Automated rollback of commit 915fb3e861dd28e16f42072101adf498242d26d0.
*** Reason for rollback ***
Might cause build to hang forever.
b/320630578
*** Original change description ***
Optimize prefetchInputs.
Use a pre-allocated array to hold the intermediate transfers to avoid allocations. Replace some of RxJava code with Futures to avoid RxJava overheads.
This improves the perfromance of prefetchInputs on a large set of inputs from ~400ms to ~16ms.
Fixes #20555.
Closes #20557.
PiperOrigin-RevId: 599135847
Change-Id: Idae6a1c57e634d16091e31e097b16ca97a67e62d
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
index 319110d..bbe91c8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
@@ -16,20 +16,17 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
-import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
+import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
-import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
@@ -47,6 +44,8 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
+import com.google.devtools.build.lib.remote.util.RxUtils;
+import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -54,6 +53,8 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -282,10 +283,6 @@
files.add(input);
}
- if (files.isEmpty()) {
- return immediateVoidFuture();
- }
-
// Collect the set of directories whose output permissions must be set at the end of this call.
// This responsibility cannot lie with the downloading of an individual file, because multiple
// files may be concurrently downloaded into the same directory within a single call to
@@ -294,38 +291,30 @@
// it must still synchronize on the output permissions having been set.
Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();
- // Using plain futures to avoid RxJava overheads.
- List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
- try (var s = Profiler.instance().profile("compose prefetches")) {
- for (var file : files) {
- transfers.add(
- prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
- }
- }
+ Completable prefetch =
+ mergeBulkTransfer(
+ Flowable.fromIterable(files)
+ .flatMapSingle(
+ input ->
+ prefetchFile(
+ action,
+ dirsWithOutputPermissions,
+ metadataSupplier,
+ input,
+ priority)))
+ .doOnComplete(
+ // Set output permissions on tree artifact subdirectories, matching the behavior of
+ // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
+ () -> {
+ for (Path dir : dirsWithOutputPermissions) {
+ directoryTracker.setOutputPermissions(dir);
+ }
+ });
- ListenableFuture<Void> mergedTransfer;
- try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
- mergedTransfer = mergeBulkTransfer(transfers);
- }
-
- return Futures.transformAsync(
- mergedTransfer,
- unused -> {
- try {
- // Set output permissions on tree artifact subdirectories, matching the behavior of
- // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
- for (Path dir : dirsWithOutputPermissions) {
- directoryTracker.setOutputPermissions(dir);
- }
- } catch (IOException e) {
- return immediateFailedFuture(e);
- }
- return immediateVoidFuture();
- },
- directExecutor());
+ return toListenableFuture(prefetch);
}
- private ListenableFuture<Void> prefetchFile(
+ private Single<TransferResult> prefetchFile(
ActionExecutionMetadata action,
Set<Path> dirsWithOutputPermissions,
MetadataSupplier metadataSupplier,
@@ -334,14 +323,14 @@
try {
if (input instanceof VirtualActionInput) {
prefetchVirtualActionInput((VirtualActionInput) input);
- return immediateVoidFuture();
+ return Single.just(TransferResult.ok());
}
PathFragment execPath = input.getExecPath();
FileArtifactValue metadata = metadataSupplier.getMetadata(input);
if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
- return immediateVoidFuture();
+ return Single.just(TransferResult.ok());
}
@Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
@@ -368,9 +357,11 @@
result = result.andThen(plantSymlink(symlink));
}
- return toListenableFuture(result);
- } catch (IOException | InterruptedException e) {
- return immediateFailedFuture(e);
+ return RxUtils.toTransferResult(result);
+ } catch (IOException e) {
+ return Single.just(TransferResult.error(e));
+ } catch (InterruptedException e) {
+ return Single.just(TransferResult.interrupted());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index 61c4c4f..3e8adc3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -13,12 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Throwables.getStackTraceAsString;
-import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
-import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.stream.Collectors.joining;
import build.bazel.remote.execution.v2.Action;
@@ -33,6 +29,7 @@
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecutionRequirements;
import com.google.devtools.build.lib.actions.Spawn;
@@ -420,11 +417,11 @@
try {
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
} catch (InvalidProtocolBufferException e) {
- return immediateFailedFuture(e);
+ return Futures.immediateFailedFuture(e);
}
},
- directExecutor())
- .catching(CacheNotFoundException.class, (e) -> null, directExecutor());
+ MoreExecutors.directExecutor())
+ .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
}
public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
@@ -486,15 +483,15 @@
*/
public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
- checkNotNull(call);
- checkNotNull(callCredentialsProvider);
+ Preconditions.checkNotNull(call);
+ Preconditions.checkNotNull(callCredentialsProvider);
try {
return Futures.catchingAsync(
call.call(),
Throwable.class,
(e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
- directExecutor());
+ MoreExecutors.directExecutor());
} catch (Throwable t) {
return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
}
@@ -514,15 +511,15 @@
}
}
- return immediateFailedFuture(t);
+ return Futures.immediateFailedFuture(t);
}
/** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
public static <V> V refreshIfUnauthenticated(
Callable<V> call, CallCredentialsProvider callCredentialsProvider)
throws IOException, InterruptedException {
- checkNotNull(call);
- checkNotNull(callCredentialsProvider);
+ Preconditions.checkNotNull(call);
+ Preconditions.checkNotNull(callCredentialsProvider);
try {
return call.call();
@@ -621,49 +618,4 @@
throw bulkTransferException;
}
}
-
- public static ListenableFuture<Void> mergeBulkTransfer(
- Iterable<ListenableFuture<Void>> transfers) {
- return Futures.whenAllComplete(transfers)
- .callAsync(
- () -> {
- BulkTransferException bulkTransferException = null;
-
- for (var transfer : transfers) {
- IOException error = null;
- try {
- transfer.get();
- } catch (CancellationException e) {
- return immediateFailedFuture(new InterruptedException());
- } catch (InterruptedException e) {
- return immediateFailedFuture(e);
- } catch (ExecutionException e) {
- var cause = e.getCause();
- if (cause instanceof InterruptedException) {
- return immediateFailedFuture(cause);
- } else if (cause instanceof IOException) {
- error = (IOException) cause;
- } else {
- error = new IOException(cause);
- }
- }
-
- if (error == null) {
- continue;
- }
-
- if (bulkTransferException == null) {
- bulkTransferException = new BulkTransferException();
- }
- bulkTransferException.add(error);
- }
-
- if (bulkTransferException != null) {
- return immediateFailedFuture(bulkTransferException);
- }
-
- return immediateVoidFuture();
- },
- directExecutor());
- }
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
index b39f2cf..76b949e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
@@ -71,6 +71,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -759,7 +760,7 @@
prefetcher.prefetchFiles(
action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);
- assertThrows(InterruptedException.class, () -> getFromFuture(future));
+ assertThrows(CancellationException.class, future::get);
}
@Test