Guard parseActionResultMetadata with bulk wrapper
ActionResult Orphaned Output Directories must be wrapped in
BulkTransferExceptions in order to be eligible for re-execution. This
change prevents an unavoidable build failure introduced with #10029 in
the event that an output directory tree specified in an action result
is missing from the CAS, and adds testing that would detect such a
regression.
Closes #11140.
PiperOrigin-RevId: 307027914
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index 4f567c1..6c6000c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -38,7 +38,6 @@
import com.google.common.collect.Iterables;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashingOutputStream;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -246,10 +245,6 @@
() -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
}
- private static String digestToString(Digest digest) {
- return digest.getHash() + "/" + digest.getSizeBytes();
- }
-
@Override
public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException {
@@ -273,11 +268,6 @@
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
- String resourceName = "";
- if (!options.remoteInstanceName.isEmpty()) {
- resourceName += options.remoteInstanceName + "/";
- }
- resourceName += "blobs/" + digestToString(digest);
@Nullable Supplier<HashCode> hashSupplier = null;
if (options.remoteVerifyDownloads) {
@@ -286,29 +276,10 @@
out = hashOut;
}
- SettableFuture<Void> outerF = SettableFuture.create();
- Futures.addCallback(
- downloadBlob(resourceName, digest, out, hashSupplier),
- new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- outerF.set(null);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (t instanceof StatusRuntimeException) {
- t = new IOException(t);
- }
- outerF.setException(t);
- }
- },
- Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
- return outerF;
+ return downloadBlob(digest, out, hashSupplier);
}
private ListenableFuture<Void> downloadBlob(
- String resourceName,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
@@ -318,23 +289,28 @@
return Futures.catchingAsync(
retrier.executeAsync(
() ->
- ctx.call(
- () ->
- requestRead(
- resourceName, offset, progressiveBackoff, digest, out, hashSupplier)),
+ ctx.call(() -> requestRead(offset, progressiveBackoff, digest, out, hashSupplier)),
progressiveBackoff),
StatusRuntimeException.class,
(e) -> Futures.immediateFailedFuture(new IOException(e)),
MoreExecutors.directExecutor());
}
+ public static String getResourceName(String instanceName, Digest digest) {
+ String resourceName = "";
+ if (!instanceName.isEmpty()) {
+ resourceName += instanceName + "/";
+ }
+ return resourceName + "blobs/" + DigestUtil.toString(digest);
+ }
+
private ListenableFuture<Void> requestRead(
- String resourceName,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
+ String resourceName = getResourceName(options.remoteInstanceName, digest);
SettableFuture<Void> future = SettableFuture.create();
bsAsyncStub()
.read(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index caba306..e104671 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -213,7 +213,7 @@
for (ListenableFuture<T> transfer : transfers) {
try {
if (interruptedException == null) {
- // Wait for all downloads to finish.
+ // Wait for all transfers to finish.
getFromFuture(transfer);
} else {
transfer.cancel(true);
@@ -680,6 +680,8 @@
directExecutor()));
}
+ waitForBulkTransfer(dirMetadataDownloads.values(), /* cancelRemainingOnInterrupt=*/ true);
+
ImmutableMap.Builder<Path, DirectoryMetadata> directories = ImmutableMap.builder();
for (Map.Entry<Path, ListenableFuture<Tree>> metadataDownload :
dirMetadataDownloads.entrySet()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index abf3170..c6b6357 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -55,16 +55,17 @@
try {
return f.get();
} catch (ExecutionException e) {
- if (e.getCause() instanceof InterruptedException) {
- throw (InterruptedException) e.getCause();
+ Throwable cause = e.getCause();
+ if (cause instanceof InterruptedException) {
+ throw (InterruptedException) cause;
}
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
}
- if (e.getCause() instanceof RuntimeException) {
- throw (RuntimeException) e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
}
- throw new IOException(e.getCause());
+ throw new IOException(cause);
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index 737e62b..eec57b5 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.GrpcCacheClient.getResourceName;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.answerVoid;
@@ -27,14 +28,18 @@
import build.bazel.remote.execution.v2.Command;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
import build.bazel.remote.execution.v2.ExecuteRequest;
import build.bazel.remote.execution.v2.ExecuteResponse;
import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
+import build.bazel.remote.execution.v2.FileNode;
import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.OutputDirectory;
import build.bazel.remote.execution.v2.OutputFile;
import build.bazel.remote.execution.v2.RequestMetadata;
+import build.bazel.remote.execution.v2.Tree;
import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
@@ -127,6 +132,7 @@
private Command command;
private RemoteSpawnRunner client;
private FileOutErr outErr;
+ private RemoteOptions remoteOptions;
private Server fakeServer;
private ListeningScheduledExecutorService retryService;
@@ -140,6 +146,25 @@
.build())
.build();
+ private static final Tree DUMMY_OUTPUT_TREE =
+ Tree.newBuilder()
+ .setRoot(
+ Directory.newBuilder()
+ .addFiles(
+ FileNode.newBuilder()
+ .setName(DUMMY_OUTPUT.getPath())
+ .setDigest(DUMMY_OUTPUT.getDigest())
+ .setIsExecutable(true)
+ .build())
+ .build())
+ .build();
+
+ private static final OutputDirectory DUMMY_OUTPUT_DIRECTORY =
+ OutputDirectory.newBuilder()
+ .setPath("dummy")
+ .setTreeDigest(DIGEST_UTIL.compute(DUMMY_OUTPUT_TREE))
+ .build();
+
@Before
public final void setUp() throws Exception {
String fakeServerName = "fake server for " + getClass();
@@ -205,7 +230,7 @@
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
- RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
+ remoteOptions = Options.getDefaults(RemoteOptions.class);
remoteOptions.remoteHeaders =
ImmutableList.of(
@@ -1011,12 +1036,12 @@
responseObserver.onCompleted();
}
});
+ String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest);
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
- assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
- .isTrue();
+ assertThat(request.getResourceName()).isEqualTo(stdOutResourceName);
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
}
});
@@ -1072,12 +1097,12 @@
responseObserver.onCompleted();
}
});
+ String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest);
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
- assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
- .isTrue();
+ assertThat(request.getResourceName()).isEqualTo(stdOutResourceName);
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
}
});
@@ -1185,6 +1210,100 @@
}
@Test
+ public void remotelyReExecuteOrphanedDirectoryCachedActions() throws Exception {
+ final ActionResult actionResult =
+ ActionResult.newBuilder().addOutputDirectories(DUMMY_OUTPUT_DIRECTORY).build();
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(actionResult);
+ responseObserver.onCompleted();
+ }
+ });
+ String dummyTreeResourceName =
+ getResourceName(remoteOptions.remoteInstanceName, DUMMY_OUTPUT_DIRECTORY.getTreeDigest());
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ private boolean first = true;
+
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ String resourceName = request.getResourceName();
+ if (resourceName.equals(dummyTreeResourceName)) {
+ // First read is a cache miss, next read succeeds.
+ if (first) {
+ first = false;
+ responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ } else {
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(DUMMY_OUTPUT_TREE.toByteString()).build());
+ responseObserver.onCompleted();
+ }
+ } else {
+ responseObserver.onNext(ReadResponse.getDefaultInstance());
+ }
+ }
+
+ @Override
+ public StreamObserver<WriteRequest> write(
+ StreamObserver<WriteResponse> responseObserver) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest request) {}
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
+ }
+ };
+ }
+ });
+ AtomicInteger numExecuteCalls = new AtomicInteger();
+ serviceRegistry.addService(
+ new ExecutionImplBase() {
+ @Override
+ public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+ numExecuteCalls.incrementAndGet();
+ assertThat(request.getSkipCacheLookup()).isTrue(); // Action will be re-executed.
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setDone(true)
+ .setResponse(
+ Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ // Nothing is missing.
+ responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ });
+
+ FakeSpawnExecutionContext policy =
+ new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr);
+
+ SpawnResult result = client.exec(simpleSpawn, policy);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(result.isCacheHit()).isFalse();
+ assertThat(numExecuteCalls.get()).isEqualTo(1);
+ }
+
+ @Test
public void retryUploadAndExecuteOnMissingInputs() throws Exception {
serviceRegistry.addService(
new ActionCacheImplBase() {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
index 3db0bf5..af617f4 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
@@ -1045,9 +1045,9 @@
MetadataInjector injector = mock(MetadataInjector.class);
// act
- IOException e =
+ BulkTransferException e =
assertThrows(
- IOException.class,
+ BulkTransferException.class,
() ->
remoteCache.downloadMinimal(
r,
@@ -1057,7 +1057,8 @@
execRoot,
injector,
outputFilesLocker));
- assertThat(e).isEqualTo(downloadTreeException);
+ assertThat(e.getSuppressed()).hasLength(1);
+ assertThat(e.getSuppressed()[0]).isEqualTo(downloadTreeException);
verify(outputFilesLocker, never()).lock();
}