Remote API v2 migration.
Major differences between v1test and v2 that are implemented here:
- Execute call streams Operation updates
- WaitExecution call replaces Watcher API
- Action is no longer part of the Execute request, and must be uploaded separately
- output spec and platform moved from Action to Command
Also, adding retries to operations lost on the server, resolving a TODO.
TESTED=unit tests, LRE, RBE.
TYPE_CHANGE_OK=this is a breaking change by design, will update proto parsing tool as well
TAG_CHANGE_OK=this way give us flexibility to update the parsing tool to support both versions
PiperOrigin-RevId: 208990450
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
index ccc6de6..3c73c4e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
@@ -13,6 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import build.bazel.remote.execution.v2.DirectoryNode;
+import build.bazel.remote.execution.v2.FileNode;
+import build.bazel.remote.execution.v2.OutputDirectory;
+import build.bazel.remote.execution.v2.OutputFile;
+import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
@@ -33,16 +43,6 @@
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.Symlinks;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
-import com.google.devtools.remoteexecution.v1test.DirectoryNode;
-import com.google.devtools.remoteexecution.v1test.FileNode;
-import com.google.devtools.remoteexecution.v1test.OutputDirectory;
-import com.google.devtools.remoteexecution.v1test.OutputFile;
-import com.google.devtools.remoteexecution.v1test.Tree;
-import com.google.protobuf.ByteString;
import io.grpc.Context;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -94,7 +94,7 @@
* documented that it cannot be used for remote execution.
*/
public abstract void ensureInputsPresent(
- TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
+ TreeNodeRepository repository, Path execRoot, TreeNode root, Action action, Command command)
throws IOException, InterruptedException;
/**
@@ -116,6 +116,8 @@
*/
abstract void upload(
DigestUtil.ActionKey actionKey,
+ Action action,
+ Command command,
Path execRoot,
Collection<Path> files,
FileOutErr outErr,
@@ -178,7 +180,7 @@
Path path = execRoot.getRelative(file.getPath());
ListenableFuture<Void> download =
retrier.executeAsync(
- () -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent())));
+ () -> ctx.call(() -> downloadFile(path, file.getDigest())));
fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable()));
}
@@ -324,7 +326,7 @@
downloads.add(
new FuturePathBooleanTuple(
retrier.executeAsync(
- () -> ctx.call(() -> downloadFile(childPath, child.getDigest(), null))),
+ () -> ctx.call(() -> downloadFile(childPath, child.getDigest()))),
childPath,
child.getIsExecutable()));
}
@@ -349,12 +351,8 @@
return downloads;
}
- /**
- * Download a file (that is not a directory). If the {@code content} is not given, the content is
- * fetched from the digest.
- */
- public ListenableFuture<Void> downloadFile(Path path, Digest digest, @Nullable ByteString content)
- throws IOException {
+ /** Download a file (that is not a directory). The content is fetched from the digest. */
+ public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOException {
Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
@@ -362,13 +360,6 @@
return COMPLETED_SUCCESS;
}
- if (content != null && !content.isEmpty()) {
- try (OutputStream stream = path.getOutputStream()) {
- content.writeTo(stream);
- }
- return COMPLETED_SUCCESS;
- }
-
OutputStream out = new LazyFileOutputStream(path);
SettableFuture<Void> outerF = SettableFuture.create();
ListenableFuture<Void> f = downloadBlob(digest, out);
@@ -487,6 +478,19 @@
}
}
+ /**
+ * Adds an action and command protos to upload. They need to be uploaded as part of the action
+ * result.
+ */
+ public void addAction(Action action, Command command) throws IOException {
+ for (byte[] blob : new byte[][]{action.toByteArray(), command.toByteArray()}) {
+ Digest digest = digestUtil.compute(blob);
+ Chunker chunker =
+ Chunker.builder(digestUtil).setInput(digest, blob).setChunkSize(blob.length).build();
+ digestToChunkers.put(digest, chunker);
+ }
+ }
+
/** Map of digests to file paths to upload. */
public Map<Digest, Path> getDigestToFile() {
return digestToFile;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 5c4ef45..bbfed55 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -43,12 +43,10 @@
"//third_party/protobuf:protobuf_java_util",
"@googleapis//:google_bytestream_bytestream_java_grpc",
"@googleapis//:google_bytestream_bytestream_java_proto",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
"@googleapis//:google_rpc_error_details_java_proto",
"@googleapis//:google_rpc_status_java_proto",
- "@googleapis//:google_watch_v1_java_grpc",
- "@googleapis//:google_watch_v1_java_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index 2cbe3e6..b02546f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -25,7 +26,6 @@
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.Context;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 42129a4..24b3101 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -20,6 +20,7 @@
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
+import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
@@ -33,7 +34,6 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
-import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
index 717d5f5..03a0fee 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
@@ -14,8 +14,8 @@
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.Digest;
import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.IOException;
/**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index 6d313dc..4d32664 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import build.bazel.remote.execution.v2.Digest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@@ -25,7 +26,6 @@
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ExecutionStatusException.java b/src/main/java/com/google/devtools/build/lib/remote/ExecutionStatusException.java
index 63839a7..031d2d1 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ExecutionStatusException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ExecutionStatusException.java
@@ -13,7 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import build.bazel.remote.execution.v2.ExecuteResponse;
import com.google.rpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 39f443f..a5eff1a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -14,6 +14,19 @@
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionCacheGrpc;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
+import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
+import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.UpdateActionResultRequest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
@@ -35,18 +48,6 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import io.grpc.CallCredentials;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -141,14 +142,16 @@
*/
@Override
public void ensureInputsPresent(
- TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
+ TreeNodeRepository repository, Path execRoot, TreeNode root, Action action, Command command)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
+ Digest actionDigest = digestUtil.compute(action);
Digest commandDigest = digestUtil.compute(command);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
ImmutableSet<Digest> missingDigests =
getMissingDigests(
- Iterables.concat(repository.getAllDigests(root), ImmutableList.of(commandDigest)));
+ Iterables.concat(
+ repository.getAllDigests(root), ImmutableList.of(actionDigest, commandDigest)));
List<Chunker> toUpload = new ArrayList<>();
// Only upload data that was missing from the cache.
@@ -156,8 +159,13 @@
Map<Digest, Directory> missingTreeNodes = new HashMap<>();
HashSet<Digest> missingTreeDigests = new HashSet<>(missingDigests);
missingTreeDigests.remove(commandDigest);
+ missingTreeDigests.remove(actionDigest);
repository.getDataFromDigests(missingTreeDigests, missingActionInputs, missingTreeNodes);
+ if (missingDigests.contains(actionDigest)) {
+ toUpload.add(
+ Chunker.builder(digestUtil).setInput(actionDigest, action.toByteArray()).build());
+ }
if (missingDigests.contains(commandDigest)) {
toUpload.add(
Chunker.builder(digestUtil).setInput(commandDigest, command.toByteArray()).build());
@@ -241,13 +249,15 @@
@Override
public void upload(
ActionKey actionKey,
+ Action action,
+ Command command,
Path execRoot,
Collection<Path> files,
FileOutErr outErr,
boolean uploadAction)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
- upload(execRoot, files, outErr, result);
+ upload(execRoot, action, command, files, outErr, uploadAction, result);
if (!uploadAction) {
return;
}
@@ -270,11 +280,21 @@
}
}
- void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result)
+ void upload(
+ Path execRoot,
+ Action action,
+ Command command,
+ Collection<Path> files,
+ FileOutErr outErr,
+ boolean uploadAction,
+ ActionResult.Builder result)
throws ExecException, IOException, InterruptedException {
UploadManifest manifest =
new UploadManifest(digestUtil, result, execRoot, options.allowSymlinkUpload);
manifest.addFiles(files);
+ if (uploadAction) {
+ manifest.addAction(action, command);
+ }
List<Chunker> filesToUpload = new ArrayList<>();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index 858f574..c86b63c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -14,30 +14,25 @@
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.ExecuteRequest;
+import build.bazel.remote.execution.v2.ExecuteResponse;
+import build.bazel.remote.execution.v2.ExecutionGrpc;
+import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionBlockingStub;
+import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
import com.google.longrunning.Operation;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;
-import com.google.watcher.v1.Change;
-import com.google.watcher.v1.ChangeBatch;
-import com.google.watcher.v1.Request;
-import com.google.watcher.v1.WatcherGrpc;
-import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
-import io.grpc.protobuf.StatusProto;
import java.io.IOException;
import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@@ -46,33 +41,20 @@
private final ManagedChannel channel;
private final CallCredentials callCredentials;
- private final int callTimeoutSecs;
private final RemoteRetrier retrier;
private final AtomicBoolean closed = new AtomicBoolean();
public GrpcRemoteExecutor(
- ManagedChannel channel,
- @Nullable CallCredentials callCredentials,
- int callTimeoutSecs,
- RemoteRetrier retrier) {
- Preconditions.checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
+ ManagedChannel channel, @Nullable CallCredentials callCredentials, RemoteRetrier retrier) {
this.channel = channel;
this.callCredentials = callCredentials;
- this.callTimeoutSecs = callTimeoutSecs;
this.retrier = retrier;
}
private ExecutionBlockingStub execBlockingStub() {
return ExecutionGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
- .withCallCredentials(callCredentials)
- .withDeadlineAfter(callTimeoutSecs, TimeUnit.SECONDS);
- }
-
- private WatcherBlockingStub watcherBlockingStub() {
- return WatcherGrpc.newBlockingStub(channel)
- .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(callCredentials);
}
@@ -110,103 +92,94 @@
return null;
}
- /* Execute has two components: the execute call and the watch call.
+ /* Execute has two components: the Execute call and (optionally) the WaitExecution call.
* This is the simple flow without any errors:
*
- * - A call to execute returns an Operation object.
- * - That Operation may already have an inlined result; if so, we return that result.
- * - Otherwise, we call watch on that operation to receive a stream of Changes to the Operation
- * object, until the first such change is an Operation with a result, which we return.
+ * - A call to Execute returns streamed updates on an Operation object.
+ * - We wait until the Operation is finished.
*
* Error possibilities:
- * - Any Operation object can have an error field instead of a result. Such Operations are
- * completed and failed; however, some of these errors may be retriable. These errors should
- * trigger a retry of the full execute+watch call, resulting in a new Operation.
- * - An execute call may fail with a retriable error (raise a StatusRuntimeException). We then
- * retry that call.
- * - A watch call may fail with a retriable error (either raise a StatusRuntimeException, or
- * return an ERROR in the ChangeBatch field). In that case, we retry the watch call only on the
- * same operation object.
+ * - An Execute call may fail with a retriable error (raise a StatusRuntimeException).
+ * - If the failure occurred before the first Operation is returned, we retry the call.
+ * - Otherwise, we call WaitExecution on the Operation.
+ * - A WaitExecution call may fail with a retriable error (raise a StatusRuntimeException).
+ * In that case, we retry the WaitExecution call on the same operation object.
+ * - A WaitExecution call may fail with a NOT_FOUND error (raise a StatusRuntimeException).
+ * That means the Operation was lost on the server, and we will retry to Execute.
+ * - Any call can return an Operation object with an error status in the result. Such Operations
+ * are completed and failed; however, some of these errors may be retriable. These errors should
+ * trigger a retry of the Execute call, resulting in a new Operation.
* */
public ExecuteResponse executeRemotely(ExecuteRequest request)
throws IOException, InterruptedException {
- // The only errors retried here are transient failures of the Action itself on the server, not
- // any gRPC errors that occurred during the call.
+ // Execute has two components: the Execute call and (optionally) the WaitExecution call.
+ // This is the simple flow without any errors:
+ //
+ // - A call to Execute returns streamed updates on an Operation object.
+ // - We wait until the Operation is finished.
+ //
+ // Error possibilities:
+ // - An Execute call may fail with a retriable error (raise a StatusRuntimeException).
+ // - If the failure occurred before the first Operation is returned, we retry the call.
+ // - Otherwise, we call WaitExecution on the Operation.
+ // - A WaitExecution call may fail with a retriable error (raise a StatusRuntimeException).
+ // In that case, we retry the WaitExecution call on the same operation object.
+ // - A WaitExecution call may fail with a NOT_FOUND error (raise a StatusRuntimeException).
+ // That means the Operation was lost on the server, and we will retry to Execute.
+ // - Any call can return an Operation object with an error status in the result. Such Operations
+ // are completed and failed; however, some of these errors may be retriable. These errors
+ // should trigger a retry of the Execute call, resulting in a new Operation.
+
+ // Will be modified by the retried handler.
+ final AtomicReference<Operation> operation =
+ new AtomicReference<>(Operation.getDefaultInstance());
+ final AtomicBoolean waitExecution =
+ new AtomicBoolean(false); // Whether we should call WaitExecution.
return retrier.execute(
() -> {
- // Here all transient gRPC errors will be retried.
- Operation op = retrier.execute(() -> execBlockingStub().execute(request));
- ExecuteResponse resp = getOperationResponse(op);
- if (resp != null) {
- return resp;
+ final Iterator<Operation> replies;
+ if (waitExecution.get()) {
+ WaitExecutionRequest wr =
+ WaitExecutionRequest.newBuilder().setName(operation.get().getName()).build();
+ replies = execBlockingStub().waitExecution(wr);
+ } else {
+ replies = execBlockingStub().execute(request);
}
- Request wr = Request.newBuilder().setTarget(op.getName()).build();
- // Here all transient gRPC errors will be retried, while transient failures of the Action
- // itself will be propagated.
- return retrier.execute(
- () -> {
- Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
- try {
- while (replies.hasNext()) {
- ChangeBatch cb = replies.next();
- for (Change ch : cb.getChangesList()) {
- switch (ch.getState()) {
- case INITIAL_STATE_SKIPPED:
- continue;
- case ERROR:
- try {
- throw StatusProto.toStatusRuntimeException(
- ch.getData().unpack(Status.class));
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- case DOES_NOT_EXIST:
- // TODO(olaola): either make this retriable, or use a different exception.
- throw new IOException(
- String.format(
- "Operation %s lost on the remote server.", op.getName()));
- case EXISTS:
- Operation o;
- try {
- o = ch.getData().unpack(Operation.class);
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- try {
- ExecuteResponse r = getOperationResponse(o);
- if (r != null) {
- return r;
- }
- } catch (StatusRuntimeException e) {
- // Pass through the Watch retry and retry the whole execute+watch call.
- throw new RemoteRetrier.PassThroughException(e);
- }
- continue;
- default:
- // This can only happen if the enum gets unexpectedly extended.
- throw new IOException(
- String.format("Illegal change state: %s", ch.getState()));
- }
- }
- }
- } finally {
- // The blocking streaming call closes correctly only when trailers and a Status
- // are received from the server so that onClose() is called on this call's
- // CallListener. Under normal circumstances (no cancel/errors), these are
- // guaranteed to be sent by the server only if replies.hasNext() has been called
- // after all replies from the stream have been consumed.
- try {
- while (replies.hasNext()) {
- replies.next();
- }
- } catch (StatusRuntimeException e) {
- // Cleanup: ignore exceptions, because the meaningful errors have already been
- // propagated.
- }
- }
- throw new IOException(
- String.format("Watch request for %s terminated with no result.", op.getName()));
- });
+ try {
+ while (replies.hasNext()) {
+ Operation o = replies.next();
+ operation.set(o);
+ waitExecution.set(!operation.get().getDone());
+ ExecuteResponse r = getOperationResponse(o);
+ if (r != null) {
+ return r;
+ }
+ }
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Code.NOT_FOUND) {
+ // Operation was lost on the server. Retry Execute.
+ waitExecution.set(false);
+ }
+ throw e;
+ } finally {
+ // The blocking streaming call closes correctly only when trailers and a Status
+ // are received from the server so that onClose() is called on this call's
+ // CallListener. Under normal circumstances (no cancel/errors), these are
+ // guaranteed to be sent by the server only if replies.hasNext() has been called
+ // after all replies from the stream have been consumed.
+ try {
+ while (replies.hasNext()) {
+ replies.next();
+ }
+ } catch (StatusRuntimeException e) {
+ // Cleanup: ignore exceptions, because the meaningful errors have already been
+ // propagated.
+ }
+ }
+ throw new IOException(
+ String.format(
+ "Remote server error: execution request for %s terminated with no result.",
+ operation.get().getName()));
});
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
index 2b44f48..7bb24c7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -110,6 +110,12 @@
String strategyName = remoteOptions.remoteLocalFallbackStrategy;
for (ActionContext context : usedContexts) {
+ if (context instanceof RemoteSpawnStrategy && cache == null) {
+ throw new ExecutorInitException(
+ "--remote_cache or --remote_executor should be initialized when using "
+ + "--spawn_strategy=remote",
+ ExitCode.COMMAND_LINE_ERROR);
+ }
if (context instanceof AbstractSpawnStrategy) {
ExecutionStrategy annotation = context.getClass().getAnnotation(ExecutionStrategy.class);
if (annotation != null) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index ff7bbac..4ce2376 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -233,14 +233,13 @@
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions,
- RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
executor =
new GrpcRemoteExecutor(
channel,
GoogleAuthUtils.newCallCredentials(authAndTlsOptions),
- remoteOptions.remoteTimeout,
retrier);
} else {
executor = null;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
index 11d2481..c0d4906 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -75,6 +75,14 @@
}
};
+ public static final Predicate<? super Exception> RETRIABLE_GRPC_EXEC_ERRORS =
+ e -> {
+ if (RETRIABLE_GRPC_ERRORS.test(e)) {
+ return true;
+ }
+ return RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND);
+ };
+
public RemoteRetrier(
RemoteOptions options,
Predicate<? super Exception> shouldRetry,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java
index 70c73fb..5235141 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java
@@ -22,11 +22,14 @@
/** Methods useful when using the {@link RemoteRetrier}. */
public final class RemoteRetrierUtils {
- public static boolean causedByStatus(RetryException e, Status.Code expected) {
- if (e.getCause() instanceof StatusRuntimeException) {
- return ((StatusRuntimeException) e.getCause()).getStatus().getCode() == expected;
- } else if (e.getCause() instanceof StatusException) {
- return ((StatusException) e.getCause()).getStatus().getCode() == expected;
+ public static boolean causedByStatus(Throwable e, Status.Code expected) {
+ if (e instanceof RetryException) {
+ e = e.getCause();
+ }
+ if (e instanceof StatusRuntimeException) {
+ return ((StatusRuntimeException) e).getStatus().getCode() == expected;
+ } else if (e instanceof StatusException) {
+ return ((StatusException) e).getStatus().getCode() == expected;
}
return false;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index a97d277..7c6cc50 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -15,6 +15,9 @@
import static com.google.common.base.Strings.isNullOrEmpty;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.ExecutionStrategy;
@@ -36,9 +39,6 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
import io.grpc.Context;
import java.io.IOException;
import java.util.Collection;
@@ -99,13 +99,16 @@
SortedMap<PathFragment, ActionInput> inputMap = context.getInputMapping();
TreeNode inputRoot = repository.buildFromActionInputs(inputMap);
repository.computeMerkleDigests(inputRoot);
- Command command = RemoteSpawnRunner.buildCommand(spawn.getArguments(), spawn.getEnvironment());
+ Command command =
+ RemoteSpawnRunner.buildCommand(
+ spawn.getOutputFiles(),
+ spawn.getArguments(),
+ spawn.getEnvironment(),
+ spawn.getExecutionPlatform());
Action action =
RemoteSpawnRunner.buildAction(
- spawn.getOutputFiles(),
digestUtil.compute(command),
repository.getMerkleDigest(inputRoot),
- spawn.getExecutionPlatform(),
context.getTimeout(),
Spawns.mayBeCached(spawn));
// Look up action cache, and reuse the action output if it is found.
@@ -183,7 +186,8 @@
Collection<Path> files =
RemoteSpawnRunner.resolveActionInputs(execRoot, spawn.getOutputFiles());
try {
- remoteCache.upload(actionKey, execRoot, files, context.getFileOutErr(), uploadAction);
+ remoteCache.upload(
+ actionKey, action, command, execRoot, files, context.getFileOutErr(), uploadAction);
} catch (IOException e) {
String errorMsg = e.getMessage();
if (isNullOrEmpty(errorMsg)) {
@@ -205,12 +209,9 @@
continue;
}
FileArtifactValue metadata = context.getMetadataProvider().getMetadata(input);
- if (metadata instanceof FileArtifactValue) {
- FileArtifactValue artifactValue = (FileArtifactValue) metadata;
- Path path = execRoot.getRelative(input.getExecPath());
- if (artifactValue.wasModifiedSinceDigest(path)) {
- throw new IOException(path + " was modified during execution");
- }
+ Path path = execRoot.getRelative(input.getExecPath());
+ if (metadata.wasModifiedSinceDigest(path)) {
+ throw new IOException(path + " was modified during execution");
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index e99cc6e..65570b6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -16,6 +16,14 @@
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.ExecuteRequest;
+import build.bazel.remote.execution.v2.ExecuteResponse;
+import build.bazel.remote.execution.v2.LogFile;
+import build.bazel.remote.execution.v2.Platform;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@@ -48,14 +56,6 @@
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.devtools.remoteexecution.v1test.LogFile;
-import com.google.devtools.remoteexecution.v1test.Platform;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.Context;
@@ -136,7 +136,7 @@
public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
throws ExecException, InterruptedException, IOException {
if (!Spawns.mayBeExecutedRemotely(spawn) || remoteCache == null) {
- return fallbackRunner.get().exec(spawn, context);
+ return execLocally(spawn, context);
}
context.report(ProgressStatus.EXECUTING, getName());
@@ -147,13 +147,15 @@
TreeNode inputRoot = repository.buildFromActionInputs(inputMap);
repository.computeMerkleDigests(inputRoot);
maybeWriteParamFilesLocally(spawn);
- Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment());
+ Command command = buildCommand(
+ spawn.getOutputFiles(),
+ spawn.getArguments(),
+ spawn.getEnvironment(),
+ spawn.getExecutionPlatform());
Action action =
buildAction(
- spawn.getOutputFiles(),
digestUtil.compute(command),
repository.getMerkleDigest(inputRoot),
- spawn.getExecutionPlatform(),
context.getTimeout(),
Spawns.mayBeCached(spawn));
@@ -190,26 +192,27 @@
}
}
} catch (IOException e) {
- return execLocallyOrFail(spawn, context, inputMap, actionKey, uploadLocalResults, e);
+ return execLocallyAndUploadOrFail(
+ spawn, context, inputMap, actionKey, action, command, uploadLocalResults, e);
}
if (remoteExecutor == null) {
// Remote execution is disabled and so execute the spawn on the local machine.
- return execLocally(spawn, context, inputMap, uploadLocalResults, remoteCache, actionKey);
- }
+ return execLocallyAndUpload(
+ spawn, context, inputMap, remoteCache, actionKey, action, command, uploadLocalResults);
+ }
ExecuteRequest request =
ExecuteRequest.newBuilder()
.setInstanceName(remoteOptions.remoteInstanceName)
- .setAction(action)
+ .setActionDigest(actionKey.getDigest())
.setSkipCacheLookup(!acceptCachedResult)
.build();
try {
return retrier.execute(
() -> {
// Upload the command and all the inputs into the remote cache.
- remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command);
-
+ remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, action, command);
ExecuteResponse reply = remoteExecutor.executeRemotely(request);
maybeDownloadServerLogs(reply, actionKey);
@@ -219,8 +222,10 @@
.build();
});
} catch (IOException e) {
- return execLocallyOrFail(spawn, context, inputMap, actionKey, uploadLocalResults, e);
+ return execLocallyAndUploadOrFail(
+ spawn, context, inputMap, actionKey, action, command, uploadLocalResults, e);
}
+
} finally {
withMetadata.detach(previous);
}
@@ -258,7 +263,7 @@
logPath = parent.getRelative(e.getKey());
logCount++;
try {
- getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest(), null));
+ getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest()));
} catch (IOException ex) {
reportOnce(Event.warn("Failed downloading server logs from the remote cache."));
}
@@ -280,11 +285,18 @@
.setExitCode(exitCode);
}
- private SpawnResult execLocallyOrFail(
+ private SpawnResult execLocally(Spawn spawn, SpawnExecutionContext context)
+ throws ExecException, InterruptedException, IOException {
+ return fallbackRunner.get().exec(spawn, context);
+ }
+
+ private SpawnResult execLocallyAndUploadOrFail(
Spawn spawn,
SpawnExecutionContext context,
SortedMap<PathFragment, ActionInput> inputMap,
ActionKey actionKey,
+ Action action,
+ Command command,
boolean uploadLocalResults,
IOException cause)
throws ExecException, InterruptedException, IOException {
@@ -296,7 +308,8 @@
if (remoteOptions.remoteLocalFallback
&& !(cause instanceof RetryException
&& RemoteRetrierUtils.causedByExecTimeout((RetryException) cause))) {
- return execLocally(spawn, context, inputMap, uploadLocalResults, remoteCache, actionKey);
+ return execLocallyAndUpload(
+ spawn, context, inputMap, remoteCache, actionKey, action, command, uploadLocalResults);
}
return handleError(cause, context.getFileOutErr(), actionKey);
}
@@ -343,38 +356,14 @@
}
static Action buildAction(
- Collection<? extends ActionInput> outputs,
Digest command,
Digest inputRoot,
- @Nullable PlatformInfo executionPlatform,
Duration timeout,
boolean cacheable) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
- ArrayList<String> outputPaths = new ArrayList<>();
- ArrayList<String> outputDirectoryPaths = new ArrayList<>();
- for (ActionInput output : outputs) {
- String pathString = output.getExecPathString();
- if (output instanceof Artifact && ((Artifact) output).isTreeArtifact()) {
- outputDirectoryPaths.add(pathString);
- } else {
- outputPaths.add(pathString);
- }
- }
- Collections.sort(outputPaths);
- Collections.sort(outputDirectoryPaths);
- action.addAllOutputFiles(outputPaths);
- action.addAllOutputDirectories(outputDirectoryPaths);
-
- // Get the remote platform properties.
- if (executionPlatform != null) {
- Platform platform =
- parsePlatform(executionPlatform.label(), executionPlatform.remoteExecutionProperties());
- action.setPlatform(platform);
- }
-
if (!timeout.isZero()) {
action.setTimeout(com.google.protobuf.Duration.newBuilder().setSeconds(timeout.getSeconds()));
}
@@ -399,8 +388,33 @@
return platformBuilder.build();
}
- static Command buildCommand(List<String> arguments, ImmutableMap<String, String> env) {
+ static Command buildCommand(
+ Collection<? extends ActionInput> outputs,
+ List<String> arguments,
+ ImmutableMap<String, String> env,
+ @Nullable PlatformInfo executionPlatform) {
Command.Builder command = Command.newBuilder();
+ ArrayList<String> outputFiles = new ArrayList<>();
+ ArrayList<String> outputDirectories = new ArrayList<>();
+ for (ActionInput output : outputs) {
+ String pathString = output.getExecPathString();
+ if (output instanceof Artifact && ((Artifact) output).isTreeArtifact()) {
+ outputDirectories.add(pathString);
+ } else {
+ outputFiles.add(pathString);
+ }
+ }
+ Collections.sort(outputFiles);
+ Collections.sort(outputDirectories);
+ command.addAllOutputFiles(outputFiles);
+ command.addAllOutputDirectories(outputDirectories);
+
+ // Get the remote platform properties.
+ if (executionPlatform != null) {
+ Platform platform =
+ parsePlatform(executionPlatform.label(), executionPlatform.remoteExecutionProperties());
+ command.setPlatform(platform);
+ }
command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(env.keySet());
@@ -430,35 +444,19 @@
return ctimes;
}
- /**
- * Execute a {@link Spawn} locally, using {@link #fallbackRunner}.
- *
- * <p>If possible also upload the {@link SpawnResult} to a remote cache.
- */
- private SpawnResult execLocally(
- Spawn spawn,
- SpawnExecutionContext context,
- SortedMap<PathFragment, ActionInput> inputMap,
- boolean uploadToCache,
- @Nullable AbstractRemoteActionCache remoteCache,
- @Nullable ActionKey actionKey)
- throws ExecException, IOException, InterruptedException {
- if (uploadToCache && remoteCache != null && actionKey != null) {
- return execLocallyAndUpload(spawn, context, inputMap, remoteCache, actionKey);
- }
- return fallbackRunner.get().exec(spawn, context);
- }
-
@VisibleForTesting
SpawnResult execLocallyAndUpload(
Spawn spawn,
SpawnExecutionContext context,
SortedMap<PathFragment, ActionInput> inputMap,
AbstractRemoteActionCache remoteCache,
- ActionKey actionKey)
+ ActionKey actionKey,
+ Action action,
+ Command command,
+ boolean uploadLocalResults)
throws ExecException, IOException, InterruptedException {
Map<Path, Long> ctimesBefore = getInputCtimes(inputMap);
- SpawnResult result = fallbackRunner.get().exec(spawn, context);
+ SpawnResult result = execLocally(spawn, context);
Map<Path, Long> ctimesAfter = getInputCtimes(inputMap);
for (Map.Entry<Path, Long> e : ctimesBefore.entrySet()) {
// Skip uploading to remote cache, because an input was modified during execution.
@@ -466,13 +464,17 @@
return result;
}
}
+ if (!uploadLocalResults) {
+ return result;
+ }
boolean uploadAction =
Spawns.mayBeCached(spawn)
&& Status.SUCCESS.equals(result.status())
&& result.exitCode() == 0;
Collection<Path> outputFiles = resolveActionInputs(execRoot, spawn.getOutputFiles());
try {
- remoteCache.upload(actionKey, execRoot, outputFiles, context.getFileOutErr(), uploadAction);
+ remoteCache.upload(
+ actionKey, action, command, execRoot, outputFiles, context.getFileOutErr(), uploadAction);
} catch (IOException e) {
if (verboseFailures) {
report(Event.debug("Upload to remote cache failed: " + e.getMessage()));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 8e6269a..af2232f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -14,6 +14,13 @@
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import build.bazel.remote.execution.v2.DirectoryNode;
+import build.bazel.remote.execution.v2.FileNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -30,12 +37,6 @@
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
-import com.google.devtools.remoteexecution.v1test.DirectoryNode;
-import com.google.devtools.remoteexecution.v1test.FileNode;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayInputStream;
@@ -68,9 +69,10 @@
@Override
public void ensureInputsPresent(
- TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
+ TreeNodeRepository repository, Path execRoot, TreeNode root, Action action, Command command)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
+ uploadBlob(action.toByteArray());
uploadBlob(command.toByteArray());
for (Directory directory : repository.treeToDirectories(root)) {
uploadBlob(directory.toByteArray());
@@ -87,7 +89,7 @@
Directory directory = Directory.parseFrom(getFromFuture(downloadBlob(rootDigest)));
for (FileNode file : directory.getFilesList()) {
Path dst = rootLocation.getRelative(file.getName());
- getFromFuture(downloadFile(dst, file.getDigest(), null));
+ getFromFuture(downloadFile(dst, file.getDigest()));
dst.setExecutable(file.getIsExecutable());
}
for (DirectoryNode child : directory.getDirectoriesList()) {
@@ -116,14 +118,16 @@
@Override
public void upload(
- ActionKey actionKey,
+ DigestUtil.ActionKey actionKey,
+ Action action,
+ Command command,
Path execRoot,
Collection<Path> files,
FileOutErr outErr,
boolean uploadAction)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
- upload(result, execRoot, files);
+ upload(result, action, command, execRoot, files, uploadAction);
if (outErr.getErrorPath().exists()) {
Digest stderr = uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
@@ -137,11 +141,20 @@
}
}
- public void upload(ActionResult.Builder result, Path execRoot, Collection<Path> files)
+ public void upload(
+ ActionResult.Builder result,
+ Action action,
+ Command command,
+ Path execRoot,
+ Collection<Path> files,
+ boolean uploadAction)
throws ExecException, IOException, InterruptedException {
UploadManifest manifest =
new UploadManifest(digestUtil, result, execRoot, options.allowSymlinkUpload);
manifest.addFiles(files);
+ if (uploadAction) {
+ manifest.addAction(action, command);
+ }
for (Map.Entry<Digest, Path> entry : manifest.getDigestToFile().entrySet()) {
try (InputStream in = entry.getValue().getInputStream()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
index 6f4366e..5d21fe6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
@@ -16,6 +16,8 @@
import static java.nio.charset.StandardCharsets.US_ASCII;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableCollection;
@@ -39,8 +41,6 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Symlinks;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
index 85cbc87..6340e04 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
@@ -26,6 +26,7 @@
/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
public final class ConcurrentMapBlobStore implements SimpleBlobStore {
private final ConcurrentMap<String, byte[]> map;
+ static final String ACTION_KEY_PREFIX = "ac_";
public ConcurrentMapBlobStore(ConcurrentMap<String, byte[]> map) {
this.map = map;
@@ -56,7 +57,7 @@
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
- return getFromFuture(get(key, out));
+ return getFromFuture(get(ACTION_KEY_PREFIX + key, out));
}
@Override
@@ -68,7 +69,7 @@
@Override
public void putActionResult(String key, byte[] in) {
- map.put(key, in);
+ map.put(ACTION_KEY_PREFIX + key, in);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
index 6556746..1358cda 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
@@ -28,6 +28,7 @@
/** A on-disk store for the remote action cache. */
public final class OnDiskBlobStore implements SimpleBlobStore {
private final Path root;
+ static final String ACTION_KEY_PREFIX = "ac_";
public OnDiskBlobStore(Path root) {
this.root = root;
@@ -58,7 +59,7 @@
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
- return getFromFuture(get(key, out));
+ return getFromFuture(get(ACTION_KEY_PREFIX + key, out));
}
@Override
@@ -80,7 +81,7 @@
@Override
public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
- put(key, in.length, new ByteArrayInputStream(in));
+ put(ACTION_KEY_PREFIX + key, in.length, new ByteArrayInputStream(in));
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
index 0a1d806..b2f5371 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
@@ -20,11 +20,9 @@
"//third_party/protobuf:protobuf_java",
"@googleapis//:google_bytestream_bytestream_java_grpc",
"@googleapis//:google_bytestream_bytestream_java_proto",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
"@googleapis//:google_rpc_status_java_proto",
- "@googleapis//:google_watch_v1_java_grpc",
- "@googleapis//:google_watch_v1_java_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/ExecuteHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/ExecuteHandler.java
index 52ad59c..665e89b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/ExecuteHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/ExecuteHandler.java
@@ -14,9 +14,9 @@
package com.google.devtools.build.lib.remote.logging;
+import build.bazel.remote.execution.v2.ExecuteRequest;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ExecuteDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.longrunning.Operation;
/** LoggingHandler for google.devtools.remoteexecution.v1test.Execution.Execute gRPC call. */
@@ -35,7 +35,7 @@
@Override
public void handleResp(Operation message) {
- builder.setResponse(message);
+ builder.addResponses(message);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/FindMissingBlobsHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/FindMissingBlobsHandler.java
index 9a69d2f..7aea4b2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/FindMissingBlobsHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/FindMissingBlobsHandler.java
@@ -14,10 +14,10 @@
package com.google.devtools.build.lib.remote.logging;
+import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
+import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.FindMissingBlobsDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
/**
* LoggingHandler for {@link
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/GetActionResultHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/GetActionResultHandler.java
index c5d421f..eafbc7c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/GetActionResultHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/GetActionResultHandler.java
@@ -14,10 +14,10 @@
package com.google.devtools.build.lib.remote.logging;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.GetActionResultRequest;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
/**
* LoggingHandler for {@link google.devtools.remoteexecution.v1test.ActionCache.GetActionResult}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
index bf1b393..4d0e1ea 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
@@ -14,17 +14,16 @@
package com.google.devtools.build.lib.remote.logging;
+import build.bazel.remote.execution.v2.ActionCacheGrpc;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
+import build.bazel.remote.execution.v2.ExecutionGrpc;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.bytestream.ByteStreamGrpc;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.protobuf.Timestamp;
-import com.google.watcher.v1.WatcherGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -59,8 +58,8 @@
MethodDescriptor<ReqT, RespT> method) {
if (method == ExecutionGrpc.getExecuteMethod()) {
return new ExecuteHandler();
- } else if (method == WatcherGrpc.getWatchMethod()) {
- return new WatchHandler();
+ } else if (method == ExecutionGrpc.getWaitExecutionMethod()) {
+ return new WaitExecutionHandler();
} else if (method == ActionCacheGrpc.getGetActionResultMethod()) {
return new GetActionResultHandler();
} else if (method == ContentAddressableStorageGrpc.getFindMissingBlobsMethod()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/WatchHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/WaitExecutionHandler.java
similarity index 62%
rename from src/main/java/com/google/devtools/build/lib/remote/logging/WatchHandler.java
rename to src/main/java/com/google/devtools/build/lib/remote/logging/WaitExecutionHandler.java
index 3dab899..2d99137 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/WatchHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/WaitExecutionHandler.java
@@ -14,27 +14,27 @@
package com.google.devtools.build.lib.remote.logging;
+import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
-import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WatchDetails;
-import com.google.watcher.v1.ChangeBatch;
-import com.google.watcher.v1.Request;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WaitExecutionDetails;
+import com.google.longrunning.Operation;
-/** LoggingHandler for {@link google.watcher.v1.Watch} gRPC call. */
-public class WatchHandler implements LoggingHandler<Request, ChangeBatch> {
- private final WatchDetails.Builder builder = WatchDetails.newBuilder();
+/** LoggingHandler for {@link build.bazel.remote.execution.v2.WaitExecution} gRPC call. */
+public class WaitExecutionHandler implements LoggingHandler<WaitExecutionRequest, Operation> {
+ private final WaitExecutionDetails.Builder builder = WaitExecutionDetails.newBuilder();
@Override
- public void handleReq(Request message) {
+ public void handleReq(WaitExecutionRequest message) {
builder.setRequest(message);
}
@Override
- public void handleResp(ChangeBatch message) {
+ public void handleResp(Operation message) {
builder.addResponses(message);
}
@Override
public RpcCallDetails getDetails() {
- return RpcCallDetails.newBuilder().setWatch(builder).build();
+ return RpcCallDetails.newBuilder().setWaitExecution(builder).build();
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
index 4183437..307858c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -17,6 +17,6 @@
"//third_party:guava",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
index bbe1da1..55b9af4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -15,6 +15,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashingOutputStream;
@@ -26,8 +28,6 @@
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
index 5955d5d..4740a00 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
@@ -13,12 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
+import build.bazel.remote.execution.v2.RequestMetadata;
+import build.bazel.remote.execution.v2.ToolDetails;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
-import com.google.devtools.remoteexecution.v1test.ToolDetails;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Contexts;
diff --git a/src/main/java/com/google/devtools/build/lib/sandbox/BUILD b/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
index 49dfba6..c8da1fd 100644
--- a/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
@@ -37,6 +37,6 @@
"//third_party:guava",
"//third_party:jsr305",
"//third_party/protobuf:protobuf_java",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/sandbox/DockerSandboxedSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/sandbox/DockerSandboxedSpawnRunner.java
index 61c91b2..9b2f468 100644
--- a/src/main/java/com/google/devtools/build/lib/sandbox/DockerSandboxedSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/sandbox/DockerSandboxedSpawnRunner.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.sandbox;
+import build.bazel.remote.execution.v2.Platform;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -38,7 +39,6 @@
import com.google.devtools.build.lib.util.ProcessUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
-import com.google.devtools.remoteexecution.v1test.Platform;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import java.io.ByteArrayInputStream;
diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD
index 55c8300..f7037ce 100644
--- a/src/main/protobuf/BUILD
+++ b/src/main/protobuf/BUILD
@@ -151,10 +151,9 @@
deps = [
"@com_google_protobuf//:timestamp_proto",
"@googleapis//:google_bytestream_bytestream_proto",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_proto",
"@googleapis//:google_longrunning_operations_proto",
"@googleapis//:google_rpc_status_proto",
- "@googleapis//:google_watch_v1_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_proto",
],
)
diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto
index 32d7694..7bfa4a6 100644
--- a/src/main/protobuf/remote_execution_log.proto
+++ b/src/main/protobuf/remote_execution_log.proto
@@ -16,19 +16,18 @@
package remote_logging;
+import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/protobuf/timestamp.proto";
import "google/bytestream/bytestream.proto";
-import "google/devtools/remoteexecution/v1test/remote_execution.proto";
import "google/longrunning/operations.proto";
import "google/rpc/status.proto";
-import "google/watcher/v1/watch.proto";
option java_package = "com.google.devtools.build.lib.remote.logging";
// A single log entry for gRPC calls related to remote execution.
message LogEntry {
// Request metadata included in call.
- google.devtools.remoteexecution.v1test.RequestMetadata metadata = 1;
+ build.bazel.remote.execution.v2.RequestMetadata metadata = 1;
// Status of the call on close.
google.rpc.Status status = 2;
@@ -49,47 +48,46 @@
}
// Details for a call to
-// google.devtools.remoteexecution.v1test.Execution.Execute.
+// build.bazel.remote.execution.v2.Execution.Execute.
message ExecuteDetails {
- // The google.devtools.remoteexecution.v1test.ExecuteRequest sent by the
+ // The build.bazel.remote.execution.v2.ExecuteRequest sent by the
// call.
- google.devtools.remoteexecution.v1test.ExecuteRequest request = 1;
+ build.bazel.remote.execution.v2.ExecuteRequest request = 1;
- // The google.longrunning.Operation received by the Execute call.
- google.longrunning.Operation response = 2;
+ // Each google.longrunning.Operation received by the Execute call in order.
+ repeated google.longrunning.Operation responses = 2;
}
// Details for a call to
-// google.devtools.remoteexecution.v1test.ActionCache.GetActionResult.
+// build.bazel.remote.execution.v2.ActionCache.GetActionResult.
message GetActionResultDetails {
- // The google.devtools.remoteexecution.v1test.GetActionResultRequest sent by
+ // The build.bazel.remote.execution.v2.GetActionResultRequest sent by
// the call.
- google.devtools.remoteexecution.v1test.GetActionResultRequest request = 1;
+ build.bazel.remote.execution.v2.GetActionResultRequest request = 1;
- // The received google.devtools.remoteexecution.v1test.ActionResult.
- google.devtools.remoteexecution.v1test.ActionResult response = 2;
+ // The received build.bazel.remote.execution.v2.ActionResult.
+ build.bazel.remote.execution.v2.ActionResult response = 2;
}
-// Details for a call to google.watcher.v1.Watch.
-message WatchDetails {
+// Details for a call to build.bazel.remote.execution.v2.WaitExecution.
+message WaitExecutionDetails {
// The google.watcher.v1.Request sent by the Watch call.
- google.watcher.v1.Request request = 1;
+ build.bazel.remote.execution.v2.WaitExecutionRequest request = 1;
- // Each google.watcher.v1.ChangeBatch response received from the
- // Watch call in order.
- repeated google.watcher.v1.ChangeBatch responses = 2;
+ // Each google.longrunning.Operation received by the call in order.
+ repeated google.longrunning.Operation responses = 2;
}
// Details for a call to
-// google.devtools.remoteexecution.v1test.ContentAddressableStorage.FindMissingBlobs.
+// build.bazel.remote.execution.v2.ContentAddressableStorage.FindMissingBlobs.
message FindMissingBlobsDetails {
- // The google.devtools.remoteexecution.v1test.FindMissingBlobsRequest request
+ // The build.bazel.remote.execution.v2.FindMissingBlobsRequest request
// sent.
- google.devtools.remoteexecution.v1test.FindMissingBlobsRequest request = 1;
+ build.bazel.remote.execution.v2.FindMissingBlobsRequest request = 1;
- // The google.devtools.remoteexecution.v1test.FindMissingBlobsResponse
+ // The build.bazel.remote.execution.v2.FindMissingBlobsResponse
// received.
- google.devtools.remoteexecution.v1test.FindMissingBlobsResponse response = 2;
+ build.bazel.remote.execution.v2.FindMissingBlobsResponse response = 2;
}
// Details for a call to google.bytestream.Read.
@@ -124,11 +122,12 @@
// Contains details for specific types of calls.
message RpcCallDetails {
+ reserved 1 to 4;
oneof details {
- ExecuteDetails execute = 1;
- GetActionResultDetails get_action_result = 2;
- WatchDetails watch = 3;
- FindMissingBlobsDetails find_missing_blobs = 4;
+ ExecuteDetails execute = 7;
+ GetActionResultDetails get_action_result = 8;
+ WaitExecutionDetails wait_execution = 9;
+ FindMissingBlobsDetails find_missing_blobs = 10;
ReadDetails read = 5;
WriteDetails write = 6;
}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index a4941a7..a9d39d8 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1275,13 +1275,11 @@
"//third_party/protobuf:protobuf_java",
"@googleapis//:google_bytestream_bytestream_java_grpc",
"@googleapis//:google_bytestream_bytestream_java_proto",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
"@googleapis//:google_rpc_code_java_proto",
"@googleapis//:google_rpc_status_java_proto",
- "@googleapis//:google_watch_v1_java_grpc",
- "@googleapis//:google_watch_v1_java_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java
index 7afe9b8..080d385 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java
@@ -17,6 +17,11 @@
import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
import static org.junit.Assert.fail;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.OutputFile;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
@@ -38,10 +43,6 @@
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.OutputFile;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -217,8 +218,9 @@
}
@Override
- public void ensureInputsPresent(TreeNodeRepository repository, Path execRoot, TreeNode root,
- Command command) throws IOException, InterruptedException {
+ public void ensureInputsPresent(
+ TreeNodeRepository repository, Path execRoot, TreeNode root, Action action, Command command)
+ throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@@ -230,8 +232,15 @@
}
@Override
- void upload(ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr,
- boolean uploadAction) throws ExecException, IOException, InterruptedException {
+ void upload(
+ ActionKey actionKey,
+ Action action,
+ Command command,
+ Path execRoot,
+ Collection<Path> files,
+ FileOutErr outErr,
+ boolean uploadAction)
+ throws ExecException, IOException, InterruptedException {
throw new UnsupportedOperationException();
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index f45e3bc..69c107b 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -17,6 +17,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.io.BaseEncoding;
@@ -35,7 +36,6 @@
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
-import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Server;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 0e142de..303cfdd 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -17,6 +17,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.WriteRequest;
@@ -30,8 +32,6 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.protobuf.ByteString;
import io.grpc.BindableService;
import io.grpc.Context;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
index a157260..3c3d4bc 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
@@ -16,10 +16,10 @@
import static com.google.common.truth.Truth.assertThat;
import static junit.framework.TestCase.fail;
+import build.bazel.remote.execution.v2.Digest;
import com.google.devtools.build.lib.remote.Chunker.Chunk;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
-import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
index 0a67d62..3d21a51 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
@@ -13,6 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Tree;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@@ -26,8 +28,6 @@
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.Symlinks;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Tree;
import java.io.IOException;
/** A fake implementation of the {@link MetadataProvider} interface. */
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
index f8d8fe5..30e7033 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
@@ -15,11 +15,11 @@
import static com.google.common.truth.Truth.assertThat;
+import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.common.collect.ImmutableMap;
-import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index a2391db..21f3a25 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -19,6 +19,20 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import build.bazel.remote.execution.v2.DirectoryNode;
+import build.bazel.remote.execution.v2.FileNode;
+import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
+import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
+import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.Tree;
+import build.bazel.remote.execution.v2.UpdateActionResultRequest;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
@@ -49,20 +63,6 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
-import com.google.devtools.remoteexecution.v1test.DirectoryNode;
-import com.google.devtools.remoteexecution.v1test.FileNode;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.Tree;
-import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
@@ -297,7 +297,12 @@
});
// Upload all missing inputs (that is, the virtual action input from above)
- client.ensureInputsPresent(treeNodeRepository, execRoot, root, Command.getDefaultInstance());
+ client.ensureInputsPresent(
+ treeNodeRepository,
+ execRoot,
+ root,
+ Action.getDefaultInstance(),
+ Command.getDefaultInstance());
assertThat(writeOccurred.get()).named("WriteOccurred").isTrue();
}
@@ -655,7 +660,8 @@
});
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(execRoot, ImmutableList.<Path>of(fooFile, barDir), outErr, result);
+ client.upload(
+ execRoot, null, null, ImmutableList.<Path>of(fooFile, barDir), outErr, false, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
@@ -684,7 +690,7 @@
});
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(execRoot, ImmutableList.<Path>of(barDir), outErr, result);
+ client.upload(execRoot, null, null, ImmutableList.<Path>of(barDir), outErr, false, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
@@ -735,7 +741,7 @@
});
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(execRoot, ImmutableList.<Path>of(barDir), outErr, result);
+ client.upload(execRoot, null, null, ImmutableList.<Path>of(barDir), outErr, false, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
@@ -751,13 +757,18 @@
final Path fooFile = execRoot.getRelative("a/foo");
final Path barFile = execRoot.getRelative("bar");
barFile.setExecutable(true);
+ Command command = Command.newBuilder().addOutputFiles("a/foo").build();
+ final Digest cmdDigest = DIGEST_UTIL.compute(command.toByteArray());
+ Action action = Action.newBuilder().setCommandDigest(cmdDigest).build();
+ final Digest actionDigest = DIGEST_UTIL.compute(action.toByteArray());
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
- assertThat(request.getBlobDigestsList()).containsExactly(fooDigest, barDigest);
+ assertThat(request.getBlobDigestsList())
+ .containsExactly(cmdDigest, actionDigest, fooDigest, barDigest);
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
@@ -765,7 +776,8 @@
});
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, result);
+ client.upload(
+ execRoot, action, command, ImmutableList.<Path>of(fooFile, barFile), outErr, true, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
expectedResult
@@ -807,7 +819,14 @@
ActionKey emptyKey = DIGEST_UTIL.computeActionKey(Action.getDefaultInstance());
Path fooFile = execRoot.getRelative("a/foo");
Path barFile = execRoot.getRelative("bar");
- client.upload(emptyKey, execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, false);
+ client.upload(
+ emptyKey,
+ Action.getDefaultInstance(),
+ Command.getDefaultInstance(),
+ execRoot,
+ ImmutableList.<Path>of(fooFile, barFile),
+ outErr,
+ false);
}
@Test
@@ -833,7 +852,7 @@
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
if (numErrors-- <= 0) {
- // Everything is missing.
+ // All outputs are missing.
responseObserver.onNext(
FindMissingBlobsResponse.newBuilder()
.addMissingBlobDigests(fooDigest)
@@ -925,7 +944,13 @@
}
});
client.upload(
- actionKey, execRoot, ImmutableList.<Path>of(fooFile, barFile, bazFile), outErr, true);
+ actionKey,
+ Action.getDefaultInstance(),
+ Command.getDefaultInstance(),
+ execRoot,
+ ImmutableList.<Path>of(fooFile, barFile, bazFile),
+ outErr,
+ true);
// 4 times for the errors, 3 times for the successful uploads.
Mockito.verify(mockByteStreamImpl, Mockito.times(7))
.write(Mockito.<StreamObserver<WriteResponse>>anyObject());
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index 93d1cdd..84db066 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -18,6 +18,20 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.ExecuteRequest;
+import build.bazel.remote.execution.v2.ExecuteResponse;
+import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
+import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
+import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
+import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.OutputFile;
+import build.bazel.remote.execution.v2.RequestMetadata;
+import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
@@ -57,27 +71,10 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.OutputFile;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
-import com.google.watcher.v1.Change;
-import com.google.watcher.v1.ChangeBatch;
-import com.google.watcher.v1.Request;
-import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.BindableService;
import io.grpc.CallCredentials;
import io.grpc.Metadata;
@@ -97,6 +94,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -129,6 +127,8 @@
private SimpleSpawn simpleSpawn;
private FakeActionInputFileCache fakeFileCache;
private Digest inputDigest;
+ private Digest cmdDigest;
+ private Command command;
private RemoteSpawnRunner client;
private FileOutErr outErr;
private Server fakeServer;
@@ -254,13 +254,13 @@
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions,
- RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryService,
Retrier.ALLOW_ALL_CALLS);
ReferenceCountedChannel channel =
new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor().build());
GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(channel.retain(), null, remoteOptions.remoteTimeout, retrier);
+ new GrpcRemoteExecutor(channel.retain(), null, retrier);
CallCredentials creds =
GoogleAuthUtils.newCallCredentials(Options.getDefaults(AuthAndTLSOptions.class));
ByteStreamUploader uploader =
@@ -284,6 +284,17 @@
DIGEST_UTIL,
logDir);
inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
+ command =
+ Command.newBuilder()
+ .addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
+ .addEnvironmentVariables(
+ Command.EnvironmentVariable.newBuilder()
+ .setName("VARIABLE")
+ .setValue("value")
+ .build())
+ .addAllOutputFiles(ImmutableList.of("bar", "foo"))
+ .build();
+ cmdDigest = DIGEST_UTIL.compute(command);
channel.release();
}
@@ -550,10 +561,6 @@
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
- // Check that the output files are sorted.
- assertThat(request.getAction().getOutputFilesList())
- .containsExactly("bar", "foo")
- .inOrder();
responseObserver.onNext(
Operation.newBuilder()
.setDone(true)
@@ -565,32 +572,17 @@
};
serviceRegistry.addService(
ServerInterceptors.intercept(execService, new RequestHeadersValidator()));
- final Command command =
- Command.newBuilder()
- .addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
- .addEnvironmentVariables(
- Command.EnvironmentVariable.newBuilder()
- .setName("VARIABLE")
- .setValue("value")
- .build())
- .build();
- final Digest cmdDigest = DIGEST_UTIL.compute(command);
BindableService cas =
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
- FindMissingBlobsResponse.Builder b = FindMissingBlobsResponse.newBuilder();
final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList());
- if (requested.contains(cmdDigest)) {
- b.addMissingBlobDigests(cmdDigest);
- } else if (requested.contains(inputDigest)) {
- b.addMissingBlobDigests(inputDigest);
- } else {
- fail("Unexpected call to findMissingBlobs: " + request);
- }
- responseObserver.onNext(b.build());
+ assertThat(requested).contains(cmdDigest);
+ assertThat(requested).contains(inputDigest);
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(inputDigest).build());
responseObserver.onCompleted();
}
};
@@ -598,7 +590,6 @@
ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
- .thenAnswer(blobWriteAnswer(command.toByteArray()))
.thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8)));
serviceRegistry.addService(
ServerInterceptors.intercept(mockByteStreamImpl, new RequestHeadersValidator()));
@@ -609,10 +600,28 @@
assertThat(result.isCacheHit()).isFalse();
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
+ Mockito.verify(mockByteStreamImpl).write(Mockito.<StreamObserver<WriteResponse>>anyObject());
+ }
+
+ private Answer<Void> answerWith(@Nullable Operation op, Status status) {
+ return invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ StreamObserver<Operation> responseObserver =
+ (StreamObserver<Operation>) invocationOnMock.getArguments()[1];
+ if (op != null) {
+ responseObserver.onNext(op);
+ }
+ if (status.isOk()) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(status.asRuntimeException());
+ }
+ return null;
+ };
}
@Test
- public void remotelyExecuteWithWatchAndRetries() throws Exception {
+ public void remotelyExecuteRetries() throws Exception {
serviceRegistry.addService(
new ActionCacheImplBase() {
private int numErrors = 4;
@@ -633,40 +642,6 @@
.build();
final String opName = "operations/xyz";
- ExecutionImplBase mockExecutionImpl = Mockito.mock(ExecutionImplBase.class);
- Answer<Void> successAnswer =
- invocationOnMock -> {
- @SuppressWarnings("unchecked") StreamObserver<Operation> responseObserver =
- (StreamObserver<Operation>) invocationOnMock.getArguments()[1];
- responseObserver.onNext(Operation.newBuilder().setName(opName).build());
- responseObserver.onCompleted();
- return null;
- };
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked") StreamObserver<Operation> responseObserver =
- (StreamObserver<Operation>) invocationOnMock.getArguments()[1];
- responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
- return null;
- })
- .doAnswer(successAnswer)
- .doAnswer(successAnswer)
- .when(mockExecutionImpl)
- .execute(
- Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
- serviceRegistry.addService(mockExecutionImpl);
-
- WatcherImplBase mockWatcherImpl = Mockito.mock(WatcherImplBase.class);
- Operation operationWithError =
- Operation.newBuilder()
- .setName(opName)
- .setError(com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.getNumber()).build())
- .build();
- Change chOperationWithError =
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(Any.pack(operationWithError))
- .build();
ExecuteResponse executeResponseWithError =
ExecuteResponse.newBuilder()
.setStatus(
@@ -678,95 +653,37 @@
.setDone(true)
.setResponse(Any.pack(executeResponseWithError))
.build();
- Change chOperationWithExecuteError =
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(Any.pack(operationWithExecuteError))
- .build();
+ Operation unfinishedOperation = Operation.newBuilder().setName(opName).build();
Operation opSuccess =
Operation.newBuilder()
.setName(opName)
.setDone(true)
.setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
.build();
- Change chSuccess =
- Change.newBuilder().setState(Change.State.EXISTS).setData(Any.pack(opSuccess)).build();
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- StreamObserver<ChangeBatch> responseObserver =
- (StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
- // Retry the execution call as well as the watch call.
- responseObserver.onNext(
- ChangeBatch.newBuilder().addChanges(chOperationWithError).build());
- responseObserver.onCompleted();
- return null;
- })
- .doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- StreamObserver<ChangeBatch> responseObserver =
- (StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
- // Retry the execution call as well as the watch call.
- responseObserver.onNext(
- ChangeBatch.newBuilder().addChanges(chOperationWithExecuteError).build());
- responseObserver.onCompleted();
- return null;
- })
- .doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- StreamObserver<ChangeBatch> responseObserver =
- (StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
- // Retry the watch call.
- responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
- return null;
- })
- .doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- StreamObserver<ChangeBatch> responseObserver =
- (StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
- // Some optional initial state.
- responseObserver.onNext(
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED).build())
- .build());
- // Still executing.
- responseObserver.onNext(
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(Any.pack(Operation.newBuilder().setName(opName).build()))
- .build())
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(Any.pack(Operation.newBuilder().setName(opName).build()))
- .build())
- .build());
- // Finished executing.
- responseObserver.onNext(ChangeBatch.newBuilder().addChanges(chSuccess).build());
- responseObserver.onCompleted();
- return null;
- })
- .when(mockWatcherImpl)
- .watch(Mockito.<Request>anyObject(), Mockito.<StreamObserver<ChangeBatch>>anyObject());
- serviceRegistry.addService(
- ServerInterceptors.intercept(mockWatcherImpl, new RequestHeadersValidator()));
- final Command command =
- Command.newBuilder()
- .addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
- .addEnvironmentVariables(
- Command.EnvironmentVariable.newBuilder()
- .setName("VARIABLE")
- .setValue("value")
- .build())
- .build();
- final Digest cmdDigest = DIGEST_UTIL.compute(command);
+ ExecutionImplBase mockExecutionImpl = Mockito.mock(ExecutionImplBase.class);
+ // Flow of this test:
+ // - call execute, get retriable gRPC error
+ // - retry: call execute, get retriable Operation error
+ // - retry: call execute, get an Operation, then a retriable gRPC error
+ // - retry: call waitExecute, get a retriable gRPC error
+ // - retry: call waitExecute, get retriable Operation error
+ // - retry: call execute, get successful operation, ignore further errors.
+ Mockito.doAnswer(answerWith(null, Status.UNAVAILABLE))
+ .doAnswer(answerWith(operationWithExecuteError, Status.OK))
+ .doAnswer(answerWith(unfinishedOperation, Status.UNAVAILABLE))
+ .doAnswer(answerWith(opSuccess, Status.UNAVAILABLE)) // last status should be ignored.
+ .when(mockExecutionImpl)
+ .execute(
+ Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
+ Mockito.doAnswer(answerWith(null, Status.UNAVAILABLE))
+ .doAnswer(answerWith(operationWithExecuteError, Status.OK))
+ .when(mockExecutionImpl)
+ .waitExecution(
+ Mockito.<WaitExecutionRequest>anyObject(),
+ Mockito.<StreamObserver<Operation>>anyObject());
+ serviceRegistry.addService(mockExecutionImpl);
+
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
private int numErrors = 4;
@@ -780,24 +697,17 @@
return;
}
- FindMissingBlobsResponse.Builder b = FindMissingBlobsResponse.newBuilder();
final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList());
- if (requested.contains(cmdDigest)) {
- b.addMissingBlobDigests(cmdDigest);
- } else if (requested.contains(inputDigest)) {
- b.addMissingBlobDigests(inputDigest);
- } else {
- fail("Unexpected call to findMissingBlobs: " + request);
- }
- responseObserver.onNext(b.build());
+ assertThat(requested).contains(cmdDigest);
+ assertThat(requested).contains(inputDigest);
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(inputDigest).build());
responseObserver.onCompleted();
}
});
ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
- .thenAnswer(blobWriteAnswerError()) // Error on command upload.
- .thenAnswer(blobWriteAnswer(command.toByteArray())) // Upload command successfully.
.thenAnswer(blobWriteAnswerError()) // Error on the input file.
.thenAnswer(blobWriteAnswerError()) // Error on the input file again.
.thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); // Upload input file successfully.
@@ -832,11 +742,116 @@
Mockito.verify(mockExecutionImpl, Mockito.times(4))
.execute(
Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
- Mockito.verify(mockWatcherImpl, Mockito.times(4))
- .watch(
- Mockito.<Request>anyObject(), Mockito.<StreamObserver<ChangeBatch>>anyObject());
+ Mockito.verify(mockExecutionImpl, Mockito.times(2))
+ .waitExecution(
+ Mockito.<WaitExecutionRequest>anyObject(),
+ Mockito.<StreamObserver<Operation>>anyObject());
Mockito.verify(mockByteStreamImpl, Mockito.times(2))
.read(Mockito.<ReadRequest>anyObject(), Mockito.<StreamObserver<ReadResponse>>anyObject());
+ Mockito.verify(mockByteStreamImpl, Mockito.times(3))
+ .write(Mockito.<StreamObserver<WriteResponse>>anyObject());
+ }
+
+ @Test
+ public void remotelyExecuteRetriesWaitResult() throws Exception {
+ // This test's flow is similar to the previous, except the result
+ // will eventually be returned by the waitExecute function.
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ }
+ });
+ final Digest resultDigest = DIGEST_UTIL.compute("bla".getBytes(UTF_8));
+ final ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .addOutputFiles(OutputFile.newBuilder().setPath("foo").setDigest(resultDigest).build())
+ .build();
+ final String opName = "operations/xyz";
+
+ Operation unfinishedOperation = Operation.newBuilder().setName(opName).build();
+ Operation opSuccess =
+ Operation.newBuilder()
+ .setName(opName)
+ .setDone(true)
+ .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
+ .build();
+
+ ExecutionImplBase mockExecutionImpl = Mockito.mock(ExecutionImplBase.class);
+ // Flow of this test:
+ // - call execute, get an Operation, then a retriable gRPC error
+ // - retry: call waitExecute, get NOT_FOUND (operation lost)
+ // - retry: call execute, get NOT_FOUND (operation lost)
+ // - retry: call execute, get an Operation, then a retriable gRPC error
+ // - retry: call waitExecute, get successful operation, ignore further errors.
+ Mockito.doAnswer(answerWith(unfinishedOperation, Status.UNAVAILABLE))
+ .doAnswer(answerWith(unfinishedOperation, Status.NOT_FOUND))
+ .doAnswer(answerWith(unfinishedOperation, Status.UNAVAILABLE))
+ .when(mockExecutionImpl)
+ .execute(
+ Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
+ Mockito.doAnswer(answerWith(unfinishedOperation, Status.NOT_FOUND))
+ .doAnswer(answerWith(opSuccess, Status.UNAVAILABLE)) // This error is ignored.
+ .when(mockExecutionImpl)
+ .waitExecution(
+ Mockito.<WaitExecutionRequest>anyObject(),
+ Mockito.<StreamObserver<Operation>>anyObject());
+ serviceRegistry.addService(mockExecutionImpl);
+
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList());
+ assertThat(requested).contains(cmdDigest);
+ assertThat(requested).contains(inputDigest);
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(inputDigest).build());
+ responseObserver.onCompleted();
+ }
+ });
+
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); // Upload input file successfully.
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ StreamObserver<ReadResponse> responseObserver =
+ (StreamObserver<ReadResponse>) invocationOnMock.getArguments()[1];
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("bla")).build());
+ responseObserver.onCompleted();
+ return null;
+ })
+ .when(mockByteStreamImpl)
+ .read(Mockito.<ReadRequest>anyObject(), Mockito.<StreamObserver<ReadResponse>>anyObject());
+ serviceRegistry.addService(mockByteStreamImpl);
+
+ SpawnResult result = client.exec(simpleSpawn, simplePolicy);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(result.isCacheHit()).isFalse();
+ assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+ assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
+ Mockito.verify(mockExecutionImpl, Mockito.times(3))
+ .execute(
+ Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
+ Mockito.verify(mockExecutionImpl, Mockito.times(2))
+ .waitExecution(
+ Mockito.<WaitExecutionRequest>anyObject(),
+ Mockito.<StreamObserver<Operation>>anyObject());
+ Mockito.verify(mockByteStreamImpl)
+ .read(Mockito.<ReadRequest>anyObject(), Mockito.<StreamObserver<ReadResponse>>anyObject());
+ Mockito.verify(mockByteStreamImpl, Mockito.times(1))
+ .write(Mockito.<StreamObserver<WriteResponse>>anyObject());
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
index f432ffd..b3387bd 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
@@ -21,6 +21,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
@@ -57,9 +61,6 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -231,10 +232,13 @@
any(TreeNodeRepository.class),
any(Path.class),
any(TreeNode.class),
+ any(Action.class),
any(Command.class));
verify(remoteCache, never())
.upload(
any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
any(Path.class),
any(Collection.class),
any(FileOutErr.class),
@@ -271,10 +275,24 @@
}
})
.when(remoteCache)
- .upload(any(ActionKey.class), any(Path.class), eq(outputFiles), eq(outErr), eq(true));
+ .upload(
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ any(Path.class),
+ eq(outputFiles),
+ eq(outErr),
+ eq(true));
entry.store(result);
verify(remoteCache)
- .upload(any(ActionKey.class), any(Path.class), eq(outputFiles), eq(outErr), eq(true));
+ .upload(
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ any(Path.class),
+ eq(outputFiles),
+ eq(outErr),
+ eq(true));
assertThat(progressUpdates)
.containsExactly(Pair.of(ProgressStatus.CHECKING_CACHE, "remote-cache"));
}
@@ -306,7 +324,14 @@
entry.store(result);
ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
verify(remoteCache)
- .upload(any(ActionKey.class), any(Path.class), eq(outputFiles), eq(outErr), eq(false));
+ .upload(
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ any(Path.class),
+ eq(outputFiles),
+ eq(outErr),
+ eq(false));
assertThat(progressUpdates).containsExactly();
}
@@ -326,7 +351,14 @@
ImmutableList<Path> outputFiles = ImmutableList.of(fs.getPath("/random/file"));
entry.store(result);
verify(remoteCache)
- .upload(any(ActionKey.class), any(Path.class), eq(outputFiles), eq(outErr), eq(false));
+ .upload(
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ any(Path.class),
+ eq(outputFiles),
+ eq(outErr),
+ eq(false));
assertThat(progressUpdates)
.containsExactly(Pair.of(ProgressStatus.CHECKING_CACHE, "remote-cache"));
}
@@ -345,11 +377,25 @@
doThrow(new IOException("cache down"))
.when(remoteCache)
- .upload(any(ActionKey.class), any(Path.class), eq(outputFiles), eq(outErr), eq(true));
+ .upload(
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ any(Path.class),
+ eq(outputFiles),
+ eq(outErr),
+ eq(true));
entry.store(result);
verify(remoteCache)
- .upload(any(ActionKey.class), any(Path.class), eq(outputFiles), eq(outErr), eq(true));
+ .upload(
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ any(Path.class),
+ eq(outputFiles),
+ eq(outErr),
+ eq(true));
assertThat(eventHandler.getEvents()).hasSize(1);
Event evt = eventHandler.getEvents().get(0);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 12d77f2..4b6d312 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -26,6 +26,13 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.ExecuteRequest;
+import build.bazel.remote.execution.v2.ExecuteResponse;
+import build.bazel.remote.execution.v2.LogFile;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
@@ -68,12 +75,6 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.devtools.remoteexecution.v1test.LogFile;
-import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import java.io.IOException;
import java.io.InputStream;
@@ -200,13 +201,15 @@
ArgumentCaptor<ExecuteRequest> requestCaptor = ArgumentCaptor.forClass(ExecuteRequest.class);
verify(executor).executeRemotely(requestCaptor.capture());
assertThat(requestCaptor.getValue().getSkipCacheLookup()).isTrue();
- assertThat(requestCaptor.getValue().getAction().getDoNotCache()).isTrue();
+ // TODO(olaola): verify that the uploaded action has the doNotCache set.
verify(cache, never())
.getCachedActionResult(any(ActionKey.class));
verify(cache, never())
.upload(
any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
any(Path.class),
any(Collection.class),
any(FileOutErr.class),
@@ -263,6 +266,8 @@
verify(cache)
.upload(
any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
any(Path.class),
any(Collection.class),
any(FileOutErr.class),
@@ -305,11 +310,21 @@
assertThat(runner.exec(spawn, policy)).isSameAs(res);
verify(localRunner).exec(eq(spawn), eq(policy));
- verify(runner).execLocallyAndUpload(eq(spawn), eq(policy), any(SortedMap.class), eq(cache),
- any(ActionKey.class));
+ verify(runner)
+ .execLocallyAndUpload(
+ eq(spawn),
+ eq(policy),
+ any(SortedMap.class),
+ eq(cache),
+ any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
+ eq(true));
verify(cache)
.upload(
any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
any(Path.class),
any(Collection.class),
any(FileOutErr.class),
@@ -391,6 +406,8 @@
.when(cache)
.upload(
any(ActionKey.class),
+ any(Action.class),
+ any(Command.class),
any(Path.class),
any(Collection.class),
any(FileOutErr.class),
@@ -567,7 +584,7 @@
.build());
SettableFuture<Void> completed = SettableFuture.create();
completed.set(null);
- when(cache.downloadFile(eq(logPath), eq(logDigest), eq(null))).thenReturn(completed);
+ when(cache.downloadFile(eq(logPath), eq(logDigest))).thenReturn(completed);
Spawn spawn = newSimpleSpawn();
SpawnExecutionContext policy = new FakeSpawnExecutionContext(spawn);
@@ -576,7 +593,7 @@
assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
verify(executor).executeRemotely(any(ExecuteRequest.class));
- verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(null));
+ verify(cache).downloadFile(eq(logPath), eq(logDigest));
}
@Test
@@ -612,7 +629,7 @@
"", 1, new ExecutionStatusException(resp.getStatus(), resp)));
SettableFuture<Void> completed = SettableFuture.create();
completed.set(null);
- when(cache.downloadFile(eq(logPath), eq(logDigest), eq(null))).thenReturn(completed);
+ when(cache.downloadFile(eq(logPath), eq(logDigest))).thenReturn(completed);
Spawn spawn = newSimpleSpawn();
SpawnExecutionContext policy = new FakeSpawnExecutionContext(spawn);
@@ -621,7 +638,7 @@
assertThat(res.status()).isEqualTo(Status.TIMEOUT);
verify(executor).executeRemotely(any(ExecuteRequest.class));
- verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(null));
+ verify(cache).downloadFile(eq(logPath), eq(logDigest));
}
@Test
@@ -661,7 +678,7 @@
verify(executor).executeRemotely(any(ExecuteRequest.class));
verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class));
- verify(cache, never()).downloadFile(any(Path.class), any(Digest.class), any(ByteString.class));
+ verify(cache, never()).downloadFile(any(Path.class), any(Digest.class));
}
@Test
@@ -701,7 +718,7 @@
verify(executor).executeRemotely(any(ExecuteRequest.class));
verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class));
- verify(cache, never()).downloadFile(any(Path.class), any(Digest.class), any(ByteString.class));
+ verify(cache, never()).downloadFile(any(Path.class), any(Digest.class));
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
index 81c7f7a..691f888 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
@@ -17,6 +17,14 @@
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import build.bazel.remote.execution.v2.DirectoryNode;
+import build.bazel.remote.execution.v2.FileNode;
+import build.bazel.remote.execution.v2.Tree;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
@@ -33,12 +41,6 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
-import com.google.devtools.remoteexecution.v1test.DirectoryNode;
-import com.google.devtools.remoteexecution.v1test.FileNode;
-import com.google.devtools.remoteexecution.v1test.Tree;
import io.grpc.Context;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -334,20 +336,29 @@
final Path quxFile = execRoot.getRelative("bar/qux");
quxFile.setExecutable(true);
final Path barDir = execRoot.getRelative("bar");
+ Command cmd = Command.newBuilder().addOutputFiles("bla").build();
+ final Digest cmdDigest = DIGEST_UTIL.compute(cmd);
+ Action action = Action.newBuilder().setCommandDigest(cmdDigest).build();
+ final Digest actionDigest = DIGEST_UTIL.compute(action);
final ConcurrentMap<String, byte[]> map = new ConcurrentHashMap<>();
final SimpleBlobStoreActionCache client = newClient(map);
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(result, execRoot, ImmutableList.<Path>of(fooFile, barDir));
+ client.upload(result, action, cmd, execRoot, ImmutableList.<Path>of(fooFile, barDir), true);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
assertThat(map.keySet())
- .containsExactly(fooDigest.getHash(), quxDigest.getHash(), barDigest.getHash());
- }
+ .containsExactly(
+ fooDigest.getHash(),
+ quxDigest.getHash(),
+ barDigest.getHash(),
+ cmdDigest.getHash(),
+ actionDigest.getHash());
+ }
@Test
public void testUploadDirectoryEmpty() throws Exception {
@@ -361,7 +372,7 @@
final SimpleBlobStoreActionCache client = newClient(map);
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(result, execRoot, ImmutableList.<Path>of(barDir));
+ client.upload(result, null, null, execRoot, ImmutableList.<Path>of(barDir), false);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
@@ -404,7 +415,7 @@
final Path barDir = execRoot.getRelative("bar");
ActionResult.Builder result = ActionResult.newBuilder();
- client.upload(result, execRoot, ImmutableList.<Path>of(barDir));
+ client.upload(result, null, null, execRoot, ImmutableList.<Path>of(barDir), false);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
diff --git a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
index e31a337..eba2ebb 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
@@ -15,6 +15,8 @@
import static com.google.common.truth.Truth.assertThat;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
import com.google.common.collect.ImmutableCollection;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -31,8 +33,6 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Root;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.Directory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
index c459978..ef137fa 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
@@ -22,6 +22,22 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import build.bazel.remote.execution.v2.ActionCacheGrpc;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.ExecuteRequest;
+import build.bazel.remote.execution.v2.ExecutionGrpc;
+import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
+import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
+import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
+import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.OutputFile;
+import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
@@ -36,37 +52,15 @@
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
-import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WatchDetails;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WaitExecutionDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.testutil.ManualClock;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
-import com.google.watcher.v1.Change;
-import com.google.watcher.v1.ChangeBatch;
-import com.google.watcher.v1.Request;
-import com.google.watcher.v1.WatcherGrpc;
-import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.MethodDescriptor;
@@ -357,32 +351,50 @@
ExecuteRequest request =
ExecuteRequest.newBuilder()
.setInstanceName("test-instance")
- .setAction(Action.newBuilder().addOutputFiles("somefile"))
+ .setActionDigest(DigestUtil.buildDigest("test", 8))
.build();
- Operation response = Operation.newBuilder().setName("test-operation").build();
+ Operation response1 = Operation.newBuilder().setName("test-name").build();
+ Operation response2 =
+ Operation.newBuilder()
+ .setName("test-name")
+ .setDone(true)
+ .setResponse(Any.pack(request))
+ .build();
+
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
- responseObserver.onNext(response);
- clock.advanceMillis(100);
+ responseObserver.onNext(response1);
+ clock.advanceMillis(2200);
+ responseObserver.onNext(response2);
+ clock.advanceMillis(1100);
responseObserver.onCompleted();
}
});
- ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel);
- clock.advanceMillis(15000);
- stub.execute(request);
+ clock.advanceMillis(50000);
+ Iterator<Operation> replies =
+ ExecutionGrpc.newBlockingStub(loggedChannel).execute(request);
+
+ // Read both responses.
+ while (replies.hasNext()) {
+ replies.next();
+ }
+
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ExecutionGrpc.getExecuteMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setExecute(
- ExecuteDetails.newBuilder().setRequest(request).setResponse(response)))
+ ExecuteDetails.newBuilder()
+ .setRequest(request)
+ .addResponses(response1)
+ .addResponses(response2)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
- .setStartTime(Timestamp.newBuilder().setSeconds(15))
- .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
+ .setStartTime(Timestamp.newBuilder().setSeconds(50))
+ .setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -392,7 +404,7 @@
ExecuteRequest request =
ExecuteRequest.newBuilder()
.setInstanceName("test-instance")
- .setAction(Action.newBuilder().addOutputFiles("somefile"))
+ .setActionDigest(DigestUtil.buildDigest("test", 8))
.build();
Status error = Status.NOT_FOUND.withDescription("not found");
serviceRegistry.addService(
@@ -403,9 +415,15 @@
responseObserver.onError(error.asRuntimeException());
}
});
- ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel);
clock.advanceMillis(20000000000001L);
- assertThrows(StatusRuntimeException.class, () -> stub.execute(request));
+ Iterator<Operation> replies = ExecutionGrpc.newBlockingStub(loggedChannel).execute(request);
+ assertThrows(
+ StatusRuntimeException.class,
+ () -> {
+ while (replies.hasNext()) {
+ replies.next();
+ }
+ });
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ExecutionGrpc.getExecuteMethod().getFullMethodName())
@@ -511,22 +529,21 @@
}
@Test
- public void testWatchCallOk() {
- Request request = Request.newBuilder().setTarget("test-target").build();
- ChangeBatch response1 =
- ChangeBatch.newBuilder()
- .addChanges(Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED))
- .build();
- ChangeBatch response2 =
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder().setState(Change.State.EXISTS).setData(Any.pack(request)))
+ public void testWaitExecutionCallOk() {
+ WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName("test-name").build();
+ Operation response1 = Operation.newBuilder().setName("test-name").build();
+ Operation response2 =
+ Operation.newBuilder()
+ .setName("test-name")
+ .setDone(true)
+ .setResponse(Any.pack(request))
.build();
serviceRegistry.addService(
- new WatcherImplBase() {
+ new ExecutionImplBase() {
@Override
- public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
+ public void waitExecution(
+ WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
responseObserver.onNext(response1);
clock.advanceMillis(2200);
responseObserver.onNext(response2);
@@ -536,7 +553,8 @@
});
clock.advanceMillis(50000);
- Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request);
+ Iterator<Operation> replies =
+ ExecutionGrpc.newBlockingStub(loggedChannel).waitExecution(request);
// Read both responses.
while (replies.hasNext()) {
@@ -545,11 +563,11 @@
LogEntry expectedEntry =
LogEntry.newBuilder()
- .setMethodName(WatcherGrpc.getWatchMethod().getFullMethodName())
+ .setMethodName(ExecutionGrpc.getWaitExecutionMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
- .setWatch(
- WatchDetails.newBuilder()
+ .setWaitExecution(
+ WaitExecutionDetails.newBuilder()
.setRequest(request)
.addResponses(response1)
.addResponses(response2)))
@@ -561,18 +579,16 @@
}
@Test
- public void testWatchCallFail() {
- Request request = Request.newBuilder().setTarget("test-target").build();
- ChangeBatch response =
- ChangeBatch.newBuilder()
- .addChanges(Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED))
- .build();
+ public void testWaitExecutionCallFail() {
+ WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName("test-name").build();
+ Operation response = Operation.newBuilder().setName("test-name").build();
Status error = Status.DEADLINE_EXCEEDED.withDescription("timed out");
serviceRegistry.addService(
- new WatcherImplBase() {
+ new ExecutionImplBase() {
@Override
- public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
+ public void waitExecution(
+ WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
clock.advanceMillis(100);
responseObserver.onNext(response);
clock.advanceMillis(100);
@@ -581,7 +597,8 @@
});
clock.advanceMillis(2000);
- Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request);
+ Iterator<Operation> replies =
+ ExecutionGrpc.newBlockingStub(loggedChannel).waitExecution(request);
assertThrows(
StatusRuntimeException.class,
() -> {
@@ -592,10 +609,13 @@
LogEntry expectedEntry =
LogEntry.newBuilder()
- .setMethodName(WatcherGrpc.getWatchMethod().getFullMethodName())
+ .setMethodName(ExecutionGrpc.getWaitExecutionMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
- .setWatch(WatchDetails.newBuilder().setRequest(request).addResponses(response)))
+ .setWaitExecution(
+ WaitExecutionDetails.newBuilder()
+ .setRequest(request)
+ .addResponses(response)))
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java
index dc66af0..61c009d 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ActionCacheServer.java
@@ -16,13 +16,13 @@
import static java.util.logging.Level.WARNING;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.GetActionResultRequest;
+import build.bazel.remote.execution.v2.UpdateActionResultRequest;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import io.grpc.stub.StreamObserver;
import java.util.logging.Logger;
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
index 9abb6af..9739b05 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
@@ -33,13 +33,11 @@
"//third_party/protobuf:protobuf_java_util",
"@googleapis//:google_bytestream_bytestream_java_grpc",
"@googleapis//:google_bytestream_bytestream_java_proto",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
"@googleapis//:google_rpc_code_java_proto",
"@googleapis//:google_rpc_error_details_java_proto",
"@googleapis//:google_rpc_status_java_proto",
- "@googleapis//:google_watch_v1_java_grpc",
- "@googleapis//:google_watch_v1_java_proto",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
+ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
index 08c4163..57e1012 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
@@ -18,6 +18,7 @@
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
+import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
@@ -29,7 +30,6 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
index 77325c5..6ce576f 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
@@ -14,14 +14,13 @@
package com.google.devtools.build.remote.worker;
+import build.bazel.remote.execution.v2.BatchUpdateBlobsRequest;
+import build.bazel.remote.execution.v2.BatchUpdateBlobsResponse;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
+import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.UpdateBlobRequest;
import com.google.rpc.Code;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
@@ -62,14 +61,14 @@
public void batchUpdateBlobs(
BatchUpdateBlobsRequest request, StreamObserver<BatchUpdateBlobsResponse> responseObserver) {
BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder();
- for (UpdateBlobRequest r : request.getRequestsList()) {
+ for (BatchUpdateBlobsRequest.Request r : request.getRequestsList()) {
BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
try {
Digest digest = cache.uploadBlob(r.getData().toByteArray());
- if (!r.getContentDigest().equals(digest)) {
+ if (!r.getDigest().equals(digest)) {
String err =
- "Upload digest " + r.getContentDigest() + " did not match data digest: " + digest;
- resp.setStatus(StatusUtils.invalidArgumentStatus("content_digest", err));
+ "Upload digest " + r.getDigest() + " did not match data digest: " + digest;
+ resp.setStatus(StatusUtils.invalidArgumentStatus("digest", err));
continue;
}
resp.getStatusBuilder().setCode(Code.OK.getNumber());
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
index 19556dc..d464ec8 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
@@ -20,6 +20,17 @@
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Command.EnvironmentVariable;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.ExecuteRequest;
+import build.bazel.remote.execution.v2.ExecuteResponse;
+import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
+import build.bazel.remote.execution.v2.Platform;
+import build.bazel.remote.execution.v2.RequestMetadata;
+import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -33,26 +44,19 @@
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.shell.AbnormalTerminationException;
-import com.google.devtools.build.lib.shell.Command;
import com.google.devtools.build.lib.shell.CommandException;
import com.google.devtools.build.lib.shell.CommandResult;
import com.google.devtools.build.lib.shell.FutureCommandResult;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
-import com.google.devtools.remoteexecution.v1test.Platform;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.longrunning.Operation;
+import com.google.protobuf.Any;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.Context;
import io.grpc.StatusException;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -66,6 +70,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -76,7 +81,6 @@
final class ExecutionServer extends ExecutionImplBase {
private static final Logger logger = Logger.getLogger(ExecutionServer.class.getName());
-
// The name of the container image entry in the Platform proto
// (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
// experimental_remote_platform_override in
@@ -110,32 +114,101 @@
this.cache = cache;
this.operationsCache = operationsCache;
this.digestUtil = digestUtil;
- ThreadPoolExecutor realExecutor = new ThreadPoolExecutor(
- // This is actually the max number of concurrent jobs.
- workerOptions.jobs,
- // Since we use an unbounded queue, the executor ignores this value, but it still checks
- // that it is greater or equal to the value above.
- workerOptions.jobs,
- // Shut down idle threads after one minute. Threads aren't all that expensive, but we also
- // don't need to keep them around if we don't need them.
- 1, TimeUnit.MINUTES,
- // We use an unbounded queue for now.
- // TODO(ulfjack): We need to reject work eventually.
- new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setNameFormat("subprocess-handler-%d").build());
+ ThreadPoolExecutor realExecutor =
+ new ThreadPoolExecutor(
+ // This is actually the max number of concurrent jobs.
+ workerOptions.jobs,
+ // Since we use an unbounded queue, the executor ignores this value, but it still checks
+ // that it is greater or equal to the value above.
+ workerOptions.jobs,
+ // Shut down idle threads after one minute. Threads aren't all that expensive, but we
+ // also
+ // don't need to keep them around if we don't need them.
+ 1,
+ TimeUnit.MINUTES,
+ // We use an unbounded queue for now.
+ // TODO(ulfjack): We need to reject work eventually.
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("subprocess-handler-%d").build());
// Allow the core threads to die.
realExecutor.allowCoreThreadTimeOut(true);
this.executorService = MoreExecutors.listeningDecorator(realExecutor);
}
@Override
+ public void waitExecution(WaitExecutionRequest wr, StreamObserver<Operation> responseObserver) {
+ final String opName = wr.getName();
+ ListenableFuture<ActionResult> future = operationsCache.get(opName);
+ if (future == null) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.NOT_FOUND.getNumber())
+ .setMessage("Operation not found: " + opName)
+ .build()));
+ return;
+ }
+ waitExecution(opName, future, responseObserver);
+ }
+
+ private void waitExecution(
+ String opName,
+ ListenableFuture<ActionResult> future,
+ StreamObserver<Operation> responseObserver) {
+ future.addListener(
+ () -> {
+ try {
+ try {
+ ActionResult result = future.get();
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setName(opName)
+ .setDone(true)
+ .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))
+ .build());
+ responseObserver.onCompleted();
+ } catch (ExecutionException e) {
+ Throwables.throwIfUnchecked(e.getCause());
+ throw (Exception) e.getCause();
+ }
+ } catch (Exception e) {
+ ExecuteResponse resp;
+ if (e instanceof ExecutionStatusException) {
+ resp = ((ExecutionStatusException) e).getResponse();
+ } else {
+ logger.log(Level.SEVERE, "Work failed: " + opName, e);
+ resp =
+ ExecuteResponse.newBuilder()
+ .setStatus(StatusUtils.internalErrorStatus(e))
+ .build();
+ }
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setName(opName)
+ .setDone(true)
+ .setResponse(Any.pack(resp))
+ .build());
+ responseObserver.onCompleted();
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ operationsCache.remove(opName);
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
final String opName = UUID.randomUUID().toString();
ListenableFuture<ActionResult> future =
executorService.submit(Context.current().wrap(() -> execute(request, opName)));
operationsCache.put(opName, future);
+ // Send the first operation.
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
- responseObserver.onCompleted();
+ // When the operation completes, send the result.
+ waitExecution(opName, future, responseObserver);
}
private ActionResult execute(ExecuteRequest request, String id)
@@ -150,7 +223,7 @@
"build-request-id: %s command-id: %s action-id: %s",
meta.getCorrelatedInvocationsId(), meta.getToolInvocationId(), meta.getActionId());
logger.log(FINE, "Received work for: {0}", workDetails);
- ActionResult result = execute(request.getAction(), tempRoot);
+ ActionResult result = execute(request.getActionDigest(), tempRoot);
logger.log(FINE, "Completed {0}.", workDetails);
return result;
} catch (Exception e) {
@@ -163,7 +236,8 @@
try {
FileSystemUtils.deleteTree(tempRoot);
} catch (IOException e) {
- logger.log(SEVERE,
+ logger.log(
+ SEVERE,
String.format(
"Failed to delete tmp directory %s: %s",
tempRoot, Throwables.getStackTraceAsString(e)));
@@ -172,20 +246,20 @@
}
}
- private ActionResult execute(Action action, Path execRoot)
+ private ActionResult execute(Digest actionDigest, Path execRoot)
throws IOException, InterruptedException, StatusException {
- com.google.devtools.remoteexecution.v1test.Command command = null;
+ Command command = null;
+ Action action = null;
try {
- command =
- com.google.devtools.remoteexecution.v1test.Command.parseFrom(
- getFromFuture(cache.downloadBlob(action.getCommandDigest())));
+ action = Action.parseFrom(getFromFuture(cache.downloadBlob(actionDigest)));
+ command = Command.parseFrom(getFromFuture(cache.downloadBlob(action.getCommandDigest())));
cache.downloadTree(action.getInputRootDigest(), execRoot);
} catch (CacheNotFoundException e) {
throw StatusUtils.notFoundError(e.getMissingDigest());
}
- List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
- for (String output : action.getOutputFilesList()) {
+ List<Path> outputs = new ArrayList<>(command.getOutputFilesList().size());
+ for (String output : command.getOutputFilesList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output file already exists: " + file);
@@ -193,7 +267,7 @@
FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
outputs.add(file);
}
- for (String output : action.getOutputDirectoriesList()) {
+ for (String output : command.getOutputDirectoriesList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output directory/file already exists: " + file);
@@ -204,12 +278,7 @@
// TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
// implementation instead of copying it.
- Command cmd =
- getCommand(
- action,
- command.getArgumentsList(),
- getEnvironmentVariables(command),
- execRoot.getPathString());
+ com.google.devtools.build.lib.shell.Command cmd = getCommand(command, execRoot.getPathString());
long startTime = System.currentTimeMillis();
CommandResult cmdResult = null;
@@ -257,8 +326,9 @@
}
ActionResult.Builder result = ActionResult.newBuilder();
+ boolean setResult = exitCode == 0 && !action.getDoNotCache();
try {
- cache.upload(result, execRoot, outputs);
+ cache.upload(result, action, command, execRoot, outputs, setResult);
} catch (ExecException e) {
if (errStatus == null) {
errStatus =
@@ -276,7 +346,7 @@
if (errStatus != null) {
resp.setStatus(errStatus);
throw new ExecutionStatusException(errStatus, resp.build());
- } else if (exitCode == 0 && !action.getDoNotCache()) {
+ } else if (setResult) {
ActionKey actionKey = digestUtil.computeActionKey(action);
cache.setCachedActionResult(actionKey, finalResult);
}
@@ -292,8 +362,7 @@
return timeoutMillis > 0 && wallTimeMillis > timeoutMillis;
}
- private Map<String, String> getEnvironmentVariables(
- com.google.devtools.remoteexecution.v1test.Command command) {
+ private Map<String, String> getEnvironmentVariables(Command command) {
HashMap<String, String> result = new HashMap<>();
for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
result.put(v.getName(), v.getValue());
@@ -308,11 +377,11 @@
// only a small handful of cases where uid is vital (e.g., if strict permissions are set on the
// output files), so most use cases would work without setting uid.
private long getUid() {
- Command cmd =
- new Command(
+ com.google.devtools.build.lib.shell.Command cmd =
+ new com.google.devtools.build.lib.shell.Command(
new String[] {"id", "-u"},
- /*environmentVariables=*/null,
- /*workingDirectory=*/null,
+ /*environmentVariables=*/ null,
+ /*workingDirectory=*/ null,
uidTimeout);
try {
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
@@ -328,9 +397,9 @@
// Checks Action for docker container definition. If no docker container specified, returns
// null. Otherwise returns docker container name from the parameters.
- private String dockerContainer(Action action) throws StatusException {
+ private String dockerContainer(Command cmd) throws StatusException {
String result = null;
- for (Platform.Property property : action.getPlatform().getPropertiesList()) {
+ for (Platform.Property property : cmd.getPlatform().getPropertiesList()) {
if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
if (result != null) {
// Multiple container name entries
@@ -354,19 +423,17 @@
return result;
}
- // Takes an Action and parameters that can be used to create a Command. Returns the Command.
- // If no docker container is specified inside Action, creates a Command straight from the
+ // Converts the Command proto into the shell Command object.
+ // If no docker container is specified, creates a Command straight from the
// arguments. Otherwise, returns a Command that would run the specified command inside the
// specified docker container.
- private Command getCommand(
- Action action,
- List<String> commandLineElements,
- Map<String, String> environmentVariables,
- String pathString) throws StatusException {
- String container = dockerContainer(action);
+ private com.google.devtools.build.lib.shell.Command getCommand(Command cmd, String pathString)
+ throws StatusException {
+ Map<String, String> environmentVariables = getEnvironmentVariables(cmd);
+ String container = dockerContainer(cmd);
if (container != null) {
// Run command inside a docker container.
- ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
+ ArrayList<String> newCommandLineElements = new ArrayList<>(cmd.getArgumentsCount());
newCommandLineElements.add("docker");
newCommandLineElements.add("run");
@@ -396,12 +463,13 @@
newCommandLineElements.add(container);
- newCommandLineElements.addAll(commandLineElements);
+ newCommandLineElements.addAll(cmd.getArgumentsList());
- return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString));
+ return new com.google.devtools.build.lib.shell.Command(
+ newCommandLineElements.toArray(new String[0]), null, new File(pathString));
} else if (sandboxPath != null) {
// Run command with sandboxing.
- ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
+ ArrayList<String> newCommandLineElements = new ArrayList<>(cmd.getArgumentsCount());
newCommandLineElements.add(sandboxPath.getPathString());
if (workerOptions.sandboxingBlockNetwork) {
newCommandLineElements.add("-N");
@@ -415,15 +483,17 @@
newCommandLineElements.add(tmpfsDir);
}
newCommandLineElements.add("--");
- newCommandLineElements.addAll(commandLineElements);
- return new Command(
+ newCommandLineElements.addAll(cmd.getArgumentsList());
+ return new com.google.devtools.build.lib.shell.Command(
newCommandLineElements.toArray(new String[0]),
environmentVariables,
new File(pathString));
} else {
// Just run the command.
- return new Command(
- commandLineElements.toArray(new String[0]), environmentVariables, new File(pathString));
+ return new com.google.devtools.build.lib.shell.Command(
+ cmd.getArgumentsList().toArray(new String[0]),
+ environmentVariables,
+ new File(pathString));
}
}
}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index e8d8318..e209823 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -19,6 +19,10 @@
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.SEVERE;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -52,11 +56,6 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.common.options.OptionsParser;
import com.google.devtools.common.options.OptionsParsingException;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
-import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
@@ -95,7 +94,6 @@
private final ActionCacheImplBase actionCacheServer;
private final ByteStreamImplBase bsServer;
private final ContentAddressableStorageImplBase casServer;
- private final WatcherImplBase watchServer;
private final ExecutionImplBase execServer;
static FileSystem getFileSystem() {
@@ -142,12 +140,10 @@
ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache =
new ConcurrentHashMap<>();
FileSystemUtils.createDirectoryAndParents(workPath);
- watchServer = new WatcherServer(operationsCache);
execServer =
new ExecutionServer(
workPath, sandboxPath, workerOptions, cache, operationsCache, digestUtil);
} else {
- watchServer = null;
execServer = null;
}
}
@@ -162,7 +158,6 @@
if (execServer != null) {
b.addService(ServerInterceptors.intercept(execServer, headersInterceptor));
- b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor));
} else {
logger.info("Execution disabled, only serving cache requests.");
}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/StatusUtils.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/StatusUtils.java
index 46288e6..59dee38 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/StatusUtils.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/StatusUtils.java
@@ -14,7 +14,7 @@
package com.google.devtools.build.remote.worker;
-import com.google.devtools.remoteexecution.v1test.Digest;
+import build.bazel.remote.execution.v2.Digest;
import com.google.protobuf.Any;
import com.google.rpc.BadRequest;
import com.google.rpc.BadRequest.FieldViolation;
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/WatcherServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/WatcherServer.java
deleted file mode 100644
index 7d3db46..0000000
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/WatcherServer.java
+++ /dev/null
@@ -1,125 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote.worker;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.remote.ExecutionStatusException;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.longrunning.Operation;
-import com.google.protobuf.Any;
-import com.google.rpc.Code;
-import com.google.rpc.Status;
-import com.google.watcher.v1.Change;
-import com.google.watcher.v1.ChangeBatch;
-import com.google.watcher.v1.Request;
-import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
-import io.grpc.protobuf.StatusProto;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/** A basic implementation of an {@link WatcherImplBase} service. */
-final class WatcherServer extends WatcherImplBase {
- private static final Logger logger = Logger.getLogger(WatcherServer.class.getName());
-
- private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
-
- public WatcherServer(ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) {
- this.operationsCache = operationsCache;
- }
-
- @Override
- public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) {
- final String opName = wr.getTarget();
- ListenableFuture<ActionResult> future = operationsCache.get(opName);
- if (future == null) {
- responseObserver.onError(
- StatusProto.toStatusRuntimeException(
- Status.newBuilder()
- .setCode(Code.NOT_FOUND.getNumber())
- .setMessage("Operation not found: " + opName)
- .build()));
- return;
- }
-
- future.addListener(
- () -> {
- try {
- try {
- ActionResult result = future.get();
- responseObserver.onNext(
- packExists(
- Operation.newBuilder()
- .setName(opName)
- .setDone(true)
- .setResponse(
- Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))));
- responseObserver.onCompleted();
- } catch (ExecutionException e) {
- Throwables.throwIfUnchecked(e.getCause());
- throw (Exception) e.getCause();
- }
- } catch (Exception e) {
- ExecuteResponse resp;
- if (e instanceof ExecutionStatusException) {
- resp = ((ExecutionStatusException) e).getResponse();
- } else {
- logger.log(Level.SEVERE, "Work failed: " + opName, e);
- resp =
- ExecuteResponse.newBuilder()
- .setStatus(StatusUtils.internalErrorStatus(e))
- .build();
- }
- responseObserver.onNext(
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(
- Any.pack(
- Operation.newBuilder()
- .setName(opName)
- .setDone(true)
- .setResponse(Any.pack(resp))
- .build()))
- .build())
- .build());
- responseObserver.onCompleted();
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- } finally {
- operationsCache.remove(opName);
- }
- },
- MoreExecutors.directExecutor());
- }
-
- /** Constructs a ChangeBatch with an exists state change that contains the given operation. */
- private ChangeBatch packExists(Operation.Builder message) {
- return ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(
- Any.pack(message.build())))
- .build();
- }
-}