Remove Hazelcast dependency

The only remaining use was a testing REST backend in the LRE.

I wrote a replacement for that using netty, which we use for our network stuff in Bazel, which means we can now get rid of Hazelcast. :)

I'll remove the Hazelcast files in a separate change when this is merged.

PiperOrigin-RevId: 205985996
diff --git a/src/test/shell/bazel/remote/remote_execution_http_test.sh b/src/test/shell/bazel/remote/remote_execution_http_test.sh
index 1fe3b1e..86350e6 100755
--- a/src/test/shell/bazel/remote/remote_execution_http_test.sh
+++ b/src/test/shell/bazel/remote/remote_execution_http_test.sh
@@ -29,12 +29,12 @@
   while [ $attempts -le 5 ]; do
     (( attempts++ ))
     worker_port=$(pick_random_unused_tcp_port) || fail "no port found"
-    hazelcast_port=$(pick_random_unused_tcp_port) || fail "no port found"
+    http_port=$(pick_random_unused_tcp_port) || fail "no port found"
     "${BAZEL_RUNFILES}/src/tools/remote/worker" \
         --work_path="${work_path}" \
         --listen_port=${worker_port} \
-        --hazelcast_standalone_listen_port=${hazelcast_port} \
-        --pid_file="${pid_file}" >& $TEST_log &
+        --http_listen_port=${http_port} \
+        --pid_file="${pid_file}" &
     local wait_seconds=0
     until [ -s "${pid_file}" ] || [ "$wait_seconds" -eq 15 ]; do
       sleep 1
@@ -72,21 +72,21 @@
 #include <iostream>
 int main() { std::cout << "Hello world!" << std::endl; return 0; }
 EOF
-  bazel build //a:test >& $TEST_log \
+  bazel build //a:test \
     || fail "Failed to build //a:test without remote cache"
   cp -f bazel-bin/a/test ${TEST_TMPDIR}/test_expected
 
-  bazel clean --expunge >& $TEST_log
+  bazel clean --expunge
   bazel build \
-      --remote_http_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
-      //a:test >& $TEST_log \
+      --remote_http_cache=http://localhost:${http_port} \
+      //a:test \
       || fail "Failed to build //a:test with remote REST cache service"
   diff bazel-bin/a/test ${TEST_TMPDIR}/test_expected \
       || fail "Remote cache generated different result"
   # Check that persistent connections are closed after the build. Is there a good cross-platform way
   # to check this?
   if [[ "$PLATFORM" = "linux" ]]; then
-    if netstat -tn | grep -qE ":${hazelcast_port}\\s+ESTABLISHED$"; then
+    if netstat -tn | grep -qE ":${http_port}\\s+ESTABLISHED$"; then
       fail "connections to to cache not closed"
     fi
   fi
@@ -119,7 +119,7 @@
   # Check that persistent connections are closed after the build. Is there a good cross-platform way
   # to check this?
   if [[ "$PLATFORM" = "linux" ]]; then
-    if netstat -tn | grep -qE ":${hazelcast_port}\\s+ESTABLISHED$"; then
+    if netstat -tn | grep -qE ":${http_port}\\s+ESTABLISHED$"; then
       fail "connections to to cache not closed"
     fi
   fi
@@ -135,7 +135,7 @@
 EOF
     bazel build \
           --noremote_allow_symlink_upload \
-          --remote_http_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+          --remote_http_cache=http://localhost:${http_port} \
           //:make-link &> $TEST_log \
           && fail "should have failed" || true
     expect_log "/l is a symbolic link"
@@ -151,7 +151,7 @@
 EOF
     bazel build \
           --noremote_allow_symlink_upload \
-          --remote_http_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+          --remote_http_cache=http://localhost:${http_port} \
           //:make-link &> $TEST_log \
           && fail "should have failed" || true
     expect_log "dir/l is a symbolic link"
@@ -265,7 +265,7 @@
   set_directory_artifact_skylark_testfixtures
 
   bazel build \
-      --remote_rest_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+      --remote_rest_cache=http://localhost:${http_port} \
       //a:test >& $TEST_log \
       || fail "Failed to build //a:test with remote REST cache"
   diff bazel-genfiles/a/qux/out.txt a/test_expected \
@@ -276,7 +276,7 @@
   set_directory_artifact_skylark_testfixtures
 
   bazel build \
-      --remote_rest_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+      --remote_rest_cache=http://localhost:${http_port} \
       //a:test2 >& $TEST_log \
       || fail "Failed to build //a:test2 with remote REST cache"
   diff bazel-genfiles/a/test2-out.txt a/test_expected \
diff --git a/src/tools/remote/BUILD b/src/tools/remote/BUILD
index 31ce3ec..0255167 100644
--- a/src/tools/remote/BUILD
+++ b/src/tools/remote/BUILD
@@ -6,10 +6,6 @@
 
 java_binary(
     name = "worker",
-    jvm_flags = [
-        # Enables REST for Hazelcast server for testing.
-        "-Dhazelcast.rest.enabled=true",
-    ],
     main_class = "com.google.devtools.build.remote.worker.RemoteWorker",
     visibility = ["//visibility:public"],
     runtime_deps = ["//src/tools/remote/src/main/java/com/google/devtools/build/remote/worker"],
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
index 0c126c5..9abb6af 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
@@ -27,7 +27,6 @@
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/common/options",
         "//third_party:guava",
-        "//third_party:hazelcast",
         "//third_party:netty",
         "//third_party/grpc:grpc-jar",
         "//third_party/protobuf:protobuf_java",
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java
new file mode 100644
index 0000000..d51292e
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java
@@ -0,0 +1,108 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.remote.worker;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/** A simple HTTP REST in-memory cache used during testing the LRE. */
+public class HttpCacheServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+  final ConcurrentMap<String, byte[]> cache = new ConcurrentHashMap<>();
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
+    if (!request.decoderResult().isSuccess()) {
+      sendError(ctx, request, HttpResponseStatus.BAD_REQUEST);
+      return;
+    }
+
+    if (request.method() == HttpMethod.GET) {
+      handleGet(ctx, request);
+    } else if (request.method() == HttpMethod.PUT) {
+      handlePut(ctx, request);
+    } else {
+      sendError(ctx, request, HttpResponseStatus.METHOD_NOT_ALLOWED);
+    }
+  }
+
+  private void handleGet(ChannelHandlerContext ctx, FullHttpRequest request) {
+    byte[] contents = cache.get(request.uri());
+
+    if (contents == null) {
+      sendError(ctx, request, HttpResponseStatus.NOT_FOUND);
+      return;
+    }
+
+    FullHttpResponse response =
+        new DefaultFullHttpResponse(
+            HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(contents));
+    HttpUtil.setContentLength(response, contents.length);
+    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+    ChannelFuture lastContentFuture = ctx.writeAndFlush(response);
+
+    if (!HttpUtil.isKeepAlive(request)) {
+      lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+
+  private void handlePut(ChannelHandlerContext ctx, FullHttpRequest request) {
+    if (!request.decoderResult().isSuccess()) {
+      sendError(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+      return;
+    }
+
+    byte[] contentBytes = new byte[request.content().readableBytes()];
+    request.content().readBytes(contentBytes);
+    cache.putIfAbsent(request.uri(), contentBytes);
+
+    FullHttpResponse response =
+        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+    ChannelFuture lastContentFuture = ctx.writeAndFlush(response);
+
+    if (!HttpUtil.isKeepAlive(request)) {
+      lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+
+  private static void sendError(
+      ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
+    FullHttpResponse response =
+        new DefaultFullHttpResponse(
+            HttpVersion.HTTP_1_1,
+            status,
+            Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+    ChannelFuture future = ctx.writeAndFlush(response);
+
+    if (!HttpUtil.isKeepAlive(request)) {
+      future.addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java
new file mode 100644
index 0000000..8d98655
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java
@@ -0,0 +1,33 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.remote.worker;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+
+/** The initializer used by the HttpCacheServerHandler. */
+public class HttpCacheServerInitializer extends ChannelInitializer<SocketChannel> {
+
+  @Override
+  protected void initChannel(SocketChannel ch) {
+    ChannelPipeline p = ch.pipeline();
+    p.addLast(new HttpServerCodec());
+    p.addLast(new HttpObjectAggregator(100 * 1024 * 1024));
+    p.addLast(new HttpCacheServerHandler());
+  }
+}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index bf4d891..e8d8318 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -57,13 +57,17 @@
 import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
 import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
 import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
-import com.hazelcast.config.Config;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
 import io.grpc.Server;
 import io.grpc.ServerInterceptor;
 import io.grpc.ServerInterceptors;
 import io.grpc.netty.NettyServerBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -80,6 +84,7 @@
  * based on gRPC.
  */
 public final class RemoteWorker {
+
   // We need to keep references to the root and netty loggers to prevent them from being garbage
   // collected, which would cause us to loose their configuration.
   private static final Logger rootLogger = Logger.getLogger("");
@@ -195,23 +200,7 @@
             });
   }
 
-  /**
-   * Construct a {@link SimpleBlobStore} using Hazelcast's version of {@link ConcurrentMap}. This
-   * will start a standalone Hazelcast server in the same JVM. There will also be a REST server
-   * started for accessing the maps.
-   */
-  private static SimpleBlobStore createHazelcast(RemoteWorkerOptions options) {
-    Config config = new Config();
-    config
-        .getNetworkConfig()
-        .setPort(options.hazelcastStandaloneListenPort)
-        .getJoin()
-        .getMulticastConfig()
-        .setEnabled(false);
-    HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
-    return new ConcurrentMapBlobStore(instance.getMap("cache"));
-  }
-
+  @SuppressWarnings("FutureReturnValueIgnored")
   public static void main(String[] args) throws Exception {
     OptionsParser parser =
         OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);
@@ -259,18 +248,14 @@
 
     // The instance of SimpleBlobStore used is based on these criteria in order:
     // 1. If remote cache or local disk cache is specified then use it first.
-    // 2. Otherwise start a standalone Hazelcast instance and use it as the blob store. This also
-    //    creates a REST server for testing.
-    // 3. Finally use a ConcurrentMap to back the blob store.
+    // 2. Finally use a ConcurrentMap to back the blob store.
     final SimpleBlobStore blobStore;
     if (usingRemoteCache) {
       blobStore = SimpleBlobStoreFactory.create(remoteOptions, null, null);
     } else if (remoteWorkerOptions.casPath != null) {
       blobStore = new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath));
-    } else if (remoteWorkerOptions.hazelcastStandaloneListenPort != 0) {
-      blobStore = createHazelcast(remoteWorkerOptions);
     } else {
-      blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
+      blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<>());
     }
 
     ListeningScheduledExecutorService retryService =
@@ -292,9 +277,40 @@
             digestUtil);
 
     final Server server = worker.startServer();
+
+    EventLoopGroup bossGroup = null, workerGroup = null;
+    Channel ch = null;
+    if (remoteWorkerOptions.httpListenPort != 0) {
+      // Configure the server.
+      bossGroup = new NioEventLoopGroup(1);
+      workerGroup = new NioEventLoopGroup();
+      ServerBootstrap b = new ServerBootstrap();
+      b.group(bossGroup, workerGroup)
+          .channel(NioServerSocketChannel.class)
+          .handler(new LoggingHandler(LogLevel.INFO))
+          .childHandler(new HttpCacheServerInitializer());
+      ch = b.bind(remoteWorkerOptions.httpListenPort).sync().channel();
+      logger.log(
+          INFO,
+          "Started HTTP cache server on port " + remoteWorkerOptions.httpListenPort);
+    } else {
+      logger.log(INFO, "Not starting HTTP cache server");
+    }
+
     worker.createPidFile();
+
     server.awaitTermination();
+    if (ch != null) {
+      ch.closeFuture().sync().get();
+    }
+
     retryService.shutdownNow();
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
+    }
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+    }
   }
 
   private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java
index ca6a431..ac0b29c 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java
@@ -135,16 +135,16 @@
   public int jobs;
 
   @Option(
-    name = "hazelcast_standalone_listen_port",
-    defaultValue = "0",
-    category = "build_worker",
-    documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
-    effectTags = {OptionEffectTag.UNKNOWN},
-    help =
-        "Runs an embedded hazelcast server that listens to this port. The server does not join"
-            + " any cluster. This is useful for testing."
-  )
-  public int hazelcastStandaloneListenPort;
+      name = "http_listen_port",
+      defaultValue = "0",
+      category = "build_worker",
+      documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+      effectTags = {OptionEffectTag.UNKNOWN},
+      help =
+          "Starts an embedded HTTP REST server on the given port. The server will simply store PUT "
+              + "requests in memory and return them again on GET requests. This is useful for "
+              + "testing only.")
+  public int httpListenPort;
 
   private static final int MAX_JOBS = 16384;