Automated rollback of commit 915fb3e861dd28e16f42072101adf498242d26d0.

*** Reason for rollback ***

Might cause build to hang forever.

b/320630578

*** Original change description ***

Optimize prefetchInputs.

Use a pre-allocated array to hold the intermediate transfers to avoid allocations. Replace some of RxJava code with Futures to avoid RxJava overheads.

This improves the perfromance of prefetchInputs on a large set of inputs from ~400ms to ~16ms.

Fixes #20555.

Closes #20557.

PiperOrigin-RevId: 599135847
Change-Id: Idae6a1c57e634d16091e31e097b16ca97a67e62d
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
index 319110d..bbe91c8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
@@ -16,20 +16,17 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
-import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
 import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
+import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
 import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
-import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.flogger.GoogleLogger;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.Action;
 import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
@@ -47,6 +44,8 @@
 import com.google.devtools.build.lib.profiler.ProfilerTask;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
 import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
+import com.google.devtools.build.lib.remote.util.RxUtils;
+import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
 import com.google.devtools.build.lib.remote.util.TempPathGenerator;
 import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -54,6 +53,8 @@
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Single;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -282,10 +283,6 @@
       files.add(input);
     }
 
-    if (files.isEmpty()) {
-      return immediateVoidFuture();
-    }
-
     // Collect the set of directories whose output permissions must be set at the end of this call.
     // This responsibility cannot lie with the downloading of an individual file, because multiple
     // files may be concurrently downloaded into the same directory within a single call to
@@ -294,38 +291,30 @@
     // it must still synchronize on the output permissions having been set.
     Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();
 
-    // Using plain futures to avoid RxJava overheads.
-    List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
-    try (var s = Profiler.instance().profile("compose prefetches")) {
-      for (var file : files) {
-        transfers.add(
-            prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
-      }
-    }
+    Completable prefetch =
+        mergeBulkTransfer(
+                Flowable.fromIterable(files)
+                    .flatMapSingle(
+                        input ->
+                            prefetchFile(
+                                action,
+                                dirsWithOutputPermissions,
+                                metadataSupplier,
+                                input,
+                                priority)))
+            .doOnComplete(
+                // Set output permissions on tree artifact subdirectories, matching the behavior of
+                // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
+                () -> {
+                  for (Path dir : dirsWithOutputPermissions) {
+                    directoryTracker.setOutputPermissions(dir);
+                  }
+                });
 
-    ListenableFuture<Void> mergedTransfer;
-    try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
-      mergedTransfer = mergeBulkTransfer(transfers);
-    }
-
-    return Futures.transformAsync(
-        mergedTransfer,
-        unused -> {
-          try {
-            // Set output permissions on tree artifact subdirectories, matching the behavior of
-            // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
-            for (Path dir : dirsWithOutputPermissions) {
-              directoryTracker.setOutputPermissions(dir);
-            }
-          } catch (IOException e) {
-            return immediateFailedFuture(e);
-          }
-          return immediateVoidFuture();
-        },
-        directExecutor());
+    return toListenableFuture(prefetch);
   }
 
-  private ListenableFuture<Void> prefetchFile(
+  private Single<TransferResult> prefetchFile(
       ActionExecutionMetadata action,
       Set<Path> dirsWithOutputPermissions,
       MetadataSupplier metadataSupplier,
@@ -334,14 +323,14 @@
     try {
       if (input instanceof VirtualActionInput) {
         prefetchVirtualActionInput((VirtualActionInput) input);
-        return immediateVoidFuture();
+        return Single.just(TransferResult.ok());
       }
 
       PathFragment execPath = input.getExecPath();
 
       FileArtifactValue metadata = metadataSupplier.getMetadata(input);
       if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
-        return immediateVoidFuture();
+        return Single.just(TransferResult.ok());
       }
 
       @Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
@@ -368,9 +357,11 @@
         result = result.andThen(plantSymlink(symlink));
       }
 
-      return toListenableFuture(result);
-    } catch (IOException | InterruptedException e) {
-      return immediateFailedFuture(e);
+      return RxUtils.toTransferResult(result);
+    } catch (IOException e) {
+      return Single.just(TransferResult.error(e));
+    } catch (InterruptedException e) {
+      return Single.just(TransferResult.interrupted());
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index 61c4c4f..3e8adc3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -13,12 +13,8 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote.util;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static com.google.common.base.Throwables.getStackTraceAsString;
-import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
-import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static java.util.stream.Collectors.joining;
 
 import build.bazel.remote.execution.v2.Action;
@@ -33,6 +29,7 @@
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.actions.ActionInput;
 import com.google.devtools.build.lib.actions.ExecutionRequirements;
 import com.google.devtools.build.lib.actions.Spawn;
@@ -420,11 +417,11 @@
               try {
                 return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
               } catch (InvalidProtocolBufferException e) {
-                return immediateFailedFuture(e);
+                return Futures.immediateFailedFuture(e);
               }
             },
-            directExecutor())
-        .catching(CacheNotFoundException.class, (e) -> null, directExecutor());
+            MoreExecutors.directExecutor())
+        .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
   }
 
   public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
@@ -486,15 +483,15 @@
    */
   public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
       AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
-    checkNotNull(call);
-    checkNotNull(callCredentialsProvider);
+    Preconditions.checkNotNull(call);
+    Preconditions.checkNotNull(callCredentialsProvider);
 
     try {
       return Futures.catchingAsync(
           call.call(),
           Throwable.class,
           (e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
-          directExecutor());
+          MoreExecutors.directExecutor());
     } catch (Throwable t) {
       return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
     }
@@ -514,15 +511,15 @@
       }
     }
 
-    return immediateFailedFuture(t);
+    return Futures.immediateFailedFuture(t);
   }
 
   /** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
   public static <V> V refreshIfUnauthenticated(
       Callable<V> call, CallCredentialsProvider callCredentialsProvider)
       throws IOException, InterruptedException {
-    checkNotNull(call);
-    checkNotNull(callCredentialsProvider);
+    Preconditions.checkNotNull(call);
+    Preconditions.checkNotNull(callCredentialsProvider);
 
     try {
       return call.call();
@@ -621,49 +618,4 @@
       throw bulkTransferException;
     }
   }
-
-  public static ListenableFuture<Void> mergeBulkTransfer(
-      Iterable<ListenableFuture<Void>> transfers) {
-    return Futures.whenAllComplete(transfers)
-        .callAsync(
-            () -> {
-              BulkTransferException bulkTransferException = null;
-
-              for (var transfer : transfers) {
-                IOException error = null;
-                try {
-                  transfer.get();
-                } catch (CancellationException e) {
-                  return immediateFailedFuture(new InterruptedException());
-                } catch (InterruptedException e) {
-                  return immediateFailedFuture(e);
-                } catch (ExecutionException e) {
-                  var cause = e.getCause();
-                  if (cause instanceof InterruptedException) {
-                    return immediateFailedFuture(cause);
-                  } else if (cause instanceof IOException) {
-                    error = (IOException) cause;
-                  } else {
-                    error = new IOException(cause);
-                  }
-                }
-
-                if (error == null) {
-                  continue;
-                }
-
-                if (bulkTransferException == null) {
-                  bulkTransferException = new BulkTransferException();
-                }
-                bulkTransferException.add(error);
-              }
-
-              if (bulkTransferException != null) {
-                return immediateFailedFuture(bulkTransferException);
-              }
-
-              return immediateVoidFuture();
-            },
-            directExecutor());
-  }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
index b39f2cf..76b949e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
@@ -71,6 +71,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -759,7 +760,7 @@
         prefetcher.prefetchFiles(
             action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);
 
-    assertThrows(InterruptedException.class, () -> getFromFuture(future));
+    assertThrows(CancellationException.class, future::get);
   }
 
   @Test