Guard parseActionResultMetadata with bulk wrapper

ActionResult Orphaned Output Directories must be wrapped in
BulkTransferExceptions in order to be eligible for re-execution. This
change prevents an unavoidable build failure introduced with #10029 in
the event that an output directory tree specified in an action result
is missing from the CAS, and adds testing that would detect such a
regression.

Closes #11140.

PiperOrigin-RevId: 307027914
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index 4f567c1..6c6000c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -38,7 +38,6 @@
 import com.google.common.collect.Iterables;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.HashingOutputStream;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -246,10 +245,6 @@
         () -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
   }
 
-  private static String digestToString(Digest digest) {
-    return digest.getHash() + "/" + digest.getSizeBytes();
-  }
-
   @Override
   public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
       throws IOException, InterruptedException {
@@ -273,11 +268,6 @@
     if (digest.getSizeBytes() == 0) {
       return Futures.immediateFuture(null);
     }
-    String resourceName = "";
-    if (!options.remoteInstanceName.isEmpty()) {
-      resourceName += options.remoteInstanceName + "/";
-    }
-    resourceName += "blobs/" + digestToString(digest);
 
     @Nullable Supplier<HashCode> hashSupplier = null;
     if (options.remoteVerifyDownloads) {
@@ -286,29 +276,10 @@
       out = hashOut;
     }
 
-    SettableFuture<Void> outerF = SettableFuture.create();
-    Futures.addCallback(
-        downloadBlob(resourceName, digest, out, hashSupplier),
-        new FutureCallback<Void>() {
-          @Override
-          public void onSuccess(Void result) {
-            outerF.set(null);
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            if (t instanceof StatusRuntimeException) {
-              t = new IOException(t);
-            }
-            outerF.setException(t);
-          }
-        },
-        Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
-    return outerF;
+    return downloadBlob(digest, out, hashSupplier);
   }
 
   private ListenableFuture<Void> downloadBlob(
-      String resourceName,
       Digest digest,
       OutputStream out,
       @Nullable Supplier<HashCode> hashSupplier) {
@@ -318,23 +289,28 @@
     return Futures.catchingAsync(
         retrier.executeAsync(
             () ->
-                ctx.call(
-                    () ->
-                        requestRead(
-                            resourceName, offset, progressiveBackoff, digest, out, hashSupplier)),
+                ctx.call(() -> requestRead(offset, progressiveBackoff, digest, out, hashSupplier)),
             progressiveBackoff),
         StatusRuntimeException.class,
         (e) -> Futures.immediateFailedFuture(new IOException(e)),
         MoreExecutors.directExecutor());
   }
 
+  public static String getResourceName(String instanceName, Digest digest) {
+    String resourceName = "";
+    if (!instanceName.isEmpty()) {
+      resourceName += instanceName + "/";
+    }
+    return resourceName + "blobs/" + DigestUtil.toString(digest);
+  }
+
   private ListenableFuture<Void> requestRead(
-      String resourceName,
       AtomicLong offset,
       ProgressiveBackoff progressiveBackoff,
       Digest digest,
       OutputStream out,
       @Nullable Supplier<HashCode> hashSupplier) {
+    String resourceName = getResourceName(options.remoteInstanceName, digest);
     SettableFuture<Void> future = SettableFuture.create();
     bsAsyncStub()
         .read(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index caba306..e104671 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -213,7 +213,7 @@
     for (ListenableFuture<T> transfer : transfers) {
       try {
         if (interruptedException == null) {
-          // Wait for all downloads to finish.
+          // Wait for all transfers to finish.
           getFromFuture(transfer);
         } else {
           transfer.cancel(true);
@@ -680,6 +680,8 @@
               directExecutor()));
     }
 
+    waitForBulkTransfer(dirMetadataDownloads.values(), /* cancelRemainingOnInterrupt=*/ true);
+
     ImmutableMap.Builder<Path, DirectoryMetadata> directories = ImmutableMap.builder();
     for (Map.Entry<Path, ListenableFuture<Tree>> metadataDownload :
         dirMetadataDownloads.entrySet()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index abf3170..c6b6357 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -55,16 +55,17 @@
     try {
       return f.get();
     } catch (ExecutionException e) {
-      if (e.getCause() instanceof InterruptedException) {
-        throw (InterruptedException) e.getCause();
+      Throwable cause = e.getCause();
+      if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
       }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
       }
-      if (e.getCause() instanceof RuntimeException) {
-        throw (RuntimeException) e.getCause();
+      if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
       }
-      throw new IOException(e.getCause());
+      throw new IOException(cause);
     }
   }
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index 737e62b..eec57b5 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -14,6 +14,7 @@
 package com.google.devtools.build.lib.remote;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.GrpcCacheClient.getResourceName;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.fail;
 import static org.mockito.AdditionalAnswers.answerVoid;
@@ -27,14 +28,18 @@
 import build.bazel.remote.execution.v2.Command;
 import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
 import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
 import build.bazel.remote.execution.v2.ExecuteRequest;
 import build.bazel.remote.execution.v2.ExecuteResponse;
 import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
+import build.bazel.remote.execution.v2.FileNode;
 import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
 import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
 import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.OutputDirectory;
 import build.bazel.remote.execution.v2.OutputFile;
 import build.bazel.remote.execution.v2.RequestMetadata;
+import build.bazel.remote.execution.v2.Tree;
 import build.bazel.remote.execution.v2.WaitExecutionRequest;
 import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
 import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
@@ -127,6 +132,7 @@
   private Command command;
   private RemoteSpawnRunner client;
   private FileOutErr outErr;
+  private RemoteOptions remoteOptions;
   private Server fakeServer;
   private ListeningScheduledExecutorService retryService;
 
@@ -140,6 +146,25 @@
                   .build())
           .build();
 
+  private static final Tree DUMMY_OUTPUT_TREE =
+      Tree.newBuilder()
+          .setRoot(
+              Directory.newBuilder()
+                  .addFiles(
+                      FileNode.newBuilder()
+                          .setName(DUMMY_OUTPUT.getPath())
+                          .setDigest(DUMMY_OUTPUT.getDigest())
+                          .setIsExecutable(true)
+                          .build())
+                  .build())
+          .build();
+
+  private static final OutputDirectory DUMMY_OUTPUT_DIRECTORY =
+      OutputDirectory.newBuilder()
+          .setPath("dummy")
+          .setTreeDigest(DIGEST_UTIL.compute(DUMMY_OUTPUT_TREE))
+          .build();
+
   @Before
   public final void setUp() throws Exception {
     String fakeServerName = "fake server for " + getClass();
@@ -205,7 +230,7 @@
     FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
     FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
     outErr = new FileOutErr(stdout, stderr);
-    RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+    remoteOptions = Options.getDefaults(RemoteOptions.class);
 
     remoteOptions.remoteHeaders =
         ImmutableList.of(
@@ -1011,12 +1036,12 @@
             responseObserver.onCompleted();
           }
         });
+    String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest);
     serviceRegistry.addService(
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
-            assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
-                .isTrue();
+            assertThat(request.getResourceName()).isEqualTo(stdOutResourceName);
             responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
           }
         });
@@ -1072,12 +1097,12 @@
             responseObserver.onCompleted();
           }
         });
+    String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest);
     serviceRegistry.addService(
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
-            assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
-                .isTrue();
+            assertThat(request.getResourceName()).isEqualTo(stdOutResourceName);
             responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
           }
         });
@@ -1185,6 +1210,100 @@
   }
 
   @Test
+  public void remotelyReExecuteOrphanedDirectoryCachedActions() throws Exception {
+    final ActionResult actionResult =
+        ActionResult.newBuilder().addOutputDirectories(DUMMY_OUTPUT_DIRECTORY).build();
+    serviceRegistry.addService(
+        new ActionCacheImplBase() {
+          @Override
+          public void getActionResult(
+              GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+            responseObserver.onNext(actionResult);
+            responseObserver.onCompleted();
+          }
+        });
+    String dummyTreeResourceName =
+        getResourceName(remoteOptions.remoteInstanceName, DUMMY_OUTPUT_DIRECTORY.getTreeDigest());
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          private boolean first = true;
+
+          @Override
+          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+            String resourceName = request.getResourceName();
+            if (resourceName.equals(dummyTreeResourceName)) {
+              // First read is a cache miss, next read succeeds.
+              if (first) {
+                first = false;
+                responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+              } else {
+                responseObserver.onNext(
+                    ReadResponse.newBuilder().setData(DUMMY_OUTPUT_TREE.toByteString()).build());
+                responseObserver.onCompleted();
+              }
+            } else {
+              responseObserver.onNext(ReadResponse.getDefaultInstance());
+            }
+          }
+
+          @Override
+          public StreamObserver<WriteRequest> write(
+              StreamObserver<WriteResponse> responseObserver) {
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest request) {}
+
+              @Override
+              public void onCompleted() {
+                responseObserver.onCompleted();
+              }
+
+              @Override
+              public void onError(Throwable t) {
+                fail("An error occurred: " + t);
+              }
+            };
+          }
+        });
+    AtomicInteger numExecuteCalls = new AtomicInteger();
+    serviceRegistry.addService(
+        new ExecutionImplBase() {
+          @Override
+          public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+            numExecuteCalls.incrementAndGet();
+            assertThat(request.getSkipCacheLookup()).isTrue(); // Action will be re-executed.
+            responseObserver.onNext(
+                Operation.newBuilder()
+                    .setDone(true)
+                    .setResponse(
+                        Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
+                    .build());
+            responseObserver.onCompleted();
+          }
+        });
+    serviceRegistry.addService(
+        new ContentAddressableStorageImplBase() {
+          @Override
+          public void findMissingBlobs(
+              FindMissingBlobsRequest request,
+              StreamObserver<FindMissingBlobsResponse> responseObserver) {
+            // Nothing is missing.
+            responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
+            responseObserver.onCompleted();
+          }
+        });
+
+    FakeSpawnExecutionContext policy =
+        new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr);
+
+    SpawnResult result = client.exec(simpleSpawn, policy);
+    assertThat(result.setupSuccess()).isTrue();
+    assertThat(result.exitCode()).isEqualTo(0);
+    assertThat(result.isCacheHit()).isFalse();
+    assertThat(numExecuteCalls.get()).isEqualTo(1);
+  }
+
+  @Test
   public void retryUploadAndExecuteOnMissingInputs() throws Exception {
     serviceRegistry.addService(
         new ActionCacheImplBase() {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
index 3db0bf5..af617f4 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
@@ -1045,9 +1045,9 @@
     MetadataInjector injector = mock(MetadataInjector.class);
 
     // act
-    IOException e =
+    BulkTransferException e =
         assertThrows(
-            IOException.class,
+            BulkTransferException.class,
             () ->
                 remoteCache.downloadMinimal(
                     r,
@@ -1057,7 +1057,8 @@
                     execRoot,
                     injector,
                     outputFilesLocker));
-    assertThat(e).isEqualTo(downloadTreeException);
+    assertThat(e.getSuppressed()).hasLength(1);
+    assertThat(e.getSuppressed()[0]).isEqualTo(downloadTreeException);
 
     verify(outputFilesLocker, never()).lock();
   }