Remote: Async upload (Part 6) Add RxUtils#mergeBulkTransfer which is similar to waitForBulkTransfer but is used with Rx. Add UploadManifest#uploadAsync and update UploadManifest#upload to use it. Part of https://github.com/bazelbuild/bazel/pull/13655. PiperOrigin-RevId: 395018504
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java index ee3043a..28b622a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java +++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -13,8 +13,12 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.devtools.build.lib.remote.RemoteCache.waitForBulkTransfer; -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; +import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle; +import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; +import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult; import build.bazel.remote.execution.v2.Action; import build.bazel.remote.execution.v2.ActionResult; @@ -24,9 +28,6 @@ import build.bazel.remote.execution.v2.Tree; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; @@ -45,6 +46,9 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.Symlinks; import com.google.protobuf.ByteString; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -54,9 +58,7 @@ import java.util.Map; import javax.annotation.Nullable; -/** - * UploadManifest adds output metadata to a {@link ActionResult}. - */ +/** UploadManifest adds output metadata to a {@link ActionResult}. */ public class UploadManifest { private final DigestUtil digestUtil; @@ -140,7 +142,7 @@ * non-directory descendant files. */ @VisibleForTesting - public void addFiles(Collection<Path> files) throws ExecException, IOException { + void addFiles(Collection<Path> files) throws ExecException, IOException { for (Path file : files) { // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and // rely on the local spawn runner to stat the files, instead of statting here. @@ -199,9 +201,7 @@ digestToBlobs.put(action.getCommandDigest(), command.toByteString()); } - /** - * Map of digests to file paths to upload. - */ + /** Map of digests to file paths to upload. */ public Map<Digest, Path> getDigestToFile() { return digestToFile; } @@ -292,15 +292,11 @@ FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW); if (statFollow == null) { throw new IOException( - String.format( - "Action output %s is a dangling symbolic link to %s ", child, target)); + String.format("Action output %s is a dangling symbolic link to %s ", child, target)); } if (statFollow.isFile() && !statFollow.isSpecialFile()) { Digest digest = digestUtil.compute(child); - b.addFilesBuilder() - .setName(name) - .setDigest(digest) - .setIsExecutable(child.isExecutable()); + b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable()); digestToFile.put(digest, child); } else if (statFollow.isDirectory()) { Directory dir = computeDirectory(child, tree); @@ -330,10 +326,11 @@ + "Change the file type or use --remote_allow_symlink_upload.", remotePathResolver.localPathToOutputPath(what), kind); - FailureDetail failureDetail = FailureDetail.newBuilder() - .setMessage(message) - .setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT)) - .build(); + FailureDetail failureDetail = + FailureDetail.newBuilder() + .setMessage(message) + .setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT)) + .build(); throw new UserExecException(failureDetail); } @@ -345,36 +342,56 @@ /** Uploads outputs and action result (if exit code is 0) to remote cache. */ public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache) throws IOException, InterruptedException { - Map<Digest, Path> digestToFile = getDigestToFile(); - Map<Digest, ByteString> digestToBlobs = getDigestToBlobs(); + try { + return uploadAsync(context, remoteCache).blockingGet(); + } catch (RuntimeException e) { + throwIfInstanceOf(e.getCause(), InterruptedException.class); + throwIfInstanceOf(e.getCause(), IOException.class); + throw e; + } + } + + private Completable upload( + RemoteActionExecutionContext context, RemoteCache remoteCache, Digest digest) { + Path file = digestToFile.get(digest); + if (file != null) { + return toCompletable(() -> remoteCache.uploadFile(context, digest, file), directExecutor()); + } + + ByteString blob = digestToBlobs.get(digest); + if (blob == null) { + String message = "FindMissingBlobs call returned an unknown digest: " + digest; + return Completable.error(new IOException(message)); + } + + return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor()); + } + + /** + * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit + * code is 0) to remote cache. + */ + public Single<ActionResult> uploadAsync( + RemoteActionExecutionContext context, RemoteCache remoteCache) { Collection<Digest> digests = new ArrayList<>(); digests.addAll(digestToFile.keySet()); digests.addAll(digestToBlobs.keySet()); - ImmutableSet<Digest> digestsToUpload = - getFromFuture(remoteCache.findMissingDigests(context, digests)); - ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder(); - for (Digest digest : digestsToUpload) { - Path file = digestToFile.get(digest); - if (file != null) { - uploads.add(remoteCache.uploadFile(context, digest, file)); - } else { - ByteString blob = digestToBlobs.get(digest); - if (blob == null) { - String message = "FindMissingBlobs call returned an unknown digest: " + digest; - throw new IOException(message); - } - uploads.add(remoteCache.uploadBlob(context, digest, blob)); - } - } - - waitForBulkTransfer(uploads.build(), /* cancelRemainingOnInterrupt=*/ false); + Completable uploadOutputs = + mergeBulkTransfer( + toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor()) + .flatMapPublisher(Flowable::fromIterable) + .flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest)))); ActionResult actionResult = result.build(); + Completable uploadActionResult = Completable.complete(); if (actionResult.getExitCode() == 0 && actionKey != null) { - getFromFuture(remoteCache.uploadActionResult(context, actionKey, actionResult)); + uploadActionResult = + toCompletable( + () -> remoteCache.uploadActionResult(context, actionKey, actionResult), + directExecutor()); } - return actionResult; + return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult); } }