Remote: Async upload (Part 6)

Add RxUtils#mergeBulkTransfer which is similar to waitForBulkTransfer but is used with Rx.

Add UploadManifest#uploadAsync and update UploadManifest#upload to use it.

Part of https://github.com/bazelbuild/bazel/pull/13655.

PiperOrigin-RevId: 395018504
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
index ee3043a..28b622a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -13,8 +13,12 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote;
 
-import static com.google.devtools.build.lib.remote.RemoteCache.waitForBulkTransfer;
-import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+import static com.google.common.base.Throwables.throwIfInstanceOf;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
+import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
+import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
+import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
 
 import build.bazel.remote.execution.v2.Action;
 import build.bazel.remote.execution.v2.ActionResult;
@@ -24,9 +28,6 @@
 import build.bazel.remote.execution.v2.Tree;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.UserExecException;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
@@ -45,6 +46,9 @@
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.Symlinks;
 import com.google.protobuf.ByteString;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Single;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -54,9 +58,7 @@
 import java.util.Map;
 import javax.annotation.Nullable;
 
-/**
- * UploadManifest adds output metadata to a {@link ActionResult}.
- */
+/** UploadManifest adds output metadata to a {@link ActionResult}. */
 public class UploadManifest {
 
   private final DigestUtil digestUtil;
@@ -140,7 +142,7 @@
    * non-directory descendant files.
    */
   @VisibleForTesting
-  public void addFiles(Collection<Path> files) throws ExecException, IOException {
+  void addFiles(Collection<Path> files) throws ExecException, IOException {
     for (Path file : files) {
       // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
       // rely on the local spawn runner to stat the files, instead of statting here.
@@ -199,9 +201,7 @@
     digestToBlobs.put(action.getCommandDigest(), command.toByteString());
   }
 
-  /**
-   * Map of digests to file paths to upload.
-   */
+  /** Map of digests to file paths to upload. */
   public Map<Digest, Path> getDigestToFile() {
     return digestToFile;
   }
@@ -292,15 +292,11 @@
         FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW);
         if (statFollow == null) {
           throw new IOException(
-              String.format(
-                  "Action output %s is a dangling symbolic link to %s ", child, target));
+              String.format("Action output %s is a dangling symbolic link to %s ", child, target));
         }
         if (statFollow.isFile() && !statFollow.isSpecialFile()) {
           Digest digest = digestUtil.compute(child);
-          b.addFilesBuilder()
-              .setName(name)
-              .setDigest(digest)
-              .setIsExecutable(child.isExecutable());
+          b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable());
           digestToFile.put(digest, child);
         } else if (statFollow.isDirectory()) {
           Directory dir = computeDirectory(child, tree);
@@ -330,10 +326,11 @@
                 + "Change the file type or use --remote_allow_symlink_upload.",
             remotePathResolver.localPathToOutputPath(what), kind);
 
-    FailureDetail failureDetail = FailureDetail.newBuilder()
-        .setMessage(message)
-        .setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT))
-        .build();
+    FailureDetail failureDetail =
+        FailureDetail.newBuilder()
+            .setMessage(message)
+            .setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT))
+            .build();
     throw new UserExecException(failureDetail);
   }
 
@@ -345,36 +342,56 @@
   /** Uploads outputs and action result (if exit code is 0) to remote cache. */
   public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
       throws IOException, InterruptedException {
-    Map<Digest, Path> digestToFile = getDigestToFile();
-    Map<Digest, ByteString> digestToBlobs = getDigestToBlobs();
+    try {
+      return uploadAsync(context, remoteCache).blockingGet();
+    } catch (RuntimeException e) {
+      throwIfInstanceOf(e.getCause(), InterruptedException.class);
+      throwIfInstanceOf(e.getCause(), IOException.class);
+      throw e;
+    }
+  }
+
+  private Completable upload(
+      RemoteActionExecutionContext context, RemoteCache remoteCache, Digest digest) {
+    Path file = digestToFile.get(digest);
+    if (file != null) {
+      return toCompletable(() -> remoteCache.uploadFile(context, digest, file), directExecutor());
+    }
+
+    ByteString blob = digestToBlobs.get(digest);
+    if (blob == null) {
+      String message = "FindMissingBlobs call returned an unknown digest: " + digest;
+      return Completable.error(new IOException(message));
+    }
+
+    return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
+  }
+
+  /**
+   * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
+   * code is 0) to remote cache.
+   */
+  public Single<ActionResult> uploadAsync(
+      RemoteActionExecutionContext context, RemoteCache remoteCache) {
     Collection<Digest> digests = new ArrayList<>();
     digests.addAll(digestToFile.keySet());
     digests.addAll(digestToBlobs.keySet());
 
-    ImmutableSet<Digest> digestsToUpload =
-        getFromFuture(remoteCache.findMissingDigests(context, digests));
-    ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
-    for (Digest digest : digestsToUpload) {
-      Path file = digestToFile.get(digest);
-      if (file != null) {
-        uploads.add(remoteCache.uploadFile(context, digest, file));
-      } else {
-        ByteString blob = digestToBlobs.get(digest);
-        if (blob == null) {
-          String message = "FindMissingBlobs call returned an unknown digest: " + digest;
-          throw new IOException(message);
-        }
-        uploads.add(remoteCache.uploadBlob(context, digest, blob));
-      }
-    }
-
-    waitForBulkTransfer(uploads.build(), /* cancelRemainingOnInterrupt=*/ false);
+    Completable uploadOutputs =
+        mergeBulkTransfer(
+            toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
+                .flatMapPublisher(Flowable::fromIterable)
+                .flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
 
     ActionResult actionResult = result.build();
+    Completable uploadActionResult = Completable.complete();
     if (actionResult.getExitCode() == 0 && actionKey != null) {
-      getFromFuture(remoteCache.uploadActionResult(context, actionKey, actionResult));
+      uploadActionResult =
+          toCompletable(
+              () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
+              directExecutor());
     }
 
-    return actionResult;
+    return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);
   }
 }