Merging remote_cache and remote_worker into a single binary. 

It can still be used as only a cache server, or only a worker with
a wrapper of Hazelcast, so no functionality is lost, but it is now
simpler to use in local testing / prototyping. Changed README files
appropriately.

TESTED=locally

--
Change-Id: I3fdff9d434ce8cae5a6a700df0cb9f5bc364b60c
Reviewed-on: https://cr.bazel.build/9253
PiperOrigin-RevId: 149569790
MOS_MIGRATED_REVID=149569790
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
index 6867c22..6c3fbff 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
@@ -237,7 +237,9 @@
   }
 
   private static boolean isHazelcastOptions(RemoteOptions options) {
-    return options.hazelcastNode != null || options.hazelcastClientConfig != null;
+    return options.hazelcastNode != null
+        || options.hazelcastClientConfig != null
+        || options.hazelcastStandaloneListenPort != 0;
   }
 
   private static boolean isRestUrlOptions(RemoteOptions options) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md
index a62fc38..5040732 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/README.md
+++ b/src/main/java/com/google/devtools/build/lib/remote/README.md
@@ -125,27 +125,26 @@
 
 #### Running the sample gRPC cache server
 
-Bazel currently provides a sample gRPC CAS implementation with Hazelcast as caching backend.
+Bazel currently provides a sample gRPC CAS implementation with a ConcurrentHashMap or Hazelcast as caching backend.
 To use it you need to clone from [Bazel](https://github.com/bazelbuild/bazel) and then build it.
 ```
-bazel build //src/tools/remote_worker:remote_cache
+bazel build //src/tools/remote_worker:all
 ```
 
-The following command will then start the cache server listening on port 8081 with the default
-Hazelcast settings.
+The following command will then start the cache server listening on port 8080 using a local in-memory cache.
 ```
-bazel-bin/src/tools/remote_worker/remote_cache --listen_port 8081
+bazel-bin/src/tools/remote_worker/remote_worker --listen_port=8080
 ```
 
-To run everything in a single command.
+To connect to a running instance of Hazelcast instead, use.
 ```
-bazel run //src/tools/remote_worker:remote_cache -- --listen_port 8081
+bazel run //src/tools/remote_worker:remote_worker -- --listen_port=8080 --hazelcast_node=address:port
 ```
 
 If you want to change Hazelcast settings to enable distributed memory cache you can provide your
 own hazelcast.xml with the following command.
 ```
-bazel-bin/src/tools/remote_worker/remote_cache --jvm_flags=-Dhazelcast.config=/path/to/hz.xml --listen_port 8081
+bazel-bin/src/tools/remote_worker/remote_worker --jvm_flags=-Dhazelcast.config=/path/to/hz.xml --listen_port 8080
 ```
 You can copy and edit the [default](https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/main/resources/hazelcast-default.xml) Hazelcast configuration. Refer to Hazelcast [manual](http://docs.hazelcast.org/docs/3.6/manual/html-single/index.html#checking-configuration)
 for more details.
@@ -153,10 +152,10 @@
 #### Using the gRPC CAS endpoint
 
 Use the following build options to use the gRPC CAS endpoint for sharing build artifacts. Change
-`address:8081` to the correct server address and port number.
+`address:8080` to the correct server address and port number.
 
 ```
-build --spawn_strategy=remote --remote_cache=address:8081
+build --spawn_strategy=remote --remote_cache=address:8080
 ```
 
 ### Distributed caching with Hazelcast (TO BE REMOVED)
@@ -201,19 +200,13 @@
 startup --host_jvm_args=-Dbazel.DigestFunction=SHA1
 ```
 
-## Running the sample gRPC cache server
-```
-bazel build //src/tools/remote_worker:remote_cache
-bazel-bin/src/tools/remote_worker/remote_cache --listen_port 8081
-```
-
-## Running the sample gRPC remote worker
+## Running the sample gRPC remote worker / cache server
 ```
 bazel build //src/tools/remote_worker:remote_worker
-bazel-bin/src/tools/remote_worker/remote_cache --work_path=/tmp --listen_port 8080
+bazel-bin/src/tools/remote_worker/remote_worker --work_path=/tmp --listen_port 8080
 ```
 
-The sample gRPC cache server and gRPC remote worker both use Hazelcast and shares the **same
+The sample gRPC cache server and gRPC remote worker share the **same
 distributed memory cluster** for storing and accessing CAS objects. It is important the CAS objects
 are shared between the two server processes.
 
@@ -225,5 +218,5 @@
 
 Use the following build options.
 ```
-build --spawn_strategy=remote --remote_worker=localhost:8080 --remote_cache=localhost:8081
+build --spawn_strategy=remote --remote_worker=localhost:8080 --remote_cache=localhost:8080
 ```
diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh
index 897bdbb..a338050 100755
--- a/src/test/shell/bazel/remote_execution_test.sh
+++ b/src/test/shell/bazel/remote_execution_test.sh
@@ -71,9 +71,9 @@
 
   bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
     --spawn_strategy=remote \
-    --hazelcast_node=localhost:${hazelcast_port} \
     --remote_worker=localhost:${worker_port} \
-    //a:test >& $TEST_log \
+    --remote_cache=localhost:${worker_port} \
+        //a:test >& $TEST_log \
     || fail "Failed to build //a:test with remote execution"
   diff bazel-bin/a/test ${TEST_TMPDIR}/test_expected \
     || fail "Remote execution generated different result"
diff --git a/src/tools/remote_worker/BUILD b/src/tools/remote_worker/BUILD
index a319119..b8ed480 100644
--- a/src/tools/remote_worker/BUILD
+++ b/src/tools/remote_worker/BUILD
@@ -10,10 +10,3 @@
     visibility = ["//visibility:public"],
     runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"],
 )
-
-java_binary(
-    name = "remote_cache",
-    main_class = "com.google.devtools.build.remote.RemoteCache",
-    visibility = ["//visibility:public"],
-    runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"],
-)
diff --git a/src/tools/remote_worker/README.md b/src/tools/remote_worker/README.md
index 7664d33..088ea78 100644
--- a/src/tools/remote_worker/README.md
+++ b/src/tools/remote_worker/README.md
@@ -1,5 +1,6 @@
 This program implements a remote execution worker that uses gRPC to accept work
-requests. It also serves as a Hazelcast server for distributed caching.
+requests. It can work as a remote execution worker, a cache worker, or both.
+The simplest setup is as follows:
 
 - First build remote_worker and run it.
 
@@ -9,9 +10,13 @@
 
 - Then you run Bazel pointing to the remote_worker instance.
 
-        bazel build --hazelcast_node=127.0.0.1:5701 --spawn_strategy=remote \
-            --remote_worker=127.0.0.1:8080 src/tools/generate_workspace:all
+        bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
+            --spawn_strategy=remote --remote_cache=localhost:8080 \
+            --remote_worker=localhost:8080 src/tools/generate_workspace:all
 
 The above command will build generate_workspace with remote spawn strategy that
-uses Hazelcast as the distributed caching backend and executes work remotely on
-the localhost remote_worker.
+uses the local worker as the distributed caching and execution backend.
+
+You can also use a Hazelcast server for the distributed cache as follows:
+Suppose your Hazelcast server is listening on address:port. Then, run the
+remote worker with --hazelcast_node=address:port.
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java
deleted file mode 100644
index 023145f..0000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java
+++ /dev/null
@@ -1,315 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
-import com.google.devtools.build.lib.remote.ConcurrentMapActionCache;
-import com.google.devtools.build.lib.remote.ConcurrentMapFactory;
-import com.google.devtools.build.lib.remote.ContentDigests;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase;
-import com.google.devtools.build.lib.remote.RemoteOptions;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.util.Preconditions;
-import com.google.devtools.common.options.OptionsParser;
-import com.google.protobuf.ByteString;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.stub.StreamObserver;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/** A server which acts as a gRPC wrapper around concurrent map based remote cache. */
-public class RemoteCache {
-  private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
-  private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
-  private static final int MAX_MEMORY_KBYTES = 512 * 1024;
-  private final ConcurrentMapActionCache cache;
-  private final CasServiceImplBase casServer;
-  private final ExecutionCacheServiceImplBase execCacheServer;
-
-  public RemoteCache(ConcurrentMapActionCache cache) {
-    this.cache = cache;
-    casServer = new CasServer();
-    execCacheServer = new ExecutionCacheServer();
-  }
-
-  public CasServiceImplBase getCasServer() {
-    return casServer;
-  }
-
-  public ExecutionCacheServiceImplBase getExecCacheServer() {
-    return execCacheServer;
-  }
-
-  class CasServer extends CasServiceImplBase {
-    @Override
-    public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) {
-      CasLookupReply.Builder reply = CasLookupReply.newBuilder();
-      CasStatus.Builder status = reply.getStatusBuilder();
-      for (ContentDigest digest : request.getDigestList()) {
-        if (!cache.containsKey(digest)) {
-          status.addMissingDigest(digest);
-        }
-      }
-      if (status.getMissingDigestCount() > 0) {
-        status.setSucceeded(false);
-        status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
-      } else {
-        status.setSucceeded(true);
-      }
-      responseObserver.onNext(reply.build());
-      responseObserver.onCompleted();
-    }
-
-    @Override
-    public void uploadTreeMetadata(
-        CasUploadTreeMetadataRequest request,
-        StreamObserver<CasUploadTreeMetadataReply> responseObserver) {
-      try {
-        for (FileNode treeNode : request.getTreeNodeList()) {
-          cache.uploadBlob(treeNode.toByteArray());
-        }
-        responseObserver.onNext(
-            CasUploadTreeMetadataReply.newBuilder()
-                .setStatus(CasStatus.newBuilder().setSucceeded(true))
-                .build());
-      } catch (Exception e) {
-        LOG.warning("Request failed: " + e.toString());
-        CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder();
-        reply
-            .getStatusBuilder()
-            .setSucceeded(false)
-            .setError(CasStatus.ErrorCode.UNKNOWN)
-            .setErrorDetail(e.toString());
-        responseObserver.onNext(reply.build());
-      } finally {
-        responseObserver.onCompleted();
-      }
-    }
-
-    @Override
-    public void downloadBlob(
-        CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) {
-      CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
-      CasStatus.Builder status = reply.getStatusBuilder();
-      for (ContentDigest digest : request.getDigestList()) {
-        if (!cache.containsKey(digest)) {
-          status.addMissingDigest(digest);
-        }
-      }
-      if (status.getMissingDigestCount() > 0) {
-        status.setSucceeded(false);
-        status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
-        responseObserver.onNext(reply.build());
-        responseObserver.onCompleted();
-        return;
-      }
-      status.setSucceeded(true);
-      try {
-        for (ContentDigest digest : request.getDigestList()) {
-          reply.setData(
-              BlobChunk.newBuilder()
-                  .setDigest(digest)
-                  .setData(ByteString.copyFrom(cache.downloadBlob(digest)))
-                  .build());
-          responseObserver.onNext(reply.build());
-          if (reply.hasStatus()) {
-            reply.clearStatus(); // Only send status on first chunk.
-          }
-        }
-      } catch (CacheNotFoundException e) {
-        // This can only happen if an item gets evicted right after we check.
-        reply.clearData();
-        status.setSucceeded(false);
-        status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
-        status.addMissingDigest(e.getMissingDigest());
-        responseObserver.onNext(reply.build());
-      } finally {
-        responseObserver.onCompleted();
-      }
-    }
-
-    @Override
-    public StreamObserver<CasUploadBlobRequest> uploadBlob(
-        final StreamObserver<CasUploadBlobReply> responseObserver) {
-      return new StreamObserver<CasUploadBlobRequest>() {
-        byte[] blob = null;
-        ContentDigest digest = null;
-        long offset = 0;
-
-        @Override
-        public void onNext(CasUploadBlobRequest request) {
-          BlobChunk chunk = request.getData();
-          try {
-            if (chunk.hasDigest()) {
-              // Check if the previous chunk was really done.
-              Preconditions.checkArgument(
-                  digest == null || offset == 0,
-                  "Missing input chunk for digest %s",
-                  digest == null ? "" : ContentDigests.toString(digest));
-              digest = chunk.getDigest();
-              // This unconditionally downloads the whole blob into memory!
-              Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
-              blob = new byte[(int) digest.getSizeBytes()];
-            }
-            Preconditions.checkArgument(digest != null, "First chunk contains no digest");
-            Preconditions.checkArgument(
-                offset == chunk.getOffset(),
-                "Missing input chunk for digest %s",
-                ContentDigests.toString(digest));
-            if (digest.getSizeBytes() > 0) {
-              chunk.getData().copyTo(blob, (int) offset);
-              offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
-            }
-            if (offset == 0) {
-              ContentDigest uploadedDigest = cache.uploadBlob(blob);
-              Preconditions.checkArgument(
-                  uploadedDigest.equals(digest),
-                  "Digest mismatch: client sent %s, server computed %s",
-                  ContentDigests.toString(digest),
-                  ContentDigests.toString(uploadedDigest));
-            }
-          } catch (Exception e) {
-            LOG.warning("Request failed: " + e.toString());
-            CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
-            reply
-                .getStatusBuilder()
-                .setSucceeded(false)
-                .setError(
-                    e instanceof IllegalArgumentException
-                        ? CasStatus.ErrorCode.INVALID_ARGUMENT
-                        : CasStatus.ErrorCode.UNKNOWN)
-                .setErrorDetail(e.toString());
-            responseObserver.onNext(reply.build());
-          }
-        }
-
-        @Override
-        public void onError(Throwable t) {
-          LOG.warning("Request errored remotely: " + t);
-        }
-
-        @Override
-        public void onCompleted() {
-          responseObserver.onCompleted();
-        }
-      };
-    }
-  }
-
-  class ExecutionCacheServer extends ExecutionCacheServiceImplBase {
-    @Override
-    public void getCachedResult(
-        ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) {
-      try {
-        ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
-        ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
-        ActionResult result = cache.getCachedActionResult(actionKey);
-        if (result != null) {
-          reply.setResult(result);
-        }
-        reply.getStatusBuilder().setSucceeded(true);
-        responseObserver.onNext(reply.build());
-      } catch (Exception e) {
-        LOG.warning("getCachedActionResult request failed: " + e.toString());
-        ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
-        reply
-            .getStatusBuilder()
-            .setSucceeded(false)
-            .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
-        responseObserver.onNext(reply.build());
-      } finally {
-        responseObserver.onCompleted();
-      }
-    }
-
-    @Override
-    public void setCachedResult(
-        ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) {
-      try {
-        ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
-        cache.setCachedActionResult(actionKey, request.getResult());
-        ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
-        reply.getStatusBuilder().setSucceeded(true);
-        responseObserver.onNext(reply.build());
-      } catch (Exception e) {
-        LOG.warning("setCachedActionResult request failed: " + e.toString());
-        ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
-        reply
-            .getStatusBuilder()
-            .setSucceeded(false)
-            .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
-        responseObserver.onNext(reply.build());
-      } finally {
-        responseObserver.onCompleted();
-      }
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    OptionsParser parser =
-        OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);
-    parser.parseAndExitUponError(args);
-    RemoteOptions remoteOptions = parser.getOptions(RemoteOptions.class);
-    RemoteWorkerOptions remoteWorkerOptions = parser.getOptions(RemoteWorkerOptions.class);
-
-    System.out.println("*** Starting Hazelcast server.");
-    ConcurrentMapActionCache cache =
-        new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions));
-
-    System.out.println(
-        "*** Starting grpc server on all locally bound IPs on port "
-            + remoteWorkerOptions.listenPort
-            + ".");
-    RemoteCache worker = new RemoteCache(cache);
-    final Server server =
-        ServerBuilder.forPort(remoteWorkerOptions.listenPort)
-            .addService(worker.getCasServer())
-            .addService(worker.getExecCacheServer())
-            .build();
-    server.start();
-
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread() {
-              @Override
-              public void run() {
-                System.err.println("*** Shutting down grpc server.");
-                server.shutdown();
-                System.err.println("*** Server shut down.");
-              }
-            });
-    server.awaitTermination();
-  }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index cca971a..3e2ef4a 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -16,31 +16,51 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.remote.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
 import com.google.devtools.build.lib.remote.ConcurrentMapActionCache;
 import com.google.devtools.build.lib.remote.ConcurrentMapFactory;
 import com.google.devtools.build.lib.remote.ContentDigests;
+import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
 import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
+import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase;
 import com.google.devtools.build.lib.remote.RemoteOptions;
 import com.google.devtools.build.lib.remote.RemoteProtocol;
 import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
 import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
 import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
 import com.google.devtools.build.lib.shell.AbnormalTerminationException;
 import com.google.devtools.build.lib.shell.Command;
 import com.google.devtools.build.lib.shell.CommandException;
 import com.google.devtools.build.lib.unix.UnixFileSystem;
 import com.google.devtools.build.lib.util.OS;
+import com.google.devtools.build.lib.util.Preconditions;
 import com.google.devtools.build.lib.util.ProcessUtils;
 import com.google.devtools.build.lib.vfs.FileSystem;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
 import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.common.options.OptionsParser;
+import com.google.protobuf.ByteString;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
@@ -55,6 +75,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -62,153 +84,381 @@
  * Implements a remote worker that accepts work items as protobufs. The server implementation is
  * based on grpc.
  */
-public class RemoteWorker extends ExecuteServiceImplBase {
+public class RemoteWorker {
   private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
   private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
-  private final Path workPath;
-  private final RemoteOptions remoteOptions;
-  private final RemoteWorkerOptions options;
   private final ConcurrentMapActionCache cache;
+  private final CasServiceImplBase casServer;
+  private final ExecuteServiceImplBase execServer;
+  private final ExecutionCacheServiceImplBase execCacheServer;
 
-  public RemoteWorker(
-      Path workPath,
-      RemoteOptions remoteOptions,
-      RemoteWorkerOptions options,
-      ConcurrentMapActionCache cache) {
-    this.workPath = workPath;
-    this.remoteOptions = remoteOptions;
-    this.options = options;
+  public RemoteWorker(Path workPath, RemoteWorkerOptions options, ConcurrentMapActionCache cache) {
     this.cache = cache;
+    casServer = new CasServer();
+    execServer = new ExecutionServer(workPath, options);
+    execCacheServer = new ExecutionCacheServer();
   }
 
-  private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
-    HashMap<String, String> result = new HashMap<>();
-    for (EnvironmentEntry entry : command.getEnvironmentList()) {
-      result.put(entry.getVariable(), entry.getValue());
-    }
-    return result;
+  public CasServiceImplBase getCasServer() {
+    return casServer;
   }
 
-  public ExecuteReply execute(Action action, Path execRoot)
-      throws IOException, InterruptedException {
-    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-    try {
-      RemoteProtocol.Command command =
-          RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
-      cache.downloadTree(action.getInputRootDigest(), execRoot);
-
-      List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
-      for (String output : action.getOutputPathList()) {
-        Path file = execRoot.getRelative(output);
-        if (file.exists()) {
-          throw new FileAlreadyExistsException("Output file already exists: " + file);
-        }
-        FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
-        outputs.add(file);
-      }
-
-      // TODO(olaola): time out after specified server-side deadline.
-      Command cmd =
-          new Command(
-              command.getArgvList().toArray(new String[] {}),
-              getEnvironmentVariables(command),
-              new File(execRoot.getPathString()));
-      cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
-
-      // Execute throws a CommandException on non-zero return values, so action has succeeded.
-      ImmutableList<ContentDigest> outErrDigests =
-          cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
-      ActionResult.Builder result =
-          ActionResult.newBuilder()
-              .setReturnCode(0)
-              .setStdoutDigest(outErrDigests.get(0))
-              .setStderrDigest(outErrDigests.get(1));
-      cache.uploadAllResults(execRoot, outputs, result);
-      cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build());
-      return ExecuteReply.newBuilder()
-          .setResult(result)
-          .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
-          .build();
-    } catch (CommandException e) {
-      ImmutableList<ContentDigest> outErrDigests =
-          cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
-      final int returnCode =
-          e instanceof AbnormalTerminationException
-              ? ((AbnormalTerminationException) e).getResult().getTerminationStatus().getExitCode()
-              : -1;
-      return ExecuteReply.newBuilder()
-          .setResult(
-              ActionResult.newBuilder()
-                  .setReturnCode(returnCode)
-                  .setStdoutDigest(outErrDigests.get(0))
-                  .setStderrDigest(outErrDigests.get(1)))
-          .setStatus(
-              ExecutionStatus.newBuilder()
-                  .setExecuted(true)
-                  .setSucceeded(false)
-                  .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
-                  .setErrorDetail(e.toString()))
-          .build();
-    } catch (CacheNotFoundException e) {
-      LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
-      return ExecuteReply.newBuilder()
-          .setCasError(
-              CasStatus.newBuilder()
-                  .setSucceeded(false)
-                  .addMissingDigest(e.getMissingDigest())
-                  .setError(CasStatus.ErrorCode.MISSING_DIGEST)
-                  .setErrorDetail(e.toString()))
-          .setStatus(
-              ExecutionStatus.newBuilder()
-                  .setExecuted(false)
-                  .setSucceeded(false)
-                  .setError(
-                      e.getMissingDigest() == action.getCommandDigest()
-                          ? ExecutionStatus.ErrorCode.MISSING_COMMAND
-                          : ExecutionStatus.ErrorCode.MISSING_INPUT)
-                  .setErrorDetail(e.toString()))
-          .build();
-    }
+  public ExecuteServiceImplBase getExecutionServer() {
+    return execServer;
   }
 
-  @Override
-  public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
-    Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
-    try {
-      tempRoot.createDirectory();
-      if (LOG_FINER) {
-        LOG.fine(
-            "Work received has "
-                + request.getTotalInputFileCount()
-                + " input files and "
-                + request.getAction().getOutputPathCount()
-                + " output files.");
-      }
-      ExecuteReply reply = execute(request.getAction(), tempRoot);
-      responseObserver.onNext(reply);
-      if (options.debug) {
-        if (!reply.getStatus().getSucceeded()) {
-          LOG.warning("Work failed. Request: " + request.toString() + ".");
-        } else if (LOG_FINER) {
-          LOG.fine("Work completed.");
+  public ExecutionCacheServiceImplBase getExecCacheServer() {
+    return execCacheServer;
+  }
+
+  class CasServer extends CasServiceImplBase {
+    private static final int MAX_MEMORY_KBYTES = 512 * 1024;
+
+    @Override
+    public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) {
+      CasLookupReply.Builder reply = CasLookupReply.newBuilder();
+      CasStatus.Builder status = reply.getStatusBuilder();
+      for (ContentDigest digest : request.getDigestList()) {
+        if (!cache.containsKey(digest)) {
+          status.addMissingDigest(digest);
         }
       }
-      if (!options.debug) {
-        FileSystemUtils.deleteTree(tempRoot);
+      if (status.getMissingDigestCount() > 0) {
+        status.setSucceeded(false);
+        status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
       } else {
-        LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
+        status.setSucceeded(true);
       }
-    } catch (IOException | InterruptedException e) {
-      ExecuteReply.Builder reply = ExecuteReply.newBuilder();
-      reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
       responseObserver.onNext(reply.build());
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-    } finally {
       responseObserver.onCompleted();
     }
+
+    @Override
+    public void uploadTreeMetadata(
+        CasUploadTreeMetadataRequest request,
+        StreamObserver<CasUploadTreeMetadataReply> responseObserver) {
+      try {
+        for (FileNode treeNode : request.getTreeNodeList()) {
+          cache.uploadBlob(treeNode.toByteArray());
+        }
+        responseObserver.onNext(
+            CasUploadTreeMetadataReply.newBuilder()
+                .setStatus(CasStatus.newBuilder().setSucceeded(true))
+                .build());
+      } catch (Exception e) {
+        LOG.warning("Request failed: " + e.toString());
+        CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder();
+        reply
+            .getStatusBuilder()
+            .setSucceeded(false)
+            .setError(CasStatus.ErrorCode.UNKNOWN)
+            .setErrorDetail(e.toString());
+        responseObserver.onNext(reply.build());
+      } finally {
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void downloadBlob(
+        CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) {
+      CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
+      CasStatus.Builder status = reply.getStatusBuilder();
+      for (ContentDigest digest : request.getDigestList()) {
+        if (!cache.containsKey(digest)) {
+          status.addMissingDigest(digest);
+        }
+      }
+      if (status.getMissingDigestCount() > 0) {
+        status.setSucceeded(false);
+        status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
+        responseObserver.onNext(reply.build());
+        responseObserver.onCompleted();
+        return;
+      }
+      status.setSucceeded(true);
+      try {
+        for (ContentDigest digest : request.getDigestList()) {
+          reply.setData(
+              BlobChunk.newBuilder()
+                  .setDigest(digest)
+                  .setData(ByteString.copyFrom(cache.downloadBlob(digest)))
+                  .build());
+          responseObserver.onNext(reply.build());
+          if (reply.hasStatus()) {
+            reply.clearStatus(); // Only send status on first chunk.
+          }
+        }
+      } catch (CacheNotFoundException e) {
+        // This can only happen if an item gets evicted right after we check.
+        reply.clearData();
+        status.setSucceeded(false);
+        status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
+        status.addMissingDigest(e.getMissingDigest());
+        responseObserver.onNext(reply.build());
+      } finally {
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public StreamObserver<CasUploadBlobRequest> uploadBlob(
+        final StreamObserver<CasUploadBlobReply> responseObserver) {
+      return new StreamObserver<CasUploadBlobRequest>() {
+        byte[] blob = null;
+        ContentDigest digest = null;
+        long offset = 0;
+
+        @Override
+        public void onNext(CasUploadBlobRequest request) {
+          BlobChunk chunk = request.getData();
+          try {
+            if (chunk.hasDigest()) {
+              // Check if the previous chunk was really done.
+              Preconditions.checkArgument(
+                  digest == null || offset == 0,
+                  "Missing input chunk for digest %s",
+                  digest == null ? "" : ContentDigests.toString(digest));
+              digest = chunk.getDigest();
+              // This unconditionally downloads the whole blob into memory!
+              Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
+              blob = new byte[(int) digest.getSizeBytes()];
+            }
+            Preconditions.checkArgument(digest != null, "First chunk contains no digest");
+            Preconditions.checkArgument(
+                offset == chunk.getOffset(),
+                "Missing input chunk for digest %s",
+                ContentDigests.toString(digest));
+            if (digest.getSizeBytes() > 0) {
+              chunk.getData().copyTo(blob, (int) offset);
+              offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
+            }
+            if (offset == 0) {
+              ContentDigest uploadedDigest = cache.uploadBlob(blob);
+              Preconditions.checkArgument(
+                  uploadedDigest.equals(digest),
+                  "Digest mismatch: client sent %s, server computed %s",
+                  ContentDigests.toString(digest),
+                  ContentDigests.toString(uploadedDigest));
+            }
+          } catch (Exception e) {
+            LOG.warning("Request failed: " + e.toString());
+            CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
+            reply
+                .getStatusBuilder()
+                .setSucceeded(false)
+                .setError(
+                    e instanceof IllegalArgumentException
+                        ? CasStatus.ErrorCode.INVALID_ARGUMENT
+                        : CasStatus.ErrorCode.UNKNOWN)
+                .setErrorDetail(e.toString());
+            responseObserver.onNext(reply.build());
+          }
+        }
+
+        @Override
+        public void onError(Throwable t) {
+          LOG.warning("Request errored remotely: " + t);
+        }
+
+        @Override
+        public void onCompleted() {
+          responseObserver.onCompleted();
+        }
+      };
+    }
+  }
+
+  class ExecutionCacheServer extends ExecutionCacheServiceImplBase {
+    @Override
+    public void getCachedResult(
+        ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) {
+      try {
+        ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
+        ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
+        ActionResult result = cache.getCachedActionResult(actionKey);
+        if (result != null) {
+          reply.setResult(result);
+        }
+        reply.getStatusBuilder().setSucceeded(true);
+        responseObserver.onNext(reply.build());
+      } catch (Exception e) {
+        LOG.warning("getCachedActionResult request failed: " + e.toString());
+        ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
+        reply
+            .getStatusBuilder()
+            .setSucceeded(false)
+            .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
+        responseObserver.onNext(reply.build());
+      } finally {
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void setCachedResult(
+        ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) {
+      try {
+        ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
+        cache.setCachedActionResult(actionKey, request.getResult());
+        ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
+        reply.getStatusBuilder().setSucceeded(true);
+        responseObserver.onNext(reply.build());
+      } catch (Exception e) {
+        LOG.warning("setCachedActionResult request failed: " + e.toString());
+        ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
+        reply
+            .getStatusBuilder()
+            .setSucceeded(false)
+            .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
+        responseObserver.onNext(reply.build());
+      } finally {
+        responseObserver.onCompleted();
+      }
+    }
+  }
+
+  class ExecutionServer extends ExecuteServiceImplBase {
+    private final Path workPath;
+    private final RemoteWorkerOptions options;
+
+    public ExecutionServer(Path workPath, RemoteWorkerOptions options) {
+      this.workPath = workPath;
+      this.options = options;
+    }
+
+    private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
+      HashMap<String, String> result = new HashMap<>();
+      for (EnvironmentEntry entry : command.getEnvironmentList()) {
+        result.put(entry.getVariable(), entry.getValue());
+      }
+      return result;
+    }
+
+    public ExecuteReply execute(Action action, Path execRoot)
+        throws IOException, InterruptedException {
+      ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+      ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+      try {
+        RemoteProtocol.Command command =
+            RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
+        cache.downloadTree(action.getInputRootDigest(), execRoot);
+
+        List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
+        for (String output : action.getOutputPathList()) {
+          Path file = execRoot.getRelative(output);
+          if (file.exists()) {
+            throw new FileAlreadyExistsException("Output file already exists: " + file);
+          }
+          FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
+          outputs.add(file);
+        }
+
+        // TODO(olaola): time out after specified server-side deadline.
+        Command cmd =
+            new Command(
+                command.getArgvList().toArray(new String[] {}),
+                getEnvironmentVariables(command),
+                new File(execRoot.getPathString()));
+        cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
+
+        // Execute throws a CommandException on non-zero return values, so action has succeeded.
+        ImmutableList<ContentDigest> outErrDigests =
+            cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
+        ActionResult.Builder result =
+            ActionResult.newBuilder()
+                .setReturnCode(0)
+                .setStdoutDigest(outErrDigests.get(0))
+                .setStderrDigest(outErrDigests.get(1));
+        cache.uploadAllResults(execRoot, outputs, result);
+        cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build());
+        return ExecuteReply.newBuilder()
+            .setResult(result)
+            .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
+            .build();
+      } catch (CommandException e) {
+        ImmutableList<ContentDigest> outErrDigests =
+            cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
+        final int returnCode =
+            e instanceof AbnormalTerminationException
+                ? ((AbnormalTerminationException) e)
+                    .getResult()
+                    .getTerminationStatus()
+                    .getExitCode()
+                : -1;
+        return ExecuteReply.newBuilder()
+            .setResult(
+                ActionResult.newBuilder()
+                    .setReturnCode(returnCode)
+                    .setStdoutDigest(outErrDigests.get(0))
+                    .setStderrDigest(outErrDigests.get(1)))
+            .setStatus(
+                ExecutionStatus.newBuilder()
+                    .setExecuted(true)
+                    .setSucceeded(false)
+                    .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
+                    .setErrorDetail(e.toString()))
+            .build();
+      } catch (CacheNotFoundException e) {
+        LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
+        return ExecuteReply.newBuilder()
+            .setCasError(
+                CasStatus.newBuilder()
+                    .setSucceeded(false)
+                    .addMissingDigest(e.getMissingDigest())
+                    .setError(CasStatus.ErrorCode.MISSING_DIGEST)
+                    .setErrorDetail(e.toString()))
+            .setStatus(
+                ExecutionStatus.newBuilder()
+                    .setExecuted(false)
+                    .setSucceeded(false)
+                    .setError(
+                        e.getMissingDigest() == action.getCommandDigest()
+                            ? ExecutionStatus.ErrorCode.MISSING_COMMAND
+                            : ExecutionStatus.ErrorCode.MISSING_INPUT)
+                    .setErrorDetail(e.toString()))
+            .build();
+      }
+    }
+
+    @Override
+    public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
+      Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
+      try {
+        tempRoot.createDirectory();
+        if (LOG_FINER) {
+          LOG.fine(
+              "Work received has "
+                  + request.getTotalInputFileCount()
+                  + " input files and "
+                  + request.getAction().getOutputPathCount()
+                  + " output files.");
+        }
+        ExecuteReply reply = execute(request.getAction(), tempRoot);
+        responseObserver.onNext(reply);
+        if (options.debug) {
+          if (!reply.getStatus().getSucceeded()) {
+            LOG.warning("Work failed. Request: " + request.toString() + ".");
+          } else if (LOG_FINER) {
+            LOG.fine("Work completed.");
+          }
+        }
+        if (!options.debug) {
+          FileSystemUtils.deleteTree(tempRoot);
+        } else {
+          LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
+        }
+      } catch (IOException | InterruptedException e) {
+        ExecuteReply.Builder reply = ExecuteReply.newBuilder();
+        reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
+        responseObserver.onNext(reply.build());
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+      } finally {
+        responseObserver.onCompleted();
+      }
+    }
   }
 
   public static void main(String[] args) throws Exception {
@@ -223,9 +473,11 @@
       return;
     }
 
-    System.out.println("*** Starting Hazelcast server.");
-    ConcurrentMapActionCache cache =
-        new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions));
+    System.out.println("*** Initializing in-memory cache server.");
+    ConcurrentMap<String, byte[]> cache =
+        ConcurrentMapFactory.isRemoteCacheOptions(remoteOptions)
+            ? ConcurrentMapFactory.create(remoteOptions)
+            : new ConcurrentHashMap<String, byte[]>();
 
     System.out.println(
         "*** Starting grpc server on all locally bound IPs on port "
@@ -233,9 +485,14 @@
             + ".");
     Path workPath = getFileSystem().getPath(remoteWorkerOptions.workPath);
     FileSystemUtils.createDirectoryAndParents(workPath);
-    RemoteWorker worker = new RemoteWorker(workPath, remoteOptions, remoteWorkerOptions, cache);
+    RemoteWorker worker =
+        new RemoteWorker(workPath, remoteWorkerOptions, new ConcurrentMapActionCache(cache));
     final Server server =
-        ServerBuilder.forPort(remoteWorkerOptions.listenPort).addService(worker).build();
+        ServerBuilder.forPort(remoteWorkerOptions.listenPort)
+            .addService(worker.getCasServer())
+            .addService(worker.getExecutionServer())
+            .addService(worker.getExecCacheServer())
+            .build();
     server.start();
 
     final Path pidFile;
@@ -270,7 +527,7 @@
   }
 
   public static void printUsage(OptionsParser parser) {
-    System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a RPC service.");
+    System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a gRPC service.");
     System.out.println(
         parser.describeOptions(
             Collections.<String, String>emptyMap(), OptionsParser.HelpVerbosity.LONG));