Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 4)
Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext.
PiperOrigin-RevId: 354472775
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index 1b2fd7c..0ca33b6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;
import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@@ -29,7 +30,11 @@
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.collect.ImmutableIterable;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.Path;
import io.grpc.Context;
import io.netty.util.AbstractReferenceCounted;
@@ -50,7 +55,8 @@
implements BuildEventArtifactUploader {
private final ListeningExecutorService uploadExecutor;
- private final Context ctx;
+ private final String buildRequestId;
+ private final String commandId;
private final ByteStreamUploader uploader;
private final String remoteServerInstanceName;
private final MissingDigestsFinder missingDigestsFinder;
@@ -61,7 +67,8 @@
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
String remoteServerName,
- Context ctx,
+ String buildRequestId,
+ String commandId,
@Nullable String remoteInstanceName,
int maxUploadThreads) {
this.uploader = Preconditions.checkNotNull(uploader);
@@ -69,7 +76,8 @@
if (!Strings.isNullOrEmpty(remoteInstanceName)) {
remoteServerInstanceName += "/" + remoteInstanceName;
}
- this.ctx = ctx;
+ this.buildRequestId = buildRequestId;
+ this.commandId = commandId;
this.remoteServerInstanceName = remoteServerInstanceName;
// Limit the maximum threads number to 1000 (chosen arbitrarily)
this.uploadExecutor =
@@ -153,6 +161,8 @@
*/
private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
ImmutableList<ListenableFuture<PathMetadata>> allPaths) throws Exception {
+ Context ctx = TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload");
+
List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
List<PathMetadata> filesToQuery = new ArrayList<>();
Set<Digest> digestsToQuery = new HashSet<>();
@@ -185,6 +195,11 @@
*/
private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
ImmutableIterable<PathMetadata> allPaths) {
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload");
+ RemoteActionExecutionContext context =
+ new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+
ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
ImmutableList.builder();
for (PathMetadata path : allPaths) {
@@ -192,12 +207,8 @@
Chunker chunker =
Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build();
final ListenableFuture<Void> upload;
- Context prevCtx = ctx.attach();
- try {
- upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false);
- } finally {
- ctx.detach(prevCtx);
- }
+ upload =
+ uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false);
allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor));
} else {
allPathsUploaded.add(Futures.immediateFuture(path));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
index 978c10f..1c27fbe 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
@@ -18,7 +18,6 @@
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
-import io.grpc.Context;
import javax.annotation.Nullable;
/**
@@ -29,7 +28,8 @@
private final ByteStreamUploader uploader;
private final String remoteServerName;
- private final Context ctx;
+ private final String buildRequestId;
+ private final String commandId;
private final MissingDigestsFinder missingDigestsFinder;
@Nullable private final String remoteInstanceName;
@@ -37,12 +37,14 @@
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
String remoteServerName,
- Context ctx,
+ String buildRequestId,
+ String commandId,
@Nullable String remoteInstanceName) {
this.uploader = uploader;
this.missingDigestsFinder = missingDigestsFinder;
this.remoteServerName = remoteServerName;
- this.ctx = ctx;
+ this.buildRequestId = buildRequestId;
+ this.commandId = commandId;
this.remoteInstanceName = remoteInstanceName;
}
@@ -52,7 +54,8 @@
uploader.retain(),
missingDigestsFinder,
remoteServerName,
- ctx,
+ buildRequestId,
+ commandId,
remoteInstanceName,
env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index c113379..bba8593 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -36,12 +36,12 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
-import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
@@ -134,9 +134,10 @@
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
- public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
+ public void uploadBlob(
+ RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload)
throws IOException, InterruptedException {
- uploadBlobs(singletonMap(hash, chunker), forceUpload);
+ uploadBlobs(context, singletonMap(hash, chunker), forceUpload);
}
/**
@@ -156,12 +157,14 @@
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
*/
- public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload)
+ public void uploadBlobs(
+ RemoteActionExecutionContext context, Map<HashCode, Chunker> chunkers, boolean forceUpload)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();
for (Map.Entry<HashCode, Chunker> chunkerEntry : chunkers.entrySet()) {
- uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
+ uploads.add(
+ uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
}
try {
@@ -200,14 +203,17 @@
}
}
- /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
+ /**
+ * @deprecated Use {@link #uploadBlobAsync(RemoteActionExecutionContext, Digest, Chunker,
+ * boolean)} instead.
+ */
@Deprecated
@VisibleForTesting
public ListenableFuture<Void> uploadBlobAsync(
- HashCode hash, Chunker chunker, boolean forceUpload) {
+ RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) {
Digest digest =
Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build();
- return uploadBlobAsync(digest, chunker, forceUpload);
+ return uploadBlobAsync(context, digest, chunker, forceUpload);
}
/**
@@ -227,7 +233,7 @@
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public ListenableFuture<Void> uploadBlobAsync(
- Digest digest, Chunker chunker, boolean forceUpload) {
+ RemoteActionExecutionContext context, Digest digest, Chunker chunker, boolean forceUpload) {
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
@@ -242,7 +248,7 @@
ListenableFuture<Void> uploadResult =
Futures.transform(
- startAsyncUpload(digest, chunker),
+ startAsyncUpload(context, digest, chunker),
(v) -> {
synchronized (lock) {
uploadedBlobs.add(HashCode.fromString(digest.getHash()));
@@ -294,7 +300,8 @@
}
/** Starts a file upload and returns a future representing the upload. */
- private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker) {
+ private ListenableFuture<Void> startAsyncUpload(
+ RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
try {
chunker.reset();
} catch (IOException e) {
@@ -313,7 +320,13 @@
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
AsyncUpload newUpload =
new AsyncUpload(
- channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker);
+ context,
+ channel,
+ callCredentialsProvider,
+ callTimeoutSecs,
+ retrier,
+ resourceName,
+ chunker);
ListenableFuture<Void> currUpload = newUpload.start();
currUpload.addListener(
() -> {
@@ -348,6 +361,7 @@
private static class AsyncUpload {
+ private final RemoteActionExecutionContext context;
private final Channel channel;
private final CallCredentialsProvider callCredentialsProvider;
private final long callTimeoutSecs;
@@ -358,12 +372,14 @@
private ClientCall<WriteRequest, WriteResponse> call;
AsyncUpload(
+ RemoteActionExecutionContext context,
Channel channel,
CallCredentialsProvider callCredentialsProvider,
long callTimeoutSecs,
Retrier retrier,
String resourceName,
Chunker chunker) {
+ this.context = context;
this.channel = channel;
this.callCredentialsProvider = callCredentialsProvider;
this.callTimeoutSecs = callTimeoutSecs;
@@ -373,7 +389,6 @@
}
ListenableFuture<Void> start() {
- Context ctx = Context.current();
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
AtomicLong committedOffset = new AtomicLong(0);
@@ -383,8 +398,7 @@
retrier.executeAsync(
() -> {
if (committedOffset.get() < chunker.getSize()) {
- return ctx.call(
- () -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
+ return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
},
@@ -409,7 +423,8 @@
private ByteStreamFutureStub bsFutureStub() {
return ByteStreamGrpc.newFutureStub(channel)
- .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+ .withInterceptors(
+ TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
.withCallCredentials(callCredentialsProvider.getCallCredentials())
.withDeadlineAfter(callTimeoutSecs, SECONDS);
}
@@ -420,7 +435,7 @@
call(committedOffset),
Exception.class,
(e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff),
- Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
+ MoreExecutors.directExecutor());
}
private ListenableFuture<Void> guardQueryWithSuppression(
@@ -584,7 +599,9 @@
}
}
};
- call.start(callListener, TracingMetadataUtils.headersFromCurrentContext());
+ call.start(
+ callListener,
+ TracingMetadataUtils.headersFromRequestMetadata(context.getRequestMetadata()));
call.request(1);
return uploadResult;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index ef031d7..b2aeb4a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -393,16 +393,22 @@
}
@Override
- public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
+ public ListenableFuture<Void> uploadFile(
+ RemoteActionExecutionContext context, Digest digest, Path path) {
return uploader.uploadBlobAsync(
+ context,
digest,
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
/* forceUpload= */ true);
}
@Override
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+ public ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data) {
return uploader.uploadBlobAsync(
- digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true);
+ context,
+ digest,
+ Chunker.builder().setInput(data.toByteArray()).build(),
+ /* forceUpload= */ true);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 3950814..3496243 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -139,7 +139,7 @@
int exitCode)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
- uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
+ uploadOutputs(context, execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
resultBuilder.setExitCode(exitCode);
ActionResult result = resultBuilder.build();
if (exitCode == 0 && !action.getDoNotCache()) {
@@ -162,6 +162,7 @@
}
private void uploadOutputs(
+ RemoteActionExecutionContext context,
Path execRoot,
ActionKey actionKey,
Action action,
@@ -192,14 +193,14 @@
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file != null) {
- uploads.add(cacheProtocol.uploadFile(digest, file));
+ uploads.add(cacheProtocol.uploadFile(context, digest, file));
} else {
ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
- uploads.add(cacheProtocol.uploadBlob(digest, blob));
+ uploads.add(cacheProtocol.uploadBlob(context, digest, blob));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
index 8886025..4398404 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
@@ -53,7 +54,10 @@
* However, remote execution uses a cache to store input files, and that may be a separate
* end-point from the executor itself, so the functionality lives here.
*/
- public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> additionalInputs)
+ public void ensureInputsPresent(
+ RemoteActionExecutionContext context,
+ MerkleTree merkleTree,
+ Map<Digest, Message> additionalInputs)
throws IOException, InterruptedException {
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
@@ -62,30 +66,33 @@
List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
for (Digest missingDigest : missingDigests) {
- uploadFutures.add(uploadBlob(missingDigest, merkleTree, additionalInputs));
+ uploadFutures.add(uploadBlob(context, missingDigest, merkleTree, additionalInputs));
}
waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
}
private ListenableFuture<Void> uploadBlob(
- Digest digest, MerkleTree merkleTree, Map<Digest, Message> additionalInputs) {
+ RemoteActionExecutionContext context,
+ Digest digest,
+ MerkleTree merkleTree,
+ Map<Digest, Message> additionalInputs) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
- return cacheProtocol.uploadBlob(digest, node.toByteString());
+ return cacheProtocol.uploadBlob(context, digest, node.toByteString());
}
PathOrBytes file = merkleTree.getFileByDigest(digest);
if (file != null) {
if (file.getBytes() != null) {
- return cacheProtocol.uploadBlob(digest, file.getBytes());
+ return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
- return cacheProtocol.uploadFile(digest, file.getPath());
+ return cacheProtocol.uploadFile(context, digest, file.getPath());
}
Message message = additionalInputs.get(digest);
if (message != null) {
- return cacheProtocol.uploadBlob(digest, message.toByteString());
+ return cacheProtocol.uploadBlob(context, digest, message.toByteString());
}
return Futures.immediateFailedFuture(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index dfa368b..443d4a8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -93,7 +93,6 @@
import com.google.devtools.common.options.OptionsParsingResult;
import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;
-import io.grpc.Context;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.HashSet;
@@ -501,14 +500,13 @@
digestUtil,
uploader.retain());
uploader.release();
- Context requestContext =
- TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "bes-upload");
buildEventArtifactUploaderFactoryDelegate.init(
new ByteStreamBuildEventArtifactUploaderFactory(
uploader,
cacheClient,
cacheChannel.authority(),
- requestContext,
+ buildRequestId,
+ invocationId,
remoteOptions.remoteInstanceName));
if (enableRemoteExecution) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
index 5d89161..f4a05dc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
@@ -144,7 +144,8 @@
additionalInputs.put(actionDigest, action);
additionalInputs.put(commandHash, command);
- remoteCache.ensureInputsPresent(merkleTree, additionalInputs);
+ remoteCache.ensureInputsPresent(
+ remoteActionExecutionContext, merkleTree, additionalInputs);
}
try (SilentCloseable c =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index fa99c63..3e9486c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -337,7 +337,8 @@
additionalInputs.put(commandHash, command);
Duration networkTimeStart = networkTime.getDuration();
Stopwatch uploadTime = Stopwatch.createStarted();
- remoteCache.ensureInputsPresent(merkleTree, additionalInputs);
+ remoteCache.ensureInputsPresent(
+ remoteActionExecutionContext, merkleTree, additionalInputs);
// subtract network time consumed here to ensure wall clock during upload is not
// double
// counted, and metrics time computation does not exceed total time
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index caf497f..628d214 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -104,20 +104,23 @@
/**
* Uploads a {@code file} to the CAS.
*
+ * @param context the context for the action.
* @param digest The digest of the file.
* @param file The file to upload.
* @return A future representing pending completion of the upload.
*/
- ListenableFuture<Void> uploadFile(Digest digest, Path file);
+ ListenableFuture<Void> uploadFile(RemoteActionExecutionContext context, Digest digest, Path file);
/**
* Uploads a BLOB to the CAS.
*
+ * @param context the context for the action.
* @param digest The digest of the blob.
* @param data The BLOB to upload.
* @return A future representing pending completion of the upload.
*/
- ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
+ ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data);
/** Close resources associated with the remote cache. */
void close();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
index 040a7d4..60c5016 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
@@ -65,11 +65,12 @@
}
@Override
- public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+ public ListenableFuture<Void> uploadFile(
+ RemoteActionExecutionContext context, Digest digest, Path file) {
try {
- diskCache.uploadFile(digest, file).get();
+ diskCache.uploadFile(context, digest, file).get();
if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteUploadLocalResults) {
- remoteCache.uploadFile(digest, file).get();
+ remoteCache.uploadFile(context, digest, file).get();
}
} catch (ExecutionException e) {
return Futures.immediateFailedFuture(e.getCause());
@@ -80,11 +81,12 @@
}
@Override
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+ public ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data) {
try {
- diskCache.uploadBlob(digest, data).get();
+ diskCache.uploadBlob(context, digest, data).get();
if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteUploadLocalResults) {
- remoteCache.uploadBlob(digest, data).get();
+ remoteCache.uploadBlob(context, digest, data).get();
}
} catch (ExecutionException e) {
return Futures.immediateFailedFuture(e.getCause());
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
index 0278913..6778525 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -121,7 +121,8 @@
public void close() {}
@Override
- public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+ public ListenableFuture<Void> uploadFile(
+ RemoteActionExecutionContext context, Digest digest, Path file) {
try (InputStream in = file.getInputStream()) {
saveFile(digest.getHash(), in, /* actionResult= */ false);
} catch (IOException e) {
@@ -131,7 +132,8 @@
}
@Override
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+ public ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data) {
try (InputStream in = data.newInput()) {
saveFile(digest.getHash(), in, /* actionResult= */ false);
} catch (IOException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 7a67bb5..1d8a922 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -671,7 +671,8 @@
}
@Override
- public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+ public ListenableFuture<Void> uploadFile(
+ RemoteActionExecutionContext context, Digest digest, Path file) {
try {
return uploadAsync(
digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
@@ -682,7 +683,8 @@
}
@Override
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+ public ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data) {
return uploadAsync(
digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true);
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index 704bb21..165924a 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -334,7 +334,7 @@
StaticMissingDigestsFinder digestQuerier =
Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest)));
ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class);
- when(uploader.uploadBlobAsync(any(Digest.class), any(), anyBoolean()))
+ when(uploader.uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean()))
.thenReturn(Futures.immediateFuture(null));
ByteStreamBuildEventArtifactUploader artifactUploader =
newArtifactUploader(uploader, digestQuerier);
@@ -350,7 +350,7 @@
// assert
verify(digestQuerier).findMissingDigests(any());
- verify(uploader).uploadBlobAsync(eq(localDigest), any(), anyBoolean());
+ verify(uploader).uploadBlobAsync(any(), eq(localDigest), any(), anyBoolean());
assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash());
assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash());
}
@@ -374,7 +374,8 @@
uploader,
missingDigestsFinder,
"localhost",
- withEmptyMetadata,
+ "none",
+ "none",
"instance",
/* maxUploadThreads= */ 100);
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index eec073a..29cd41e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -36,6 +36,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -86,9 +89,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-/**
- * Tests for {@link ByteStreamUploader}.
- */
+/** Tests for {@link ByteStreamUploader}. */
@RunWith(JUnit4.class)
public class ByteStreamUploaderTest {
@@ -102,6 +103,7 @@
private Server server;
private ManagedChannel channel;
+ private RemoteActionExecutionContext context;
private Context withEmptyMetadata;
private Context prevContext;
@@ -112,12 +114,19 @@
MockitoAnnotations.initMocks(this);
String serverName = "Server for " + this.getClass();
- server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry)
- .build().start();
+ server =
+ InProcessServerBuilder.forName(serverName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .build()
+ .start();
channel = InProcessChannelBuilder.forName(serverName).build();
- withEmptyMetadata =
- TracingMetadataUtils.contextWithMetadata(
- "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(
+ "none",
+ "none",
+ DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash());
+ context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+ withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata);
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@@ -161,7 +170,8 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- serviceRegistry.addService(new ByteStreamImplBase() {
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@@ -183,8 +193,8 @@
ByteString data = writeRequest.getData();
- System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset,
- data.size());
+ System.arraycopy(
+ data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
nextOffset += data.size();
boolean lastWrite = blob.length == nextOffset;
@@ -210,7 +220,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
Mockito.verifyZeroInteractions(mockBackoff);
@@ -328,7 +338,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
@@ -392,7 +402,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
assertThat(numWriteCalls.get()).isEqualTo(1);
@@ -466,7 +476,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should have triggered a single retry, because it made
// no progress.
@@ -508,7 +518,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
Mockito.verifyZeroInteractions(mockBackoff);
@@ -549,7 +559,7 @@
});
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
// expected
@@ -592,7 +602,7 @@
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(chunkers, true);
+ uploader.uploadBlobs(context, chunkers, true);
blockUntilInternalStateConsistent(uploader);
@@ -690,16 +700,19 @@
for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
Digest actionDigest = chunkerEntry.getKey();
- Context ctx =
- TracingMetadataUtils.contextWithMetadata(
- "build-req-id", "command-id", DIGEST_UTIL.asActionKey(actionDigest));
- ctx.run(
- () ->
- uploads.add(
- uploader.uploadBlobAsync(
- HashCode.fromString(actionDigest.getHash()),
- chunkerEntry.getValue(),
- /* forceUpload=*/ true)));
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(
+ "build-req-id",
+ "command-id",
+ DIGEST_UTIL.asActionKey(actionDigest).getDigest().getHash());
+ RemoteActionExecutionContext remoteActionExecutionContext =
+ new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+ uploads.add(
+ uploader.uploadBlobAsync(
+ remoteActionExecutionContext,
+ actionDigest,
+ chunkerEntry.getValue(),
+ /* forceUpload= */ true));
}
for (ListenableFuture<Void> upload : uploads) {
@@ -776,7 +789,7 @@
}
}));
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
}
@Test
@@ -796,47 +809,48 @@
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
AtomicInteger numWriteCalls = new AtomicInteger();
CountDownLatch blocker = new CountDownLatch(1);
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- numWriteCalls.incrementAndGet();
- try {
- // Ensures that the first upload does not finish, before the second upload is started.
- blocker.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- return new StreamObserver<WriteRequest>() {
-
- private long bytesReceived;
-
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
- public void onNext(WriteRequest writeRequest) {
- bytesReceived += writeRequest.getData().size();
- }
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ numWriteCalls.incrementAndGet();
+ try {
+ // Ensures that the first upload does not finish, before the second upload is started.
+ blocker.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
+ return new StreamObserver<WriteRequest>() {
- @Override
- public void onCompleted() {
- response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
- response.onCompleted();
- }
- };
- }
- });
+ private long bytesReceived;
- Future<?> upload1 = uploader.uploadBlobAsync(hash, chunker, true);
- Future<?> upload2 = uploader.uploadBlobAsync(hash, chunker, true);
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ bytesReceived += writeRequest.getData().size();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
+ response.onCompleted();
+ }
+ };
+ }
+ });
+
+ Future<?> upload1 = uploader.uploadBlobAsync(context, digest, chunker, true);
+ Future<?> upload2 = uploader.uploadBlobAsync(context, digest, chunker, true);
blocker.countDown();
@@ -866,16 +880,17 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- response.onError(Status.INTERNAL.asException());
- return new NoopStreamObserver();
- }
- });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ response.onError(Status.INTERNAL.asException());
+ return new NoopStreamObserver();
+ }
+ });
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -926,14 +941,14 @@
byte[] blob1 = new byte[CHUNK_SIZE];
Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build();
- HashCode hash1 = HashCode.fromString(DIGEST_UTIL.compute(blob1).getHash());
+ Digest digest1 = DIGEST_UTIL.compute(blob1);
byte[] blob2 = new byte[CHUNK_SIZE + 1];
Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build();
- HashCode hash2 = HashCode.fromString(DIGEST_UTIL.compute(blob2).getHash());
+ Digest digest2 = DIGEST_UTIL.compute(blob2);
- ListenableFuture<Void> f1 = uploader.uploadBlobAsync(hash1, chunker1, true);
- ListenableFuture<Void> f2 = uploader.uploadBlobAsync(hash2, chunker2, true);
+ ListenableFuture<Void> f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true);
+ ListenableFuture<Void> f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true);
assertThat(uploader.uploadsInProgress()).isTrue();
@@ -964,14 +979,15 @@
/* callTimeoutSecs= */ 60,
retrier);
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- // Immediately fail the call, so that it is retried.
- response.onError(Status.ABORTED.asException());
- return new NoopStreamObserver();
- }
- });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ // Immediately fail the call, so that it is retried.
+ response.onError(Status.ABORTED.asException());
+ return new NoopStreamObserver();
+ }
+ });
retryService.shutdownNow();
// Random very high timeout, as the test will timeout by itself.
@@ -982,7 +998,7 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1004,35 +1020,34 @@
/* callTimeoutSecs= */ 60,
retrier);
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- return new StreamObserver<WriteRequest>() {
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
- public void onNext(WriteRequest writeRequest) {
- // Test that the resource name doesn't start with an instance name.
- assertThat(writeRequest.getResourceName()).startsWith("uploads/");
- }
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ // Test that the resource name doesn't start with an instance name.
+ assertThat(writeRequest.getResourceName()).startsWith("uploads/");
+ }
- @Override
- public void onError(Throwable throwable) {
+ @Override
+ public void onError(Throwable throwable) {}
+ @Override
+ public void onCompleted() {
+ response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
+ response.onCompleted();
+ }
+ };
}
-
- @Override
- public void onCompleted() {
- response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
- response.onCompleted();
- }
- };
- }
- });
+ });
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
withEmptyMetadata.detach(prevContext);
}
@@ -1053,21 +1068,22 @@
AtomicInteger numCalls = new AtomicInteger();
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- numCalls.incrementAndGet();
- response.onError(Status.INTERNAL.asException());
- return new NoopStreamObserver();
- }
- });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ numCalls.incrementAndGet();
+ response.onError(Status.INTERNAL.asException());
+ return new NoopStreamObserver();
+ }
+ });
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(numCalls.get()).isEqualTo(1);
@@ -1139,7 +1155,7 @@
StatusRuntimeException expected = null;
try {
// This should fail
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
} catch (IOException e) {
if (e.getCause() instanceof StatusRuntimeException) {
expected = (StatusRuntimeException) e.getCause();
@@ -1148,7 +1164,7 @@
assertThat(expected).isNotNull();
assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
// This should trigger an upload.
- uploader.uploadBlob(hash, chunker, false);
+ uploader.uploadBlob(context, hash, chunker, false);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1177,42 +1193,43 @@
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
AtomicInteger numUploads = new AtomicInteger();
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
- numUploads.incrementAndGet();
- return new StreamObserver<WriteRequest>() {
-
- long nextOffset = 0;
-
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
- public void onNext(WriteRequest writeRequest) {
- nextOffset += writeRequest.getData().size();
- boolean lastWrite = blob.length == nextOffset;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ numUploads.incrementAndGet();
+ return new StreamObserver<WriteRequest>() {
+
+ long nextOffset = 0;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ nextOffset += writeRequest.getData().size();
+ boolean lastWrite = blob.length == nextOffset;
+ assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ assertThat(nextOffset).isEqualTo(blob.length);
+
+ WriteResponse response =
+ WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+ };
}
+ });
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- assertThat(nextOffset).isEqualTo(blob.length);
-
- WriteResponse response =
- WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
- };
- }
- });
-
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This should not trigger an upload.
- uploader.uploadBlob(hash, chunker, false);
+ uploader.uploadBlob(context, hash, chunker, false);
assertThat(numUploads.get()).isEqualTo(1);
@@ -1271,11 +1288,7 @@
}
});
- assertThrows(
- IOException.class,
- () -> {
- uploader.uploadBlob(hash, chunker, true);
- });
+ assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true));
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1363,7 +1376,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1378,16 +1391,13 @@
private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
@Override
- public void onNext(WriteRequest writeRequest) {
- }
+ public void onNext(WriteRequest writeRequest) {}
@Override
- public void onError(Throwable throwable) {
- }
+ public void onError(Throwable throwable) {}
@Override
- public void onCompleted() {
- }
+ public void onCompleted() {}
}
static class FixedBackoff implements Retrier.Backoff {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index d22e1ed..c1f0104 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -317,7 +317,7 @@
});
// Upload all missing inputs (that is, the virtual action input from above)
- client.ensureInputsPresent(merkleTree, ImmutableMap.of());
+ client.ensureInputsPresent(remoteActionExecutionContext, merkleTree, ImmutableMap.of());
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
index 92e4ca2..d3bfe63 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
@@ -577,7 +577,7 @@
Directory.newBuilder()
.addSymlinks(SymlinkNode.newBuilder().setName("link").setTarget("../foo")))
.build();
- Digest treeDigest = cache.addContents(tree.toByteArray());
+ Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray());
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest);
// Doesn't check for dangling links, hence download succeeds.
@@ -637,7 +637,7 @@
Directory.newBuilder()
.addSymlinks(SymlinkNode.newBuilder().setName("link").setTarget("/foo")))
.build();
- Digest treeDigest = cache.addContents(tree.toByteArray());
+ Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray());
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest);
IOException expected =
@@ -661,10 +661,10 @@
public void downloadFailureMaintainsDirectories() throws Exception {
InMemoryRemoteCache cache = newRemoteCache();
Tree tree = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
- Digest treeDigest = cache.addContents(tree.toByteArray());
+ Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray());
Digest outputFileDigest =
cache.addException("outputdir/outputfile", new IOException("download failed"));
- Digest otherFileDigest = cache.addContents("otherfile");
+ Digest otherFileDigest = cache.addContents(remoteActionExecutionContext, "otherfile");
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputDirectoriesBuilder().setPath("outputdir").setTreeDigest(treeDigest);
@@ -693,9 +693,9 @@
Path stderr = fs.getPath("/execroot/stderr");
InMemoryRemoteCache cache = newRemoteCache();
- Digest digest1 = cache.addContents("file1");
+ Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
Digest digest2 = cache.addException("file2", new IOException("download failed"));
- Digest digest3 = cache.addContents("file3");
+ Digest digest3 = cache.addContents(remoteActionExecutionContext, "file3");
ActionResult result =
ActionResult.newBuilder()
@@ -729,7 +729,7 @@
Path stderr = fs.getPath("/execroot/stderr");
InMemoryRemoteCache cache = newRemoteCache();
- Digest digest1 = cache.addContents("file1");
+ Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
Digest digest2 = cache.addException("file2", new IOException("file2 failed"));
Digest digest3 = cache.addException("file3", new IOException("file3 failed"));
@@ -764,7 +764,7 @@
Path stderr = fs.getPath("/execroot/stderr");
InMemoryRemoteCache cache = newRemoteCache();
- Digest digest1 = cache.addContents("file1");
+ Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
IOException reusedException = new IOException("reused io exception");
Digest digest2 = cache.addException("file2", reusedException);
Digest digest3 = cache.addException("file3", reusedException);
@@ -800,7 +800,7 @@
Path stderr = fs.getPath("/execroot/stderr");
InMemoryRemoteCache cache = newRemoteCache();
- Digest digest1 = cache.addContents("file1");
+ Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
InterruptedException reusedInterruption = new InterruptedException("reused interruption");
Digest digest2 = cache.addException("file2", reusedInterruption);
Digest digest3 = cache.addException("file3", reusedInterruption);
@@ -840,8 +840,8 @@
when(spyOutErr.childOutErr()).thenReturn(spyChildOutErr);
InMemoryRemoteCache cache = newRemoteCache();
- Digest digestStdout = cache.addContents("stdout");
- Digest digestStderr = cache.addContents("stderr");
+ Digest digestStdout = cache.addContents(remoteActionExecutionContext, "stdout");
+ Digest digestStderr = cache.addContents(remoteActionExecutionContext, "stderr");
ActionResult result =
ActionResult.newBuilder()
@@ -918,8 +918,8 @@
// arrange
InMemoryRemoteCache remoteCache = newRemoteCache();
- Digest d1 = remoteCache.addContents("content1");
- Digest d2 = remoteCache.addContents("content2");
+ Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+ Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
ActionResult r =
ActionResult.newBuilder()
.setExitCode(0)
@@ -950,8 +950,8 @@
// arrange
InMemoryRemoteCache remoteCache = newRemoteCache();
- Digest d1 = remoteCache.addContents("content1");
- Digest d2 = remoteCache.addContents("content2");
+ Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+ Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
ActionResult r =
ActionResult.newBuilder()
.setExitCode(0)
@@ -997,19 +997,19 @@
// Output Directory:
// dir/file1
// dir/a/file2
- Digest d1 = remoteCache.addContents("content1");
- Digest d2 = remoteCache.addContents("content2");
+ Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+ Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
FileNode file1 = FileNode.newBuilder().setName("file1").setDigest(d1).build();
FileNode file2 = FileNode.newBuilder().setName("file2").setDigest(d2).build();
Directory a = Directory.newBuilder().addFiles(file2).build();
- Digest da = remoteCache.addContents(a);
+ Digest da = remoteCache.addContents(remoteActionExecutionContext, a);
Directory root =
Directory.newBuilder()
.addFiles(file1)
.addDirectories(DirectoryNode.newBuilder().setName("a").setDigest(da))
.build();
Tree t = Tree.newBuilder().setRoot(root).addChildren(a).build();
- Digest dt = remoteCache.addContents(t);
+ Digest dt = remoteCache.addContents(remoteActionExecutionContext, t);
ActionResult r =
ActionResult.newBuilder()
.setExitCode(0)
@@ -1070,12 +1070,12 @@
// Output Directory:
// dir/file1
// dir/a/file2
- Digest d1 = remoteCache.addContents("content1");
- Digest d2 = remoteCache.addContents("content2");
+ Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+ Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
FileNode file1 = FileNode.newBuilder().setName("file1").setDigest(d1).build();
FileNode file2 = FileNode.newBuilder().setName("file2").setDigest(d2).build();
Directory a = Directory.newBuilder().addFiles(file2).build();
- Digest da = remoteCache.addContents(a);
+ Digest da = remoteCache.addContents(remoteActionExecutionContext, a);
Directory root =
Directory.newBuilder()
.addFiles(file1)
@@ -1127,8 +1127,8 @@
// arrange
InMemoryRemoteCache remoteCache = newRemoteCache();
- Digest dOut = remoteCache.addContents("stdout");
- Digest dErr = remoteCache.addContents("stderr");
+ Digest dOut = remoteCache.addContents(remoteActionExecutionContext, "stdout");
+ Digest dErr = remoteCache.addContents(remoteActionExecutionContext, "stderr");
ActionResult r =
ActionResult.newBuilder()
.setExitCode(0)
@@ -1168,8 +1168,8 @@
// arrange
InMemoryRemoteCache remoteCache = newRemoteCache();
- Digest d1 = remoteCache.addContents("content1");
- Digest d2 = remoteCache.addContents("content2");
+ Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+ Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
ActionResult r =
ActionResult.newBuilder()
.setExitCode(0)
@@ -1216,7 +1216,7 @@
// arrange
InMemoryRemoteCache remoteCache = newRemoteCache();
- Digest d1 = remoteCache.addContents("in-memory output");
+ Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "in-memory output");
ActionResult r = ActionResult.newBuilder().setExitCode(0).build();
Artifact a1 = ActionsTestUtil.createArtifact(artifactRoot, "file1");
MetadataInjector injector = mock(MetadataInjector.class);
@@ -1644,18 +1644,21 @@
super(new InMemoryCacheClient(), options, digestUtil);
}
- Digest addContents(String txt) throws IOException, InterruptedException {
- return addContents(txt.getBytes(UTF_8));
+ Digest addContents(RemoteActionExecutionContext context, String txt)
+ throws IOException, InterruptedException {
+ return addContents(context, txt.getBytes(UTF_8));
}
- Digest addContents(byte[] bytes) throws IOException, InterruptedException {
+ Digest addContents(RemoteActionExecutionContext context, byte[] bytes)
+ throws IOException, InterruptedException {
Digest digest = digestUtil.compute(bytes);
- Utils.getFromFuture(cacheProtocol.uploadBlob(digest, ByteString.copyFrom(bytes)));
+ Utils.getFromFuture(cacheProtocol.uploadBlob(context, digest, ByteString.copyFrom(bytes)));
return digest;
}
- Digest addContents(Message m) throws IOException, InterruptedException {
- return addContents(m.toByteArray());
+ Digest addContents(RemoteActionExecutionContext context, Message m)
+ throws IOException, InterruptedException {
+ return addContents(context, m.toByteArray());
}
Digest addException(String txt, Exception e) {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java
index 7fde5bc..f313ec9 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java
@@ -93,7 +93,7 @@
assertThat(executionResult.exitCode()).isEqualTo(0);
}
-
+
@Test
public void testNoneZeroExitCodeFromCache() throws IOException, InterruptedException {
// Test that an ActionResult with a none-zero exit code is not accepted as cached.
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 7cb8328..fe23a68 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -246,7 +246,7 @@
runner.exec(spawn, policy);
verify(localRunner).exec(spawn, policy);
- verify(cache).ensureInputsPresent(any(), any());
+ verify(cache).ensureInputsPresent(any(), any(), any());
verifyNoMoreInteractions(cache);
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java
index 41fdd7f..2e13a37 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java
@@ -25,6 +25,7 @@
import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase;
import build.bazel.remote.asset.v1.Qualifier;
import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
@@ -37,6 +38,9 @@
import com.google.devtools.build.lib.remote.ReferenceCountedChannel;
import com.google.devtools.build.lib.remote.RemoteRetrier;
import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -79,6 +83,7 @@
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private final String fakeServerName = "fake server for " + getClass();
private Server fakeServer;
+ private RemoteActionExecutionContext context;
private Context withEmptyMetadata;
private Context prevContext;
private ListeningScheduledExecutorService retryService;
@@ -92,9 +97,13 @@
.directExecutor()
.build()
.start();
- withEmptyMetadata =
- TracingMetadataUtils.contextWithMetadata(
- "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(
+ "none",
+ "none",
+ DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash());
+ context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+ withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata);
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@@ -189,7 +198,7 @@
final RemoteCacheClient cacheClient = new InMemoryCacheClient();
final GrpcRemoteDownloader downloader = newDownloader(cacheClient);
- getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFrom(content)));
+ getFromFuture(cacheClient.uploadBlob(context, contentDigest, ByteString.copyFrom(content)));
final byte[] downloaded =
downloadBlob(
downloader, new URL("http://example.com/content.txt"), Optional.<Checksum>empty());
@@ -225,7 +234,7 @@
final RemoteCacheClient cacheClient = new InMemoryCacheClient();
final GrpcRemoteDownloader downloader = newDownloader(cacheClient);
- getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFrom(content)));
+ getFromFuture(cacheClient.uploadBlob(context, contentDigest, ByteString.copyFrom(content)));
final byte[] downloaded =
downloadBlob(
downloader,
@@ -263,7 +272,8 @@
final RemoteCacheClient cacheClient = new InMemoryCacheClient();
final GrpcRemoteDownloader downloader = newDownloader(cacheClient);
- getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFromUtf8("wrong content")));
+ getFromFuture(
+ cacheClient.uploadBlob(context, contentDigest, ByteString.copyFromUtf8("wrong content")));
IOException e =
assertThrows(
diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
index 7515f32..5c091c2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
@@ -319,7 +319,7 @@
ByteString data = ByteString.copyFrom("foo bar", StandardCharsets.UTF_8);
Digest digest = DIGEST_UTIL.compute(data.toByteArray());
- blobStore.uploadBlob(digest, data).get();
+ blobStore.uploadBlob(remoteActionExecutionContext, digest, data).get();
assertThat(cacheContents).hasSize(1);
String cacheKey = "/cas/" + digest.getHash();
@@ -329,7 +329,7 @@
// Clear the remote cache contents
cacheContents.clear();
- blobStore.uploadBlob(digest, data).get();
+ blobStore.uploadBlob(remoteActionExecutionContext, digest, data).get();
// Nothing should have been uploaded again.
assertThat(cacheContents).isEmpty();
@@ -369,7 +369,9 @@
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
- getFromFuture(blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
+ getFromFuture(
+ blobStore.uploadBlob(
+ remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
fail("Exception expected");
} finally {
testServer.stop(server);
@@ -432,7 +434,10 @@
IOException.class,
() ->
getFromFuture(
- blobStore.uploadBlob(DIGEST_UTIL.compute(data.toByteArray()), data)));
+ blobStore.uploadBlob(
+ remoteActionExecutionContext,
+ DIGEST_UTIL.compute(data.toByteArray()),
+ data)));
assertThat(e.getCause()).isInstanceOf(TooLongFrameException.class);
} finally {
testServer.stop(server);
@@ -564,7 +569,10 @@
Credentials credentials = newCredentials();
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
- blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)).get();
+ blobStore
+ .uploadBlob(
+ remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data))
+ .get();
verify(credentials, times(1)).refresh();
verify(credentials, times(2)).getRequestMetadata(any(URI.class));
verify(credentials, times(2)).hasRequestMetadata();
@@ -621,7 +629,10 @@
HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] oneByte = new byte[] {0};
getFromFuture(
- blobStore.uploadBlob(DIGEST_UTIL.compute(oneByte), ByteString.copyFrom(oneByte)));
+ blobStore.uploadBlob(
+ remoteActionExecutionContext,
+ DIGEST_UTIL.compute(oneByte),
+ ByteString.copyFrom(oneByte)));
fail("Exception expected.");
} catch (Exception e) {
assertThat(e).isInstanceOf(HttpException.class);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
index 9c78ef0..9894f83 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
@@ -107,7 +107,8 @@
}
@Override
- public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+ public ListenableFuture<Void> uploadFile(
+ RemoteActionExecutionContext context, Digest digest, Path file) {
try (InputStream in = file.getInputStream()) {
cas.put(digest, ByteStreams.toByteArray(in));
} catch (IOException e) {
@@ -117,7 +118,8 @@
}
@Override
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+ public ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data) {
try (InputStream in = data.newInput()) {
cas.put(digest, data.toByteArray());
} catch (IOException e) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
index 8bf39e8..2ed32e5 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
@@ -103,6 +103,10 @@
@Override
public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) {
+ RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
+ RemoteActionExecutionContext context =
+ new RemoteActionExecutionContextImpl(meta, new NetworkTime());
+
Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString());
try {
FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory());
@@ -226,7 +230,7 @@
try {
Digest d = digestUtil.compute(temp);
- getFromFuture(cache.uploadFile(d, temp));
+ getFromFuture(cache.uploadFile(context, d, temp));
try {
temp.delete();
} catch (IOException e) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
index 9b3bba5..e862939 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
@@ -68,12 +68,16 @@
@Override
public void batchUpdateBlobs(
BatchUpdateBlobsRequest request, StreamObserver<BatchUpdateBlobsResponse> responseObserver) {
+ RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
+ RemoteActionExecutionContext context =
+ new RemoteActionExecutionContextImpl(meta, new NetworkTime());
+
BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder();
for (BatchUpdateBlobsRequest.Request r : request.getRequestsList()) {
BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
try {
Digest digest = cache.getDigestUtil().compute(r.getData().toByteArray());
- getFromFuture(cache.uploadBlob(digest, r.getData()));
+ getFromFuture(cache.uploadBlob(context, digest, r.getData()));
if (!r.getDigest().equals(digest)) {
String err =
"Upload digest " + r.getDigest() + " did not match data digest: " + digest;
@@ -94,6 +98,7 @@
RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
RemoteActionExecutionContext context =
new RemoteActionExecutionContextImpl(meta, new NetworkTime());
+
// Directories are returned in depth-first order. We store all previously-traversed digests so
// identical subtrees having the same digest will only be traversed and returned once.
Set<Digest> seenDigests = new HashSet<>();
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
index 804f739..1f39c31 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
@@ -61,12 +61,14 @@
}
}
- public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
- return cacheProtocol.uploadFile(digest, file);
+ public ListenableFuture<Void> uploadFile(
+ RemoteActionExecutionContext context, Digest digest, Path file) {
+ return cacheProtocol.uploadFile(context, digest, file);
}
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
- return cacheProtocol.uploadBlob(digest, data);
+ public ListenableFuture<Void> uploadBlob(
+ RemoteActionExecutionContext context, Digest digest, ByteString data) {
+ return cacheProtocol.uploadBlob(context, digest, data);
}
public void uploadActionResult(