Refactor remote package to only have one remote cache impl.

Historically the remote package shipped two implementations
for remote caching: GrpcRemoteCache and SimpleBlobStoreActionCache.
Today there is no good reason to keep this structure and the
duplication of code and tests that comes with it. This is the
final refactoring of a long series:

 * 5fc91a7
 * 12ebb84
 * 8ac6bdc
 * 9fb83b4
 * 36611c3
 * a6b5b05
 * d40933d
 * 797f292

This change makes it so that there is one RemoteCache that
has a RemoteCacheClient. The RemoteCacheClient has one
implementation per caching protocol:

 * DiskCacheClient (formerly OnDiskBlobStore)
 * GrpcCacheClient (formerly GrpcRemoteCache)
 * HttpCacheClient (formerly HttpBlobStore)
 * CombinedDiskHttpCacheClient (formerly CombinedDiskHttpBlobStore)

A single RemoteCache type for all protocols will allow
composition of caching protocols. In particular, grpc and disk
caching. Additionally, this change will allow us to fix a longstanding
bug where for the HTTP

Closes #10200.

PiperOrigin-RevId: 279723411
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 4b6264d..31d074e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -7,7 +7,6 @@
     srcs = glob(["**"]) + [
         "//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
         "//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
-        "//src/main/java/com/google/devtools/build/lib/remote/blobstore:srcs",
         "//src/main/java/com/google/devtools/build/lib/remote/http:srcs",
         "//src/main/java/com/google/devtools/build/lib/remote/logging:srcs",
         "//src/main/java/com/google/devtools/build/lib/remote/options:srcs",
@@ -40,7 +39,6 @@
         "//src/main/java/com/google/devtools/build/lib/buildeventstream",
         "//src/main/java/com/google/devtools/build/lib/concurrent",
         "//src/main/java/com/google/devtools/build/lib/profiler",
-        "//src/main/java/com/google/devtools/build/lib/remote/blobstore",
         "//src/main/java/com/google/devtools/build/lib/remote/common",
         "//src/main/java/com/google/devtools/build/lib/remote/disk",
         "//src/main/java/com/google/devtools/build/lib/remote/http",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index b312d57..107023c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -28,6 +28,7 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.PathConverter;
 import com.google.devtools.build.lib.collect.ImmutableIterable;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.vfs.Path;
 import io.grpc.Context;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
index e6b6cad..978c10f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
@@ -14,6 +14,7 @@
 package com.google.devtools.build.lib.remote;
 
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
 import com.google.devtools.build.lib.runtime.CommandEnvironment;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
similarity index 76%
rename from src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
rename to src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index c8709b3..4d694f0 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -15,15 +15,14 @@
 package com.google.devtools.build.lib.remote;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.lang.String.format;
 
 import build.bazel.remote.execution.v2.ActionCacheGrpc;
 import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub;
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
 import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub;
 import build.bazel.remote.execution.v2.Digest;
-import build.bazel.remote.execution.v2.Directory;
 import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
 import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
 import build.bazel.remote.execution.v2.GetActionResultRequest;
@@ -37,7 +36,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.HashingOutputStream;
 import com.google.common.util.concurrent.FutureCallback;
@@ -48,26 +46,24 @@
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
-import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
-import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.remote.util.Utils;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
 import io.grpc.CallCredentials;
 import io.grpc.Context;
 import io.grpc.Status;
+import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -76,9 +72,11 @@
 
 /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
 @ThreadSafe
-public class GrpcRemoteCache extends AbstractRemoteActionCache {
+public class GrpcCacheClient implements RemoteCacheClient, MissingDigestsFinder {
   private final CallCredentials credentials;
   private final ReferenceCountedChannel channel;
+  private final RemoteOptions options;
+  private final DigestUtil digestUtil;
   private final RemoteRetrier retrier;
   private final ByteStreamUploader uploader;
   private final int maxMissingBlobsDigestsPerMessage;
@@ -86,16 +84,17 @@
   private AtomicBoolean closed = new AtomicBoolean();
 
   @VisibleForTesting
-  public GrpcRemoteCache(
+  public GrpcCacheClient(
       ReferenceCountedChannel channel,
       CallCredentials credentials,
       RemoteOptions options,
       RemoteRetrier retrier,
       DigestUtil digestUtil,
       ByteStreamUploader uploader) {
-    super(options, digestUtil);
     this.credentials = credentials;
     this.channel = channel;
+    this.options = options;
+    this.digestUtil = digestUtil;
     this.retrier = retrier;
     this.uploader = uploader;
     maxMissingBlobsDigestsPerMessage = computeMaxMissingBlobsDigestsPerMessage();
@@ -141,6 +140,13 @@
         .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
   }
 
+  private ActionCacheFutureStub acFutureStub() {
+    return ActionCacheGrpc.newFutureStub(channel)
+        .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+        .withCallCredentials(credentials)
+        .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+  }
+
   @Override
   public void close() {
     if (closed.getAndSet(true)) {
@@ -199,60 +205,28 @@
     return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request)));
   }
 
-  /**
-   * Ensures that the tree structure of the inputs, the input files themselves, and the command are
-   * available in the remote cache, such that the tree can be reassembled and executed on another
-   * machine given the root digest.
-   *
-   * <p>The cache may check whether files or parts of the tree structure are already present, and do
-   * not need to be uploaded again.
-   *
-   * <p>Note that this method is only required for remote execution, not for caching itself.
-   * However, remote execution uses a cache to store input files, and that may be a separate
-   * end-point from the executor itself, so the functionality lives here.
-   */
-  public void ensureInputsPresent(
-      MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot)
-      throws IOException, InterruptedException {
-    Iterable<Digest> allDigests =
-        Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
-    ImmutableSet<Digest> missingDigests = Utils.getFromFuture(findMissingDigests(allDigests));
-    Map<HashCode, Chunker> inputsToUpload = Maps.newHashMapWithExpectedSize(missingDigests.size());
-    for (Digest missingDigest : missingDigests) {
-      Directory node = merkleTree.getDirectoryByDigest(missingDigest);
-      HashCode hash = HashCode.fromString(missingDigest.getHash());
-      if (node != null) {
-        Chunker c = Chunker.builder().setInput(node.toByteArray()).build();
-        inputsToUpload.put(hash, c);
-        continue;
-      }
+  private ListenableFuture<ActionResult> handleStatus(ListenableFuture<ActionResult> download) {
+    return Futures.catchingAsync(
+        download,
+        StatusRuntimeException.class,
+        (sre) ->
+            sre.getStatus().getCode() == Code.NOT_FOUND
+                // Return null to indicate that it was a cache miss.
+                ? Futures.immediateFuture(null)
+                : Futures.immediateFailedFuture(new IOException(sre)),
+        MoreExecutors.directExecutor());
+  }
 
-      PathOrBytes file = merkleTree.getFileByDigest(missingDigest);
-      if (file != null) {
-        final Chunker c;
-        if (file.getPath() != null) {
-          c = Chunker.builder().setInput(missingDigest.getSizeBytes(), file.getPath()).build();
-        } else {
-          c = Chunker.builder().setInput(file.getBytes().toByteArray()).build();
-        }
-        inputsToUpload.put(hash, c);
-        continue;
-      }
-
-      Message message = additionalInputs.get(missingDigest);
-      if (message != null) {
-        Chunker c = Chunker.builder().setInput(message.toByteArray()).build();
-        inputsToUpload.put(hash, c);
-        continue;
-      }
-
-      throw new IOException(
-          format(
-              "findMissingDigests returned a missing digest that has not been requested: %s",
-              missingDigest));
-    }
-
-    uploader.uploadBlobs(inputsToUpload, /* forceUpload= */ true);
+  @Override
+  public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
+    GetActionResultRequest request =
+        GetActionResultRequest.newBuilder()
+            .setInstanceName(options.remoteInstanceName)
+            .setActionDigest(actionKey.getDigest())
+            .build();
+    Context ctx = Context.current();
+    return retrier.executeAsync(
+        () -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
   }
 
   private static String digestToString(Digest digest) {
@@ -260,7 +234,25 @@
   }
 
   @Override
-  protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+  public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
+      throws IOException, InterruptedException {
+    try {
+      retrier.execute(
+          () ->
+              acBlockingStub()
+                  .updateActionResult(
+                      UpdateActionResultRequest.newBuilder()
+                          .setInstanceName(options.remoteInstanceName)
+                          .setActionDigest(actionKey.getDigest())
+                          .setActionResult(actionResult)
+                          .build()));
+    } catch (StatusRuntimeException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
     if (digest.getSizeBytes() == 0) {
       return Futures.immediateFuture(null);
     }
@@ -363,7 +355,7 @@
               public void onCompleted() {
                 try {
                   if (hashSupplier != null) {
-                    verifyContents(
+                    Utils.verifyBlobContents(
                         digest.getHash(), DigestUtil.hashCodeToString(hashSupplier.get()));
                   }
                   out.flush();
@@ -377,7 +369,7 @@
   }
 
   @Override
-  protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
+  public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
     return uploader.uploadBlobAsync(
         HashCode.fromString(digest.getHash()),
         Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
@@ -385,51 +377,10 @@
   }
 
   @Override
-  protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
     return uploader.uploadBlobAsync(
         HashCode.fromString(digest.getHash()),
         Chunker.builder().setInput(data.toByteArray()).build(),
         /* forceUpload= */ true);
   }
-
-  // Execution Cache API
-
-  @Override
-  public ActionResult getCachedActionResult(ActionKey actionKey)
-      throws IOException, InterruptedException {
-    try {
-      return retrier.execute(
-          () ->
-              acBlockingStub()
-                  .getActionResult(
-                      GetActionResultRequest.newBuilder()
-                          .setInstanceName(options.remoteInstanceName)
-                          .setActionDigest(actionKey.getDigest())
-                          .build()));
-    } catch (StatusRuntimeException e) {
-      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
-        // Return null to indicate that it was a cache miss.
-        return null;
-      }
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  protected void setCachedActionResult(ActionKey actionKey, ActionResult result)
-      throws IOException, InterruptedException {
-    try {
-      retrier.execute(
-          () ->
-              acBlockingStub()
-                  .updateActionResult(
-                      UpdateActionResultRequest.newBuilder()
-                          .setInstanceName(options.remoteInstanceName)
-                          .setActionDigest(actionKey.getDigest())
-                          .setActionResult(result)
-                          .build()));
-    } catch (StatusRuntimeException e) {
-      throw new IOException(e);
-    }
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
index 535bba1..a929254 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -43,7 +43,7 @@
  */
 final class RemoteActionContextProvider extends ActionContextProvider {
   private final CommandEnvironment env;
-  private final AbstractRemoteActionCache cache;
+  private final RemoteCache cache;
   @Nullable private final GrpcRemoteExecutor executor;
   private final RemoteRetrier retrier;
   private final DigestUtil digestUtil;
@@ -53,7 +53,7 @@
 
   private RemoteActionContextProvider(
       CommandEnvironment env,
-      AbstractRemoteActionCache cache,
+      RemoteCache cache,
       @Nullable GrpcRemoteExecutor executor,
       RemoteRetrier retrier,
       DigestUtil digestUtil,
@@ -67,17 +67,14 @@
   }
 
   public static RemoteActionContextProvider createForRemoteCaching(
-      CommandEnvironment env,
-      AbstractRemoteActionCache cache,
-      RemoteRetrier retrier,
-      DigestUtil digestUtil) {
+      CommandEnvironment env, RemoteCache cache, RemoteRetrier retrier, DigestUtil digestUtil) {
     return new RemoteActionContextProvider(
         env, cache, /*executor=*/ null, retrier, digestUtil, /*logDir=*/ null);
   }
 
   public static RemoteActionContextProvider createForRemoteExecution(
       CommandEnvironment env,
-      GrpcRemoteCache cache,
+      RemoteExecutionCache cache,
       GrpcRemoteExecutor executor,
       RemoteRetrier retrier,
       DigestUtil digestUtil,
@@ -116,7 +113,7 @@
               env.getReporter(),
               buildRequestId,
               commandId,
-              (GrpcRemoteCache) cache,
+              (RemoteExecutionCache) cache,
               executor,
               retrier,
               digestUtil,
@@ -167,7 +164,7 @@
 
   /** Returns the remote cache object if any. */
   @Nullable
-  AbstractRemoteActionCache getRemoteCache() {
+  RemoteCache getRemoteCache() {
     return cache;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index 2d82bd6..72c6d54 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -65,11 +65,11 @@
   @GuardedBy("lock")
   final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();
 
-  private final AbstractRemoteActionCache remoteCache;
+  private final RemoteCache remoteCache;
   private final Path execRoot;
   private final Context ctx;
 
-  RemoteActionInputFetcher(AbstractRemoteActionCache remoteCache, Path execRoot, Context ctx) {
+  RemoteActionInputFetcher(RemoteCache remoteCache, Path execRoot, Context ctx) {
     this.remoteCache = Preconditions.checkNotNull(remoteCache);
     this.execRoot = Preconditions.checkNotNull(execRoot);
     this.ctx = Preconditions.checkNotNull(ctx);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
similarity index 92%
rename from src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
rename to src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 1aacf14..4df9100 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -50,11 +50,11 @@
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.profiler.Profiler;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.DirectoryMetadata;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.FileMetadata;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.SymlinkMetadata;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.DirectoryMetadata;
+import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.FileMetadata;
+import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.SymlinkMetadata;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.Utils;
@@ -87,7 +87,7 @@
 
 /** A cache for storing artifacts (input and output) as well as the output of running an action. */
 @ThreadSafety.ThreadSafe
-public abstract class AbstractRemoteActionCache implements MissingDigestsFinder, AutoCloseable {
+public class RemoteCache implements AutoCloseable {
 
   /** See {@link SpawnExecutionContext#lockOutputFiles()}. */
   @FunctionalInterface
@@ -103,49 +103,21 @@
     ((SettableFuture<byte[]>) EMPTY_BYTES).set(new byte[0]);
   }
 
+  protected final RemoteCacheClient cacheProtocol;
   protected final RemoteOptions options;
   protected final DigestUtil digestUtil;
 
-  public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
+  public RemoteCache(
+      RemoteCacheClient cacheProtocol, RemoteOptions options, DigestUtil digestUtil) {
+    this.cacheProtocol = cacheProtocol;
     this.options = options;
     this.digestUtil = digestUtil;
   }
 
-  /**
-   * Attempts to look up the given action in the remote cache and return its result, if present.
-   * Returns {@code null} if there is no such entry. Note that a successful result from this method
-   * does not guarantee the availability of the corresponding output files in the remote cache.
-   *
-   * @throws IOException if the remote cache is unavailable.
-   */
-  @Nullable
-  abstract ActionResult getCachedActionResult(ActionKey actionKey)
-      throws IOException, InterruptedException;
-
-  protected abstract void setCachedActionResult(ActionKey actionKey, ActionResult action)
-      throws IOException, InterruptedException;
-
-  /**
-   * Uploads a file
-   *
-   * <p>Any errors are being propagated via the returned future. If the future completes without
-   * errors the upload was successful.
-   *
-   * @param digest the digest of the file.
-   * @param file the file to upload.
-   */
-  protected abstract ListenableFuture<Void> uploadFile(Digest digest, Path file);
-
-  /**
-   * Uploads a BLOB.
-   *
-   * <p>Any errors are being propagated via the returned future. If the future completes without
-   * errors the upload was successful
-   *
-   * @param digest the digest of the blob.
-   * @param data the blob to upload.
-   */
-  protected abstract ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
+  public ActionResult downloadActionResult(ActionKey actionKey)
+      throws IOException, InterruptedException {
+    return Utils.getFromFuture(cacheProtocol.downloadActionResult(actionKey));
+  }
 
   /**
    * Upload the result of a locally executed action to the remote cache.
@@ -167,7 +139,7 @@
     resultBuilder.setExitCode(exitCode);
     ActionResult result = resultBuilder.build();
     if (exitCode == 0 && !action.getDoNotCache()) {
-      setCachedActionResult(actionKey, result);
+      cacheProtocol.uploadActionResult(actionKey, result);
     }
     return result;
   }
@@ -209,19 +181,20 @@
     digests.addAll(digestToFile.keySet());
     digests.addAll(digestToBlobs.keySet());
 
-    ImmutableSet<Digest> digestsToUpload = Utils.getFromFuture(findMissingDigests(digests));
+    ImmutableSet<Digest> digestsToUpload =
+        Utils.getFromFuture(cacheProtocol.findMissingDigests(digests));
     ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
     for (Digest digest : digestsToUpload) {
       Path file = digestToFile.get(digest);
       if (file != null) {
-        uploads.add(uploadFile(digest, file));
+        uploads.add(cacheProtocol.uploadFile(digest, file));
       } else {
         ByteString blob = digestToBlobs.get(digest);
         if (blob == null) {
           String message = "FindMissingBlobs call returned an unknown digest: " + digest;
           throw new IOException(message);
         }
-        uploads.add(uploadBlob(digest, blob));
+        uploads.add(cacheProtocol.uploadBlob(digest, blob));
       }
     }
 
@@ -255,13 +228,6 @@
   }
 
   /**
-   * Downloads a blob with a content hash {@code digest} to {@code out}.
-   *
-   * @return a future that completes after the download completes (succeeds / fails).
-   */
-  protected abstract ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
-
-  /**
    * Downloads a blob with content hash {@code digest} and stores its content in memory.
    *
    * @return a future that completes after the download completes (succeeds / fails). If successful,
@@ -274,7 +240,7 @@
     ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
     SettableFuture<byte[]> outerF = SettableFuture.create();
     Futures.addCallback(
-        downloadBlob(digest, bOut),
+        cacheProtocol.downloadBlob(digest, bOut),
         new FutureCallback<Void>() {
           @Override
           public void onSuccess(Void aVoid) {
@@ -490,7 +456,7 @@
 
     OutputStream out = new LazyFileOutputStream(path);
     SettableFuture<Void> outerF = SettableFuture.create();
-    ListenableFuture<Void> f = downloadBlob(digest, out);
+    ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out);
     Futures.addCallback(
         f,
         new FutureCallback<Void>() {
@@ -530,7 +496,7 @@
     } else if (result.hasStdoutDigest()) {
       downloads.add(
           Futures.transform(
-              downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
+              cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
               (d) -> null,
               directExecutor()));
     }
@@ -540,7 +506,7 @@
     } else if (result.hasStderrDigest()) {
       downloads.add(
           Futures.transform(
-              downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
+              cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
               (d) -> null,
               directExecutor()));
     }
@@ -857,7 +823,7 @@
      * Adds an action and command protos to upload. They need to be uploaded as part of the action
      * result.
      */
-    public void addAction(SimpleBlobStore.ActionKey actionKey, Action action, Command command) {
+    public void addAction(RemoteCacheClient.ActionKey actionKey, Action action, Command command) {
       digestToBlobs.put(actionKey.getDigest(), action.toByteString());
       digestToBlobs.put(action.getCommandDigest(), command.toByteString());
     }
@@ -993,20 +959,11 @@
     }
   }
 
-  protected void verifyContents(String expectedHash, String actualHash) throws IOException {
-    if (!expectedHash.equals(actualHash)) {
-      String msg =
-          String.format(
-              "An output download failed, because the expected hash"
-                  + "'%s' did not match the received hash '%s'.",
-              expectedHash, actualHash);
-      throw new IOException(msg);
-    }
-  }
-
   /** Release resources associated with the cache. The cache may not be used after calling this. */
   @Override
-  public abstract void close();
+  public void close() {
+    cacheProtocol.close();
+  }
 
   /**
    * Creates an {@link OutputStream} that isn't actually opened until the first data is written.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
similarity index 66%
rename from src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
rename to src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
index 89a3bbc..aeaabac 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
@@ -18,11 +18,12 @@
 import com.google.common.base.Ascii;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpBlobStore;
-import com.google.devtools.build.lib.remote.disk.OnDiskBlobStore;
-import com.google.devtools.build.lib.remote.http.HttpBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpCacheClient;
+import com.google.devtools.build.lib.remote.disk.DiskCacheClient;
+import com.google.devtools.build.lib.remote.http.HttpCacheClient;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import io.netty.channel.unix.DomainSocketAddress;
@@ -31,36 +32,29 @@
 import javax.annotation.Nullable;
 
 /**
- * A factory class for providing a {@link SimpleBlobStore} to be used with {@link
- * SimpleBlobStoreActionCache}. Currently implemented with HTTP or local.
+ * A factory class for providing a {@link RemoteCacheClient}. Currently implemented for HTTP and
+ * disk caching.
  */
-public final class SimpleBlobStoreFactory {
+public final class RemoteCacheClientFactory {
 
-  private SimpleBlobStoreFactory() {}
+  private RemoteCacheClientFactory() {}
 
-  public static SimpleBlobStore create(RemoteOptions remoteOptions, @Nullable Path casPath) {
-    if (isHttpUrlOptions(remoteOptions)) {
-      return createHttp(remoteOptions, /* creds= */ null);
-    } else if (casPath != null) {
-      return new OnDiskBlobStore(casPath);
-    } else {
-      return null;
-    }
-  }
-
-  public static SimpleBlobStore create(
-      RemoteOptions options, @Nullable Credentials creds, Path workingDirectory)
+  public static RemoteCacheClient create(
+      RemoteOptions options,
+      @Nullable Credentials creds,
+      Path workingDirectory,
+      DigestUtil digestUtil)
       throws IOException {
-
     Preconditions.checkNotNull(workingDirectory, "workingDirectory");
     if (isHttpUrlOptions(options) && isDiskCache(options)) {
-      return createCombinedCache(workingDirectory, options.diskCache, options, creds);
+      return createCombinedCache(workingDirectory, options.diskCache, options, creds, digestUtil);
     }
     if (isHttpUrlOptions(options)) {
-      return createHttp(options, creds);
+      return createHttp(options, creds, digestUtil);
     }
     if (isDiskCache(options)) {
-      return createDiskCache(workingDirectory, options.diskCache);
+      return createDiskCache(
+          workingDirectory, options.diskCache, options.remoteVerifyDownloads, digestUtil);
     }
     throw new IllegalArgumentException(
         "Unrecognized RemoteOptions configuration: remote Http cache URL and/or local disk cache"
@@ -71,7 +65,8 @@
     return isHttpUrlOptions(options) || isDiskCache(options);
   }
 
-  private static SimpleBlobStore createHttp(RemoteOptions options, Credentials creds) {
+  private static RemoteCacheClient createHttp(
+      RemoteOptions options, Credentials creds, DigestUtil digestUtil) {
     Preconditions.checkNotNull(options.remoteCache, "remoteCache");
 
     try {
@@ -82,22 +77,26 @@
 
       if (options.remoteProxy != null) {
         if (options.remoteProxy.startsWith("unix:")) {
-          return HttpBlobStore.create(
+          return HttpCacheClient.create(
               new DomainSocketAddress(options.remoteProxy.replaceFirst("^unix:", "")),
               uri,
               options.remoteTimeout,
               options.remoteMaxConnections,
+              options.remoteVerifyDownloads,
               ImmutableList.copyOf(options.remoteHeaders),
+              digestUtil,
               creds);
         } else {
           throw new Exception("Remote cache proxy unsupported: " + options.remoteProxy);
         }
       } else {
-        return HttpBlobStore.create(
+        return HttpCacheClient.create(
             uri,
             options.remoteTimeout,
             options.remoteMaxConnections,
+            options.remoteVerifyDownloads,
             ImmutableList.copyOf(options.remoteHeaders),
+            digestUtil,
             creds);
       }
     } catch (Exception e) {
@@ -105,29 +104,38 @@
     }
   }
 
-  private static SimpleBlobStore createDiskCache(Path workingDirectory, PathFragment diskCachePath)
+  private static RemoteCacheClient createDiskCache(
+      Path workingDirectory,
+      PathFragment diskCachePath,
+      boolean verifyDownloads,
+      DigestUtil digestUtil)
       throws IOException {
     Path cacheDir =
         workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
     if (!cacheDir.exists()) {
       cacheDir.createDirectoryAndParents();
     }
-    return new OnDiskBlobStore(cacheDir);
+    return new DiskCacheClient(cacheDir, verifyDownloads, digestUtil);
   }
 
-  private static SimpleBlobStore createCombinedCache(
-      Path workingDirectory, PathFragment diskCachePath, RemoteOptions options, Credentials cred)
+  private static RemoteCacheClient createCombinedCache(
+      Path workingDirectory,
+      PathFragment diskCachePath,
+      RemoteOptions options,
+      Credentials cred,
+      DigestUtil digestUtil)
       throws IOException {
-
     Path cacheDir =
         workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
     if (!cacheDir.exists()) {
       cacheDir.createDirectoryAndParents();
     }
 
-    OnDiskBlobStore diskCache = new OnDiskBlobStore(cacheDir);
-    SimpleBlobStore httpCache = createHttp(options, cred);
-    return new CombinedDiskHttpBlobStore(diskCache, httpCache);
+    DiskCacheClient diskCache =
+        new DiskCacheClient(cacheDir, options.remoteVerifyDownloads, digestUtil);
+    RemoteCacheClient httpCache = createHttp(options, cred, digestUtil);
+
+    return new CombinedDiskHttpCacheClient(diskCache, httpCache);
   }
 
   private static boolean isDiskCache(RemoteOptions options) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
new file mode 100644
index 0000000..bef7cc9
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -0,0 +1,128 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static java.lang.String.format;
+
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
+import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
+import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.Utils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/** A {@link RemoteCache} with additional functionality needed for remote execution. */
+public class RemoteExecutionCache extends RemoteCache {
+
+  public RemoteExecutionCache(
+      GrpcCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
+    super(protocolImpl, options, digestUtil);
+  }
+
+  private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blobs)
+      throws IOException, InterruptedException {
+    List<ListenableFuture<Void>> uploads = new ArrayList<>();
+
+    for (Map.Entry<Digest, Path> entry : files.entrySet()) {
+      uploads.add(cacheProtocol.uploadFile(entry.getKey(), entry.getValue()));
+    }
+
+    for (Map.Entry<Digest, ByteString> entry : blobs.entrySet()) {
+      uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue()));
+    }
+
+    try {
+      for (ListenableFuture<Void> upload : uploads) {
+        upload.get();
+      }
+    } catch (ExecutionException e) {
+      // Cancel remaining uploads.
+      for (ListenableFuture<Void> upload : uploads) {
+        upload.cancel(/* mayInterruptIfRunning= */ true);
+      }
+
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      Throwables.propagateIfPossible(cause, InterruptedException.class);
+      throw new IOException(cause);
+    }
+  }
+
+  /**
+   * Ensures that the tree structure of the inputs, the input files themselves, and the command are
+   * available in the remote cache, such that the tree can be reassembled and executed on another
+   * machine given the root digest.
+   *
+   * <p>The cache may check whether files or parts of the tree structure are already present, and do
+   * not need to be uploaded again.
+   *
+   * <p>Note that this method is only required for remote execution, not for caching itself.
+   * However, remote execution uses a cache to store input files, and that may be a separate
+   * end-point from the executor itself, so the functionality lives here.
+   */
+  public void ensureInputsPresent(
+      MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot)
+      throws IOException, InterruptedException {
+    Iterable<Digest> allDigests =
+        Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
+    ImmutableSet<Digest> missingDigests =
+        Utils.getFromFuture(cacheProtocol.findMissingDigests(allDigests));
+    Map<Digest, Path> filesToUpload = new HashMap<>();
+    Map<Digest, ByteString> blobsToUpload = new HashMap<>();
+    for (Digest missingDigest : missingDigests) {
+      Directory node = merkleTree.getDirectoryByDigest(missingDigest);
+      if (node != null) {
+        blobsToUpload.put(missingDigest, node.toByteString());
+        continue;
+      }
+
+      PathOrBytes file = merkleTree.getFileByDigest(missingDigest);
+      if (file != null) {
+        if (file.getBytes() != null) {
+          blobsToUpload.put(missingDigest, file.getBytes());
+          continue;
+        }
+        filesToUpload.put(missingDigest, file.getPath());
+        continue;
+      }
+
+      Message message = additionalInputs.get(missingDigest);
+      if (message != null) {
+        blobsToUpload.put(missingDigest, message.toByteString());
+        continue;
+      }
+
+      throw new IOException(
+          format(
+              "findMissingDigests returned a missing digest that has not been requested: %s",
+              missingDigest));
+    }
+
+    uploadMissing(filesToUpload, blobsToUpload);
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 1598e25..4eba377 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -43,6 +43,7 @@
 import com.google.devtools.build.lib.exec.ExecutorBuilder;
 import com.google.devtools.build.lib.packages.TargetUtils;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.logging.LoggingInterceptor;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -160,8 +161,8 @@
     DigestHashFunction hashFn = env.getRuntime().getFileSystem().getDigestFunction();
     DigestUtil digestUtil = new DigestUtil(hashFn);
 
-    boolean enableBlobStoreCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions);
-    boolean enableGrpcCache = GrpcRemoteCache.isRemoteCacheOptions(remoteOptions);
+    boolean enableBlobStoreCache = RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions);
+    boolean enableGrpcCache = GrpcCacheClient.isRemoteCacheOptions(remoteOptions);
     boolean enableRemoteExecution = shouldEnableRemoteExecution(remoteOptions);
     if (enableBlobStoreCache && enableRemoteExecution) {
       throw new AbruptExitException(
@@ -206,7 +207,7 @@
                     interceptors.toArray(new ClientInterceptor[0])));
       }
       RemoteRetrier executeRetrier = null;
-      AbstractRemoteActionCache cache = null;
+      RemoteCacheClient cacheClient = null;
       if (enableGrpcCache || !Strings.isNullOrEmpty(remoteOptions.remoteExecutor)) {
         rpcRetrier =
               new RemoteRetrier(
@@ -259,8 +260,8 @@
                 remoteOptions.remoteTimeout,
                 rpcRetrier);
         cacheChannel.release();
-        cache =
-            new GrpcRemoteCache(
+        cacheClient =
+            new GrpcCacheClient(
                 cacheChannel.retain(),
                 credentials,
                 remoteOptions,
@@ -273,7 +274,7 @@
         buildEventArtifactUploaderFactoryDelegate.init(
             new ByteStreamBuildEventArtifactUploaderFactory(
                 uploader,
-                cache,
+                cacheClient,
                 cacheChannel.authority(),
                 requestContext,
                 remoteOptions.remoteInstanceName));
@@ -281,13 +282,11 @@
 
       if (enableBlobStoreCache) {
         executeRetrier = null;
-        cache =
-            new SimpleBlobStoreActionCache(
+        cacheClient =
+            RemoteCacheClientFactory.create(
                 remoteOptions,
-                SimpleBlobStoreFactory.create(
-                    remoteOptions,
-                    GoogleAuthUtils.newCredentials(authAndTlsOptions),
-                    Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory")),
+                GoogleAuthUtils.newCredentials(authAndTlsOptions),
+                Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"),
                 digestUtil);
       }
 
@@ -306,15 +305,18 @@
                 retrier);
         execChannel.release();
         Preconditions.checkState(
-            cache instanceof GrpcRemoteCache,
+            cacheClient instanceof GrpcCacheClient,
             "Only the gRPC cache is support for remote execution");
+        RemoteExecutionCache remoteCache =
+            new RemoteExecutionCache((GrpcCacheClient) cacheClient, remoteOptions, digestUtil);
         actionContextProvider =
             RemoteActionContextProvider.createForRemoteExecution(
-                env, (GrpcRemoteCache) cache, executor, executeRetrier, digestUtil, logDir);
-      } else if (cache != null) {
+                env, remoteCache, executor, executeRetrier, digestUtil, logDir);
+      } else if (cacheClient != null) {
+        RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
         actionContextProvider =
             RemoteActionContextProvider.createForRemoteCaching(
-                env, cache, executeRetrier, digestUtil);
+                env, remoteCache, executeRetrier, digestUtil);
       }
     } catch (IOException e) {
       env.getReporter().handle(Event.error(e.getMessage()));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
index 3346b27..fc8064f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -20,7 +20,6 @@
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import io.grpc.Status;
-import io.grpc.StatusException;
 import io.grpc.StatusRuntimeException;
 import java.io.IOException;
 import java.time.Duration;
@@ -28,16 +27,28 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 
 /** Specific retry logic for remote execution/caching. */
 public class RemoteRetrier extends Retrier {
 
+  @Nullable
+  private static Status fromException(Exception e) {
+    for (Throwable cause = e; cause != null; cause = cause.getCause()) {
+      if (cause instanceof StatusRuntimeException) {
+        return ((StatusRuntimeException) cause).getStatus();
+      }
+    }
+    return null;
+  }
+
   public static final Predicate<? super Exception> RETRIABLE_GRPC_ERRORS =
       e -> {
-        if (!(e instanceof StatusException) && !(e instanceof StatusRuntimeException)) {
+        Status s = fromException(e);
+        if (s == null) {
+          // It's not a gRPC error.
           return false;
         }
-        Status s = Status.fromThrowable(e);
         switch (s.getCode()) {
           case CANCELLED:
             return !Thread.currentThread().isInterrupted();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index 52fb136..d32a79f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -46,7 +46,7 @@
 import com.google.devtools.build.lib.profiler.ProfilerTask;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -75,7 +75,7 @@
   private final Path execRoot;
   private final RemoteOptions options;
 
-  private final AbstractRemoteActionCache remoteCache;
+  private final RemoteCache remoteCache;
   private final String buildRequestId;
   private final String commandId;
 
@@ -94,7 +94,7 @@
   RemoteSpawnCache(
       Path execRoot,
       RemoteOptions options,
-      AbstractRemoteActionCache remoteCache,
+      RemoteCache remoteCache,
       String buildRequestId,
       String commandId,
       @Nullable Reporter cmdlineReporter,
@@ -153,7 +153,7 @@
       try {
         ActionResult result;
         try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
-          result = remoteCache.getCachedActionResult(actionKey);
+          result = remoteCache.downloadActionResult(actionKey);
         }
         // In case the remote cache returned a failed action (exit code != 0) we treat it as a
         // cache miss
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index cef0be7..bd35367 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -57,7 +57,7 @@
 import com.google.devtools.build.lib.profiler.ProfilerTask;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -99,7 +99,7 @@
   private final boolean verboseFailures;
 
   @Nullable private final Reporter cmdlineReporter;
-  private final GrpcRemoteCache remoteCache;
+  private final RemoteExecutionCache remoteCache;
   @Nullable private final GrpcRemoteExecutor remoteExecutor;
   @Nullable private final RemoteRetrier retrier;
   private final String buildRequestId;
@@ -125,7 +125,7 @@
       @Nullable Reporter cmdlineReporter,
       String buildRequestId,
       String commandId,
-      GrpcRemoteCache remoteCache,
+      RemoteExecutionCache remoteCache,
       GrpcRemoteExecutor remoteExecutor,
       @Nullable RemoteRetrier retrier,
       DigestUtil digestUtil,
@@ -191,7 +191,7 @@
         // Try to lookup the action in the action cache.
         ActionResult cachedResult;
         try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
-          cachedResult = acceptCachedResult ? remoteCache.getCachedActionResult(actionKey) : null;
+          cachedResult = acceptCachedResult ? remoteCache.downloadActionResult(actionKey) : null;
         }
         if (cachedResult != null) {
           if (cachedResult.getExitCode() != 0) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
deleted file mode 100644
index 363340c..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import build.bazel.remote.execution.v2.ActionResult;
-import build.bazel.remote.execution.v2.Digest;
-import build.bazel.remote.execution.v2.Directory;
-import build.bazel.remote.execution.v2.DirectoryNode;
-import build.bazel.remote.execution.v2.FileNode;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.hash.HashingOutputStream;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
-import com.google.devtools.build.lib.remote.options.RemoteOptions;
-import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.build.lib.remote.util.Utils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.io.OutputStream;
-import javax.annotation.Nullable;
-
-/**
- * A RemoteActionCache implementation that uses a simple blob store for files and action output.
- *
- * <p>The thread safety is guaranteed by the underlying simple blob store.
- *
- * <p>Note that this class is used from src/tools/remote.
- */
-@ThreadSafe
-public class SimpleBlobStoreActionCache extends AbstractRemoteActionCache {
-  protected final SimpleBlobStore blobStore;
-
-  public SimpleBlobStoreActionCache(
-      RemoteOptions options, SimpleBlobStore blobStore, DigestUtil digestUtil) {
-    super(options, digestUtil);
-    this.blobStore = blobStore;
-  }
-
-  public void downloadTree(Digest rootDigest, Path rootLocation)
-      throws IOException, InterruptedException {
-    rootLocation.createDirectoryAndParents();
-    Directory directory = Directory.parseFrom(Utils.getFromFuture(downloadBlob(rootDigest)));
-    for (FileNode file : directory.getFilesList()) {
-      Path dst = rootLocation.getRelative(file.getName());
-      Utils.getFromFuture(downloadFile(dst, file.getDigest()));
-      dst.setExecutable(file.getIsExecutable());
-    }
-    for (DirectoryNode child : directory.getDirectoriesList()) {
-      downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
-    }
-  }
-
-  @Override
-  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
-    return blobStore.uploadFile(digest, file);
-  }
-
-  @Override
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
-    return blobStore.uploadBlob(digest, data);
-  }
-
-  @Override
-  public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
-    return Futures.immediateFuture(ImmutableSet.copyOf(digests));
-  }
-
-  @Override
-  public ActionResult getCachedActionResult(ActionKey actionKey)
-      throws IOException, InterruptedException {
-    return Utils.getFromFuture(blobStore.downloadActionResult(actionKey));
-  }
-
-  public void setCachedActionResult(ActionKey actionKey, ActionResult result)
-      throws IOException, InterruptedException {
-    blobStore.uploadActionResult(actionKey, result);
-  }
-
-  @Override
-  public void close() {
-    blobStore.close();
-  }
-
-  @Override
-  protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
-    SettableFuture<Void> outerF = SettableFuture.create();
-    @Nullable
-    HashingOutputStream hashOut =
-        options.remoteVerifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
-    Futures.addCallback(
-        blobStore.downloadBlob(digest, hashOut != null ? hashOut : out),
-        new FutureCallback<Void>() {
-          @Override
-          public void onSuccess(Void unused) {
-            try {
-              if (hashOut != null) {
-                verifyContents(digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
-              }
-              out.flush();
-              outerF.set(null);
-            } catch (IOException e) {
-              outerF.setException(e);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable throwable) {
-            outerF.setException(throwable);
-          }
-        },
-        MoreExecutors.directExecutor());
-    return outerF;
-  }
-
-  public DigestUtil getDigestUtil() {
-    return digestUtil;
-  }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
deleted file mode 100644
index c840c0d..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
+++ /dev/null
@@ -1,24 +0,0 @@
-load("@rules_java//java:defs.bzl", "java_library")
-
-package(default_visibility = ["//src:__subpackages__"])
-
-filegroup(
-    name = "srcs",
-    srcs = glob(["**"]),
-    visibility = ["//src/main/java/com/google/devtools/build/lib/remote:__pkg__"],
-)
-
-java_library(
-    name = "blobstore",
-    srcs = glob(["*.java"]),
-    tags = ["bazel"],
-    deps = [
-        "//src/main/java/com/google/devtools/build/lib/remote/common",
-        "//src/main/java/com/google/devtools/build/lib/remote/util",
-        "//src/main/java/com/google/devtools/build/lib/vfs",
-        "//src/main/java/com/google/devtools/common/options",
-        "//third_party:guava",
-        "//third_party/protobuf:protobuf_java",
-        "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
-    ],
-)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
deleted file mode 100644
index 2aaa588..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote.blobstore;
-
-
-/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
diff --git a/src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java b/src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
similarity index 92%
rename from src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java
rename to src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
index 1a6bd9e..c9c1d40 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
@@ -11,14 +11,14 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-package com.google.devtools.build.lib.remote;
+package com.google.devtools.build.lib.remote.common;
 
 import build.bazel.remote.execution.v2.Digest;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
 
 /** Supports querying a remote cache whether it contains a list of blobs. */
-interface MissingDigestsFinder {
+public interface MissingDigestsFinder {
 
   /**
    * Returns a set of digests that the remote cache does not know about. The returned set is
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
similarity index 90%
rename from src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index da6e6de..2c1bb83 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -29,13 +29,14 @@
  *
  * <p>Implementations must be thread-safe.
  */
-public interface SimpleBlobStore {
+public interface RemoteCacheClient extends MissingDigestsFinder {
 
   /**
    * A key in the remote action cache. The type wraps around a {@link Digest} of an {@link Action}.
    * Action keys are special in that they aren't content-addressable but refer to action results.
    */
   final class ActionKey {
+
     private final Digest digest;
 
     public Digest getDigest() {
@@ -45,6 +46,21 @@
     public ActionKey(Digest digest) {
       this.digest = Preconditions.checkNotNull(digest, "digest");
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ActionKey)) {
+        return false;
+      }
+
+      ActionKey otherKey = (ActionKey) other;
+      return digest.equals(otherKey.digest);
+    }
+
+    @Override
+    public int hashCode() {
+      return digest.hashCode();
+    }
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
similarity index 80%
rename from src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
index 2f38e3a..cc1c068 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
@@ -16,10 +16,11 @@
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.Digest;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -28,16 +29,16 @@
 import java.util.concurrent.ExecutionException;
 
 /**
- * A {@link SimpleBlobStore} implementation combining two blob stores. A local disk blob store and a
- * remote blob store. If a blob isn't found in the first store, the second store is used, and the
+ * A {@link RemoteCacheClient} implementation combining two blob stores. A local disk blob store and
+ * a remote blob store. If a blob isn't found in the first store, the second store is used, and the
  * blob added to the first. Put puts the blob on both stores.
  */
-public final class CombinedDiskHttpBlobStore implements SimpleBlobStore {
+public final class CombinedDiskHttpCacheClient implements RemoteCacheClient {
 
-  private final SimpleBlobStore remoteCache;
-  private final OnDiskBlobStore diskCache;
+  private final RemoteCacheClient remoteCache;
+  private final DiskCacheClient diskCache;
 
-  public CombinedDiskHttpBlobStore(OnDiskBlobStore diskCache, SimpleBlobStore remoteCache) {
+  public CombinedDiskHttpCacheClient(DiskCacheClient diskCache, RemoteCacheClient remoteCache) {
     this.diskCache = Preconditions.checkNotNull(diskCache);
     this.remoteCache = Preconditions.checkNotNull(remoteCache);
   }
@@ -81,6 +82,20 @@
     return Futures.immediateFuture(null);
   }
 
+  @Override
+  public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+    ListenableFuture<ImmutableSet<Digest>> remoteQuery = remoteCache.findMissingDigests(digests);
+    ListenableFuture<ImmutableSet<Digest>> diskQuery = diskCache.findMissingDigests(digests);
+    return Futures.whenAllSucceed(remoteQuery, diskQuery)
+        .call(
+            () ->
+                ImmutableSet.<Digest>builder()
+                    .addAll(remoteQuery.get())
+                    .addAll(diskQuery.get())
+                    .build(),
+            MoreExecutors.directExecutor());
+  }
+
   private Path newTempPath() {
     return diskCache.toPath(UUID.randomUUID().toString(), /* actionResult= */ false);
   }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
similarity index 76%
rename from src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
index ddd0cec..68ad316 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -15,11 +15,15 @@
 
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.Digest;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashingOutputStream;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.Utils;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
@@ -27,14 +31,21 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.UUID;
+import javax.annotation.Nullable;
 
 /** A on-disk store for the remote action cache. */
-public class OnDiskBlobStore implements SimpleBlobStore {
-  private final Path root;
+public class DiskCacheClient implements RemoteCacheClient {
+
   private static final String ACTION_KEY_PREFIX = "ac_";
 
-  public OnDiskBlobStore(Path root) {
+  private final Path root;
+  private final boolean verifyDownloads;
+  private final DigestUtil digestUtil;
+
+  public DiskCacheClient(Path root, boolean verifyDownloads, DigestUtil digestUtil) {
     this.root = root;
+    this.verifyDownloads = verifyDownloads;
+    this.digestUtil = digestUtil;
   }
 
   /** Returns {@code true} if the provided {@code key} is stored in the CAS. */
@@ -68,7 +79,23 @@
 
   @Override
   public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
-    return download(digest, out, /* isActionCache= */ false);
+    @Nullable
+    HashingOutputStream hashOut = verifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
+    return Futures.transformAsync(
+        download(digest, hashOut != null ? hashOut : out, /* isActionCache= */ false),
+        (v) -> {
+          try {
+            if (hashOut != null) {
+              Utils.verifyBlobContents(
+                  digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
+            }
+            out.flush();
+            return Futures.immediateFuture(null);
+          } catch (IOException e) {
+            return Futures.immediateFailedFuture(e);
+          }
+        },
+        MoreExecutors.directExecutor());
   }
 
   @Override
@@ -108,6 +135,11 @@
     return Futures.immediateFuture(null);
   }
 
+  @Override
+  public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+    return Futures.immediateFuture(ImmutableSet.copyOf(digests));
+  }
+
   protected Path toPath(String key, boolean actionResult) {
     return root.getChild(getDiskKey(key, actionResult));
   }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
index 9361235..e82e236 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
@@ -103,7 +103,7 @@
     if (!uri.getPath().endsWith("/")) {
       builder.append("/");
     }
-    builder.append(isCas ? HttpBlobStore.CAS_PREFIX : HttpBlobStore.AC_PREFIX);
+    builder.append(isCas ? HttpCacheClient.CAS_PREFIX : HttpCacheClient.AC_PREFIX);
     builder.append(hash);
     return builder.toString();
   }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
similarity index 92%
rename from src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 19e9bd9..c0f0051 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -17,11 +17,15 @@
 import build.bazel.remote.execution.v2.Digest;
 import com.google.auth.Credentials;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashingOutputStream;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.Utils;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
@@ -84,7 +88,7 @@
 import javax.net.ssl.SSLEngine;
 
 /**
- * Implementation of {@link SimpleBlobStore} that can talk to a HTTP/1.1 backend.
+ * Implementation of {@link RemoteCacheClient} that can talk to a HTTP/1.1 backend.
  *
  * <p>Blobs (Binary large objects) are uploaded using the {@code PUT} method. Action cache blobs are
  * stored under the path {@code /ac/base16-key}. CAS (Content Addressable Storage) blobs are stored
@@ -107,7 +111,7 @@
  *
  * <p>The implementation currently does not support transfer encoding chunked.
  */
-public final class HttpBlobStore implements SimpleBlobStore {
+public final class HttpCacheClient implements RemoteCacheClient {
 
   public static final String AC_PREFIX = "ac/";
   public static final String CAS_PREFIX = "cas/";
@@ -122,6 +126,8 @@
   private final int timeoutSeconds;
   private final ImmutableList<Entry<String, String>> extraHttpHeaders;
   private final boolean useTls;
+  private final boolean verifyDownloads;
+  private final DigestUtil digestUtil;
 
   private final Object closeLock = new Object();
 
@@ -136,51 +142,61 @@
   @GuardedBy("credentialsLock")
   private long lastRefreshTime;
 
-  public static HttpBlobStore create(
+  public static HttpCacheClient create(
       URI uri,
       int timeoutSeconds,
       int remoteMaxConnections,
+      boolean verifyDownloads,
       ImmutableList<Entry<String, String>> extraHttpHeaders,
+      DigestUtil digestUtil,
       @Nullable final Credentials creds)
       throws Exception {
-    return new HttpBlobStore(
+    return new HttpCacheClient(
         NioEventLoopGroup::new,
         NioSocketChannel.class,
         uri,
         timeoutSeconds,
         remoteMaxConnections,
+        verifyDownloads,
         extraHttpHeaders,
+        digestUtil,
         creds,
         null);
   }
 
-  public static HttpBlobStore create(
+  public static HttpCacheClient create(
       DomainSocketAddress domainSocketAddress,
       URI uri,
       int timeoutSeconds,
       int remoteMaxConnections,
+      boolean verifyDownloads,
       ImmutableList<Entry<String, String>> extraHttpHeaders,
+      DigestUtil digestUtil,
       @Nullable final Credentials creds)
       throws Exception {
 
     if (KQueue.isAvailable()) {
-      return new HttpBlobStore(
+      return new HttpCacheClient(
           KQueueEventLoopGroup::new,
           KQueueDomainSocketChannel.class,
           uri,
           timeoutSeconds,
           remoteMaxConnections,
+          verifyDownloads,
           extraHttpHeaders,
+          digestUtil,
           creds,
           domainSocketAddress);
     } else if (Epoll.isAvailable()) {
-      return new HttpBlobStore(
+      return new HttpCacheClient(
           EpollEventLoopGroup::new,
           EpollDomainSocketChannel.class,
           uri,
           timeoutSeconds,
           remoteMaxConnections,
+          verifyDownloads,
           extraHttpHeaders,
+          digestUtil,
           creds,
           domainSocketAddress);
     } else {
@@ -188,13 +204,15 @@
     }
   }
 
-  private HttpBlobStore(
+  private HttpCacheClient(
       Function<Integer, EventLoopGroup> newEventLoopGroup,
       Class<? extends Channel> channelClass,
       URI uri,
       int timeoutSeconds,
       int remoteMaxConnections,
+      boolean verifyDownloads,
       ImmutableList<Entry<String, String>> extraHttpHeaders,
+      DigestUtil digestUtil,
       @Nullable final Credentials creds,
       @Nullable SocketAddress socketAddress)
       throws Exception {
@@ -261,6 +279,8 @@
     this.creds = creds;
     this.timeoutSeconds = timeoutSeconds;
     this.extraHttpHeaders = extraHttpHeaders;
+    this.verifyDownloads = verifyDownloads;
+    this.digestUtil = digestUtil;
   }
 
   @SuppressWarnings("FutureReturnValueIgnored")
@@ -419,7 +439,23 @@
 
   @Override
   public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
-    return get(digest, out, /* casDownload= */ true);
+    final HashingOutputStream hashOut =
+        verifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
+    return Futures.transformAsync(
+        get(digest, hashOut != null ? hashOut : out, /* casDownload= */ true),
+        (v) -> {
+          try {
+            if (hashOut != null) {
+              Utils.verifyBlobContents(
+                  digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
+            }
+            out.flush();
+            return Futures.immediateFuture(null);
+          } catch (IOException e) {
+            return Futures.immediateFailedFuture(e);
+          }
+        },
+        MoreExecutors.directExecutor());
   }
 
   @SuppressWarnings("FutureReturnValueIgnored")
@@ -607,6 +643,11 @@
     return Futures.immediateFuture(null);
   }
 
+  @Override
+  public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+    return Futures.immediateFuture(ImmutableSet.copyOf(digests));
+  }
+
   @SuppressWarnings("FutureReturnValueIgnored")
   private void putAfterCredentialRefresh(UploadCommand cmd) throws InterruptedException {
     Channel ch = null;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
index 6fc1446..ee3f06d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -23,7 +23,7 @@
 import com.google.common.io.BaseEncoding;
 import com.google.devtools.build.lib.actions.cache.DigestUtils;
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.vfs.DigestHashFunction;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.Message;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
index 7191511..e707260 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
@@ -18,7 +18,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import io.grpc.ClientInterceptor;
 import io.grpc.Context;
 import io.grpc.Contexts;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index f1c3083..79f7c68 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -26,7 +26,7 @@
 import com.google.devtools.build.lib.actions.SpawnResult;
 import com.google.devtools.build.lib.actions.SpawnResult.Status;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.protobuf.ByteString;
@@ -149,6 +149,17 @@
         .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
   }
 
+  public static void verifyBlobContents(String expectedHash, String actualHash) throws IOException {
+    if (!expectedHash.equals(actualHash)) {
+      String msg =
+          String.format(
+              "An output download failed, because the expected hash"
+                  + "'%s' did not match the received hash '%s'.",
+              expectedHash, actualHash);
+      throw new IOException(msg);
+    }
+  }
+
   /** An in-memory output file. */
   public static final class InMemoryOutput {
     private final ActionInput output;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD
index 267072f..7421605 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD
@@ -51,7 +51,6 @@
         "//src/main/java/com/google/devtools/build/lib/buildeventstream",
         "//src/main/java/com/google/devtools/build/lib/clock",
         "//src/main/java/com/google/devtools/build/lib/remote",
-        "//src/main/java/com/google/devtools/build/lib/remote/blobstore",
         "//src/main/java/com/google/devtools/build/lib/remote/common",
         "//src/main/java/com/google/devtools/build/lib/remote/disk",
         "//src/main/java/com/google/devtools/build/lib/remote/http",
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index dc6b4e4..5affb4b 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -46,6 +46,7 @@
 import com.google.devtools.build.lib.clock.JavaClock;
 import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff;
 import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TestUtils;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
similarity index 84%
rename from src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
rename to src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index 4d8119ef..fffbec8 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -58,7 +58,7 @@
 import com.google.devtools.build.lib.clock.JavaClock;
 import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
 import com.google.devtools.build.lib.remote.Retrier.Backoff;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -88,6 +88,7 @@
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.stub.StreamObserver;
 import io.grpc.util.MutableHandlerRegistry;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -107,9 +108,9 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-/** Tests for {@link GrpcRemoteCache}. */
+/** Tests for {@link GrpcCacheClient}. */
 @RunWith(JUnit4.class)
-public class GrpcRemoteCacheTest {
+public class GrpcCacheClientTest {
 
   private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256);
 
@@ -183,15 +184,15 @@
     }
   }
 
-  private GrpcRemoteCache newClient() throws IOException {
+  private GrpcCacheClient newClient() throws IOException {
     return newClient(Options.getDefaults(RemoteOptions.class));
   }
 
-  private GrpcRemoteCache newClient(RemoteOptions remoteOptions) throws IOException {
+  private GrpcCacheClient newClient(RemoteOptions remoteOptions) throws IOException {
     return newClient(remoteOptions, () -> new ExponentialBackoff(remoteOptions));
   }
 
-  private GrpcRemoteCache newClient(RemoteOptions remoteOptions, Supplier<Backoff> backoffSupplier)
+  private GrpcCacheClient newClient(RemoteOptions remoteOptions, Supplier<Backoff> backoffSupplier)
       throws IOException {
     AuthAndTLSOptions authTlsOptions = Options.getDefaults(AuthAndTLSOptions.class);
     authTlsOptions.useGoogleDefaultCredentials = true;
@@ -214,22 +215,35 @@
         TestUtils.newRemoteRetrier(
             backoffSupplier, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService);
     ReferenceCountedChannel channel =
-        new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor()
-            .intercept(new CallCredentialsInterceptor(creds)).build());
+        new ReferenceCountedChannel(
+            InProcessChannelBuilder.forName(fakeServerName)
+                .directExecutor()
+                .intercept(new CallCredentialsInterceptor(creds))
+                .build());
     ByteStreamUploader uploader =
-        new ByteStreamUploader(remoteOptions.remoteInstanceName, channel.retain(), creds,
-            remoteOptions.remoteTimeout, retrier);
-    return new GrpcRemoteCache(channel.retain(),
-        creds,
-        remoteOptions,
-        retrier,
-        DIGEST_UTIL,
-        uploader);
+        new ByteStreamUploader(
+            remoteOptions.remoteInstanceName,
+            channel.retain(),
+            creds,
+            remoteOptions.remoteTimeout,
+            retrier);
+    return new GrpcCacheClient(
+        channel.retain(), creds, remoteOptions, retrier, DIGEST_UTIL, uploader);
+  }
+
+  private static byte[] downloadBlob(GrpcCacheClient cacheClient, Digest digest)
+      throws IOException, InterruptedException {
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      getFromFuture(cacheClient.downloadBlob(digest, out));
+      return out.toByteArray();
+    }
   }
 
   @Test
   public void testVirtualActionInputSupport() throws Exception {
-    GrpcRemoteCache client = newClient();
+    RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+    RemoteExecutionCache client =
+        new RemoteExecutionCache(newClient(options), options, DIGEST_UTIL);
     PathFragment execPath = PathFragment.create("my/exec/path");
     VirtualActionInput virtualActionInput = new StringActionInput("hello", execPath);
     MerkleTree merkleTree =
@@ -289,15 +303,15 @@
 
   @Test
   public void testDownloadEmptyBlob() throws Exception {
-    GrpcRemoteCache client = newClient();
+    GrpcCacheClient client = newClient();
     Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
     // Will not call the mock Bytestream interface at all.
-    assertThat(getFromFuture(client.downloadBlob(emptyDigest))).isEmpty();
+    assertThat(downloadBlob(client, emptyDigest)).isEmpty();
   }
 
   @Test
   public void testDownloadBlobSingleChunk() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    final GrpcCacheClient client = newClient();
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -309,12 +323,12 @@
             responseObserver.onCompleted();
           }
         });
-    assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
+    assertThat(new String(downloadBlob(client, digest), UTF_8)).isEqualTo("abcdefg");
   }
 
   @Test
   public void testDownloadBlobMultipleChunks() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    final GrpcCacheClient client = newClient();
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -330,12 +344,15 @@
             responseObserver.onCompleted();
           }
         });
-    assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
+    assertThat(new String(downloadBlob(client, digest), UTF_8)).isEqualTo("abcdefg");
   }
 
   @Test
   public void testDownloadAllResults() throws Exception {
-    GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
     Digest barDigest = DIGEST_UTIL.computeAsUtf8("bar-contents");
     Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
@@ -346,7 +363,7 @@
     result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
     result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest);
     result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest);
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar"))).isEqualTo(barDigest);
@@ -355,7 +372,10 @@
 
   @Test
   public void testDownloadDirectory() throws Exception {
-    GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
     Digest quxDigest = DIGEST_UTIL.computeAsUtf8("qux-contents");
     Tree barTreeMessage =
@@ -379,7 +399,7 @@
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
     result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
 
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/qux"))).isEqualTo(quxDigest);
@@ -388,7 +408,10 @@
 
   @Test
   public void testDownloadDirectoryEmpty() throws Exception {
-    GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     Tree barTreeMessage = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
     Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
     serviceRegistry.addService(
@@ -397,14 +420,17 @@
 
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
 
     assertThat(execRoot.getRelative("a/bar").isDirectory()).isTrue();
   }
 
   @Test
   public void testDownloadDirectoryNested() throws Exception {
-    GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
     Digest quxDigest = DIGEST_UTIL.computeAsUtf8("qux-contents");
     Directory wobbleDirMessage =
@@ -436,7 +462,7 @@
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
     result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
 
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
     assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/wobble/qux"))).isEqualTo(quxDigest);
@@ -494,7 +520,10 @@
 
   @Test
   public void testUploadDirectory() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
     final Digest quxDigest =
@@ -540,7 +569,7 @@
           }
         });
 
-    ActionResult result = uploadDirectory(client, ImmutableList.<Path>of(fooFile, barDir));
+    ActionResult result = uploadDirectory(remoteCache, ImmutableList.<Path>of(fooFile, barDir));
     ActionResult.Builder expectedResult = ActionResult.newBuilder();
     expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
     expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
@@ -549,7 +578,10 @@
 
   @Test
   public void testUploadDirectoryEmpty() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     final Digest barDigest =
         fakeFileCache.createScratchInputDirectory(
             ActionInputHelper.fromPath("bar"),
@@ -577,7 +609,7 @@
           }
         });
 
-    ActionResult result = uploadDirectory(client, ImmutableList.<Path>of(barDir));
+    ActionResult result = uploadDirectory(remoteCache, ImmutableList.<Path>of(barDir));
     ActionResult.Builder expectedResult = ActionResult.newBuilder();
     expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
     assertThat(result).isEqualTo(expectedResult.build());
@@ -585,7 +617,10 @@
 
   @Test
   public void testUploadDirectoryNested() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     final Digest wobbleDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz");
     final Digest quxDigest =
@@ -636,23 +671,26 @@
           }
         });
 
-    ActionResult result = uploadDirectory(client, ImmutableList.of(barDir));
+    ActionResult result = uploadDirectory(remoteCache, ImmutableList.of(barDir));
     ActionResult.Builder expectedResult = ActionResult.newBuilder();
     expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
     assertThat(result).isEqualTo(expectedResult.build());
   }
 
-  private ActionResult uploadDirectory(GrpcRemoteCache client, List<Path> outputs)
+  private ActionResult uploadDirectory(RemoteCache remoteCache, List<Path> outputs)
       throws Exception {
     Action action = Action.getDefaultInstance();
     ActionKey actionKey = DIGEST_UTIL.computeActionKey(action);
     Command cmd = Command.getDefaultInstance();
-    return client.upload(actionKey, action, cmd, execRoot, outputs, outErr);
+    return remoteCache.upload(actionKey, action, cmd, execRoot, outputs, outErr);
   }
 
   @Test
   public void testUpload() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
     final Digest barDigest =
@@ -698,7 +736,7 @@
         });
 
     ActionResult result =
-        client.upload(
+        remoteCache.upload(
             DIGEST_UTIL.asActionKey(actionDigest),
             action,
             command,
@@ -719,6 +757,11 @@
 
   @Test
   public void testUploadSplitMissingDigestsCall() throws Exception {
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    remoteOptions.maxOutboundMessageSize = 80; // Enough for one digest, but not two.
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
     final Digest barDigest =
@@ -754,11 +797,8 @@
           }
         });
 
-    RemoteOptions options = Options.getDefaults(RemoteOptions.class);
-    options.maxOutboundMessageSize = 80; // Enough for one digest, but not two.
-    final GrpcRemoteCache client = newClient(options);
     ActionResult result =
-        client.upload(
+        remoteCache.upload(
             DIGEST_UTIL.asActionKey(actionDigest),
             action,
             command,
@@ -778,7 +818,10 @@
 
   @Test
   public void testUploadCacheMissesWithRetries() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    GrpcCacheClient client = newClient(remoteOptions);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
+
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
     final Digest barDigest =
@@ -903,7 +946,7 @@
                 }))
         .when(mockByteStreamImpl)
         .queryWriteStatus(any(), any());
-    client.upload(
+    remoteCache.upload(
         actionKey,
         Action.getDefaultInstance(),
         Command.getDefaultInstance(),
@@ -917,7 +960,7 @@
 
   @Test
   public void testGetCachedActionResultWithRetries() throws Exception {
-    final GrpcRemoteCache client = newClient();
+    final GrpcCacheClient client = newClient();
     ActionKey actionKey = DIGEST_UTIL.asActionKey(DIGEST_UTIL.computeAsUtf8("key"));
     serviceRegistry.addService(
         new ActionCacheImplBase() {
@@ -930,13 +973,13 @@
                 (numErrors-- <= 0 ? Status.NOT_FOUND : Status.UNAVAILABLE).asRuntimeException());
           }
         });
-    assertThat(client.getCachedActionResult(actionKey)).isNull();
+    assertThat(getFromFuture(client.downloadActionResult(actionKey))).isNull();
   }
 
   @Test
   public void downloadBlobIsRetriedWithProgress() throws IOException, InterruptedException {
     Backoff mockBackoff = Mockito.mock(Backoff.class);
-    final GrpcRemoteCache client =
+    final GrpcCacheClient client =
         newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
@@ -959,16 +1002,15 @@
             }
           }
         });
-    assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
+    assertThat(new String(downloadBlob(client, digest), UTF_8)).isEqualTo("abcdefg");
     Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis();
   }
 
   @Test
-  public void downloadBlobPassesThroughDeadlineExceededWithoutProgress()
-      throws IOException, InterruptedException {
+  public void downloadBlobPassesThroughDeadlineExceededWithoutProgress() throws IOException {
     Backoff mockBackoff = Mockito.mock(Backoff.class);
     Mockito.when(mockBackoff.nextDelayMillis()).thenReturn(-1L);
-    final GrpcRemoteCache client =
+    final GrpcCacheClient client =
         newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
@@ -984,19 +1026,60 @@
             responseObserver.onError(Status.DEADLINE_EXCEEDED.asException());
           }
         });
-    IOException e =
-        assertThrows(IOException.class, () -> getFromFuture(client.downloadBlob(digest)));
+    IOException e = assertThrows(IOException.class, () -> downloadBlob(client, digest));
     Status st = Status.fromThrowable(e);
     assertThat(st.getCode()).isEqualTo(Status.Code.DEADLINE_EXCEEDED);
     Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis();
   }
 
   @Test
+  public void testDownloadFailsOnDigestMismatch() throws Exception {
+    // Test that the download fails when a blob/file has a different content hash than expected.
+
+    GrpcCacheClient client = newClient();
+    Digest digest = DIGEST_UTIL.computeAsUtf8("foo");
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+            ByteString data = ByteString.copyFromUtf8("bar");
+            responseObserver.onNext(ReadResponse.newBuilder().setData(data).build());
+            responseObserver.onCompleted();
+          }
+        });
+    IOException e = assertThrows(IOException.class, () -> downloadBlob(client, digest));
+    assertThat(e).hasMessageThat().contains(digest.getHash());
+    assertThat(e).hasMessageThat().contains(DIGEST_UTIL.computeAsUtf8("bar").getHash());
+  }
+
+  @Test
+  public void testDisablingDigestVerification() throws Exception {
+    // Test that when digest verification is disabled a corrupted download works.
+
+    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    remoteOptions.remoteVerifyDownloads = false;
+
+    GrpcCacheClient client = newClient(remoteOptions);
+    Digest digest = DIGEST_UTIL.computeAsUtf8("foo");
+    ByteString downloadContents = ByteString.copyFromUtf8("bar");
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+            responseObserver.onNext(ReadResponse.newBuilder().setData(downloadContents).build());
+            responseObserver.onCompleted();
+          }
+        });
+
+    assertThat(downloadBlob(client, digest)).isEqualTo(downloadContents.toByteArray());
+  }
+
+  @Test
   public void isRemoteCacheOptionsWhenGrpcEnabled() {
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "grpc://some-host.com";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isTrue();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isTrue();
   }
 
   @Test
@@ -1004,7 +1087,7 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "GRPC://some-host.com";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isTrue();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isTrue();
   }
 
   @Test
@@ -1012,7 +1095,7 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "localhost:1234";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isTrue();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isTrue();
   }
 
   @Test
@@ -1020,7 +1103,7 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "some-host.com:1234";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isTrue();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isTrue();
   }
 
   @Test
@@ -1028,7 +1111,7 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "http://some-host.com";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isFalse();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isFalse();
   }
 
   @Test
@@ -1036,7 +1119,7 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "HTTP://some-host.com";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isFalse();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isFalse();
   }
 
   @Test
@@ -1044,7 +1127,7 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "https://some-host.com";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isFalse();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isFalse();
   }
 
   @Test
@@ -1053,7 +1136,7 @@
     options.remoteCache = "grp://some-host.com";
 
     // TODO(ishikhman): add proper vaildation and flip to false
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isTrue();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isTrue();
   }
 
   @Test
@@ -1062,7 +1145,7 @@
     options.remoteCache = "grpcsss://some-host.com";
 
     // TODO(ishikhman): add proper vaildation and flip to false
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isTrue();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isTrue();
   }
 
   @Test
@@ -1070,13 +1153,13 @@
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteCache = "";
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isFalse();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isFalse();
   }
 
   @Test
   public void isRemoteCacheOptionsWhenRemoteCacheDisabled() {
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
 
-    assertThat(GrpcRemoteCache.isRemoteCacheOptions(options)).isFalse();
+    assertThat(GrpcCacheClient.isRemoteCacheOptions(options)).isFalse();
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index e1b1d8e..a60c71e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -19,6 +19,7 @@
 import static org.mockito.AdditionalAnswers.answerVoid;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
@@ -213,7 +214,8 @@
             RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
             retryService);
     ReferenceCountedChannel channel =
-        new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor().build());
+        new ReferenceCountedChannel(
+            InProcessChannelBuilder.forName(fakeServerName).directExecutor().build());
     GrpcRemoteExecutor executor =
         new GrpcRemoteExecutor(channel.retain(), null, retrier);
     CallCredentials creds =
@@ -221,8 +223,10 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(remoteOptions.remoteInstanceName, channel.retain(), creds,
             remoteOptions.remoteTimeout, retrier);
-    GrpcRemoteCache remoteCache =
-        new GrpcRemoteCache(channel.retain(), creds, remoteOptions, retrier, DIGEST_UTIL, uploader);
+    GrpcCacheClient cacheProtocol =
+        new GrpcCacheClient(channel.retain(), creds, remoteOptions, retrier, DIGEST_UTIL, uploader);
+    RemoteExecutionCache remoteCache =
+        new RemoteExecutionCache(cacheProtocol, remoteOptions, DIGEST_UTIL);
     client =
         new RemoteSpawnRunner(
             execRoot,
@@ -441,7 +445,7 @@
         return new StreamObserver<WriteRequest>() {
           @Override
           public void onNext(WriteRequest request) {
-            assertThat(request.getResourceName()).contains(DIGEST_UTIL.toString(digest));
+            assertThat(request.getResourceName()).contains(DigestUtil.toString(digest));
             assertThat(request.getFinishWrite()).isTrue();
             assertThat(request.getData().toByteArray()).isEqualTo(data);
             responseObserver.onNext(
@@ -488,6 +492,7 @@
 
   /** Capture the request headers from a client. Useful for testing metadata propagation. */
   private static class RequestHeadersValidator implements ServerInterceptor {
+
     @Override
     public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
         ServerCall<ReqT, RespT> call,
@@ -567,7 +572,7 @@
     assertThat(result.isCacheHit()).isFalse();
     assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
     assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
-    Mockito.verify(mockByteStreamImpl).write(ArgumentMatchers.<StreamObserver<WriteResponse>>any());
+    verify(mockByteStreamImpl).write(ArgumentMatchers.<StreamObserver<WriteResponse>>any());
   }
 
   private Answer<Void> answerWith(@Nullable Operation op, Status status) {
@@ -724,19 +729,19 @@
     assertThat(result.isCacheHit()).isFalse();
     assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
     assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
-    Mockito.verify(mockExecutionImpl, Mockito.times(4))
+    verify(mockExecutionImpl, Mockito.times(4))
         .execute(
             ArgumentMatchers.<ExecuteRequest>any(),
             ArgumentMatchers.<StreamObserver<Operation>>any());
-    Mockito.verify(mockExecutionImpl, Mockito.times(2))
+    verify(mockExecutionImpl, Mockito.times(2))
         .waitExecution(
             ArgumentMatchers.<WaitExecutionRequest>any(),
             ArgumentMatchers.<StreamObserver<Operation>>any());
-    Mockito.verify(mockByteStreamImpl, Mockito.times(2))
+    verify(mockByteStreamImpl, Mockito.times(2))
         .read(
             ArgumentMatchers.<ReadRequest>any(),
             ArgumentMatchers.<StreamObserver<ReadResponse>>any());
-    Mockito.verify(mockByteStreamImpl, Mockito.times(3))
+    verify(mockByteStreamImpl, Mockito.times(3))
         .write(ArgumentMatchers.<StreamObserver<WriteResponse>>any());
   }
 
@@ -834,19 +839,19 @@
     assertThat(result.isCacheHit()).isFalse();
     assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
     assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
-    Mockito.verify(mockExecutionImpl, Mockito.times(3))
+    verify(mockExecutionImpl, Mockito.times(3))
         .execute(
             ArgumentMatchers.<ExecuteRequest>any(),
             ArgumentMatchers.<StreamObserver<Operation>>any());
-    Mockito.verify(mockExecutionImpl, Mockito.times(2))
+    verify(mockExecutionImpl, Mockito.times(2))
         .waitExecution(
             ArgumentMatchers.<WaitExecutionRequest>any(),
             ArgumentMatchers.<StreamObserver<Operation>>any());
-    Mockito.verify(mockByteStreamImpl)
+    verify(mockByteStreamImpl)
         .read(
             ArgumentMatchers.<ReadRequest>any(),
             ArgumentMatchers.<StreamObserver<ReadResponse>>any());
-    Mockito.verify(mockByteStreamImpl, Mockito.times(1))
+    verify(mockByteStreamImpl, Mockito.times(1))
         .write(ArgumentMatchers.<StreamObserver<WriteResponse>>any());
   }
 
@@ -867,8 +872,7 @@
     SpawnResult result = client.exec(simpleSpawn, policy);
     assertThat(result.status()).isEqualTo(SpawnResult.Status.EXECUTION_FAILED_CATASTROPHICALLY);
     // Ensure we also got back the stack trace due to verboseFailures=true
-    assertThat(result.getFailureMessage())
-        .contains("GrpcRemoteExecutionClientTest.passUnavailableErrorWithStackTrace");
+    assertThat(result.getFailureMessage()).contains("com.google.devtools.build.lib.remote");
   }
 
   @Test
@@ -886,9 +890,8 @@
         new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr);
     SpawnResult result = client.exec(simpleSpawn, policy);
     assertThat(result.getFailureMessage()).contains("whoa"); // Error details.
-    // Ensure we also got back the stack trace.
-    assertThat(result.getFailureMessage())
-        .contains("GrpcRemoteExecutionClientTest.passInternalErrorWithStackTrace");
+    // Ensure we also got back the stack trace due to verboseFailures=true
+    assertThat(result.getFailureMessage()).contains("com.google.devtools.build.lib.remote");
   }
 
   @Test
@@ -934,7 +937,7 @@
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
-            assertThat(request.getResourceName().contains(DIGEST_UTIL.toString(stdOutDigest)))
+            assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
                 .isTrue();
             responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
           }
@@ -945,7 +948,7 @@
 
     SpawnResult result = client.exec(simpleSpawn, policy);
     assertThat(result.status()).isEqualTo(SpawnResult.Status.REMOTE_CACHE_FAILED);
-    assertThat(result.getFailureMessage()).contains(DIGEST_UTIL.toString(stdOutDigest));
+    assertThat(result.getFailureMessage()).contains(DigestUtil.toString(stdOutDigest));
     // Ensure we also got back the stack trace.
     assertThat(result.getFailureMessage())
         .contains("GrpcRemoteExecutionClientTest.passCacheMissErrorWithStackTrace");
@@ -995,7 +998,7 @@
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
-            assertThat(request.getResourceName().contains(DIGEST_UTIL.toString(stdOutDigest)))
+            assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
                 .isTrue();
             responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
           }
@@ -1005,7 +1008,7 @@
         new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr);
     SpawnResult result = client.exec(simpleSpawn, policy);
     assertThat(result.status()).isEqualTo(SpawnResult.Status.REMOTE_CACHE_FAILED);
-    assertThat(result.getFailureMessage()).contains(DIGEST_UTIL.toString(stdOutDigest));
+    assertThat(result.getFailureMessage()).contains(DigestUtil.toString(stdOutDigest));
     // Ensure we also got back the stack trace because verboseFailures=true
     assertThat(result.getFailureMessage())
         .contains("passRepeatedOrphanedCacheMissErrorWithStackTrace");
@@ -1275,11 +1278,11 @@
     assertThat(result.setupSuccess()).isTrue();
     assertThat(result.exitCode()).isEqualTo(0);
     assertThat(result.isCacheHit()).isFalse();
-    Mockito.verify(mockExecutionImpl, Mockito.times(1))
+    verify(mockExecutionImpl, Mockito.times(1))
         .execute(
             ArgumentMatchers.<ExecuteRequest>any(),
             ArgumentMatchers.<StreamObserver<Operation>>any());
-    Mockito.verify(mockExecutionImpl, Mockito.times(2))
+    verify(mockExecutionImpl, Mockito.times(2))
         .waitExecution(
             Mockito.eq(waitExecutionRequest), ArgumentMatchers.<StreamObserver<Operation>>any());
   }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
index 0615c92..5913677 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
@@ -16,14 +16,11 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
 
-import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.Digest;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import com.google.common.hash.HashCode;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.ActionInput;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.ArtifactRoot;
@@ -33,10 +30,9 @@
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
 import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
 import com.google.devtools.build.lib.clock.JavaClock;
-import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
 import com.google.devtools.build.lib.remote.util.StaticMetadataProvider;
 import com.google.devtools.build.lib.remote.util.StringActionInput;
 import com.google.devtools.build.lib.vfs.DigestHashFunction;
@@ -49,7 +45,6 @@
 import com.google.protobuf.ByteString;
 import io.grpc.Context;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -88,8 +83,7 @@
     Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
     Artifact a2 = createRemoteArtifact("file2", "fizz buzz", metadata, cacheEntries);
     MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
-    AbstractRemoteActionCache remoteCache =
-        new StaticRemoteActionCache(options, digestUtil, cacheEntries);
+    RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries);
     RemoteActionInputFetcher actionInputFetcher =
         new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
 
@@ -112,8 +106,7 @@
   public void testStagingVirtualActionInput() throws Exception {
     // arrange
     MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
-    AbstractRemoteActionCache remoteCache =
-        new StaticRemoteActionCache(options, digestUtil, new HashMap<>());
+    RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
     RemoteActionInputFetcher actionInputFetcher =
         new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
     VirtualActionInput a = new StringActionInput("hello world", PathFragment.create("file1"));
@@ -138,8 +131,7 @@
     Artifact a =
         createRemoteArtifact("file1", "hello world", metadata, /* cacheEntries= */ new HashMap<>());
     MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
-    AbstractRemoteActionCache remoteCache =
-        new StaticRemoteActionCache(options, digestUtil, new HashMap<>());
+    RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
     RemoteActionInputFetcher actionInputFetcher =
         new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
 
@@ -163,8 +155,7 @@
     Artifact a = ActionsTestUtil.createArtifact(artifactRoot, p);
     FileArtifactValue f = FileArtifactValue.createForTesting(a);
     MetadataProvider metadataProvider = new StaticMetadataProvider(ImmutableMap.of(a, f));
-    AbstractRemoteActionCache remoteCache =
-        new StaticRemoteActionCache(options, digestUtil, new HashMap<>());
+    RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
     RemoteActionInputFetcher actionInputFetcher =
         new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
 
@@ -182,8 +173,7 @@
     Map<ActionInput, FileArtifactValue> metadata = new HashMap<>();
     Map<Digest, ByteString> cacheEntries = new HashMap<>();
     Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
-    AbstractRemoteActionCache remoteCache =
-        new StaticRemoteActionCache(options, digestUtil, cacheEntries);
+    RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries);
     RemoteActionInputFetcher actionInputFetcher =
         new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
 
@@ -214,58 +204,13 @@
     return a;
   }
 
-  private static class StaticRemoteActionCache extends AbstractRemoteActionCache {
-
-    private final ImmutableMap<Digest, ByteString> cacheEntries;
-
-    public StaticRemoteActionCache(
-        RemoteOptions options, DigestUtil digestUtil, Map<Digest, ByteString> cacheEntries) {
-      super(options, digestUtil);
-      this.cacheEntries = ImmutableMap.copyOf(cacheEntries);
+  private static RemoteCache newCache(
+      RemoteOptions options, DigestUtil digestUtil, Map<Digest, ByteString> cacheEntries) {
+    Map<Digest, byte[]> cacheEntriesByteArray =
+        Maps.newHashMapWithExpectedSize(cacheEntries.size());
+    for (Map.Entry<Digest, ByteString> entry : cacheEntries.entrySet()) {
+      cacheEntriesByteArray.put(entry.getKey(), entry.getValue().toByteArray());
     }
-
-    @Override
-    ActionResult getCachedActionResult(ActionKey actionKey) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void setCachedActionResult(ActionKey actionKey, ActionResult action) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
-      ByteString data = cacheEntries.get(digest);
-      if (data == null) {
-        return Futures.immediateFailedFuture(new CacheNotFoundException(digest));
-      }
-      try {
-        data.writeTo(out);
-      } catch (IOException e) {
-        return Futures.immediateFailedFuture(e);
-      }
-      return Futures.immediateFuture(null);
-    }
-
-    @Override
-    public void close() {
-      // Intentionally left empty.
-    }
+    return new RemoteCache(new InMemoryCacheClient(cacheEntriesByteArray), options, digestUtil);
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java
similarity index 66%
rename from src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactoryTest.java
rename to src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java
index 3b95dda..aa3e37f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactoryTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java
@@ -18,11 +18,12 @@
 import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
 
 import com.google.devtools.build.lib.clock.JavaClock;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpBlobStore;
-import com.google.devtools.build.lib.remote.disk.OnDiskBlobStore;
-import com.google.devtools.build.lib.remote.http.HttpBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpCacheClient;
+import com.google.devtools.build.lib.remote.disk.DiskCacheClient;
+import com.google.devtools.build.lib.remote.http.HttpCacheClient;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.vfs.DigestHashFunction;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
@@ -34,9 +35,11 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link SimpleBlobStoreFactory}. */
+/** Tests for {@link RemoteCacheClientFactory}. */
 @RunWith(JUnit4.class)
-public class SimpleBlobStoreFactoryTest {
+public class RemoteCacheClientFactoryTest {
+
+  private final DigestUtil digestUtil = new DigestUtil(DigestHashFunction.SHA256);
 
   private RemoteOptions remoteOptions;
   private Path workingDirectory;
@@ -55,10 +58,11 @@
     remoteOptions.diskCache = PathFragment.create("/etc/something/cache/here");
     fs.getPath("/etc/something/cache/here").createDirectoryAndParents();
 
-    SimpleBlobStore blobStore =
-        SimpleBlobStoreFactory.create(remoteOptions, /* creds= */ null, workingDirectory);
+    RemoteCacheClient blobStore =
+        RemoteCacheClientFactory.create(
+            remoteOptions, /* creds= */ null, workingDirectory, digestUtil);
 
-    assertThat(blobStore).isInstanceOf(CombinedDiskHttpBlobStore.class);
+    assertThat(blobStore).isInstanceOf(CombinedDiskHttpCacheClient.class);
   }
 
   @Test
@@ -67,10 +71,11 @@
     remoteOptions.diskCache = PathFragment.create("/etc/something/cache/here");
     assertThat(workingDirectory.exists()).isFalse();
 
-    SimpleBlobStore blobStore =
-        SimpleBlobStoreFactory.create(remoteOptions, /* creds= */ null, workingDirectory);
+    RemoteCacheClient blobStore =
+        RemoteCacheClientFactory.create(
+            remoteOptions, /* creds= */ null, workingDirectory, digestUtil);
 
-    assertThat(blobStore).isInstanceOf(CombinedDiskHttpBlobStore.class);
+    assertThat(blobStore).isInstanceOf(CombinedDiskHttpCacheClient.class);
     assertThat(workingDirectory.exists()).isTrue();
   }
 
@@ -83,8 +88,8 @@
     assertThrows(
         NullPointerException.class,
         () ->
-            SimpleBlobStoreFactory.create(
-                remoteOptions, /* creds= */ null, /* workingDirectory= */ null));
+            RemoteCacheClientFactory.create(
+                remoteOptions, /* creds= */ null, /* workingDirectory= */ null, digestUtil));
   }
 
   @Test
@@ -92,10 +97,11 @@
     remoteOptions.remoteCache = "http://doesnotexist.com";
     remoteOptions.remoteProxy = "unix://some-proxy";
 
-    SimpleBlobStore blobStore =
-        SimpleBlobStoreFactory.create(remoteOptions, /* creds= */ null, workingDirectory);
+    RemoteCacheClient blobStore =
+        RemoteCacheClientFactory.create(
+            remoteOptions, /* creds= */ null, workingDirectory, digestUtil);
 
-    assertThat(blobStore).isInstanceOf(HttpBlobStore.class);
+    assertThat(blobStore).isInstanceOf(HttpCacheClient.class);
   }
 
   @Test
@@ -107,8 +113,8 @@
             assertThrows(
                 RuntimeException.class,
                 () ->
-                    SimpleBlobStoreFactory.create(
-                        remoteOptions, /* creds= */ null, workingDirectory)))
+                    RemoteCacheClientFactory.create(
+                        remoteOptions, /* creds= */ null, workingDirectory, digestUtil)))
         .hasMessageThat()
         .contains("Remote cache proxy unsupported: bad-proxy");
   }
@@ -117,50 +123,52 @@
   public void createHttpCacheWithoutProxy() throws IOException {
     remoteOptions.remoteCache = "http://doesnotexist.com";
 
-    SimpleBlobStore blobStore =
-        SimpleBlobStoreFactory.create(remoteOptions, /* creds= */ null, workingDirectory);
+    RemoteCacheClient blobStore =
+        RemoteCacheClientFactory.create(
+            remoteOptions, /* creds= */ null, workingDirectory, digestUtil);
 
-    assertThat(blobStore).isInstanceOf(HttpBlobStore.class);
+    assertThat(blobStore).isInstanceOf(HttpCacheClient.class);
   }
 
   @Test
   public void createDiskCache() throws IOException {
     remoteOptions.diskCache = PathFragment.create("/etc/something/cache/here");
 
-    SimpleBlobStore blobStore =
-        SimpleBlobStoreFactory.create(remoteOptions, /* creds= */ null, workingDirectory);
+    RemoteCacheClient blobStore =
+        RemoteCacheClientFactory.create(
+            remoteOptions, /* creds= */ null, workingDirectory, digestUtil);
 
-    assertThat(blobStore).isInstanceOf(OnDiskBlobStore.class);
+    assertThat(blobStore).isInstanceOf(DiskCacheClient.class);
   }
 
   @Test
   public void isRemoteCacheOptions_httpCacheEnabled() {
     remoteOptions.remoteCache = "http://doesnotexist:90";
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
   }
 
   @Test
   public void isRemoteCacheOptions_httpCacheEnabledInUpperCase() {
     remoteOptions.remoteCache = "HTTP://doesnotexist:90";
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
   }
 
   @Test
   public void isRemoteCacheOptions_httpsCacheEnabled() {
     remoteOptions.remoteCache = "https://doesnotexist:90";
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
   }
 
   @Test
   public void isRemoteCacheOptions_badProtocolStartsWithHttp() {
     remoteOptions.remoteCache = "httplolol://doesnotexist:90";
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
   }
 
   @Test
   public void isRemoteCacheOptions_diskCacheEnabled() {
     remoteOptions.diskCache = PathFragment.create("/etc/something/cache/here");
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
   }
 
   @Test
@@ -168,7 +176,7 @@
     remoteOptions.remoteCache = "http://doesnotexist:90";
     remoteOptions.diskCache = PathFragment.create("/etc/something/cache/here");
 
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
   }
 
   @Test
@@ -176,37 +184,37 @@
     remoteOptions.remoteCache = "https://doesnotexist:90";
     remoteOptions.diskCache = PathFragment.create("/etc/something/cache/here");
 
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isTrue();
   }
 
   @Test
   public void isRemoteCacheOptions_httpCacheDisabledWhenGrpcEnabled() {
     remoteOptions.remoteCache = "grpc://doesnotexist:90";
 
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
   }
 
   @Test
   public void isRemoteCacheOptions_httpCacheDisabledWhenNoProtocol() {
     remoteOptions.remoteCache = "doesnotexist:90";
 
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
   }
 
   @Test
   public void isRemoteCacheOptions_diskCacheOptionEmpty() {
     remoteOptions.diskCache = PathFragment.EMPTY_FRAGMENT;
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
   }
 
   @Test
   public void isRemoteCacheOptions_remoteHttpCacheOptionEmpty() {
     remoteOptions.remoteCache = "";
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
   }
 
   @Test
   public void isRemoteCacheOptions_defaultOptions() {
-    assertThat(SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
+    assertThat(RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions)).isFalse();
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
similarity index 74%
rename from src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java
rename to src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
index af3e4f6..5869d7d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
@@ -24,7 +24,9 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import build.bazel.remote.execution.v2.Action;
 import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
 import build.bazel.remote.execution.v2.Digest;
 import build.bazel.remote.execution.v2.Directory;
 import build.bazel.remote.execution.v2.DirectoryNode;
@@ -33,16 +35,14 @@
 import build.bazel.remote.execution.v2.OutputFile;
 import build.bazel.remote.execution.v2.SymlinkNode;
 import build.bazel.remote.execution.v2.Tree;
+import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
 import com.google.devtools.build.lib.actions.Artifact.SpecialArtifactType;
@@ -52,12 +52,12 @@
 import com.google.devtools.build.lib.actions.cache.MetadataInjector;
 import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
 import com.google.devtools.build.lib.clock.JavaClock;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.OutputFilesLocker;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.UploadManifest;
-import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteCache.OutputFilesLocker;
+import com.google.devtools.build.lib.remote.RemoteCache.UploadManifest;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
 import com.google.devtools.build.lib.remote.util.Utils;
 import com.google.devtools.build.lib.remote.util.Utils.InMemoryOutput;
 import com.google.devtools.build.lib.util.io.FileOutErr;
@@ -75,13 +75,10 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -92,9 +89,9 @@
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-/** Tests for {@link AbstractRemoteActionCache}. */
+/** Tests for {@link RemoteCache}. */
 @RunWith(JUnit4.class)
-public class AbstractRemoteActionCacheTests {
+public class RemoteCacheTests {
 
   @Mock private OutputFilesLocker outputFilesLocker;
 
@@ -102,6 +99,7 @@
   private Path execRoot;
   ArtifactRoot artifactRoot;
   private final DigestUtil digestUtil = new DigestUtil(DigestHashFunction.SHA256);
+  private FakeActionInputFileCache fakeFileCache;
 
   private static ListeningScheduledExecutorService retryService;
 
@@ -116,6 +114,7 @@
     fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
     execRoot = fs.getPath("/execroot");
     execRoot.createDirectoryAndParents();
+    fakeFileCache = new FakeActionInputFileCache(execRoot);
     artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, execRoot.getChild("outputs"));
     artifactRoot.getRoot().asPath().createDirectoryAndParents();
   }
@@ -534,7 +533,7 @@
 
   @Test
   public void downloadRelativeFileSymlink() throws Exception {
-    AbstractRemoteActionCache cache = newTestCache();
+    RemoteCache cache = newRemoteCache();
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputFileSymlinksBuilder().setPath("a/b/link").setTarget("../../foo");
     // Doesn't check for dangling links, hence download succeeds.
@@ -547,7 +546,7 @@
 
   @Test
   public void downloadRelativeDirectorySymlink() throws Exception {
-    AbstractRemoteActionCache cache = newTestCache();
+    RemoteCache cache = newRemoteCache();
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputDirectorySymlinksBuilder().setPath("a/b/link").setTarget("foo");
     // Doesn't check for dangling links, hence download succeeds.
@@ -560,7 +559,7 @@
 
   @Test
   public void downloadRelativeSymlinkInDirectory() throws Exception {
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Tree tree =
         Tree.newBuilder()
             .setRoot(
@@ -580,7 +579,7 @@
 
   @Test
   public void downloadAbsoluteDirectorySymlinkError() throws Exception {
-    AbstractRemoteActionCache cache = newTestCache();
+    RemoteCache cache = newRemoteCache();
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputDirectorySymlinksBuilder().setPath("foo").setTarget("/abs/link");
     IOException expected =
@@ -594,7 +593,7 @@
 
   @Test
   public void downloadAbsoluteFileSymlinkError() throws Exception {
-    AbstractRemoteActionCache cache = newTestCache();
+    RemoteCache cache = newRemoteCache();
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputFileSymlinksBuilder().setPath("foo").setTarget("/abs/link");
     IOException expected =
@@ -608,7 +607,7 @@
 
   @Test
   public void downloadAbsoluteSymlinkInDirectoryError() throws Exception {
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Tree tree =
         Tree.newBuilder()
             .setRoot(
@@ -631,7 +630,7 @@
 
   @Test
   public void downloadFailureMaintainsDirectories() throws Exception {
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Tree tree = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
     Digest treeDigest = cache.addContents(tree.toByteArray());
     Digest outputFileDigest =
@@ -661,7 +660,7 @@
     Path stdout = fs.getPath("/execroot/stdout");
     Path stderr = fs.getPath("/execroot/stderr");
 
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Digest digest1 = cache.addContents("file1");
     Digest digest2 = cache.addException("file2", new IOException("download failed"));
     Digest digest3 = cache.addContents("file3");
@@ -682,7 +681,6 @@
     assertThat(e.getSuppressed()).isEmpty();
     assertThat(cache.getNumSuccessfulDownloads()).isEqualTo(2);
     assertThat(cache.getNumFailedDownloads()).isEqualTo(1);
-    assertThat(cache.getDownloadQueueSize()).isEqualTo(3);
     assertThat(Throwables.getRootCause(e)).hasMessageThat().isEqualTo("download failed");
     verify(outputFilesLocker, never()).lock();
   }
@@ -692,7 +690,7 @@
     Path stdout = fs.getPath("/execroot/stdout");
     Path stderr = fs.getPath("/execroot/stderr");
 
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Digest digest1 = cache.addContents("file1");
     Digest digest2 = cache.addException("file2", new IOException("file2 failed"));
     Digest digest3 = cache.addException("file3", new IOException("file3 failed"));
@@ -722,7 +720,7 @@
     Path stdout = fs.getPath("/execroot/stdout");
     Path stderr = fs.getPath("/execroot/stderr");
 
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Digest digest1 = cache.addContents("file1");
     IOException reusedException = new IOException("reused io exception");
     Digest digest2 = cache.addException("file2", reusedException);
@@ -751,7 +749,7 @@
     Path stdout = fs.getPath("/execroot/stdout");
     Path stderr = fs.getPath("/execroot/stderr");
 
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Digest digest1 = cache.addContents("file1");
     InterruptedException reusedInterruption = new InterruptedException("reused interruption");
     Digest digest2 = cache.addException("file2", reusedInterruption);
@@ -787,7 +785,7 @@
     FileOutErr spyChildOutErr = Mockito.spy(childOutErr);
     when(spyOutErr.childOutErr()).thenReturn(spyChildOutErr);
 
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     Digest digestStdout = cache.addContents("stdout");
     Digest digestStderr = cache.addContents("stderr");
 
@@ -828,7 +826,7 @@
     FileOutErr spyChildOutErr = Mockito.spy(childOutErr);
     when(spyOutErr.childOutErr()).thenReturn(spyChildOutErr);
 
-    DefaultRemoteActionCache cache = newTestCache();
+    InMemoryRemoteCache cache = newRemoteCache();
     // Don't add stdout/stderr as a known blob to the remote cache so that downloading it will fail
     Digest digestStdout = digestUtil.computeAsUtf8("stdout");
     Digest digestStderr = digestUtil.computeAsUtf8("stderr");
@@ -862,7 +860,7 @@
     // Test that injecting the metadata for a remote output file works
 
     // arrange
-    DefaultRemoteActionCache remoteCache = newTestCache();
+    InMemoryRemoteCache remoteCache = newRemoteCache();
     Digest d1 = remoteCache.addContents("content1");
     Digest d2 = remoteCache.addContents("content2");
     ActionResult r =
@@ -893,7 +891,7 @@
     // Test that injecting the metadata for a remote output file works
 
     // arrange
-    DefaultRemoteActionCache remoteCache = newTestCache();
+    InMemoryRemoteCache remoteCache = newRemoteCache();
     Digest d1 = remoteCache.addContents("content1");
     Digest d2 = remoteCache.addContents("content2");
     ActionResult r =
@@ -937,7 +935,7 @@
     // Test that injecting the metadata for a tree artifact / remote output directory works
 
     // arrange
-    DefaultRemoteActionCache remoteCache = newTestCache();
+    InMemoryRemoteCache remoteCache = newRemoteCache();
     // Output Directory:
     // dir/file1
     // dir/a/file2
@@ -1007,7 +1005,7 @@
     // directory fails
 
     // arrange
-    DefaultRemoteActionCache remoteCache = newTestCache();
+    InMemoryRemoteCache remoteCache = newRemoteCache();
     // Output Directory:
     // dir/file1
     // dir/a/file2
@@ -1063,7 +1061,7 @@
     // Test that downloading of non-embedded stdout and stderr works
 
     // arrange
-    DefaultRemoteActionCache remoteCache = newTestCache();
+    InMemoryRemoteCache remoteCache = newRemoteCache();
     Digest dOut = remoteCache.addContents("stdout");
     Digest dErr = remoteCache.addContents("stderr");
     ActionResult r =
@@ -1102,7 +1100,7 @@
     // Test that downloading an in memory output works
 
     // arrange
-    DefaultRemoteActionCache remoteCache = newTestCache();
+    InMemoryRemoteCache remoteCache = newRemoteCache();
     Digest d1 = remoteCache.addContents("content1");
     Digest d2 = remoteCache.addContents("content2");
     ActionResult r =
@@ -1145,126 +1143,387 @@
     verify(outputFilesLocker).lock();
   }
 
-  private DefaultRemoteActionCache newTestCache() {
-    RemoteOptions options = Options.getDefaults(RemoteOptions.class);
-    return new DefaultRemoteActionCache(options, digestUtil);
+  @Test
+  public void testDownloadEmptyBlobAndFile() throws Exception {
+    // Test that downloading an empty BLOB/file does not try to perform a download.
+
+    // arrange
+    Path file = fs.getPath("/execroot/file");
+    RemoteCache remoteCache = newRemoteCache();
+    Digest emptyDigest = digestUtil.compute(new byte[0]);
+
+    // act and assert
+    assertThat(Utils.getFromFuture(remoteCache.downloadBlob(emptyDigest))).isEmpty();
+
+    try (OutputStream out = file.getOutputStream()) {
+      Utils.getFromFuture(remoteCache.downloadFile(file, emptyDigest));
+    }
+    assertThat(file.exists()).isTrue();
+    assertThat(file.getFileSize()).isEqualTo(0);
   }
 
-  private static class DefaultRemoteActionCache extends AbstractRemoteActionCache {
+  @Test
+  public void testDownloadDirectory() throws Exception {
+    // Test that downloading an output directory works.
 
-    Map<Digest, ListenableFuture<byte[]>> downloadResults = new HashMap<>();
-    List<ListenableFuture<?>> blockingDownloads = new ArrayList<>();
-    AtomicInteger numSuccess = new AtomicInteger();
-    AtomicInteger numFailures = new AtomicInteger();
+    // arrange
+    final ConcurrentMap<Digest, byte[]> cas = new ConcurrentHashMap<>();
 
-    public DefaultRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
-      super(options, digestUtil);
+    Digest fooDigest = digestUtil.computeAsUtf8("foo-contents");
+    cas.put(fooDigest, "foo-contents".getBytes(Charsets.UTF_8));
+    Digest quxDigest = digestUtil.computeAsUtf8("qux-contents");
+    cas.put(quxDigest, "qux-contents".getBytes(Charsets.UTF_8));
+
+    Tree barTreeMessage =
+        Tree.newBuilder()
+            .setRoot(
+                Directory.newBuilder()
+                    .addFiles(
+                        FileNode.newBuilder()
+                            .setName("qux")
+                            .setDigest(quxDigest)
+                            .setIsExecutable(true)))
+            .build();
+    Digest barTreeDigest = digestUtil.compute(barTreeMessage);
+    cas.put(barTreeDigest, barTreeMessage.toByteArray());
+
+    ActionResult.Builder result = ActionResult.newBuilder();
+    result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+    result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
+
+    // act
+    RemoteCache remoteCache = newRemoteCache(cas);
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+
+    // assert
+    assertThat(digestUtil.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
+    assertThat(digestUtil.compute(execRoot.getRelative("a/bar/qux"))).isEqualTo(quxDigest);
+    assertThat(execRoot.getRelative("a/bar/qux").isExecutable()).isTrue();
+  }
+
+  @Test
+  public void testDownloadEmptyDirectory() throws Exception {
+    // Test that downloading an empty output directory works.
+
+    // arrange
+    Tree barTreeMessage = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
+    Digest barTreeDigest = digestUtil.compute(barTreeMessage);
+
+    final ConcurrentMap<Digest, byte[]> map = new ConcurrentHashMap<>();
+    map.put(barTreeDigest, barTreeMessage.toByteArray());
+
+    ActionResult.Builder result = ActionResult.newBuilder();
+    result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
+
+    // act
+    RemoteCache remoteCache = newRemoteCache(map);
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+
+    // assert
+    assertThat(execRoot.getRelative("a/bar").isDirectory()).isTrue();
+  }
+
+  @Test
+  public void testDownloadNestedDirectory() throws Exception {
+    // Test that downloading a nested output directory works.
+
+    // arrange
+    Digest fooDigest = digestUtil.computeAsUtf8("foo-contents");
+    Digest quxDigest = digestUtil.computeAsUtf8("qux-contents");
+    Directory wobbleDirMessage =
+        Directory.newBuilder()
+            .addFiles(FileNode.newBuilder().setName("qux").setDigest(quxDigest))
+            .build();
+    Digest wobbleDirDigest = digestUtil.compute(wobbleDirMessage);
+    Tree barTreeMessage =
+        Tree.newBuilder()
+            .setRoot(
+                Directory.newBuilder()
+                    .addFiles(
+                        FileNode.newBuilder()
+                            .setName("qux")
+                            .setDigest(quxDigest)
+                            .setIsExecutable(true))
+                    .addDirectories(
+                        DirectoryNode.newBuilder().setName("wobble").setDigest(wobbleDirDigest)))
+            .addChildren(wobbleDirMessage)
+            .build();
+    Digest barTreeDigest = digestUtil.compute(barTreeMessage);
+
+    final ConcurrentMap<Digest, byte[]> map = new ConcurrentHashMap<>();
+    map.put(fooDigest, "foo-contents".getBytes(Charsets.UTF_8));
+    map.put(barTreeDigest, barTreeMessage.toByteArray());
+    map.put(quxDigest, "qux-contents".getBytes(Charsets.UTF_8));
+
+    ActionResult.Builder result = ActionResult.newBuilder();
+    result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+    result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
+
+    // act
+    RemoteCache remoteCache = newRemoteCache(map);
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+
+    // assert
+    assertThat(digestUtil.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
+    assertThat(digestUtil.compute(execRoot.getRelative("a/bar/wobble/qux"))).isEqualTo(quxDigest);
+    assertThat(execRoot.getRelative("a/bar/wobble/qux").isExecutable()).isFalse();
+  }
+
+  @Test
+  public void testDownloadDirectoryWithSameHash() throws Exception {
+    // Test that downloading an output directory works when two Directory
+    // protos have the same hash i.e. because they have the same name and contents or are empty.
+
+    /*
+     * /bar/foo/file
+     * /foo/file
+     */
+
+    // arrange
+    Digest fileDigest = digestUtil.computeAsUtf8("file");
+    FileNode file = FileNode.newBuilder().setName("file").setDigest(fileDigest).build();
+    Directory fooDir = Directory.newBuilder().addFiles(file).build();
+    Digest fooDigest = digestUtil.compute(fooDir);
+    DirectoryNode fooDirNode =
+        DirectoryNode.newBuilder().setName("foo").setDigest(fooDigest).build();
+    Directory barDir = Directory.newBuilder().addDirectories(fooDirNode).build();
+    Digest barDigest = digestUtil.compute(barDir);
+    DirectoryNode barDirNode =
+        DirectoryNode.newBuilder().setName("bar").setDigest(barDigest).build();
+    Directory rootDir =
+        Directory.newBuilder().addDirectories(fooDirNode).addDirectories(barDirNode).build();
+
+    Tree tree =
+        Tree.newBuilder()
+            .setRoot(rootDir)
+            .addChildren(barDir)
+            .addChildren(fooDir)
+            .addChildren(fooDir)
+            .build();
+    Digest treeDigest = digestUtil.compute(tree);
+
+    final ConcurrentMap<Digest, byte[]> map = new ConcurrentHashMap<>();
+    map.put(fileDigest, "file".getBytes(Charsets.UTF_8));
+    map.put(treeDigest, tree.toByteArray());
+
+    ActionResult.Builder result = ActionResult.newBuilder();
+    result.addOutputDirectoriesBuilder().setPath("a/").setTreeDigest(treeDigest);
+
+    // act
+    RemoteCache remoteCache = newRemoteCache(map);
+    remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
+
+    // assert
+    assertThat(digestUtil.compute(execRoot.getRelative("a/bar/foo/file"))).isEqualTo(fileDigest);
+    assertThat(digestUtil.compute(execRoot.getRelative("a/foo/file"))).isEqualTo(fileDigest);
+  }
+
+  @Test
+  public void testUploadDirectory() throws Exception {
+    // Test that uploading a directory works.
+
+    // arrange
+    Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
+    Digest quxDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
+    Digest barDigest =
+        fakeFileCache.createScratchInputDirectory(
+            ActionInputHelper.fromPath("bar"),
+            Tree.newBuilder()
+                .setRoot(
+                    Directory.newBuilder()
+                        .addFiles(
+                            FileNode.newBuilder()
+                                .setIsExecutable(true)
+                                .setName("qux")
+                                .setDigest(quxDigest)
+                                .build())
+                        .build())
+                .build());
+    Path fooFile = execRoot.getRelative("a/foo");
+    Path quxFile = execRoot.getRelative("bar/qux");
+    quxFile.setExecutable(true);
+    Path barDir = execRoot.getRelative("bar");
+    Command cmd = Command.newBuilder().addOutputFiles("bla").build();
+    Digest cmdDigest = digestUtil.compute(cmd);
+    Action action = Action.newBuilder().setCommandDigest(cmdDigest).build();
+    Digest actionDigest = digestUtil.compute(action);
+
+    // act
+    InMemoryRemoteCache remoteCache = newRemoteCache();
+    ActionResult result =
+        remoteCache.upload(
+            digestUtil.asActionKey(actionDigest),
+            action,
+            cmd,
+            execRoot,
+            ImmutableList.of(fooFile, barDir),
+            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
+
+    // assert
+    ActionResult.Builder expectedResult = ActionResult.newBuilder();
+    expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
+    assertThat(result).isEqualTo(expectedResult.build());
+
+    ImmutableList<Digest> toQuery =
+        ImmutableList.of(fooDigest, quxDigest, barDigest, cmdDigest, actionDigest);
+    assertThat(remoteCache.findMissingDigests(toQuery)).isEmpty();
+  }
+
+  @Test
+  public void testUploadEmptyDirectory() throws Exception {
+    // Test that uploading an empty directory works.
+
+    // arrange
+    final Digest barDigest =
+        fakeFileCache.createScratchInputDirectory(
+            ActionInputHelper.fromPath("bar"),
+            Tree.newBuilder().setRoot(Directory.newBuilder().build()).build());
+    final Path barDir = execRoot.getRelative("bar");
+    Action action = Action.getDefaultInstance();
+    ActionKey actionDigest = digestUtil.computeActionKey(action);
+    Command cmd = Command.getDefaultInstance();
+
+    // act
+    InMemoryRemoteCache remoteCache = newRemoteCache();
+    ActionResult result =
+        remoteCache.upload(
+            actionDigest,
+            action,
+            cmd,
+            execRoot,
+            ImmutableList.of(barDir),
+            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
+
+    // assert
+    ActionResult.Builder expectedResult = ActionResult.newBuilder();
+    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
+    assertThat(result).isEqualTo(expectedResult.build());
+    assertThat(remoteCache.findMissingDigests(ImmutableList.of(barDigest))).isEmpty();
+  }
+
+  @Test
+  public void testUploadNestedDirectory() throws Exception {
+    // Test that uploading a nested directory works.
+
+    // arrange
+    final Digest wobbleDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz");
+    final Digest quxDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
+    final Directory testDirMessage =
+        Directory.newBuilder()
+            .addFiles(FileNode.newBuilder().setName("wobble").setDigest(wobbleDigest).build())
+            .build();
+    final Digest testDigest = digestUtil.compute(testDirMessage);
+    final Tree barTree =
+        Tree.newBuilder()
+            .setRoot(
+                Directory.newBuilder()
+                    .addFiles(
+                        FileNode.newBuilder()
+                            .setIsExecutable(true)
+                            .setName("qux")
+                            .setDigest(quxDigest))
+                    .addDirectories(
+                        DirectoryNode.newBuilder().setName("test").setDigest(testDigest)))
+            .addChildren(testDirMessage)
+            .build();
+    final Digest barDigest =
+        fakeFileCache.createScratchInputDirectory(ActionInputHelper.fromPath("bar"), barTree);
+
+    final Path quxFile = execRoot.getRelative("bar/qux");
+    quxFile.setExecutable(true);
+    final Path barDir = execRoot.getRelative("bar");
+
+    Action action = Action.getDefaultInstance();
+    ActionKey actionDigest = digestUtil.computeActionKey(action);
+    Command cmd = Command.getDefaultInstance();
+
+    // act
+    InMemoryRemoteCache remoteCache = newRemoteCache();
+    ActionResult result =
+        remoteCache.upload(
+            actionDigest,
+            action,
+            cmd,
+            execRoot,
+            ImmutableList.of(barDir),
+            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
+
+    // assert
+    ActionResult.Builder expectedResult = ActionResult.newBuilder();
+    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
+    assertThat(result).isEqualTo(expectedResult.build());
+
+    ImmutableList<Digest> toQuery = ImmutableList.of(wobbleDigest, quxDigest, barDigest);
+    assertThat(remoteCache.findMissingDigests(toQuery)).isEmpty();
+  }
+
+  private InMemoryRemoteCache newRemoteCache(Map<Digest, byte[]> casEntries) {
+    RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+    return new InMemoryRemoteCache(casEntries, options, digestUtil);
+  }
+
+  private InMemoryRemoteCache newRemoteCache() {
+    RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+    return new InMemoryRemoteCache(options, digestUtil);
+  }
+
+  private static class InMemoryRemoteCache extends RemoteCache {
+
+    InMemoryRemoteCache(
+        Map<Digest, byte[]> casEntries, RemoteOptions options, DigestUtil digestUtil) {
+      super(new InMemoryCacheClient(casEntries), options, digestUtil);
     }
 
-    public Digest addContents(String txt) {
+    InMemoryRemoteCache(RemoteOptions options, DigestUtil digestUtil) {
+      super(new InMemoryCacheClient(), options, digestUtil);
+    }
+
+    Digest addContents(String txt) throws IOException, InterruptedException {
       return addContents(txt.getBytes(UTF_8));
     }
 
-    public Digest addContents(byte[] bytes) {
+    Digest addContents(byte[] bytes) throws IOException, InterruptedException {
       Digest digest = digestUtil.compute(bytes);
-      downloadResults.put(digest, Futures.immediateFuture(bytes));
+      Utils.getFromFuture(cacheProtocol.uploadBlob(digest, ByteString.copyFrom(bytes)));
       return digest;
     }
 
-    public Digest addContents(Message m) {
+    Digest addContents(Message m) throws IOException, InterruptedException {
       return addContents(m.toByteArray());
     }
 
-    public Digest addException(String txt, Exception e) {
+    Digest addException(String txt, Exception e) {
       Digest digest = digestUtil.compute(txt.getBytes(UTF_8));
-      downloadResults.put(digest, Futures.immediateFailedFuture(e));
+      ((InMemoryCacheClient) cacheProtocol).addDownloadFailure(digest, e);
       return digest;
     }
 
     Digest addException(Message m, Exception e) {
       Digest digest = digestUtil.compute(m);
-      downloadResults.put(digest, Futures.immediateFailedFuture(e));
+      ((InMemoryCacheClient) cacheProtocol).addDownloadFailure(digest, e);
       return digest;
     }
 
-    public int getNumSuccessfulDownloads() {
-      return numSuccess.get();
+    int getNumSuccessfulDownloads() {
+      return ((InMemoryCacheClient) cacheProtocol).getNumSuccessfulDownloads();
     }
 
-    public int getNumFailedDownloads() {
-      return numFailures.get();
+    int getNumFailedDownloads() {
+      return ((InMemoryCacheClient) cacheProtocol).getNumFailedDownloads();
     }
 
-    public int getDownloadQueueSize() {
-      return blockingDownloads.size();
-    }
-
-    @Override
-    protected <T> T getFromFuture(ListenableFuture<T> f) throws IOException, InterruptedException {
-      blockingDownloads.add(f);
-      return Utils.getFromFuture(f);
-    }
-
-    @Nullable
-    @Override
-    ActionResult getCachedActionResult(ActionKey actionKey) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void setCachedActionResult(ActionKey actionKey, ActionResult action) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
-      SettableFuture<Void> result = SettableFuture.create();
-      ListenableFuture<byte[]> downloadResult = downloadResults.get(digest);
-      Futures.addCallback(
-          downloadResult != null
-              ? downloadResult
-              : Futures.immediateFailedFuture(new CacheNotFoundException(digest)),
-          new FutureCallback<byte[]>() {
-            @Override
-            public void onSuccess(byte[] bytes) {
-              numSuccess.incrementAndGet();
-              try {
-                out.write(bytes);
-                out.close();
-                result.set(null);
-              } catch (IOException e) {
-                result.setException(e);
-              }
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-              numFailures.incrementAndGet();
-              result.setException(throwable);
-            }
-          },
-          MoreExecutors.directExecutor());
-      return result;
+    ImmutableSet<Digest> findMissingDigests(Iterable<Digest> digests)
+        throws IOException, InterruptedException {
+      return Utils.getFromFuture(cacheProtocol.findMissingDigests(digests));
     }
 
     @Override
     public void close() {
-      throw new UnsupportedOperationException();
+      cacheProtocol.close();
     }
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
index 083491b..733ac65 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
@@ -58,7 +58,7 @@
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.exec.util.FakeOwner;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -106,7 +106,7 @@
   private Path execRoot;
   private SimpleSpawn simpleSpawn;
   private FakeActionInputFileCache fakeFileCache;
-  @Mock private AbstractRemoteActionCache remoteCache;
+  @Mock private RemoteCache remoteCache;
   private RemoteSpawnCache cache;
   private FileOutErr outErr;
   private final List<Pair<ProgressStatus, String>> progressUpdates = new ArrayList<>();
@@ -258,7 +258,7 @@
   @Test
   public void cacheHit() throws Exception {
     ActionResult actionResult = ActionResult.getDefaultInstance();
-    when(remoteCache.getCachedActionResult(any(ActionKey.class)))
+    when(remoteCache.downloadActionResult(any(ActionKey.class)))
         .thenAnswer(
             new Answer<ActionResult>() {
               @Override
@@ -361,7 +361,7 @@
         SimpleSpawn uncacheableSpawn =
             simpleSpawnWithExecutionInfo(ImmutableMap.of(requirement, ""));
         CacheHandle entry = remoteSpawnCache.lookup(uncacheableSpawn, simplePolicy);
-        verify(remoteCache, never()).getCachedActionResult(any(ActionKey.class));
+        verify(remoteCache, never()).downloadActionResult(any(ActionKey.class));
         assertThat(entry.hasResult()).isFalse();
         SpawnResult result =
             new SpawnResult.Builder()
@@ -393,7 +393,7 @@
             ExecutionRequirements.NO_REMOTE)) {
       SimpleSpawn uncacheableSpawn = simpleSpawnWithExecutionInfo(ImmutableMap.of(requirement, ""));
       CacheHandle entry = remoteSpawnCache.lookup(uncacheableSpawn, simplePolicy);
-      verify(remoteCache, never()).getCachedActionResult(any(ActionKey.class));
+      verify(remoteCache, never()).downloadActionResult(any(ActionKey.class));
       assertThat(entry.hasResult()).isFalse();
       SpawnResult result =
           new SpawnResult.Builder()
@@ -426,7 +426,7 @@
             ExecutionRequirements.NO_REMOTE)) {
       SimpleSpawn uncacheableSpawn = simpleSpawnWithExecutionInfo(ImmutableMap.of(requirement, ""));
       CacheHandle entry = remoteSpawnCache.lookup(uncacheableSpawn, simplePolicy);
-      verify(remoteCache, never()).getCachedActionResult(any(ActionKey.class));
+      verify(remoteCache, never()).downloadActionResult(any(ActionKey.class));
       assertThat(entry.hasResult()).isFalse();
       SpawnResult result =
           new SpawnResult.Builder()
@@ -449,7 +449,7 @@
     SimpleSpawn cacheableSpawn =
         simpleSpawnWithExecutionInfo(ImmutableMap.of(ExecutionRequirements.NO_REMOTE_CACHE, ""));
     cache.lookup(cacheableSpawn, simplePolicy);
-    verify(remoteCache).getCachedActionResult(any(ActionKey.class));
+    verify(remoteCache).downloadActionResult(any(ActionKey.class));
   }
 
   @Test
@@ -457,14 +457,14 @@
     SimpleSpawn cacheableSpawn =
         simpleSpawnWithExecutionInfo(ImmutableMap.of(ExecutionRequirements.NO_REMOTE_EXEC, ""));
     cache.lookup(cacheableSpawn, simplePolicy);
-    verify(remoteCache).getCachedActionResult(any(ActionKey.class));
+    verify(remoteCache).downloadActionResult(any(ActionKey.class));
   }
 
   @Test
   public void failedActionsAreNotUploaded() throws Exception {
     // Only successful action results are uploaded to the remote cache.
     CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy);
-    verify(remoteCache).getCachedActionResult(any(ActionKey.class));
+    verify(remoteCache).downloadActionResult(any(ActionKey.class));
     assertThat(entry.hasResult()).isFalse();
     SpawnResult result =
         new SpawnResult.Builder()
@@ -530,7 +530,7 @@
   public void printWarningIfDownloadFails() throws Exception {
     doThrow(new IOException(io.grpc.Status.UNAVAILABLE.asRuntimeException()))
         .when(remoteCache)
-        .getCachedActionResult(any(ActionKey.class));
+        .downloadActionResult(any(ActionKey.class));
 
     CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy);
     assertThat(entry.hasResult()).isFalse();
@@ -585,7 +585,7 @@
         ActionResult.newBuilder()
             .addOutputFiles(OutputFile.newBuilder().setPath("/random/file").setDigest(digest))
             .build();
-    when(remoteCache.getCachedActionResult(any(ActionKey.class)))
+    when(remoteCache.downloadActionResult(any(ActionKey.class)))
         .thenAnswer(
             new Answer<ActionResult>() {
               @Override
@@ -645,7 +645,7 @@
   @Test
   public void failedCacheActionAsCacheMiss() throws Exception {
     ActionResult actionResult = ActionResult.newBuilder().setExitCode(1).build();
-    when(remoteCache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
+    when(remoteCache.downloadActionResult(any(ActionKey.class))).thenReturn(actionResult);
 
     CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy);
 
@@ -660,7 +660,7 @@
     cache = remoteSpawnCacheWithOptions(remoteOptions);
 
     ActionResult success = ActionResult.newBuilder().setExitCode(0).build();
-    when(remoteCache.getCachedActionResult(any())).thenReturn(success);
+    when(remoteCache.downloadActionResult(any())).thenReturn(success);
 
     // act
     CacheHandle cacheHandle = cache.lookup(simpleSpawn, simplePolicy);
@@ -681,7 +681,7 @@
     IOException downloadFailure = new IOException("downloadMinimal failed");
 
     ActionResult success = ActionResult.newBuilder().setExitCode(0).build();
-    when(remoteCache.getCachedActionResult(any())).thenReturn(success);
+    when(remoteCache.downloadActionResult(any())).thenReturn(success);
     when(remoteCache.downloadMinimal(any(), anyCollection(), any(), any(), any(), any(), any()))
         .thenThrow(downloadFailure);
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index b1b9cff..79c4380 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -64,7 +64,7 @@
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.exec.util.FakeOwner;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -114,7 +114,7 @@
   private RemoteOptions remoteOptions;
   private RemoteRetrier retrier;
 
-  @Mock private GrpcRemoteCache cache;
+  @Mock private RemoteExecutionCache cache;
 
   @Mock private GrpcRemoteExecutor executor;
 
@@ -187,7 +187,7 @@
     assertThat(requestCaptor.getValue().getExecutionPolicy().getPriority()).isEqualTo(2);
     // TODO(olaola): verify that the uploaded action has the doNotCache set.
 
-    verify(cache, never()).getCachedActionResult(any(ActionKey.class));
+    verify(cache, never()).downloadActionResult(any(ActionKey.class));
     verify(cache, never()).upload(any(), any(), any(), any(), any(), any());
     verifyZeroInteractions(localRunner);
   }
@@ -298,7 +298,7 @@
     remoteOptions.remoteUploadLocalResults = true;
 
     ActionResult failedAction = ActionResult.newBuilder().setExitCode(1).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(failedAction);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(failedAction);
 
     RemoteSpawnRunner runner = spy(newSpawnRunner());
     // Throw an IOException to trigger the local fallback.
@@ -338,7 +338,7 @@
     // remotely
 
     ActionResult failedAction = ActionResult.newBuilder().setExitCode(1).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(failedAction);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(failedAction);
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
@@ -377,8 +377,7 @@
     SpawnExecutionContext policy =
         new FakeSpawnExecutionContext(spawn, fakeFileCache, execRoot, outErr);
 
-    when(cache.getCachedActionResult(any(ActionKey.class)))
-        .thenThrow(new IOException("cache down"));
+    when(cache.downloadActionResult(any(ActionKey.class))).thenThrow(new IOException("cache down"));
 
     doThrow(new IOException("cache down"))
         .when(cache)
@@ -419,7 +418,7 @@
     SpawnExecutionContext policy =
         new FakeSpawnExecutionContext(spawn, fakeFileCache, execRoot, outErr);
 
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
 
     IOException err = new IOException("local execution error");
     when(localRunner.exec(eq(spawn), eq(policy))).thenThrow(err);
@@ -445,7 +444,7 @@
     SpawnExecutionContext policy =
         new FakeSpawnExecutionContext(spawn, fakeFileCache, execRoot, outErr);
 
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenThrow(new IOException());
+    when(cache.downloadActionResult(any(ActionKey.class))).thenThrow(new IOException());
 
     IOException err = new IOException("local execution error");
     when(localRunner.exec(eq(spawn), eq(policy))).thenThrow(err);
@@ -462,7 +461,7 @@
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
     when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException());
 
     Spawn spawn = newSimpleSpawn();
@@ -594,7 +593,7 @@
     RemoteSpawnRunner runner = newSpawnRunner();
 
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(cachedResult);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(cachedResult);
     Exception downloadFailure = new CacheNotFoundException(Digest.getDefaultInstance());
     doThrow(downloadFailure)
         .when(cache)
@@ -623,7 +622,7 @@
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
     ActionResult execResult = ActionResult.newBuilder().setExitCode(31).build();
     ExecuteResponse cachedResponse =
@@ -665,7 +664,7 @@
     RemoteSpawnRunner runner = newSpawnRunner();
 
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
     ExecuteResponse resp =
         ExecuteResponse.newBuilder()
             .setResult(cachedResult)
@@ -699,7 +698,7 @@
     RemoteSpawnRunner runner = newSpawnRunner();
 
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
     ExecuteResponse resp =
         ExecuteResponse.newBuilder()
             .setResult(cachedResult)
@@ -731,7 +730,7 @@
     RemoteSpawnRunner runner = newSpawnRunner();
 
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
     ExecuteResponse failed =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(33).build())
@@ -761,7 +760,7 @@
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(null);
     when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException("reasons"));
 
     Spawn spawn = newSimpleSpawn();
@@ -782,7 +781,7 @@
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenThrow(new IOException("reasons"));
+    when(cache.downloadActionResult(any(ActionKey.class))).thenThrow(new IOException("reasons"));
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy =
@@ -862,7 +861,7 @@
     remoteOptions.remoteOutputsMode = RemoteOutputsMode.MINIMAL;
 
     ActionResult succeededAction = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(succeededAction);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(succeededAction);
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
@@ -914,7 +913,7 @@
     remoteOptions.remoteOutputsMode = RemoteOutputsMode.MINIMAL;
 
     ActionResult succeededAction = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(succeededAction);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(succeededAction);
     IOException downloadFailure = new IOException("downloadMinimal failed");
     when(cache.downloadMinimal(any(), anyCollection(), any(), any(), any(), any(), any()))
         .thenThrow(downloadFailure);
@@ -946,7 +945,7 @@
         ActionsTestUtil.createArtifact(outputRoot, outputRoot.getRoot().getRelative("foo.bin"));
 
     ActionResult succeededAction = ActionResult.newBuilder().setExitCode(0).build();
-    when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(succeededAction);
+    when(cache.downloadActionResult(any(ActionKey.class))).thenReturn(succeededAction);
 
     RemoteSpawnRunner runner = newSpawnRunner(ImmutableSet.of(topLevelOutput));
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
deleted file mode 100644
index b44ca79..0000000
--- a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
+++ /dev/null
@@ -1,514 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import static com.google.common.truth.Truth.assertThat;
-import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
-import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import build.bazel.remote.execution.v2.Action;
-import build.bazel.remote.execution.v2.ActionResult;
-import build.bazel.remote.execution.v2.Command;
-import build.bazel.remote.execution.v2.Digest;
-import build.bazel.remote.execution.v2.Directory;
-import build.bazel.remote.execution.v2.DirectoryNode;
-import build.bazel.remote.execution.v2.FileNode;
-import build.bazel.remote.execution.v2.Tree;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.actions.ActionInputHelper;
-import com.google.devtools.build.lib.clock.JavaClock;
-import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
-import com.google.devtools.build.lib.remote.options.RemoteOptions;
-import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
-import com.google.devtools.build.lib.remote.util.Utils;
-import com.google.devtools.build.lib.util.io.FileOutErr;
-import com.google.devtools.build.lib.vfs.DigestHashFunction;
-import com.google.devtools.build.lib.vfs.FileSystem;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
-import com.google.devtools.common.options.Options;
-import com.google.protobuf.ByteString;
-import io.grpc.Context;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link SimpleBlobStoreActionCache}. */
-@RunWith(JUnit4.class)
-public class SimpleBlobStoreActionCacheTest {
-  private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256);
-
-  private FileSystem fs;
-  private Path execRoot;
-  private FakeActionInputFileCache fakeFileCache;
-  private Context withEmptyMetadata;
-  private Context prevContext;
-
-  private static ListeningScheduledExecutorService retryService;
-
-  @BeforeClass
-  public static void beforeEverything() {
-    retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
-  }
-
-  @Before
-  public final void setUp() throws Exception {
-    Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk.
-    fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
-    execRoot = fs.getPath("/exec/root");
-    FileSystemUtils.createDirectoryAndParents(execRoot);
-    fakeFileCache = new FakeActionInputFileCache(execRoot);
-    Path stdout = fs.getPath("/tmp/stdout");
-    Path stderr = fs.getPath("/tmp/stderr");
-    FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
-    FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
-    withEmptyMetadata =
-        TracingMetadataUtils.contextWithMetadata(
-            "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
-    prevContext = withEmptyMetadata.attach();
-  }
-
-  @After
-  public void tearDown() {
-    withEmptyMetadata.detach(prevContext);
-  }
-
-  @AfterClass
-  public static void afterEverything() {
-    retryService.shutdownNow();
-  }
-
-  private SimpleBlobStoreActionCache newClient() {
-    return newClient(new ConcurrentHashMap<>());
-  }
-
-  private SimpleBlobStoreActionCache newClient(ConcurrentMap<String, byte[]> map) {
-    return new SimpleBlobStoreActionCache(
-        Options.getDefaults(RemoteOptions.class),
-        new ConcurrentMapBlobStore(map),
-        DIGEST_UTIL);
-  }
-
-  @Test
-  public void testDownloadEmptyBlob() throws Exception {
-    SimpleBlobStoreActionCache client = newClient();
-    Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
-    // Will not call the mock Bytestream interface at all.
-    assertThat(getFromFuture(client.downloadBlob(emptyDigest))).isEmpty();
-  }
-
-  @Test
-  public void testDownloadBlob() throws Exception {
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
-    map.put(digest.getHash(), "abcdefg".getBytes(Charsets.UTF_8));
-    final SimpleBlobStoreActionCache client = newClient(map);
-    assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
-  }
-
-  @Test
-  public void testDownloadAllResults() throws Exception {
-    Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
-    Digest barDigest = DIGEST_UTIL.computeAsUtf8("bar-contents");
-    Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    map.put(fooDigest.getHash(), "foo-contents".getBytes(Charsets.UTF_8));
-    map.put(barDigest.getHash(), "bar-contents".getBytes(Charsets.UTF_8));
-    SimpleBlobStoreActionCache client = newClient(map);
-
-    ActionResult.Builder result = ActionResult.newBuilder();
-    result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
-    result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest);
-    result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest);
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar"))).isEqualTo(barDigest);
-    assertThat(execRoot.getRelative("a/bar").isExecutable()).isTrue();
-  }
-
-  @Test
-  public void testDownloadDirectory() throws Exception {
-    Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
-    Digest quxDigest = DIGEST_UTIL.computeAsUtf8("qux-contents");
-    Tree barTreeMessage =
-        Tree.newBuilder()
-            .setRoot(
-                Directory.newBuilder()
-                    .addFiles(
-                        FileNode.newBuilder()
-                            .setName("qux")
-                            .setDigest(quxDigest)
-                            .setIsExecutable(true)))
-            .build();
-    Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    map.put(fooDigest.getHash(), "foo-contents".getBytes(Charsets.UTF_8));
-    map.put(barTreeDigest.getHash(), barTreeMessage.toByteArray());
-    map.put(quxDigest.getHash(), "qux-contents".getBytes(Charsets.UTF_8));
-    SimpleBlobStoreActionCache client = newClient(map);
-
-    ActionResult.Builder result = ActionResult.newBuilder();
-    result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
-    result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
-
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/qux"))).isEqualTo(quxDigest);
-    assertThat(execRoot.getRelative("a/bar/qux").isExecutable()).isTrue();
-  }
-
-  @Test
-  public void testDownloadDirectoryEmpty() throws Exception {
-    Tree barTreeMessage = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
-    Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    map.put(barTreeDigest.getHash(), barTreeMessage.toByteArray());
-    SimpleBlobStoreActionCache client = newClient(map);
-
-    ActionResult.Builder result = ActionResult.newBuilder();
-    result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
-
-    assertThat(execRoot.getRelative("a/bar").isDirectory()).isTrue();
-  }
-
-  @Test
-  public void testDownloadDirectoryNested() throws Exception {
-    Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
-    Digest quxDigest = DIGEST_UTIL.computeAsUtf8("qux-contents");
-    Directory wobbleDirMessage =
-        Directory.newBuilder()
-            .addFiles(FileNode.newBuilder().setName("qux").setDigest(quxDigest))
-            .build();
-    Digest wobbleDirDigest = DIGEST_UTIL.compute(wobbleDirMessage);
-    Tree barTreeMessage =
-        Tree.newBuilder()
-            .setRoot(
-                Directory.newBuilder()
-                    .addFiles(
-                        FileNode.newBuilder()
-                            .setName("qux")
-                            .setDigest(quxDigest)
-                            .setIsExecutable(true))
-                    .addDirectories(
-                        DirectoryNode.newBuilder().setName("wobble").setDigest(wobbleDirDigest)))
-            .addChildren(wobbleDirMessage)
-            .build();
-    Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    map.put(fooDigest.getHash(), "foo-contents".getBytes(Charsets.UTF_8));
-    map.put(barTreeDigest.getHash(), barTreeMessage.toByteArray());
-    map.put(quxDigest.getHash(), "qux-contents".getBytes(Charsets.UTF_8));
-    SimpleBlobStoreActionCache client = newClient(map);
-
-    ActionResult.Builder result = ActionResult.newBuilder();
-    result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
-    result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
-
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/wobble/qux"))).isEqualTo(quxDigest);
-    assertThat(execRoot.getRelative("a/bar/wobble/qux").isExecutable()).isFalse();
-  }
-
-  @Test
-  public void testDownloadDirectoriesWithSameHash() throws Exception {
-    // Test that downloading an output directory works when two Directory
-    // protos have the same hash i.e. because they have the same name and contents or are empty.
-
-    /*
-     * /bar/foo/file
-     * /foo/file
-     */
-    Digest fileDigest = DIGEST_UTIL.computeAsUtf8("file");
-    FileNode file =
-        FileNode.newBuilder().setName("file").setDigest(fileDigest).build();
-    Directory fooDir = Directory.newBuilder().addFiles(file).build();
-    Digest fooDigest = DIGEST_UTIL.compute(fooDir);
-    DirectoryNode fooDirNode =
-        DirectoryNode.newBuilder().setName("foo").setDigest(fooDigest).build();
-    Directory barDir = Directory.newBuilder().addDirectories(fooDirNode).build();
-    Digest barDigest = DIGEST_UTIL.compute(barDir);
-    DirectoryNode barDirNode =
-        DirectoryNode.newBuilder().setName("bar").setDigest(barDigest).build();
-    Directory rootDir =
-        Directory.newBuilder().addDirectories(fooDirNode).addDirectories(barDirNode).build();
-
-    Tree tree = Tree.newBuilder()
-        .setRoot(rootDir)
-        .addChildren(barDir)
-        .addChildren(fooDir)
-        .addChildren(fooDir)
-        .build();
-    Digest treeDigest = DIGEST_UTIL.compute(tree);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    map.put(fileDigest.getHash(), "file".getBytes(Charsets.UTF_8));
-    map.put(treeDigest.getHash(), tree.toByteArray());
-    SimpleBlobStoreActionCache client = newClient(map);
-    ActionResult.Builder result = ActionResult.newBuilder();
-    result.addOutputDirectoriesBuilder().setPath("a/").setTreeDigest(treeDigest);
-    client.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {});
-
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/foo/file"))).isEqualTo(fileDigest);
-    assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo/file"))).isEqualTo(fileDigest);
-  }
-
-  @Test
-  public void testUploadDirectory() throws Exception {
-    final Digest fooDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
-    final Digest quxDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
-    final Digest barDigest =
-        fakeFileCache.createScratchInputDirectory(
-            ActionInputHelper.fromPath("bar"),
-            Tree.newBuilder()
-                .setRoot(
-                    Directory.newBuilder()
-                        .addFiles(
-                            FileNode.newBuilder()
-                                .setIsExecutable(true)
-                                .setName("qux")
-                                .setDigest(quxDigest)
-                                .build())
-                        .build())
-                .build());
-    final Path fooFile = execRoot.getRelative("a/foo");
-    final Path quxFile = execRoot.getRelative("bar/qux");
-    quxFile.setExecutable(true);
-    final Path barDir = execRoot.getRelative("bar");
-    Command cmd = Command.newBuilder().addOutputFiles("bla").build();
-    final Digest cmdDigest = DIGEST_UTIL.compute(cmd);
-    Action action = Action.newBuilder().setCommandDigest(cmdDigest).build();
-    final Digest actionDigest = DIGEST_UTIL.compute(action);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    final SimpleBlobStoreActionCache client = newClient(map);
-
-    ActionResult result =
-        client.upload(
-            DIGEST_UTIL.asActionKey(actionDigest),
-            action,
-            cmd,
-            execRoot,
-            ImmutableList.of(fooFile, barDir),
-            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
-    ActionResult.Builder expectedResult = ActionResult.newBuilder();
-    expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
-    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
-    assertThat(result).isEqualTo(expectedResult.build());
-
-    assertThat(map.keySet())
-        .containsAtLeast(
-            fooDigest.getHash(),
-            quxDigest.getHash(),
-            barDigest.getHash(),
-            cmdDigest.getHash(),
-            actionDigest.getHash());
- }
-
-  @Test
-  public void testUploadDirectoryEmpty() throws Exception {
-    final Digest barDigest =
-        fakeFileCache.createScratchInputDirectory(
-            ActionInputHelper.fromPath("bar"),
-            Tree.newBuilder().setRoot(Directory.newBuilder().build()).build());
-    final Path barDir = execRoot.getRelative("bar");
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    final SimpleBlobStoreActionCache client = newClient(map);
-
-    ActionResult result = uploadDirectory(client, ImmutableList.<Path>of(barDir));
-    ActionResult.Builder expectedResult = ActionResult.newBuilder();
-    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
-    assertThat(result).isEqualTo(expectedResult.build());
-
-    assertThat(map.keySet()).contains(barDigest.getHash());
-  }
-
-  @Test
-  public void testUploadDirectoryNested() throws Exception {
-    final Digest wobbleDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz");
-    final Digest quxDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
-    final Directory testDirMessage =
-        Directory.newBuilder()
-            .addFiles(FileNode.newBuilder().setName("wobble").setDigest(wobbleDigest).build())
-            .build();
-    final Digest testDigest = DIGEST_UTIL.compute(testDirMessage);
-    final Tree barTree =
-        Tree.newBuilder()
-            .setRoot(
-                Directory.newBuilder()
-                    .addFiles(
-                        FileNode.newBuilder()
-                            .setIsExecutable(true)
-                            .setName("qux")
-                            .setDigest(quxDigest))
-                    .addDirectories(
-                        DirectoryNode.newBuilder().setName("test").setDigest(testDigest)))
-            .addChildren(testDirMessage)
-            .build();
-    final Digest barDigest =
-        fakeFileCache.createScratchInputDirectory(ActionInputHelper.fromPath("bar"), barTree);
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    final SimpleBlobStoreActionCache client = newClient(map);
-
-    final Path quxFile = execRoot.getRelative("bar/qux");
-    quxFile.setExecutable(true);
-    final Path barDir = execRoot.getRelative("bar");
-
-    ActionResult result = uploadDirectory(client, ImmutableList.<Path>of(barDir));
-    ActionResult.Builder expectedResult = ActionResult.newBuilder();
-    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
-    assertThat(result).isEqualTo(expectedResult.build());
-
-    assertThat(map.keySet())
-        .containsAtLeast(wobbleDigest.getHash(), quxDigest.getHash(), barDigest.getHash());
-  }
-
-  private ActionResult uploadDirectory(SimpleBlobStoreActionCache client, List<Path> outputs)
-      throws Exception {
-    Action action = Action.getDefaultInstance();
-    ActionKey actionKey = DIGEST_UTIL.computeActionKey(action);
-    Command cmd = Command.getDefaultInstance();
-    return client.upload(
-        actionKey,
-        action,
-        cmd,
-        execRoot,
-        outputs,
-        new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
-  }
-
-  @Test
-  public void testDownloadFailsOnDigestMismatch() {
-    // Test that the download fails when a blob/file has a different content hash than expected.
-
-    final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
-    Digest digest = DIGEST_UTIL.computeAsUtf8("hello");
-    // Store content that doesn't match its digest
-    map.put(digest.getHash(), "world".getBytes(Charsets.UTF_8));
-    final SimpleBlobStoreActionCache client = newClient(map);
-
-    IOException e =
-        assertThrows(IOException.class, () -> getFromFuture(client.downloadBlob(digest)));
-    assertThat(e).hasMessageThat().contains(digest.getHash());
-
-    e =
-        assertThrows(
-            IOException.class,
-            () -> getFromFuture(client.downloadFile(fs.getPath("/exec/root/foo"), digest)));
-    assertThat(e).hasMessageThat().contains(digest.getHash());
-  }
-
-  private static class ConcurrentMapBlobStore implements SimpleBlobStore {
-    private final ConcurrentMap<String, byte[]> map;
-    private static final String ACTION_KEY_PREFIX = "ac_";
-
-    public ConcurrentMapBlobStore(ConcurrentMap<String, byte[]> map) {
-      this.map = map;
-    }
-
-    private ListenableFuture<Void> get(String key, OutputStream out, Digest digest) {
-      byte[] data = map.get(key);
-      if (data == null) {
-        return Futures.immediateFailedFuture(new CacheNotFoundException(digest));
-      } else {
-        try {
-          out.write(data);
-        } catch (IOException e) {
-          return Futures.immediateFailedFuture(e);
-        }
-      }
-      return Futures.immediateFuture(null);
-    }
-
-    @Override
-    public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
-      return get(digest.getHash(), out, digest);
-    }
-
-    @Override
-    public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
-      return Utils.downloadAsActionResult(
-          actionKey, (digest, out) -> get(ACTION_KEY_PREFIX + digest.getHash(), out, digest));
-    }
-
-    @Override
-    public void uploadActionResult(ActionKey actionKey, ActionResult actionResult) {
-      map.put(ACTION_KEY_PREFIX + actionKey.getDigest().getHash(), actionResult.toByteArray());
-    }
-
-    @Override
-    public void close() {}
-
-    @Override
-    public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
-      try (InputStream in = file.getInputStream()) {
-        upload(digest.getHash(), digest.getSizeBytes(), in);
-      } catch (IOException e) {
-        return Futures.immediateFailedFuture(e);
-      }
-      return Futures.immediateFuture(null);
-    }
-
-    @Override
-    public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
-      try (InputStream in = data.newInput()) {
-        upload(digest.getHash(), digest.getSizeBytes(), in);
-      } catch (IOException e) {
-        return Futures.immediateFailedFuture(e);
-      }
-      return Futures.immediateFuture(null);
-    }
-
-    private void upload(String key, long length, InputStream in) throws IOException {
-      byte[] value = ByteStreams.toByteArray(in);
-      Preconditions.checkState(value.length == length);
-      map.put(key, value);
-    }
-  }
-}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpBlobStoreTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
similarity index 78%
rename from src/test/java/com/google/devtools/build/lib/remote/http/HttpBlobStoreTest.java
rename to src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
index b959c43..f01f93b 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpBlobStoreTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
@@ -33,11 +33,13 @@
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.testutil.MoreAsserts.ThrowingRunnable;
 import com.google.devtools.build.lib.vfs.DigestHashFunction;
 import com.google.devtools.build.remote.worker.http.HttpCacheServerHandler;
 import com.google.protobuf.ByteString;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
@@ -71,6 +73,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -92,12 +95,12 @@
 import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
 
-/** Tests for {@link HttpBlobStore}. */
+/** Tests for {@link HttpCacheClient}. */
 @RunWith(Parameterized.class)
-public class HttpBlobStoreTest {
+public class HttpCacheClientTest {
 
   private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256);
-  private static final Digest DIGEST = DIGEST_UTIL.computeAsUtf8("foo");
+  private static final Digest DIGEST = DIGEST_UTIL.computeAsUtf8("File Contents");
 
   private static ServerChannel createServer(
       Class<? extends ServerChannel> serverChannelClass,
@@ -136,6 +139,7 @@
   }
 
   interface TestServer {
+
     ServerChannel start(ChannelInboundHandler handler);
 
     void stop(ServerChannel serverChannel);
@@ -241,35 +245,53 @@
 
   private final TestServer testServer;
 
-  public HttpBlobStoreTest(TestServer testServer) {
+  public HttpCacheClientTest(TestServer testServer) {
     this.testServer = testServer;
   }
 
-  private HttpBlobStore createHttpBlobStore(
-      ServerChannel serverChannel, int timeoutSeconds, @Nullable final Credentials creds)
+  private HttpCacheClient createHttpBlobStore(
+      ServerChannel serverChannel,
+      int timeoutSeconds,
+      boolean remoteVerifyDownloads,
+      @Nullable final Credentials creds)
       throws Exception {
     SocketAddress socketAddress = serverChannel.localAddress();
     if (socketAddress instanceof DomainSocketAddress) {
       DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress;
       URI uri = new URI("http://localhost");
-      return HttpBlobStore.create(
+      return HttpCacheClient.create(
           domainSocketAddress,
           uri,
           timeoutSeconds,
           /* remoteMaxConnections= */ 0,
+          remoteVerifyDownloads,
           ImmutableList.of(),
+          DIGEST_UTIL,
           creds);
     } else if (socketAddress instanceof InetSocketAddress) {
       InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
       URI uri = new URI("http://localhost:" + inetSocketAddress.getPort());
-      return HttpBlobStore.create(
-          uri, timeoutSeconds, /* remoteMaxConnections= */ 0, ImmutableList.of(), creds);
+      return HttpCacheClient.create(
+          uri,
+          timeoutSeconds,
+          /* remoteMaxConnections= */ 0,
+          remoteVerifyDownloads,
+          ImmutableList.of(),
+          DIGEST_UTIL,
+          creds);
     } else {
       throw new IllegalStateException(
           "unsupported socket address class " + socketAddress.getClass());
     }
   }
 
+  private HttpCacheClient createHttpBlobStore(
+      ServerChannel serverChannel, int timeoutSeconds, @Nullable final Credentials creds)
+      throws Exception {
+    return createHttpBlobStore(
+        serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds);
+  }
+
   @Test
   public void testUploadAtMostOnce() throws Exception {
     ServerChannel server = null;
@@ -277,7 +299,7 @@
       ConcurrentHashMap<String, byte[]> cacheContents = new ConcurrentHashMap<>();
       server = testServer.start(new HttpCacheServerHandler(cacheContents));
 
-      HttpBlobStore blobStore =
+      HttpCacheClient blobStore =
           createHttpBlobStore(server, /* timeoutSeconds= */ 1, /* credentials= */ null);
 
       ByteString data = ByteString.copyFrom("foo bar", StandardCharsets.UTF_8);
@@ -308,7 +330,7 @@
     testServer.stop(server);
 
     Credentials credentials = newCredentials();
-    HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+    HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
     getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream()));
 
     fail("Exception expected");
@@ -329,7 +351,7 @@
               });
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
       getFromFuture(blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
       fail("Exception expected");
@@ -353,7 +375,7 @@
               });
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream()));
       fail("Exception expected");
     } finally {
@@ -385,7 +407,7 @@
               });
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       ByteString data = ByteString.copyFrom("File Contents", Charsets.US_ASCII);
       IOException e =
           assertThrows(
@@ -400,21 +422,95 @@
   }
 
   @Test
+  public void testDownloadFailsOnDigestMismatch() throws Exception {
+    // Test that the download fails when a blob/file has a different content hash than expected.
+
+    ServerChannel server = null;
+    try {
+      server =
+          testServer.start(
+              new SimpleChannelInboundHandler<FullHttpRequest>() {
+                @Override
+                protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
+                  ByteBuf data = ctx.alloc().buffer();
+                  ByteBufUtil.writeUtf8(data, "bar");
+                  DefaultFullHttpResponse response =
+                      new DefaultFullHttpResponse(
+                          HttpVersion.HTTP_1_1, HttpResponseStatus.OK, data);
+                  HttpUtil.setContentLength(response, data.readableBytes());
+
+                  ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+                }
+              });
+
+      Credentials credentials = newCredentials();
+      HttpCacheClient blobStore =
+          createHttpBlobStore(
+              server, /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ true, credentials);
+      Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8));
+      try (OutputStream out = new ByteArrayOutputStream()) {
+        ThrowingRunnable download = () -> getFromFuture(blobStore.downloadBlob(fooDigest, out));
+        IOException e = assertThrows(IOException.class, download);
+        assertThat(e).hasMessageThat().contains(fooDigest.getHash());
+        assertThat(e).hasMessageThat().contains(DIGEST_UTIL.computeAsUtf8("bar").getHash());
+      }
+    } finally {
+      testServer.stop(server);
+    }
+  }
+
+  @Test
+  public void testDisablingDigestVerification() throws Exception {
+    // Test that when digest verification is disabled a corrupted download works.
+
+    ServerChannel server = null;
+    try {
+      server =
+          testServer.start(
+              new SimpleChannelInboundHandler<FullHttpRequest>() {
+                @Override
+                protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
+                  ByteBuf data = ctx.alloc().buffer();
+                  ByteBufUtil.writeUtf8(data, "bar");
+                  DefaultFullHttpResponse response =
+                      new DefaultFullHttpResponse(
+                          HttpVersion.HTTP_1_1, HttpResponseStatus.OK, data);
+                  HttpUtil.setContentLength(response, data.readableBytes());
+
+                  ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+                }
+              });
+
+      Credentials credentials = newCredentials();
+      HttpCacheClient blobStore =
+          createHttpBlobStore(
+              server, /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ false, credentials);
+      Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8));
+      try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+        getFromFuture(blobStore.downloadBlob(fooDigest, out));
+        assertThat(out.toByteArray()).isEqualTo("bar".getBytes(Charsets.UTF_8));
+      }
+    } finally {
+      testServer.stop(server);
+    }
+  }
+
+  @Test
   public void expiredAuthTokensShouldBeRetried_get() throws Exception {
     expiredAuthTokensShouldBeRetried_get(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.UNAUTHORIZED);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.UNAUTHORIZED);
     expiredAuthTokensShouldBeRetried_get(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.INVALID_TOKEN);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.INVALID_TOKEN);
   }
 
   private void expiredAuthTokensShouldBeRetried_get(
-      HttpBlobStoreTest.NotAuthorizedHandler.ErrorType errorType) throws Exception {
+      HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) throws Exception {
     ServerChannel server = null;
     try {
       server = testServer.start(new NotAuthorizedHandler(errorType));
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
       getFromFuture(blobStore.downloadBlob(DIGEST, out));
       assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents");
@@ -432,19 +528,19 @@
   @Test
   public void expiredAuthTokensShouldBeRetried_put() throws Exception {
     expiredAuthTokensShouldBeRetried_put(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.UNAUTHORIZED);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.UNAUTHORIZED);
     expiredAuthTokensShouldBeRetried_put(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.INVALID_TOKEN);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.INVALID_TOKEN);
   }
 
   private void expiredAuthTokensShouldBeRetried_put(
-      HttpBlobStoreTest.NotAuthorizedHandler.ErrorType errorType) throws Exception {
+      HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) throws Exception {
     ServerChannel server = null;
     try {
       server = testServer.start(new NotAuthorizedHandler(errorType));
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
       blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)).get();
       verify(credentials, times(1)).refresh();
@@ -459,19 +555,19 @@
   @Test
   public void errorCodesThatShouldNotBeRetried_get() {
     errorCodeThatShouldNotBeRetried_get(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.INSUFFICIENT_SCOPE);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.INSUFFICIENT_SCOPE);
     errorCodeThatShouldNotBeRetried_get(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.INVALID_REQUEST);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.INVALID_REQUEST);
   }
 
   private void errorCodeThatShouldNotBeRetried_get(
-      HttpBlobStoreTest.NotAuthorizedHandler.ErrorType errorType) {
+      HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) {
     ServerChannel server = null;
     try {
       server = testServer.start(new NotAuthorizedHandler(errorType));
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream()));
       fail("Exception expected.");
     } catch (Exception e) {
@@ -486,19 +582,19 @@
   @Test
   public void errorCodesThatShouldNotBeRetried_put() {
     errorCodeThatShouldNotBeRetried_put(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.INSUFFICIENT_SCOPE);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.INSUFFICIENT_SCOPE);
     errorCodeThatShouldNotBeRetried_put(
-        HttpBlobStoreTest.NotAuthorizedHandler.ErrorType.INVALID_REQUEST);
+        HttpCacheClientTest.NotAuthorizedHandler.ErrorType.INVALID_REQUEST);
   }
 
   private void errorCodeThatShouldNotBeRetried_put(
-      HttpBlobStoreTest.NotAuthorizedHandler.ErrorType errorType) {
+      HttpCacheClientTest.NotAuthorizedHandler.ErrorType errorType) {
     ServerChannel server = null;
     try {
       server = testServer.start(new NotAuthorizedHandler(errorType));
 
       Credentials credentials = newCredentials();
-      HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
+      HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       byte[] oneByte = new byte[] {0};
       getFromFuture(
           blobStore.uploadBlob(DIGEST_UTIL.compute(oneByte), ByteString.copyFrom(oneByte)));
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
index 1a6edd1..72a74d3 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -20,8 +20,10 @@
         "//src/main/java/com/google/devtools/build/lib:io",
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/remote",
+        "//src/main/java/com/google/devtools/build/lib/remote/common",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//third_party:guava",
         "//third_party/protobuf:protobuf_java",
+        "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
     ],
 )
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
new file mode 100644
index 0000000..2633e81
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
@@ -0,0 +1,141 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote.util;
+
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Digest;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** A {@link RemoteCache} that stores its contents in memory. */
+public class InMemoryCacheClient implements RemoteCacheClient {
+
+  private final ConcurrentMap<Digest, Exception> downloadFailures = new ConcurrentHashMap<>();
+  private final ConcurrentMap<ActionKey, ActionResult> ac = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Digest, byte[]> cas;
+
+  private AtomicInteger numSuccess = new AtomicInteger();
+  private AtomicInteger numFailures = new AtomicInteger();
+
+  public InMemoryCacheClient(Map<Digest, byte[]> casEntries) {
+    this.cas = new ConcurrentHashMap<>();
+    for (Map.Entry<Digest, byte[]> entry : casEntries.entrySet()) {
+      cas.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  public InMemoryCacheClient() {
+    this.cas = new ConcurrentHashMap<>();
+  }
+
+  public void addDownloadFailure(Digest digest, Exception e) {
+    downloadFailures.put(digest, e);
+  }
+
+  public int getNumSuccessfulDownloads() {
+    return numSuccess.get();
+  }
+
+  public int getNumFailedDownloads() {
+    return numFailures.get();
+  }
+
+  @Override
+  public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+    Exception failure = downloadFailures.get(digest);
+    if (failure != null) {
+      numFailures.incrementAndGet();
+      return Futures.immediateFailedFuture(failure);
+    }
+
+    byte[] data = cas.get(digest);
+    if (data == null) {
+      return Futures.immediateFailedFuture(new CacheNotFoundException(digest));
+    }
+
+    try {
+      out.write(data);
+      out.flush();
+    } catch (IOException e) {
+      numFailures.incrementAndGet();
+      return Futures.immediateFailedFuture(e);
+    }
+    numSuccess.incrementAndGet();
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
+    ActionResult actionResult = ac.get(actionKey);
+    if (actionResult == null) {
+      return Futures.immediateFailedFuture(new CacheNotFoundException(actionKey.getDigest()));
+    }
+    return Futures.immediateFuture(actionResult);
+  }
+
+  @Override
+  public void uploadActionResult(ActionKey actionKey, ActionResult actionResult) {
+    ac.put(actionKey, actionResult);
+  }
+
+  @Override
+  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+    try (InputStream in = file.getInputStream()) {
+      cas.put(digest, ByteStreams.toByteArray(in));
+    } catch (IOException e) {
+      return Futures.immediateFailedFuture(e);
+    }
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+    try (InputStream in = data.newInput()) {
+      cas.put(digest, data.toByteArray());
+    } catch (IOException e) {
+      return Futures.immediateFailedFuture(e);
+    }
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+    ImmutableSet.Builder<Digest> missingBuilder = ImmutableSet.builder();
+    for (Digest digest : digests) {
+      if (!cas.containsKey(digest)) {
+        missingBuilder.add(digest);
+      }
+    }
+    return Futures.immediateFuture(missingBuilder.build());
+  }
+
+  @Override
+  public void close() {
+    cas.clear();
+    ac.clear();
+  }
+}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java
index ade8363..3ac4588 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java
@@ -20,8 +20,7 @@
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.GetActionResultRequest;
 import build.bazel.remote.execution.v2.UpdateActionResultRequest;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import io.grpc.stub.StreamObserver;
 import java.util.logging.Logger;
@@ -30,10 +29,10 @@
 final class ActionCacheServer extends ActionCacheImplBase {
   private static final Logger logger = Logger.getLogger(ActionCacheImplBase.class.getName());
 
-  private final SimpleBlobStoreActionCache cache;
+  private final OnDiskBlobStoreCache cache;
   private final DigestUtil digestUtil;
 
-  public ActionCacheServer(SimpleBlobStoreActionCache cache, DigestUtil digestUtil) {
+  public ActionCacheServer(OnDiskBlobStoreCache cache, DigestUtil digestUtil) {
     this.cache = cache;
     this.digestUtil = digestUtil;
   }
@@ -43,7 +42,7 @@
       GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
     try {
       ActionKey actionKey = digestUtil.asActionKey(request.getActionDigest());
-      ActionResult result = cache.getCachedActionResult(actionKey);
+      ActionResult result = cache.downloadActionResult(actionKey);
 
       if (result == null) {
         responseObserver.onError(StatusUtils.notFoundError(request.getActionDigest()));
@@ -63,7 +62,7 @@
       UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
     try {
       ActionKey actionKey = digestUtil.asActionKey(request.getActionDigest());
-      cache.setCachedActionResult(actionKey, request.getActionResult());
+      cache.uploadActionResult(actionKey, request.getActionResult());
       responseObserver.onNext(request.getActionResult());
       responseObserver.onCompleted();
     } catch (Exception e) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
index 2a509c4..8bbcd3d 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
@@ -26,7 +26,6 @@
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
         "//src/main/java/com/google/devtools/build/lib/remote",
-        "//src/main/java/com/google/devtools/build/lib/remote/blobstore",
         "//src/main/java/com/google/devtools/build/lib/remote/common",
         "//src/main/java/com/google/devtools/build/lib/remote/disk",
         "//src/main/java/com/google/devtools/build/lib/remote/options",
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
index 79a52b5..4a5b30b 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
@@ -41,7 +41,7 @@
 /** A basic implementation of a {@link ByteStreamImplBase} service. */
 final class ByteStreamServer extends ByteStreamImplBase {
   private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName());
-  private final OnDiskBlobStoreActionCache cache;
+  private final OnDiskBlobStoreCache cache;
   private final Path workPath;
   private final DigestUtil digestUtil;
 
@@ -60,7 +60,7 @@
     }
   }
 
-  public ByteStreamServer(OnDiskBlobStoreActionCache cache, Path workPath, DigestUtil digestUtil) {
+  public ByteStreamServer(OnDiskBlobStoreCache cache, Path workPath, DigestUtil digestUtil) {
     this.cache = cache;
     this.workPath = workPath;
     this.digestUtil = digestUtil;
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
index 6cd6949..2097bd3 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
@@ -28,9 +28,9 @@
 /** A basic implementation of a {@link ContentAddressableStorageImplBase} service. */
 final class CasServer extends ContentAddressableStorageImplBase {
   static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 4;
-  private final OnDiskBlobStoreActionCache cache;
+  private final OnDiskBlobStoreCache cache;
 
-  public CasServer(OnDiskBlobStoreActionCache cache) {
+  public CasServer(OnDiskBlobStoreCache cache) {
     this.cache = cache;
   }
 
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
index 3036a80..538a25a 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
@@ -38,9 +38,8 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.remote.ExecutionStatusException;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.shell.AbnormalTerminationException;
@@ -97,7 +96,7 @@
   private final Path workPath;
   private final Path sandboxPath;
   private final RemoteWorkerOptions workerOptions;
-  private final SimpleBlobStoreActionCache cache;
+  private final OnDiskBlobStoreCache cache;
   private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
   private final ListeningExecutorService executorService;
   private final DigestUtil digestUtil;
@@ -106,7 +105,7 @@
       Path workPath,
       Path sandboxPath,
       RemoteWorkerOptions workerOptions,
-      SimpleBlobStoreActionCache cache,
+      OnDiskBlobStoreCache cache,
       ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache,
       DigestUtil digestUtil) {
     this.workPath = workPath;
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreActionCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreActionCache.java
deleted file mode 100644
index 3f34329..0000000
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreActionCache.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Copyright 2019 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.remote.worker;
-
-import build.bazel.remote.execution.v2.Digest;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.build.lib.remote.disk.OnDiskBlobStore;
-import com.google.devtools.build.lib.remote.options.RemoteOptions;
-import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.build.lib.vfs.Path;
-
-/** A {@link SimpleBlobStoreActionCache} backed by an {@link OnDiskBlobStore}. */
-class OnDiskBlobStoreActionCache extends SimpleBlobStoreActionCache {
-
-  public OnDiskBlobStoreActionCache(RemoteOptions options, Path cacheDir, DigestUtil digestUtil) {
-    super(options, new OnDiskBlobStore(cacheDir), digestUtil);
-  }
-
-  public boolean containsKey(Digest digest) {
-    return ((OnDiskBlobStore) blobStore).contains(digest);
-  }
-}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
new file mode 100644
index 0000000..424d805
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
@@ -0,0 +1,77 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.remote.worker;
+
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import build.bazel.remote.execution.v2.DirectoryNode;
+import build.bazel.remote.execution.v2.FileNode;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.remote.RemoteCache;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
+import com.google.devtools.build.lib.remote.disk.DiskCacheClient;
+import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.Utils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+
+/** A {@link RemoteCache} backed by an {@link DiskCacheClient}. */
+class OnDiskBlobStoreCache extends RemoteCache {
+
+  public OnDiskBlobStoreCache(RemoteOptions options, Path cacheDir, DigestUtil digestUtil) {
+    super(
+        new DiskCacheClient(cacheDir, /* verifyDownloads= */ true, digestUtil),
+        options,
+        digestUtil);
+  }
+
+  public boolean containsKey(Digest digest) {
+    return ((DiskCacheClient) cacheProtocol).contains(digest);
+  }
+
+  @SuppressWarnings("ProtoParseWithRegistry")
+  public void downloadTree(Digest rootDigest, Path rootLocation)
+      throws IOException, InterruptedException {
+    rootLocation.createDirectoryAndParents();
+    Directory directory = Directory.parseFrom(Utils.getFromFuture(downloadBlob(rootDigest)));
+    for (FileNode file : directory.getFilesList()) {
+      Path dst = rootLocation.getRelative(file.getName());
+      Utils.getFromFuture(downloadFile(dst, file.getDigest()));
+      dst.setExecutable(file.getIsExecutable());
+    }
+    for (DirectoryNode child : directory.getDirectoriesList()) {
+      downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
+    }
+  }
+
+  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+    return cacheProtocol.uploadFile(digest, file);
+  }
+
+  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+    return cacheProtocol.uploadBlob(digest, data);
+  }
+
+  public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
+      throws IOException, InterruptedException {
+    cacheProtocol.uploadActionResult(actionKey, actionResult);
+  }
+
+  public DigestUtil getDigestUtil() {
+    return digestUtil;
+  }
+}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index 6abf699..e4bd8de 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -111,7 +111,7 @@
   public RemoteWorker(
       FileSystem fs,
       RemoteWorkerOptions workerOptions,
-      OnDiskBlobStoreActionCache cache,
+      OnDiskBlobStoreCache cache,
       Path sandboxPath,
       DigestUtil digestUtil)
       throws IOException {
@@ -254,8 +254,7 @@
     Path casPath =
         remoteWorkerOptions.casPath != null ? fs.getPath(remoteWorkerOptions.casPath) : null;
     DigestUtil digestUtil = new DigestUtil(fs.getDigestFunction());
-    OnDiskBlobStoreActionCache cache =
-        new OnDiskBlobStoreActionCache(remoteOptions, casPath, digestUtil);
+    OnDiskBlobStoreCache cache = new OnDiskBlobStoreCache(remoteOptions, casPath, digestUtil);
     ListeningScheduledExecutorService retryService =
         MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
     RemoteWorker worker = new RemoteWorker(fs, remoteWorkerOptions, cache, sandboxPath, digestUtil);