Perform Bulk Wait in InputFetcher for downloads
Use the bulk wait mechanism to block on input fetcher downloads, and
annotate only-CNF caused BulkTransferExceptions with the CAS eviction
advisory.
Closes #11197.
PiperOrigin-RevId: 308830754
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index b47b728..519b7f4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -32,7 +32,6 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import io.grpc.Context;
import java.io.IOException;
@@ -115,32 +114,25 @@
}
}
- IOException ioException = null;
- InterruptedException interruptedException = null;
- for (Map.Entry<Path, ListenableFuture<Void>> entry : downloadsToWaitFor.entrySet()) {
- try {
- Utils.getFromFuture(entry.getValue());
- } catch (IOException e) {
- if (e instanceof CacheNotFoundException) {
- e =
+ try {
+ RemoteCache.waitForBulkTransfer(
+ downloadsToWaitFor.values(), /* cancelRemainingOnInterrupt=*/ true);
+ } catch (BulkTransferException e) {
+ if (e.onlyCausedByCacheNotFoundException()) {
+ BulkTransferException bulkAnnotatedException = new BulkTransferException();
+ for (Throwable t : e.getSuppressed()) {
+ IOException annotatedException =
new IOException(
String.format(
"Failed to fetch file with hash '%s' because it does not exist remotely."
+ " --experimental_remote_outputs=minimal does not work if"
+ " your remote cache evicts files during builds.",
- ((CacheNotFoundException) e).getMissingDigest().getHash()));
+ ((CacheNotFoundException) t).getMissingDigest().getHash()));
+ bulkAnnotatedException.add(annotatedException);
}
- ioException = ioException == null ? e : ioException;
- } catch (InterruptedException e) {
- interruptedException = interruptedException == null ? e : interruptedException;
+ e = bulkAnnotatedException;
}
- }
-
- if (interruptedException != null) {
- throw interruptedException;
- }
- if (ioException != null) {
- throw ioException;
+ throw e;
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 9709e37..95df362 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -204,7 +204,7 @@
}
}
- protected static <T> void waitForBulkTransfer(
+ public static <T> void waitForBulkTransfer(
Iterable<ListenableFuture<T>> transfers, boolean cancelRemainingOnInterrupt)
throws BulkTransferException, InterruptedException {
BulkTransferException bulkTransferException = null;
@@ -579,10 +579,10 @@
if (inMemoryOutput != null) {
inMemoryOutputDownload = downloadBlob(inMemoryOutputDigest);
}
- for (ListenableFuture<FileMetadata> download : downloadOutErr(result, outErr)) {
- getFromFuture(download);
- }
+ waitForBulkTransfer(downloadOutErr(result, outErr), /* cancelRemainingOnInterrupt=*/ true);
if (inMemoryOutputDownload != null) {
+ waitForBulkTransfer(
+ ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true);
byte[] data = getFromFuture(inMemoryOutputDownload);
return new InMemoryOutput(inMemoryOutput, ByteString.copyFrom(data));
}