Remote: Async upload (Part 4)

Remove RemoteCache#upload. Use UploadManifest#upload directly in RemoteExecutionService.

Part of https://github.com/bazelbuild/bazel/pull/13655.

PiperOrigin-RevId: 394606309
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 8f815d8..b2b2ab4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -19,9 +19,7 @@
 import static com.google.devtools.build.lib.remote.util.Utils.bytesCountToDisplayString;
 import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 
-import build.bazel.remote.execution.v2.Action;
 import build.bazel.remote.execution.v2.ActionResult;
-import build.bazel.remote.execution.v2.Command;
 import build.bazel.remote.execution.v2.Digest;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -31,7 +29,6 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.concurrent.ThreadSafety;
 import com.google.devtools.build.lib.exec.SpawnProgressEvent;
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
@@ -42,13 +39,11 @@
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
-import com.google.devtools.build.lib.remote.common.RemotePathResolver;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
 import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
 import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
-import com.google.devtools.build.lib.util.io.FileOutErr;
 import com.google.devtools.build.lib.util.io.OutErr;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
 import com.google.devtools.build.lib.vfs.Path;
@@ -57,7 +52,6 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
@@ -136,35 +130,6 @@
     return cacheProtocol.uploadBlob(context, digest, data);
   }
 
-  /**
-   * Upload the result of a locally executed action to the remote cache.
-   *
-   * @throws IOException if there was an error uploading to the remote cache
-   * @throws ExecException if uploading any of the action outputs is not supported
-   */
-  public ActionResult upload(
-      RemoteActionExecutionContext context,
-      RemotePathResolver remotePathResolver,
-      ActionKey actionKey,
-      Action action,
-      Command command,
-      Collection<Path> outputs,
-      FileOutErr outErr)
-      throws ExecException, IOException, InterruptedException {
-    UploadManifest manifest =
-        UploadManifest.create(
-            options,
-            digestUtil,
-            remotePathResolver,
-            actionKey,
-            action,
-            command,
-            outputs,
-            outErr,
-            /* exitCode= */ 0);
-    return manifest.upload(context, this);
-  }
-
   public static void waitForBulkTransfer(
       Iterable<? extends ListenableFuture<?>> transfers, boolean cancelRemainingOnInterrupt)
       throws BulkTransferException, InterruptedException {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
index 3ce4d78..36659dc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
@@ -52,6 +52,7 @@
 import build.bazel.remote.execution.v2.RequestMetadata;
 import build.bazel.remote.execution.v2.SymlinkNode;
 import build.bazel.remote.execution.v2.Tree;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -70,6 +71,7 @@
 import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue;
 import com.google.devtools.build.lib.actions.ForbiddenActionInputException;
 import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.SpawnResult;
 import com.google.devtools.build.lib.actions.Spawns;
 import com.google.devtools.build.lib.actions.UserExecException;
 import com.google.devtools.build.lib.actions.cache.MetadataInjector;
@@ -999,23 +1001,37 @@
     return null;
   }
 
-  /** Upload outputs of a remote action which was executed locally to remote cache. */
-  public void uploadOutputs(RemoteAction action)
-      throws InterruptedException, IOException, ExecException {
-    checkState(shouldUploadLocalResults(action.spawn), "spawn shouldn't upload local result");
-
+  @VisibleForTesting
+  UploadManifest buildUploadManifest(RemoteAction action, SpawnResult spawnResult)
+      throws ExecException, IOException {
     Collection<Path> outputFiles =
         action.spawn.getOutputFiles().stream()
             .map((inp) -> execRoot.getRelative(inp.getExecPath()))
             .collect(ImmutableList.toImmutableList());
-    remoteCache.upload(
-        action.remoteActionExecutionContext,
+
+    return UploadManifest.create(
+        remoteOptions,
+        digestUtil,
         remotePathResolver,
         action.actionKey,
         action.action,
         action.command,
         outputFiles,
-        action.spawnExecutionContext.getFileOutErr());
+        action.spawnExecutionContext.getFileOutErr(),
+        /* exitCode= */ 0);
+  }
+
+  /** Upload outputs of a remote action which was executed locally to remote cache. */
+  public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
+      throws InterruptedException, IOException, ExecException {
+    checkState(shouldUploadLocalResults(action.spawn), "spawn shouldn't upload local result");
+    checkState(
+        SpawnResult.Status.SUCCESS.equals(spawnResult.status()) && spawnResult.exitCode() == 0,
+        "shouldn't upload outputs of failed local action");
+
+    UploadManifest manifest = buildUploadManifest(action, spawnResult);
+
+    manifest.upload(action.getRemoteActionExecutionContext(), remoteCache);
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index d72bb34..f09b5fc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -198,7 +198,7 @@
           }
 
           try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
-            remoteExecutionService.uploadOutputs(action);
+            remoteExecutionService.uploadOutputs(action, result);
           } catch (IOException e) {
             String errorMessage;
             if (!verboseFailures) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 31ed039..8fc35bc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -123,6 +123,11 @@
     this.remoteExecutionService = remoteExecutionService;
   }
 
+  @VisibleForTesting
+  RemoteExecutionService getRemoteExecutionService() {
+    return remoteExecutionService;
+  }
+
   @Override
   public String getName() {
     return "remote";
@@ -566,7 +571,7 @@
     }
 
     try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) {
-      remoteExecutionService.uploadOutputs(action);
+      remoteExecutionService.uploadOutputs(action, result);
     } catch (IOException e) {
       if (verboseFailures) {
         report(Event.debug("Upload to remote cache failed: " + e.getMessage()));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
index 1645009..ee3043a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -337,6 +337,11 @@
     throw new UserExecException(failureDetail);
   }
 
+  @VisibleForTesting
+  ActionResult getActionResult() {
+    return result.build();
+  }
+
   /** Uploads outputs and action result (if exit code is 0) to remote cache. */
   public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
       throws IOException, InterruptedException {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index 3faa645..06d9951 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -556,12 +556,33 @@
     assertThat(result).isEqualTo(expectedResult.build());
   }
 
+  private ActionResult upload(
+      RemoteCache remoteCache,
+      ActionKey actionKey,
+      Action action,
+      Command command,
+      List<Path> outputs)
+      throws Exception {
+    UploadManifest uploadManifest =
+        UploadManifest.create(
+            remoteCache.options,
+            remoteCache.digestUtil,
+            remotePathResolver,
+            actionKey,
+            action,
+            command,
+            outputs,
+            outErr,
+            0);
+    return uploadManifest.upload(context, remoteCache);
+  }
+
   private ActionResult uploadDirectory(RemoteCache remoteCache, List<Path> outputs)
       throws Exception {
     Action action = Action.getDefaultInstance();
     ActionKey actionKey = DIGEST_UTIL.computeActionKey(action);
     Command cmd = Command.getDefaultInstance();
-    return remoteCache.upload(context, remotePathResolver, actionKey, action, cmd, outputs, outErr);
+    return upload(remoteCache, actionKey, action, cmd, outputs);
   }
 
   @Test
@@ -686,14 +707,12 @@
         });
 
     ActionResult result =
-        remoteCache.upload(
-            context,
-            remotePathResolver,
+        upload(
+            remoteCache,
             DIGEST_UTIL.asActionKey(actionDigest),
             action,
             command,
-            ImmutableList.of(fooFile, barFile),
-            outErr);
+            ImmutableList.of(fooFile, barFile));
     ActionResult.Builder expectedResult = ActionResult.newBuilder();
     expectedResult.setStdoutDigest(stdoutDigest);
     expectedResult.setStderrDigest(stderrDigest);
@@ -749,14 +768,12 @@
         });
 
     ActionResult result =
-        remoteCache.upload(
-            context,
-            remotePathResolver,
+        upload(
+            remoteCache,
             DIGEST_UTIL.asActionKey(actionDigest),
             action,
             command,
-            ImmutableList.of(fooFile, barFile),
-            outErr);
+            ImmutableList.of(fooFile, barFile));
     ActionResult.Builder expectedResult = ActionResult.newBuilder();
     expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
     expectedResult
@@ -898,14 +915,12 @@
                 }))
         .when(mockByteStreamImpl)
         .queryWriteStatus(any(), any());
-    remoteCache.upload(
-        context,
-        remotePathResolver,
+    upload(
+        remoteCache,
         actionKey,
         Action.getDefaultInstance(),
         Command.getDefaultInstance(),
-        ImmutableList.<Path>of(fooFile, barFile, bazFile),
-        outErr);
+        ImmutableList.<Path>of(fooFile, barFile, bazFile));
     // 4 times for the errors, 3 times for the successful uploads.
     Mockito.verify(mockByteStreamImpl, Mockito.times(7))
         .write(ArgumentMatchers.<StreamObserver<WriteResponse>>any());
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
index b55c75a..054a730 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
@@ -16,16 +16,9 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 
-import build.bazel.remote.execution.v2.Action;
 import build.bazel.remote.execution.v2.ActionResult;
-import build.bazel.remote.execution.v2.Command;
 import build.bazel.remote.execution.v2.Digest;
-import build.bazel.remote.execution.v2.Directory;
-import build.bazel.remote.execution.v2.DirectoryNode;
-import build.bazel.remote.execution.v2.FileNode;
 import build.bazel.remote.execution.v2.RequestMetadata;
-import build.bazel.remote.execution.v2.Tree;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -34,8 +27,6 @@
 import com.google.devtools.build.lib.actions.ArtifactRoot.RootType;
 import com.google.devtools.build.lib.clock.JavaClock;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
-import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
-import com.google.devtools.build.lib.remote.common.RemotePathResolver;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -66,7 +57,6 @@
 public class RemoteCacheTests {
 
   private RemoteActionExecutionContext context;
-  private RemotePathResolver remotePathResolver;
   private FileSystem fs;
   private Path execRoot;
   ArtifactRoot artifactRoot;
@@ -84,7 +74,6 @@
     fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
     execRoot = fs.getPath("/execroot/main");
     execRoot.createDirectoryAndParents();
-    remotePathResolver = RemotePathResolver.createDefault(execRoot);
     fakeFileCache = new FakeActionInputFileCache(execRoot);
     artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, RootType.Output, "outputs");
     artifactRoot.getRoot().asPath().createDirectoryAndParents();
@@ -167,60 +156,6 @@
   }
 
   @Test
-  public void testUploadDirectory() throws Exception {
-    // Test that uploading a directory works.
-
-    // arrange
-    Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
-    Digest quxDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
-    Digest barDigest =
-        fakeFileCache.createScratchInputDirectory(
-            ActionInputHelper.fromPath("bar"),
-            Tree.newBuilder()
-                .setRoot(
-                    Directory.newBuilder()
-                        .addFiles(
-                            FileNode.newBuilder()
-                                .setIsExecutable(true)
-                                .setName("qux")
-                                .setDigest(quxDigest)
-                                .build())
-                        .build())
-                .build());
-    Path fooFile = execRoot.getRelative("a/foo");
-    Path quxFile = execRoot.getRelative("bar/qux");
-    quxFile.setExecutable(true);
-    Path barDir = execRoot.getRelative("bar");
-    Command cmd = Command.newBuilder().addOutputFiles("bla").build();
-    Digest cmdDigest = digestUtil.compute(cmd);
-    Action action = Action.newBuilder().setCommandDigest(cmdDigest).build();
-    Digest actionDigest = digestUtil.compute(action);
-
-    // act
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    ActionResult result =
-        remoteCache.upload(
-            context,
-            remotePathResolver,
-            digestUtil.asActionKey(actionDigest),
-            action,
-            cmd,
-            ImmutableList.of(fooFile, barDir),
-            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
-
-    // assert
-    ActionResult.Builder expectedResult = ActionResult.newBuilder();
-    expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
-    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
-    assertThat(result).isEqualTo(expectedResult.build());
-
-    ImmutableList<Digest> toQuery =
-        ImmutableList.of(fooDigest, quxDigest, barDigest, cmdDigest, actionDigest);
-    assertThat(getFromFuture(remoteCache.findMissingDigests(context, toQuery))).isEmpty();
-  }
-
-  @Test
   public void upload_emptyBlobAndFile_doNotPerformUpload() throws Exception {
     // Test that uploading an empty BLOB/file does not try to perform an upload.
     InMemoryRemoteCache remoteCache = newRemoteCache();
@@ -236,127 +171,6 @@
         .containsExactly(emptyDigest);
   }
 
-  @Test
-  public void upload_emptyOutputs_doNotPerformUpload() throws Exception {
-    // Test that uploading an empty output does not try to perform an upload.
-
-    // arrange
-    Digest emptyDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "");
-    Path file = execRoot.getRelative("bar/test/wobble");
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    Action action = Action.getDefaultInstance();
-    ActionKey actionDigest = digestUtil.computeActionKey(action);
-    Command cmd = Command.getDefaultInstance();
-
-    // act
-    remoteCache.upload(
-        context,
-        remotePathResolver,
-        actionDigest,
-        action,
-        cmd,
-        ImmutableList.of(file),
-        new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
-
-    // assert
-    assertThat(getFromFuture(remoteCache.findMissingDigests(context, ImmutableSet.of(emptyDigest))))
-        .containsExactly(emptyDigest);
-  }
-
-  @Test
-  public void testUploadEmptyDirectory() throws Exception {
-    // Test that uploading an empty directory works.
-
-    // arrange
-    final Digest barDigest =
-        fakeFileCache.createScratchInputDirectory(
-            ActionInputHelper.fromPath("bar"),
-            Tree.newBuilder().setRoot(Directory.newBuilder().build()).build());
-    final Path barDir = execRoot.getRelative("bar");
-    Action action = Action.getDefaultInstance();
-    ActionKey actionDigest = digestUtil.computeActionKey(action);
-    Command cmd = Command.getDefaultInstance();
-
-    // act
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    ActionResult result =
-        remoteCache.upload(
-            context,
-            remotePathResolver,
-            actionDigest,
-            action,
-            cmd,
-            ImmutableList.of(barDir),
-            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
-
-    // assert
-    ActionResult.Builder expectedResult = ActionResult.newBuilder();
-    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
-    assertThat(result).isEqualTo(expectedResult.build());
-    assertThat(getFromFuture(remoteCache.findMissingDigests(context, ImmutableList.of(barDigest))))
-        .isEmpty();
-  }
-
-  @Test
-  public void testUploadNestedDirectory() throws Exception {
-    // Test that uploading a nested directory works.
-
-    // arrange
-    final Digest wobbleDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz");
-    final Digest quxDigest =
-        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
-    final Directory testDirMessage =
-        Directory.newBuilder()
-            .addFiles(FileNode.newBuilder().setName("wobble").setDigest(wobbleDigest).build())
-            .build();
-    final Digest testDigest = digestUtil.compute(testDirMessage);
-    final Tree barTree =
-        Tree.newBuilder()
-            .setRoot(
-                Directory.newBuilder()
-                    .addFiles(
-                        FileNode.newBuilder()
-                            .setIsExecutable(true)
-                            .setName("qux")
-                            .setDigest(quxDigest))
-                    .addDirectories(
-                        DirectoryNode.newBuilder().setName("test").setDigest(testDigest)))
-            .addChildren(testDirMessage)
-            .build();
-    final Digest barDigest =
-        fakeFileCache.createScratchInputDirectory(ActionInputHelper.fromPath("bar"), barTree);
-
-    final Path quxFile = execRoot.getRelative("bar/qux");
-    quxFile.setExecutable(true);
-    final Path barDir = execRoot.getRelative("bar");
-
-    Action action = Action.getDefaultInstance();
-    ActionKey actionDigest = digestUtil.computeActionKey(action);
-    Command cmd = Command.getDefaultInstance();
-
-    // act
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    ActionResult result =
-        remoteCache.upload(
-            context,
-            remotePathResolver,
-            actionDigest,
-            action,
-            cmd,
-            ImmutableList.of(barDir),
-            new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")));
-
-    // assert
-    ActionResult.Builder expectedResult = ActionResult.newBuilder();
-    expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
-    assertThat(result).isEqualTo(expectedResult.build());
-
-    ImmutableList<Digest> toQuery = ImmutableList.of(wobbleDigest, quxDigest, barDigest);
-    assertThat(getFromFuture(remoteCache.findMissingDigests(context, toQuery))).isEmpty();
-  }
-
   private InMemoryRemoteCache newRemoteCache() {
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     return new InMemoryRemoteCache(options, digestUtil);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java
index aa3ea86..db11adc 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java
@@ -16,6 +16,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.devtools.build.lib.actions.ExecutionRequirements.REMOTE_EXECUTION_INLINE_OUTPUTS;
 import static com.google.devtools.build.lib.remote.util.DigestUtil.toBinaryDigest;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 import static com.google.devtools.build.lib.vfs.FileSystemUtils.readContent;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertThrows;
@@ -46,6 +47,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
 import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
@@ -55,6 +57,7 @@
 import com.google.devtools.build.lib.actions.ResourceSet;
 import com.google.devtools.build.lib.actions.SimpleSpawn;
 import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.SpawnResult;
 import com.google.devtools.build.lib.actions.cache.MetadataInjector;
 import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
 import com.google.devtools.build.lib.clock.JavaClock;
@@ -1064,6 +1067,197 @@
     verify(injector, never()).injectFile(eq(a1), remoteFileMatchingDigest(d1));
   }
 
+  @Test
+  public void uploadOutputs_uploadDirectory_works() throws Exception {
+    // Test that uploading a directory works.
+
+    // arrange
+    Digest fooDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("outputs/a/foo"), "xyz");
+    Digest quxDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("outputs/bar/qux"), "abc");
+    Digest barDigest =
+        fakeFileCache.createScratchInputDirectory(
+            ActionInputHelper.fromPath("outputs/bar"),
+            Tree.newBuilder()
+                .setRoot(
+                    Directory.newBuilder()
+                        .addFiles(
+                            FileNode.newBuilder()
+                                .setIsExecutable(true)
+                                .setName("qux")
+                                .setDigest(quxDigest)
+                                .build())
+                        .build())
+                .build());
+    Path fooFile = execRoot.getRelative("outputs/a/foo");
+    Path quxFile = execRoot.getRelative("outputs/bar/qux");
+    quxFile.setExecutable(true);
+    Path barDir = execRoot.getRelative("outputs/bar");
+    Artifact outputFile = ActionsTestUtil.createArtifact(artifactRoot, fooFile);
+    Artifact outputDirectory =
+        ActionsTestUtil.createTreeArtifactWithGeneratingAction(
+            artifactRoot, barDir.relativeTo(execRoot));
+    RemoteExecutionService service = newRemoteExecutionService();
+    Spawn spawn = newSpawn(ImmutableMap.of(), ImmutableSet.of(outputFile, outputDirectory));
+    FakeSpawnExecutionContext context = newSpawnExecutionContext(spawn);
+    RemoteAction action = service.buildRemoteAction(spawn, context);
+    SpawnResult spawnResult =
+        new SpawnResult.Builder()
+            .setExitCode(0)
+            .setStatus(SpawnResult.Status.SUCCESS)
+            .setRunnerName("test")
+            .build();
+
+    // act
+    UploadManifest manifest = service.buildUploadManifest(action, spawnResult);
+    service.uploadOutputs(action, spawnResult);
+
+    // assert
+    ActionResult.Builder expectedResult = ActionResult.newBuilder();
+    expectedResult.addOutputFilesBuilder().setPath("outputs/a/foo").setDigest(fooDigest);
+    expectedResult.addOutputDirectoriesBuilder().setPath("outputs/bar").setTreeDigest(barDigest);
+    assertThat(manifest.getActionResult()).isEqualTo(expectedResult.build());
+
+    ImmutableList<Digest> toQuery = ImmutableList.of(fooDigest, quxDigest, barDigest);
+    assertThat(getFromFuture(cache.findMissingDigests(remoteActionExecutionContext, toQuery)))
+        .isEmpty();
+  }
+
+  @Test
+  public void uploadOutputs_uploadEmptyDirectory_works() throws Exception {
+    // Test that uploading an empty directory works.
+
+    // arrange
+    Digest barDigest =
+        fakeFileCache.createScratchInputDirectory(
+            ActionInputHelper.fromPath("outputs/bar"),
+            Tree.newBuilder().setRoot(Directory.getDefaultInstance()).build());
+    Path barDir = execRoot.getRelative("outputs/bar");
+    Artifact outputDirectory =
+        ActionsTestUtil.createTreeArtifactWithGeneratingAction(
+            artifactRoot, barDir.relativeTo(execRoot));
+    RemoteExecutionService service = newRemoteExecutionService();
+    Spawn spawn = newSpawn(ImmutableMap.of(), ImmutableSet.of(outputDirectory));
+    FakeSpawnExecutionContext context = newSpawnExecutionContext(spawn);
+    RemoteAction action = service.buildRemoteAction(spawn, context);
+    SpawnResult spawnResult =
+        new SpawnResult.Builder()
+            .setExitCode(0)
+            .setStatus(SpawnResult.Status.SUCCESS)
+            .setRunnerName("test")
+            .build();
+
+    // act
+    UploadManifest manifest = service.buildUploadManifest(action, spawnResult);
+    service.uploadOutputs(action, spawnResult);
+
+    // assert
+    ActionResult.Builder expectedResult = ActionResult.newBuilder();
+    expectedResult.addOutputDirectoriesBuilder().setPath("outputs/bar").setTreeDigest(barDigest);
+    assertThat(manifest.getActionResult()).isEqualTo(expectedResult.build());
+    assertThat(
+            getFromFuture(
+                cache.findMissingDigests(
+                    remoteActionExecutionContext, ImmutableList.of(barDigest))))
+        .isEmpty();
+  }
+
+  @Test
+  public void uploadOutputs_uploadNestedDirectory_works() throws Exception {
+    // Test that uploading a nested directory works.
+
+    // arrange
+    final Digest wobbleDigest =
+        fakeFileCache.createScratchInput(
+            ActionInputHelper.fromPath("outputs/bar/test/wobble"), "xyz");
+    final Digest quxDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("outputs/bar/qux"), "abc");
+    final Directory testDirMessage =
+        Directory.newBuilder()
+            .addFiles(FileNode.newBuilder().setName("wobble").setDigest(wobbleDigest).build())
+            .build();
+    final Digest testDigest = digestUtil.compute(testDirMessage);
+    final Tree barTree =
+        Tree.newBuilder()
+            .setRoot(
+                Directory.newBuilder()
+                    .addFiles(
+                        FileNode.newBuilder()
+                            .setIsExecutable(true)
+                            .setName("qux")
+                            .setDigest(quxDigest))
+                    .addDirectories(
+                        DirectoryNode.newBuilder().setName("test").setDigest(testDigest)))
+            .addChildren(testDirMessage)
+            .build();
+    final Digest barDigest =
+        fakeFileCache.createScratchInputDirectory(
+            ActionInputHelper.fromPath("outputs/bar"), barTree);
+
+    final Path quxFile = execRoot.getRelative("outputs/bar/qux");
+    quxFile.setExecutable(true);
+    final Path barDir = execRoot.getRelative("outputs/bar");
+
+    Artifact outputDirectory =
+        ActionsTestUtil.createTreeArtifactWithGeneratingAction(
+            artifactRoot, barDir.relativeTo(execRoot));
+    RemoteExecutionService service = newRemoteExecutionService();
+    Spawn spawn = newSpawn(ImmutableMap.of(), ImmutableSet.of(outputDirectory));
+    FakeSpawnExecutionContext context = newSpawnExecutionContext(spawn);
+    RemoteAction action = service.buildRemoteAction(spawn, context);
+    SpawnResult spawnResult =
+        new SpawnResult.Builder()
+            .setExitCode(0)
+            .setStatus(SpawnResult.Status.SUCCESS)
+            .setRunnerName("test")
+            .build();
+
+    // act
+    UploadManifest manifest = service.buildUploadManifest(action, spawnResult);
+    service.uploadOutputs(action, spawnResult);
+
+    // assert
+    ActionResult.Builder expectedResult = ActionResult.newBuilder();
+    expectedResult.addOutputDirectoriesBuilder().setPath("outputs/bar").setTreeDigest(barDigest);
+    assertThat(manifest.getActionResult()).isEqualTo(expectedResult.build());
+
+    ImmutableList<Digest> toQuery = ImmutableList.of(wobbleDigest, quxDigest, barDigest);
+    assertThat(getFromFuture(cache.findMissingDigests(remoteActionExecutionContext, toQuery)))
+        .isEmpty();
+  }
+
+  @Test
+  public void uploadOutputs_emptyOutputs_doNotPerformUpload() throws Exception {
+    // Test that uploading an empty output does not try to perform an upload.
+
+    // arrange
+    Digest emptyDigest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("outputs/bar/test/wobble"), "");
+    Path file = execRoot.getRelative("outputs/bar/test/wobble");
+    Artifact outputFile = ActionsTestUtil.createArtifact(artifactRoot, file);
+    RemoteExecutionService service = newRemoteExecutionService();
+    Spawn spawn = newSpawn(ImmutableMap.of(), ImmutableSet.of(outputFile));
+    FakeSpawnExecutionContext context = newSpawnExecutionContext(spawn);
+    RemoteAction action = service.buildRemoteAction(spawn, context);
+    SpawnResult spawnResult =
+        new SpawnResult.Builder()
+            .setExitCode(0)
+            .setStatus(SpawnResult.Status.SUCCESS)
+            .setRunnerName("test")
+            .build();
+
+    // act
+    service.uploadOutputs(action, spawnResult);
+
+    // assert
+    assertThat(
+            getFromFuture(
+                cache.findMissingDigests(
+                    remoteActionExecutionContext, ImmutableSet.of(emptyDigest))))
+        .containsExactly(emptyDigest);
+  }
+
   private Spawn newSpawnFromResult(RemoteActionResult result) {
     return newSpawnFromResult(ImmutableMap.of(), result);
   }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
index 7a073e0..5f2b175 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
@@ -17,6 +17,7 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -24,9 +25,7 @@
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import build.bazel.remote.execution.v2.Action;
 import build.bazel.remote.execution.v2.ActionResult;
-import build.bazel.remote.execution.v2.Command;
 import build.bazel.remote.execution.v2.Digest;
 import build.bazel.remote.execution.v2.OutputFile;
 import build.bazel.remote.execution.v2.RequestMetadata;
@@ -84,7 +83,6 @@
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.SortedMap;
 import org.junit.Before;
@@ -130,8 +128,7 @@
         public void prefetchInputs() {}
 
         @Override
-        public void lockOutputFiles() {
-        }
+        public void lockOutputFiles() {}
 
         @Override
         public boolean speculating() {
@@ -247,7 +244,6 @@
     fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().getSingleton(), "xyz");
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void cacheHit() throws Exception {
     // arrange
@@ -293,15 +289,7 @@
     verify(service)
         .downloadOutputs(
             any(), eq(RemoteActionResult.createFromCache(CachedActionResult.remote(actionResult))));
-    verify(remoteCache, never())
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            any(Collection.class),
-            any(FileOutErr.class));
+    verify(service, never()).uploadOutputs(any(), any());
     assertThat(result.setupSuccess()).isTrue();
     assertThat(result.exitCode()).isEqualTo(0);
     assertThat(result.isCacheHit()).isTrue();
@@ -314,6 +302,7 @@
   @Test
   public void cacheMiss() throws Exception {
     RemoteSpawnCache cache = createRemoteSpawnCache();
+    RemoteExecutionService service = cache.getRemoteExecutionService();
     CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy);
     assertThat(entry.hasResult()).isFalse();
     SpawnResult result =
@@ -322,37 +311,9 @@
             .setStatus(Status.SUCCESS)
             .setRunnerName("test")
             .build();
-    ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
-    doAnswer(
-            new Answer<Void>() {
-              @Override
-              public Void answer(InvocationOnMock invocation) {
-                RemoteActionExecutionContext context = invocation.getArgument(0);
-                RequestMetadata meta = context.getRequestMetadata();
-                assertThat(meta.getCorrelatedInvocationsId()).isEqualTo(BUILD_REQUEST_ID);
-                assertThat(meta.getToolInvocationId()).isEqualTo(COMMAND_ID);
-                return null;
-              }
-            })
-        .when(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    doNothing().when(service).uploadOutputs(any(), any());
     entry.store(result);
-    verify(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    verify(service).uploadOutputs(any(), any());
     assertThat(progressUpdates)
         .containsExactly(
             SpawnCheckingCacheEvent.create("remote-cache"),
@@ -493,6 +454,7 @@
   public void failedActionsAreNotUploaded() throws Exception {
     // Only successful action results are uploaded to the remote cache.
     RemoteSpawnCache cache = createRemoteSpawnCache();
+    RemoteExecutionService service = cache.getRemoteExecutionService();
     CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy);
     verify(remoteCache)
         .downloadActionResult(
@@ -510,17 +472,8 @@
                     .build())
             .setRunnerName("test")
             .build();
-    ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
     entry.store(result);
-    verify(remoteCache, never())
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    verify(service, never()).uploadOutputs(any(), any());
     assertThat(progressUpdates)
         .containsExactly(
             SpawnCheckingCacheEvent.create("remote-cache"),
@@ -530,6 +483,7 @@
   @Test
   public void printWarningIfUploadFails() throws Exception {
     RemoteSpawnCache cache = createRemoteSpawnCache();
+    RemoteExecutionService service = cache.getRemoteExecutionService();
     CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy);
     assertThat(entry.hasResult()).isFalse();
     SpawnResult result =
@@ -538,29 +492,11 @@
             .setStatus(Status.SUCCESS)
             .setRunnerName("test")
             .build();
-    ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
 
-    doThrow(new IOException("cache down"))
-        .when(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    doThrow(new IOException("cache down")).when(service).uploadOutputs(any(), any());
 
     entry.store(result);
-    verify(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    verify(service).uploadOutputs(any(), eq(result));
 
     assertThat(eventHandler.getEvents()).hasSize(1);
     Event evt = eventHandler.getEvents().get(0);
@@ -575,6 +511,7 @@
   @Test
   public void printWarningIfDownloadFails() throws Exception {
     RemoteSpawnCache cache = createRemoteSpawnCache();
+    RemoteExecutionService service = cache.getRemoteExecutionService();
     doThrow(new IOException(io.grpc.Status.UNAVAILABLE.asRuntimeException()))
         .when(remoteCache)
         .downloadActionResult(
@@ -590,38 +527,10 @@
             .setStatus(Status.SUCCESS)
             .setRunnerName("test")
             .build();
-    ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
 
-    doAnswer(
-            new Answer<Void>() {
-              @Override
-              public Void answer(InvocationOnMock invocation) {
-                RemoteActionExecutionContext context = invocation.getArgument(0);
-                RequestMetadata meta = context.getRequestMetadata();
-                assertThat(meta.getCorrelatedInvocationsId()).isEqualTo(BUILD_REQUEST_ID);
-                assertThat(meta.getToolInvocationId()).isEqualTo(COMMAND_ID);
-                return null;
-              }
-            })
-        .when(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    doNothing().when(service).uploadOutputs(any(), any());
     entry.store(result);
-    verify(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    verify(service).uploadOutputs(any(), eq(result));
 
     assertThat(eventHandler.getEvents()).hasSize(1);
     Event evt = eventHandler.getEvents().get(0);
@@ -636,6 +545,7 @@
   @Test
   public void orphanedCachedResultIgnored() throws Exception {
     RemoteSpawnCache cache = createRemoteSpawnCache();
+    RemoteExecutionService service = cache.getRemoteExecutionService();
     Digest digest = digestUtil.computeAsUtf8("bla");
     ActionResult actionResult =
         ActionResult.newBuilder()
@@ -657,7 +567,7 @@
               }
             });
     doThrow(new CacheNotFoundException(digest))
-        .when(cache.getRemoteExecutionService())
+        .when(service)
         .downloadOutputs(
             any(), eq(RemoteActionResult.createFromCache(CachedActionResult.remote(actionResult))));
 
@@ -669,38 +579,10 @@
             .setStatus(Status.SUCCESS)
             .setRunnerName("test")
             .build();
-    ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
 
-    doAnswer(
-            new Answer<Void>() {
-              @Override
-              public Void answer(InvocationOnMock invocation) {
-                RemoteActionExecutionContext context = invocation.getArgument(0);
-                RequestMetadata meta = context.getRequestMetadata();
-                assertThat(meta.getCorrelatedInvocationsId()).isEqualTo(BUILD_REQUEST_ID);
-                assertThat(meta.getToolInvocationId()).isEqualTo(COMMAND_ID);
-                return null;
-              }
-            })
-        .when(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    doNothing().when(service).uploadOutputs(any(), any());
     entry.store(result);
-    verify(remoteCache)
-        .upload(
-            any(RemoteActionExecutionContext.class),
-            any(RemotePathResolver.class),
-            any(ActionKey.class),
-            any(Action.class),
-            any(Command.class),
-            eq(outputFiles),
-            eq(outErr));
+    verify(service).uploadOutputs(any(), eq(result));
     assertThat(progressUpdates)
         .containsExactly(
             SpawnCheckingCacheEvent.create("remote-cache"),
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index a234a711..eec46d9 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -18,6 +18,7 @@
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
@@ -148,8 +149,6 @@
 
   @Mock private SpawnRunner localRunner;
 
-  private RemoteExecutionService service;
-
   // The action key of the Spawn returned by newSimpleSpawn().
   private final String simpleActionId =
       "eb45b20cc979d504f96b9efc9a08c48103c6f017afa09c0df5c70a5f92a98ea8";
@@ -194,6 +193,7 @@
     remoteOptions.remoteExecutionPriority = 2;
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
@@ -222,7 +222,7 @@
     // TODO(olaola): verify that the uploaded action has the doNotCache set.
 
     verify(service, never()).lookupCache(any());
-    verify(cache, never()).upload(any(), any(), any(), any(), any(), any(), any());
+    verify(service, never()).uploadOutputs(any(), any());
     verifyNoMoreInteractions(localRunner);
   }
 
@@ -273,6 +273,8 @@
     remoteOptions.remoteUploadLocalResults = true;
 
     RemoteSpawnRunner runner = spy(newSpawnRunner());
+    RemoteExecutionService service = runner.getRemoteExecutionService();
+    doNothing().when(service).uploadOutputs(any(), any());
 
     // Throw an IOException to trigger the local fallback.
     when(executor.executeRemotely(
@@ -298,7 +300,7 @@
     verify(localRunner).exec(eq(spawn), eq(policy));
     verify(runner)
         .execLocallyAndUpload(any(), eq(spawn), eq(policy), /* uploadLocalResults= */ eq(true));
-    verify(cache).upload(any(), any(), any(), any(), any(), any(), any());
+    verify(service).uploadOutputs(any(), eq(res));
   }
 
   @Test
@@ -309,6 +311,7 @@
     remoteOptions.remoteUploadLocalResults = true;
 
     RemoteSpawnRunner runner = spy(newSpawnRunner());
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     // Throw an IOException to trigger the local fallback.
     when(executor.executeRemotely(
@@ -330,7 +333,7 @@
     verify(localRunner).exec(eq(spawn), eq(policy));
     verify(runner)
         .execLocallyAndUpload(any(), eq(spawn), eq(policy), /* uploadLocalResults= */ eq(true));
-    verify(cache, never()).upload(any(), any(), any(), any(), any(), any(), any());
+    verify(service, never()).uploadOutputs(any(), any());
   }
 
   @Test
@@ -350,12 +353,14 @@
         .thenReturn(failedAction);
 
     RemoteSpawnRunner runner = spy(newSpawnRunner());
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     // Throw an IOException to trigger the local fallback.
     when(executor.executeRemotely(
             any(RemoteActionExecutionContext.class),
             any(ExecuteRequest.class),
             any(OperationObserver.class)))
         .thenThrow(IOException.class);
+    doNothing().when(service).uploadOutputs(any(), any());
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -368,12 +373,12 @@
             .build();
     when(localRunner.exec(eq(spawn), eq(policy))).thenReturn(succeeded);
 
-    runner.exec(spawn, policy);
+    SpawnResult result = runner.exec(spawn, policy);
 
     verify(localRunner).exec(eq(spawn), eq(policy));
     verify(runner)
         .execLocallyAndUpload(any(), eq(spawn), eq(policy), /* uploadLocalResults= */ eq(true));
-    verify(service).uploadOutputs(any());
+    verify(service).uploadOutputs(any(), eq(result));
     verify(service, never()).downloadOutputs(any(), any());
   }
 
@@ -391,6 +396,7 @@
         .thenReturn(failedAction);
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
@@ -421,6 +427,8 @@
     reporter.addHandler(eventHandler);
 
     RemoteSpawnRunner runner = newSpawnRunner(reporter);
+    RemoteExecutionService service = runner.getRemoteExecutionService();
+
     // Trigger local fallback
     when(executor.executeRemotely(
             any(RemoteActionExecutionContext.class),
@@ -431,15 +439,8 @@
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
 
-    when(cache.downloadActionResult(
-            any(RemoteActionExecutionContext.class),
-            any(ActionKey.class),
-            /* inlineOutErr= */ eq(false)))
-        .thenThrow(new IOException("cache down"));
-
-    doThrow(new IOException("cache down"))
-        .when(cache)
-        .upload(any(), any(), any(), any(), any(), any(), any());
+    doThrow(new IOException("cache down")).when(service).lookupCache(any());
+    doThrow(new IOException("cache down")).when(service).uploadOutputs(any(), any());
 
     SpawnResult res =
         new SpawnResult.Builder()
@@ -559,6 +560,7 @@
   @Test
   public void testHumanReadableServerLogsSavedForFailingAction() throws Exception {
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
     ExecuteResponse resp =
@@ -596,6 +598,7 @@
   public void testHumanReadableServerLogsSavedForFailingActionWithSiblingRepositoryLayout()
       throws Exception {
     RemoteSpawnRunner runner = newSpawnRunner(new SiblingRepositoryLayoutResolver(execRoot));
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     Path logPath =
         logDir
@@ -635,6 +638,7 @@
   @Test
   public void testHumanReadableServerLogsSavedForFailingActionWithStatus() throws Exception {
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
     com.google.rpc.Status timeoutStatus =
@@ -674,6 +678,7 @@
   public void testNonHumanReadableServerLogsNotSaved() throws Exception {
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     ActionResult result = ActionResult.newBuilder().setExitCode(31).build();
@@ -710,6 +715,7 @@
   @Test
   public void testServerLogsNotSavedForSuccessfulAction() throws Exception {
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     ActionResult result = ActionResult.newBuilder().setExitCode(0).build();
@@ -748,6 +754,7 @@
 
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     CachedActionResult cachedResult =
         CachedActionResult.remote(ActionResult.newBuilder().setExitCode(0).build());
@@ -797,6 +804,7 @@
 
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     when(cache.downloadActionResult(
             any(RemoteActionExecutionContext.class),
@@ -854,6 +862,7 @@
     remoteOptions.remoteLocalFallback = false;
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
     when(cache.downloadActionResult(
@@ -898,6 +907,7 @@
     remoteOptions.remoteLocalFallback = true;
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
     when(cache.downloadActionResult(
@@ -940,6 +950,7 @@
     remoteOptions.remoteLocalFallback = true;
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     when(cache.downloadActionResult(
             any(RemoteActionExecutionContext.class),
@@ -1137,9 +1148,8 @@
         RemoteActionResult.createFromCache(CachedActionResult.remote(succeededAction));
 
     RemoteSpawnRunner runner = newSpawnRunner();
-    doReturn(RemoteActionResult.createFromCache(CachedActionResult.remote(succeededAction)))
-        .when(service)
-        .lookupCache(any());
+    RemoteExecutionService service = runner.getRemoteExecutionService();
+    doReturn(actionResult).when(service).lookupCache(any());
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -1167,6 +1177,7 @@
         .thenReturn(succeeded);
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     Spawn spawn = newSimpleSpawn();
     FakeSpawnExecutionContext policy = getSpawnContext(spawn);
@@ -1194,6 +1205,7 @@
     IOException downloadFailure = new IOException("downloadMinimal failed");
 
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
 
     doReturn(RemoteActionResult.createFromCache(CachedActionResult.remote(succeededAction)))
         .when(service)
@@ -1226,6 +1238,7 @@
             CachedActionResult.remote(ActionResult.newBuilder().setExitCode(0).build()));
 
     RemoteSpawnRunner runner = newSpawnRunner(ImmutableSet.of(topLevelOutput));
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     doReturn(cachedActionResult).when(service).lookupCache(any());
 
     Spawn spawn = newSimpleSpawn(topLevelOutput);
@@ -1289,6 +1302,7 @@
   public void shouldReportCheckingCacheBeforeScheduling() throws Exception {
     // Arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
@@ -1331,6 +1345,7 @@
   public void shouldReportExecutingStatusWithoutMetadata() throws Exception {
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
@@ -1373,6 +1388,7 @@
   public void shouldReportExecutingStatusAfterGotExecutingStageFromMetadata() throws Exception {
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
@@ -1432,6 +1448,7 @@
   public void shouldIgnoreInvalidMetadata() throws Exception {
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
@@ -1479,6 +1496,7 @@
   public void shouldReportExecutingStatusIfNoExecutingStatusFromMetadata() throws Exception {
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
@@ -1527,6 +1545,7 @@
   public void shouldReportExecutingStatusEvenNoOperationFromServer() throws Exception {
     // arrange
     RemoteSpawnRunner runner = newSpawnRunner();
+    RemoteExecutionService service = runner.getRemoteExecutionService();
     ExecuteResponse succeeded =
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
@@ -1616,7 +1635,7 @@
       @Nullable Reporter reporter,
       ImmutableSet<ActionInput> topLevelOutputs,
       RemotePathResolver remotePathResolver) {
-    service =
+    RemoteExecutionService service =
         spy(
             new RemoteExecutionService(
                 execRoot,