Guard parseActionResultMetadata with bulk wrapper
ActionResult Orphaned Output Directories must be wrapped in
BulkTransferExceptions in order to be eligible for re-execution. This
change prevents an unavoidable build failure introduced with #10029 in
the event that an output directory tree specified in an action result
is missing from the CAS, and adds testing that would detect such a
regression.
Closes #11140.
PiperOrigin-RevId: 307027914
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index 4f567c1..6c6000c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -38,7 +38,6 @@
import com.google.common.collect.Iterables;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashingOutputStream;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -246,10 +245,6 @@
() -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
}
- private static String digestToString(Digest digest) {
- return digest.getHash() + "/" + digest.getSizeBytes();
- }
-
@Override
public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException {
@@ -273,11 +268,6 @@
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
- String resourceName = "";
- if (!options.remoteInstanceName.isEmpty()) {
- resourceName += options.remoteInstanceName + "/";
- }
- resourceName += "blobs/" + digestToString(digest);
@Nullable Supplier<HashCode> hashSupplier = null;
if (options.remoteVerifyDownloads) {
@@ -286,29 +276,10 @@
out = hashOut;
}
- SettableFuture<Void> outerF = SettableFuture.create();
- Futures.addCallback(
- downloadBlob(resourceName, digest, out, hashSupplier),
- new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- outerF.set(null);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (t instanceof StatusRuntimeException) {
- t = new IOException(t);
- }
- outerF.setException(t);
- }
- },
- Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
- return outerF;
+ return downloadBlob(digest, out, hashSupplier);
}
private ListenableFuture<Void> downloadBlob(
- String resourceName,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
@@ -318,23 +289,28 @@
return Futures.catchingAsync(
retrier.executeAsync(
() ->
- ctx.call(
- () ->
- requestRead(
- resourceName, offset, progressiveBackoff, digest, out, hashSupplier)),
+ ctx.call(() -> requestRead(offset, progressiveBackoff, digest, out, hashSupplier)),
progressiveBackoff),
StatusRuntimeException.class,
(e) -> Futures.immediateFailedFuture(new IOException(e)),
MoreExecutors.directExecutor());
}
+ public static String getResourceName(String instanceName, Digest digest) {
+ String resourceName = "";
+ if (!instanceName.isEmpty()) {
+ resourceName += instanceName + "/";
+ }
+ return resourceName + "blobs/" + DigestUtil.toString(digest);
+ }
+
private ListenableFuture<Void> requestRead(
- String resourceName,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
+ String resourceName = getResourceName(options.remoteInstanceName, digest);
SettableFuture<Void> future = SettableFuture.create();
bsAsyncStub()
.read(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index caba306..e104671 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -213,7 +213,7 @@
for (ListenableFuture<T> transfer : transfers) {
try {
if (interruptedException == null) {
- // Wait for all downloads to finish.
+ // Wait for all transfers to finish.
getFromFuture(transfer);
} else {
transfer.cancel(true);
@@ -680,6 +680,8 @@
directExecutor()));
}
+ waitForBulkTransfer(dirMetadataDownloads.values(), /* cancelRemainingOnInterrupt=*/ true);
+
ImmutableMap.Builder<Path, DirectoryMetadata> directories = ImmutableMap.builder();
for (Map.Entry<Path, ListenableFuture<Tree>> metadataDownload :
dirMetadataDownloads.entrySet()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index abf3170..c6b6357 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -55,16 +55,17 @@
try {
return f.get();
} catch (ExecutionException e) {
- if (e.getCause() instanceof InterruptedException) {
- throw (InterruptedException) e.getCause();
+ Throwable cause = e.getCause();
+ if (cause instanceof InterruptedException) {
+ throw (InterruptedException) cause;
}
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
}
- if (e.getCause() instanceof RuntimeException) {
- throw (RuntimeException) e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
}
- throw new IOException(e.getCause());
+ throw new IOException(cause);
}
}