Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 3)
Change RemoteCacheClient#downloadBlob to use RemoteActionExecutionContext.
PiperOrigin-RevId: 354239205
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index 85da6b0..ef031d7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -129,9 +129,11 @@
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}
- private ByteStreamStub bsAsyncStub() {
+ private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) {
return ByteStreamGrpc.newStub(channel)
- .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+ .withInterceptors(
+ TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
+ new NetworkTimeInterceptor(context::getNetworkTime))
.withCallCredentials(callCredentialsProvider.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}
@@ -283,7 +285,8 @@
}
@Override
- public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ public ListenableFuture<Void> downloadBlob(
+ RemoteActionExecutionContext context, Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
@@ -295,12 +298,14 @@
out = digestOut;
}
- return downloadBlob(digest, out, digestSupplier);
+ return downloadBlob(context, digest, out, digestSupplier);
}
private ListenableFuture<Void> downloadBlob(
- Digest digest, OutputStream out, @Nullable Supplier<Digest> digestSupplier) {
- Context ctx = Context.current();
+ RemoteActionExecutionContext context,
+ Digest digest,
+ OutputStream out,
+ @Nullable Supplier<Digest> digestSupplier) {
AtomicLong offset = new AtomicLong(0);
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
ListenableFuture<Void> downloadFuture =
@@ -308,10 +313,8 @@
() ->
retrier.executeAsync(
() ->
- ctx.call(
- () ->
- requestRead(
- offset, progressiveBackoff, digest, out, digestSupplier)),
+ requestRead(
+ context, offset, progressiveBackoff, digest, out, digestSupplier),
progressiveBackoff),
callCredentialsProvider);
@@ -331,6 +334,7 @@
}
private ListenableFuture<Void> requestRead(
+ RemoteActionExecutionContext context,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
@@ -338,7 +342,7 @@
@Nullable Supplier<Digest> digestSupplier) {
String resourceName = getResourceName(options.remoteInstanceName, digest);
SettableFuture<Void> future = SettableFuture.create();
- bsAsyncStub()
+ bsAsyncStub(context)
.read(
ReadRequest.newBuilder()
.setResourceName(resourceName)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index 6191f65..e78c168 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -33,6 +33,9 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
@@ -66,15 +69,17 @@
@GuardedBy("lock")
final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();
+ private final String buildRequestId;
+ private final String commandId;
private final RemoteCache remoteCache;
private final Path execRoot;
- private final RequestMetadata requestMetadata;
RemoteActionInputFetcher(
- RemoteCache remoteCache, Path execRoot, RequestMetadata requestMetadata) {
+ String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) {
+ this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
+ this.commandId = Preconditions.checkNotNull(commandId);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
this.execRoot = Preconditions.checkNotNull(execRoot);
- this.requestMetadata = Preconditions.checkNotNull(requestMetadata);
}
/**
@@ -160,13 +165,15 @@
ListenableFuture<Void> download = downloadsInProgress.get(path);
if (download == null) {
- Context ctx =
- TracingMetadataUtils.contextWithMetadata(
- requestMetadata.toBuilder().setActionId(metadata.getActionId()).build());
+ RequestMetadata requestMetadata =
+ TracingMetadataUtils.buildMetadata(buildRequestId, commandId, metadata.getActionId());
+ RemoteActionExecutionContext remoteActionExecutionContext =
+ new RemoteActionExecutionContextImpl(requestMetadata, new NetworkTime());
+ Context ctx = TracingMetadataUtils.contextWithMetadata(requestMetadata);
Context prevCtx = ctx.attach();
try {
Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
- download = remoteCache.downloadFile(path, digest);
+ download = remoteCache.downloadFile(remoteActionExecutionContext, path, digest);
downloadsInProgress.put(path, download);
Futures.addCallback(
download,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 8e68aab..3950814 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -261,14 +261,15 @@
* @return a future that completes after the download completes (succeeds / fails). If successful,
* the content is stored in the future's {@code byte[]}.
*/
- public ListenableFuture<byte[]> downloadBlob(Digest digest) {
+ public ListenableFuture<byte[]> downloadBlob(
+ RemoteActionExecutionContext context, Digest digest) {
if (digest.getSizeBytes() == 0) {
return EMPTY_BYTES;
}
ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
SettableFuture<byte[]> outerF = SettableFuture.create();
Futures.addCallback(
- cacheProtocol.downloadBlob(digest, bOut),
+ cacheProtocol.downloadBlob(context, digest, bOut),
new FutureCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
@@ -305,12 +306,13 @@
* @throws ExecException in case clean up after a failed download failed.
*/
public void download(
+ RemoteActionExecutionContext context,
ActionResult result,
Path execRoot,
FileOutErr origOutErr,
OutputFilesLocker outputFilesLocker)
throws ExecException, IOException, InterruptedException {
- ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);
+ ActionResultMetadata metadata = parseActionResultMetadata(context, result, execRoot);
List<ListenableFuture<FileMetadata>> downloads =
Stream.concat(
@@ -321,7 +323,7 @@
(file) -> {
try {
ListenableFuture<Void> download =
- downloadFile(toTmpDownloadPath(file.path()), file.digest());
+ downloadFile(context, toTmpDownloadPath(file.path()), file.digest());
return Futures.transform(download, (d) -> file, directExecutor());
} catch (IOException e) {
return Futures.<FileMetadata>immediateFailedFuture(e);
@@ -337,7 +339,7 @@
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
- downloads.addAll(downloadOutErr(result, tmpOutErr));
+ downloads.addAll(downloadOutErr(context, result, tmpOutErr));
try {
waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
@@ -449,7 +451,8 @@
}
/** Downloads a file (that is not a directory). The content is fetched from the digest. */
- public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOException {
+ public ListenableFuture<Void> downloadFile(
+ RemoteActionExecutionContext context, Path path, Digest digest) throws IOException {
Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
@@ -472,7 +475,7 @@
OutputStream out = new LazyFileOutputStream(path);
SettableFuture<Void> outerF = SettableFuture.create();
- ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out);
+ ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
Futures.addCallback(
f,
new FutureCallback<Void>() {
@@ -509,7 +512,8 @@
return outerF;
}
- private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
+ private List<ListenableFuture<FileMetadata>> downloadOutErr(
+ RemoteActionExecutionContext context, ActionResult result, OutErr outErr) {
List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
try {
@@ -521,7 +525,8 @@
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
- cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
+ cacheProtocol.downloadBlob(
+ context, result.getStdoutDigest(), outErr.getOutputStream()),
(d) -> null,
directExecutor()));
}
@@ -535,7 +540,8 @@
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
- cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
+ cacheProtocol.downloadBlob(
+ context, result.getStderrDigest(), outErr.getErrorStream()),
(d) -> null,
directExecutor()));
}
@@ -549,6 +555,7 @@
* <p>This method only downloads output directory metadata, stdout and stderr as well as the
* contents of {@code inMemoryOutputPath} if specified.
*
+ * @param context the context this action running with
* @param result the action result metadata of a successfully executed action (exit code = 0).
* @param outputs the action's declared output files
* @param inMemoryOutputPath the path of an output file whose contents should be returned in
@@ -564,6 +571,7 @@
*/
@Nullable
public InMemoryOutput downloadMinimal(
+ RemoteActionExecutionContext context,
String actionId,
ActionResult result,
Collection<? extends ActionInput> outputs,
@@ -579,7 +587,7 @@
ActionResultMetadata metadata;
try (SilentCloseable c = Profiler.instance().profile("Remote.parseActionResultMetadata")) {
- metadata = parseActionResultMetadata(result, execRoot);
+ metadata = parseActionResultMetadata(context, result, execRoot);
}
if (!metadata.symlinks().isEmpty()) {
@@ -614,9 +622,10 @@
try (SilentCloseable c = Profiler.instance().profile("Remote.download")) {
ListenableFuture<byte[]> inMemoryOutputDownload = null;
if (inMemoryOutput != null) {
- inMemoryOutputDownload = downloadBlob(inMemoryOutputDigest);
+ inMemoryOutputDownload = downloadBlob(context, inMemoryOutputDigest);
}
- waitForBulkTransfer(downloadOutErr(result, outErr), /* cancelRemainingOnInterrupt=*/ true);
+ waitForBulkTransfer(
+ downloadOutErr(context, result, outErr), /* cancelRemainingOnInterrupt=*/ true);
if (inMemoryOutputDownload != null) {
waitForBulkTransfer(
ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true);
@@ -708,7 +717,8 @@
return new DirectoryMetadata(filesBuilder.build(), symlinksBuilder.build());
}
- private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult, Path execRoot)
+ private ActionResultMetadata parseActionResultMetadata(
+ RemoteActionExecutionContext context, ActionResult actionResult, Path execRoot)
throws IOException, InterruptedException {
Preconditions.checkNotNull(actionResult, "actionResult");
Map<Path, ListenableFuture<Tree>> dirMetadataDownloads =
@@ -717,7 +727,7 @@
dirMetadataDownloads.put(
execRoot.getRelative(dir.getPath()),
Futures.transform(
- downloadBlob(dir.getTreeDigest()),
+ downloadBlob(context, dir.getTreeDigest()),
(treeBytes) -> {
try {
return Tree.parseFrom(treeBytes);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index ecb175e..dfa368b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -15,7 +15,6 @@
package com.google.devtools.build.lib.remote;
import build.bazel.remote.execution.v2.DigestFunction;
-import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.auth.Credentials;
import com.google.common.annotations.VisibleForTesting;
@@ -512,9 +511,6 @@
requestContext,
remoteOptions.remoteInstanceName));
- Context repoContext =
- TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "repository_rule");
-
if (enableRemoteExecution) {
RemoteExecutionClient remoteExecutor;
if (remoteOptions.remoteExecutionKeepalive) {
@@ -550,7 +546,6 @@
digestUtil,
buildRequestId,
invocationId,
- "repository_rule",
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
} else {
@@ -579,10 +574,11 @@
if (enableRemoteDownloader) {
remoteDownloaderSupplier.set(
new GrpcRemoteDownloader(
+ buildRequestId,
+ invocationId,
downloaderChannel.retain(),
Optional.ofNullable(credentials),
retrier,
- repoContext,
cacheClient,
remoteOptions));
downloaderChannel.release();
@@ -855,14 +851,12 @@
env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;
if (!remoteOutputsMode.downloadAllOutputs()) {
- RequestMetadata requestMetadata =
- RequestMetadata.newBuilder()
- .setCorrelatedInvocationsId(env.getBuildRequestId())
- .setToolInvocationId(env.getCommandId().toString())
- .build();
actionInputFetcher =
new RemoteActionInputFetcher(
- actionContextProvider.getRemoteCache(), env.getExecRoot(), requestMetadata);
+ env.getBuildRequestId(),
+ env.getCommandId().toString(),
+ actionContextProvider.getRemoteCache(),
+ env.getExecRoot());
builder.setActionInputPrefetcher(actionInputFetcher);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
index e66897b..5d89161 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
@@ -20,6 +20,7 @@
import build.bazel.remote.execution.v2.ExecuteRequest;
import build.bazel.remote.execution.v2.ExecuteResponse;
import build.bazel.remote.execution.v2.Platform;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
@@ -55,7 +56,6 @@
private final DigestUtil digestUtil;
private final String buildRequestId;
private final String commandId;
- private final String actionId;
private final String remoteInstanceName;
private final boolean acceptCached;
@@ -66,7 +66,6 @@
DigestUtil digestUtil,
String buildRequestId,
String commandId,
- String actionId,
String remoteInstanceName,
boolean acceptCached) {
this.remoteCache = remoteCache;
@@ -74,12 +73,11 @@
this.digestUtil = digestUtil;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
- this.actionId = actionId;
this.remoteInstanceName = remoteInstanceName;
this.acceptCached = acceptCached;
}
- private ExecutionResult downloadOutErr(ActionResult result)
+ private ExecutionResult downloadOutErr(RemoteActionExecutionContext context, ActionResult result)
throws IOException, InterruptedException {
try (SilentCloseable c =
Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "download stdout/stderr")) {
@@ -87,14 +85,14 @@
if (!result.getStdoutRaw().isEmpty()) {
stdout = result.getStdoutRaw().toByteArray();
} else if (result.hasStdoutDigest()) {
- stdout = Utils.getFromFuture(remoteCache.downloadBlob(result.getStdoutDigest()));
+ stdout = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStdoutDigest()));
}
byte[] stderr = new byte[0];
if (!result.getStderrRaw().isEmpty()) {
stderr = result.getStderrRaw().toByteArray();
} else if (result.hasStderrDigest()) {
- stderr = Utils.getFromFuture(remoteCache.downloadBlob(result.getStderrDigest()));
+ stderr = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStderrDigest()));
}
return new ExecutionResult(result.getExitCode(), stdout, stderr);
@@ -110,8 +108,9 @@
String workingDirectory,
Duration timeout)
throws IOException, InterruptedException {
- Context requestCtx =
- TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionId);
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "repository_rule");
+ Context requestCtx = TracingMetadataUtils.contextWithMetadata(metadata);
Context prev = requestCtx.attach();
try {
Platform platform = PlatformUtils.buildPlatformProto(executionProperties);
@@ -130,9 +129,7 @@
Digest actionDigest = digestUtil.compute(action);
ActionKey actionKey = new ActionKey(actionDigest);
RemoteActionExecutionContext remoteActionExecutionContext =
- new RemoteActionExecutionContextImpl(
- TracingMetadataUtils.buildMetadata(buildRequestId, commandId, actionId),
- new NetworkTime());
+ new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
ActionResult actionResult;
try (SilentCloseable c =
Profiler.instance().profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
@@ -164,7 +161,7 @@
actionResult = response.getResult();
}
}
- return downloadOutErr(actionResult);
+ return downloadOutErr(remoteActionExecutionContext, actionResult);
} finally {
requestCtx.detach(prev);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java
index 70ee4d9..a67cf85 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java
@@ -26,7 +26,6 @@
private final DigestUtil digestUtil;
private final String buildRequestId;
private final String commandId;
- private final String actionId;
private final String remoteInstanceName;
private final boolean acceptCached;
@@ -37,7 +36,6 @@
DigestUtil digestUtil,
String buildRequestId,
String commandId,
- String actionId,
String remoteInstanceName,
boolean acceptCached) {
this.remoteExecutionCache = remoteExecutionCache;
@@ -45,7 +43,6 @@
this.digestUtil = digestUtil;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
- this.actionId = actionId;
this.remoteInstanceName = remoteInstanceName;
this.acceptCached = acceptCached;
}
@@ -58,7 +55,6 @@
digestUtil,
buildRequestId,
commandId,
- actionId,
remoteInstanceName,
acceptCached);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index a7bc727..d213bee 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -191,7 +191,11 @@
try (SilentCloseable c =
prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs")) {
remoteCache.download(
- result, execRoot, context.getFileOutErr(), context::lockOutputFiles);
+ remoteActionExecutionContext,
+ result,
+ execRoot,
+ context.getFileOutErr(),
+ context::lockOutputFiles);
}
} else {
PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
@@ -200,6 +204,7 @@
prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs minimal")) {
inMemoryOutput =
remoteCache.downloadMinimal(
+ remoteActionExecutionContext,
actionKey.getDigest().getHash(),
result,
spawn.getOutputFiles(),
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index fdce7c5..fa99c63 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -277,6 +277,7 @@
} else {
try {
return downloadAndFinalizeSpawnResult(
+ remoteActionExecutionContext,
actionKey.getDigest().getHash(),
cachedResult,
/* cacheHit= */ true,
@@ -366,11 +367,12 @@
spawnMetricsAccounting(spawnMetrics, actionResult.getExecutionMetadata());
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download server logs")) {
- maybeDownloadServerLogs(reply, actionKey);
+ maybeDownloadServerLogs(remoteActionExecutionContext, reply, actionKey);
}
try {
return downloadAndFinalizeSpawnResult(
+ remoteActionExecutionContext,
actionKey.getDigest().getHash(),
actionResult,
reply.getCachedResult(),
@@ -452,6 +454,7 @@
}
private SpawnResult downloadAndFinalizeSpawnResult(
+ RemoteActionExecutionContext remoteActionExecutionContext,
String actionId,
ActionResult actionResult,
boolean cacheHit,
@@ -473,7 +476,11 @@
if (downloadOutputs) {
try (SilentCloseable c = Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs")) {
remoteCache.download(
- actionResult, execRoot, context.getFileOutErr(), context::lockOutputFiles);
+ remoteActionExecutionContext,
+ actionResult,
+ execRoot,
+ context.getFileOutErr(),
+ context::lockOutputFiles);
}
} else {
PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
@@ -481,6 +488,7 @@
Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs minimal")) {
inMemoryOutput =
remoteCache.downloadMinimal(
+ remoteActionExecutionContext,
actionId,
actionResult,
spawn.getOutputFiles(),
@@ -532,7 +540,8 @@
}
}
- private void maybeDownloadServerLogs(ExecuteResponse resp, ActionKey actionKey)
+ private void maybeDownloadServerLogs(
+ RemoteActionExecutionContext context, ExecuteResponse resp, ActionKey actionKey)
throws InterruptedException {
ActionResult result = resp.getResult();
if (resp.getServerLogsCount() > 0
@@ -545,7 +554,7 @@
logPath = parent.getRelative(e.getKey());
logCount++;
try {
- getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest()));
+ getFromFuture(remoteCache.downloadFile(context, logPath, e.getValue().getDigest()));
} catch (IOException ex) {
reportOnce(Event.warn("Failed downloading server logs from the remote cache."));
}
@@ -598,11 +607,16 @@
command,
uploadLocalResults);
}
- return handleError(cause, context.getFileOutErr(), actionKey, context);
+ return handleError(
+ remoteActionExecutionContext, cause, context.getFileOutErr(), actionKey, context);
}
private SpawnResult handleError(
- IOException exception, FileOutErr outErr, ActionKey actionKey, SpawnExecutionContext context)
+ RemoteActionExecutionContext remoteActionExecutionContext,
+ IOException exception,
+ FileOutErr outErr,
+ ActionKey actionKey,
+ SpawnExecutionContext context)
throws ExecException, InterruptedException, IOException {
boolean remoteCacheFailed =
BulkTransferException.isOnlyCausedByCacheNotFoundException(exception);
@@ -610,11 +624,16 @@
ExecutionStatusException e = (ExecutionStatusException) exception.getCause();
if (e.getResponse() != null) {
ExecuteResponse resp = e.getResponse();
- maybeDownloadServerLogs(resp, actionKey);
+ maybeDownloadServerLogs(remoteActionExecutionContext, resp, actionKey);
if (resp.hasResult()) {
try {
// We try to download all (partial) results even on server error, for debuggability.
- remoteCache.download(resp.getResult(), execRoot, outErr, context::lockOutputFiles);
+ remoteCache.download(
+ remoteActionExecutionContext,
+ resp.getResult(),
+ execRoot,
+ outErr,
+ context::lockOutputFiles);
} catch (BulkTransferException bulkTransferEx) {
exception.addSuppressed(bulkTransferEx);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index 2f3c74b..caf497f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -66,6 +66,7 @@
/**
* Downloads an action result for the {@code actionKey}.
*
+ * @param context the context for the action.
* @param actionKey The digest of the {@link Action} that generated the action result.
* @param inlineOutErr A hint to the server to inline the stdout and stderr in the {@code
* ActionResult} message.
@@ -78,6 +79,7 @@
/**
* Uploads an action result for the {@code actionKey}.
*
+ * @param context the context for the action.
* @param actionKey The digest of the {@link Action} that generated the action result.
* @param actionResult The action result to associate with the {@code actionKey}.
* @throws IOException If there is an error uploading the action result.
@@ -92,10 +94,12 @@
*
* <p>It's the callers responsibility to close {@code out}.
*
+ * @param context the context for the action.
* @return A Future representing pending completion of the download. If a BLOB for {@code digest}
* does not exist in the cache the Future fails with a {@link CacheNotFoundException}.
*/
- ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
+ ListenableFuture<Void> downloadBlob(
+ RemoteActionExecutionContext context, Digest digest, OutputStream out);
/**
* Uploads a {@code file} to the CAS.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
index 74c19bd..040a7d4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
@@ -129,9 +129,10 @@
}
@Override
- public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ public ListenableFuture<Void> downloadBlob(
+ RemoteActionExecutionContext context, Digest digest, OutputStream out) {
if (diskCache.contains(digest)) {
- return diskCache.downloadBlob(digest, out);
+ return diskCache.downloadBlob(context, digest, out);
}
Path tempPath = newTempPath();
@@ -144,21 +145,19 @@
if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteAcceptCached) {
ListenableFuture<Void> download =
- closeStreamOnError(remoteCache.downloadBlob(digest, tempOut), tempOut);
- ListenableFuture<Void> saveToDiskAndTarget =
- Futures.transformAsync(
- download,
- (unused) -> {
- try {
- tempOut.close();
- diskCache.captureFile(tempPath, digest, /* isActionCache= */ false);
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
- }
- return diskCache.downloadBlob(digest, out);
- },
- MoreExecutors.directExecutor());
- return saveToDiskAndTarget;
+ closeStreamOnError(remoteCache.downloadBlob(context, digest, tempOut), tempOut);
+ return Futures.transformAsync(
+ download,
+ (unused) -> {
+ try {
+ tempOut.close();
+ diskCache.captureFile(tempPath, digest, /* isActionCache= */ false);
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ return diskCache.downloadBlob(context, digest, out);
+ },
+ MoreExecutors.directExecutor());
} else {
return Futures.immediateFuture(null);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
index 9cef499..0278913 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -81,7 +81,8 @@
}
@Override
- public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ public ListenableFuture<Void> downloadBlob(
+ RemoteActionExecutionContext context, Digest digest, OutputStream out) {
@Nullable
DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null;
return Futures.transformAsync(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
index 37b6399..672207b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
@@ -20,6 +20,7 @@
import build.bazel.remote.asset.v1.FetchGrpc.FetchBlockingStub;
import build.bazel.remote.asset.v1.Qualifier;
import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.devtools.build.lib.bazel.repository.downloader.Checksum;
@@ -28,6 +29,9 @@
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.ReferenceCountedChannel;
import com.google.devtools.build.lib.remote.RemoteRetrier;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -36,7 +40,6 @@
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.grpc.CallCredentials;
-import io.grpc.Context;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.io.OutputStream;
@@ -58,10 +61,11 @@
*/
public class GrpcRemoteDownloader implements AutoCloseable, Downloader {
+ private final String buildRequestId;
+ private final String commandId;
private final ReferenceCountedChannel channel;
private final Optional<CallCredentials> credentials;
private final RemoteRetrier retrier;
- private final Context requestCtx;
private final RemoteCacheClient cacheClient;
private final RemoteOptions options;
@@ -75,17 +79,19 @@
private static final String QUALIFIER_AUTH_HEADERS = "bazel.auth_headers";
public GrpcRemoteDownloader(
+ String buildRequestId,
+ String commandId,
ReferenceCountedChannel channel,
Optional<CallCredentials> credentials,
RemoteRetrier retrier,
- Context requestCtx,
RemoteCacheClient cacheClient,
RemoteOptions options) {
+ this.buildRequestId = buildRequestId;
+ this.commandId = commandId;
this.channel = channel;
this.credentials = credentials;
this.retrier = retrier;
this.cacheClient = cacheClient;
- this.requestCtx = requestCtx;
this.options = options;
}
@@ -109,22 +115,26 @@
Map<String, String> clientEnv,
com.google.common.base.Optional<String> type)
throws IOException, InterruptedException {
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "remote_downloader");
+ RemoteActionExecutionContext remoteActionExecutionContext =
+ new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+
final FetchBlobRequest request =
newFetchBlobRequest(options.remoteInstanceName, urls, authHeaders, checksum, canonicalId);
try {
FetchBlobResponse response =
- retrier.execute(() -> requestCtx.call(() -> fetchBlockingStub().fetchBlob(request)));
+ retrier.execute(() -> fetchBlockingStub(remoteActionExecutionContext).fetchBlob(request));
final Digest blobDigest = response.getBlobDigest();
retrier.execute(
- () ->
- requestCtx.call(
- () -> {
- try (OutputStream out = newOutputStream(destination, checksum)) {
- Utils.getFromFuture(cacheClient.downloadBlob(blobDigest, out));
- }
- return null;
- }));
+ () -> {
+ try (OutputStream out = newOutputStream(destination, checksum)) {
+ Utils.getFromFuture(
+ cacheClient.downloadBlob(remoteActionExecutionContext, blobDigest, out));
+ }
+ return null;
+ });
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
@@ -164,9 +174,10 @@
return requestBuilder.build();
}
- private FetchBlockingStub fetchBlockingStub() {
+ private FetchBlockingStub fetchBlockingStub(RemoteActionExecutionContext context) {
return FetchGrpc.newBlockingStub(channel)
- .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+ .withInterceptors(
+ TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
.withInterceptors(TracingMetadataUtils.newDownloaderHeadersInterceptor(options))
.withCallCredentials(credentials.orElse(null))
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 9a55733..7a67bb5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -438,7 +438,8 @@
}
@Override
- public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ public ListenableFuture<Void> downloadBlob(
+ RemoteActionExecutionContext context, Digest digest, OutputStream out) {
final DigestOutputStream digestOut =
verifyDownloads ? digestUtil.newDigestOutputStream(out) : null;
return Futures.transformAsync(