Guard parseActionResultMetadata with bulk wrapper

ActionResult Orphaned Output Directories must be wrapped in
BulkTransferExceptions in order to be eligible for re-execution. This
change prevents an unavoidable build failure introduced with #10029 in
the event that an output directory tree specified in an action result
is missing from the CAS, and adds testing that would detect such a
regression.

Closes #11140.

PiperOrigin-RevId: 307027914
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index 4f567c1..6c6000c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -38,7 +38,6 @@
 import com.google.common.collect.Iterables;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.HashingOutputStream;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -246,10 +245,6 @@
         () -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
   }
 
-  private static String digestToString(Digest digest) {
-    return digest.getHash() + "/" + digest.getSizeBytes();
-  }
-
   @Override
   public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
       throws IOException, InterruptedException {
@@ -273,11 +268,6 @@
     if (digest.getSizeBytes() == 0) {
       return Futures.immediateFuture(null);
     }
-    String resourceName = "";
-    if (!options.remoteInstanceName.isEmpty()) {
-      resourceName += options.remoteInstanceName + "/";
-    }
-    resourceName += "blobs/" + digestToString(digest);
 
     @Nullable Supplier<HashCode> hashSupplier = null;
     if (options.remoteVerifyDownloads) {
@@ -286,29 +276,10 @@
       out = hashOut;
     }
 
-    SettableFuture<Void> outerF = SettableFuture.create();
-    Futures.addCallback(
-        downloadBlob(resourceName, digest, out, hashSupplier),
-        new FutureCallback<Void>() {
-          @Override
-          public void onSuccess(Void result) {
-            outerF.set(null);
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            if (t instanceof StatusRuntimeException) {
-              t = new IOException(t);
-            }
-            outerF.setException(t);
-          }
-        },
-        Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
-    return outerF;
+    return downloadBlob(digest, out, hashSupplier);
   }
 
   private ListenableFuture<Void> downloadBlob(
-      String resourceName,
       Digest digest,
       OutputStream out,
       @Nullable Supplier<HashCode> hashSupplier) {
@@ -318,23 +289,28 @@
     return Futures.catchingAsync(
         retrier.executeAsync(
             () ->
-                ctx.call(
-                    () ->
-                        requestRead(
-                            resourceName, offset, progressiveBackoff, digest, out, hashSupplier)),
+                ctx.call(() -> requestRead(offset, progressiveBackoff, digest, out, hashSupplier)),
             progressiveBackoff),
         StatusRuntimeException.class,
         (e) -> Futures.immediateFailedFuture(new IOException(e)),
         MoreExecutors.directExecutor());
   }
 
+  public static String getResourceName(String instanceName, Digest digest) {
+    String resourceName = "";
+    if (!instanceName.isEmpty()) {
+      resourceName += instanceName + "/";
+    }
+    return resourceName + "blobs/" + DigestUtil.toString(digest);
+  }
+
   private ListenableFuture<Void> requestRead(
-      String resourceName,
       AtomicLong offset,
       ProgressiveBackoff progressiveBackoff,
       Digest digest,
       OutputStream out,
       @Nullable Supplier<HashCode> hashSupplier) {
+    String resourceName = getResourceName(options.remoteInstanceName, digest);
     SettableFuture<Void> future = SettableFuture.create();
     bsAsyncStub()
         .read(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index caba306..e104671 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -213,7 +213,7 @@
     for (ListenableFuture<T> transfer : transfers) {
       try {
         if (interruptedException == null) {
-          // Wait for all downloads to finish.
+          // Wait for all transfers to finish.
           getFromFuture(transfer);
         } else {
           transfer.cancel(true);
@@ -680,6 +680,8 @@
               directExecutor()));
     }
 
+    waitForBulkTransfer(dirMetadataDownloads.values(), /* cancelRemainingOnInterrupt=*/ true);
+
     ImmutableMap.Builder<Path, DirectoryMetadata> directories = ImmutableMap.builder();
     for (Map.Entry<Path, ListenableFuture<Tree>> metadataDownload :
         dirMetadataDownloads.entrySet()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index abf3170..c6b6357 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -55,16 +55,17 @@
     try {
       return f.get();
     } catch (ExecutionException e) {
-      if (e.getCause() instanceof InterruptedException) {
-        throw (InterruptedException) e.getCause();
+      Throwable cause = e.getCause();
+      if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
       }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
       }
-      if (e.getCause() instanceof RuntimeException) {
-        throw (RuntimeException) e.getCause();
+      if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
       }
-      throw new IOException(e.getCause());
+      throw new IOException(cause);
     }
   }