Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 3)

Change RemoteCacheClient#downloadBlob to use RemoteActionExecutionContext.

PiperOrigin-RevId: 354239205
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index 85da6b0..ef031d7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -129,9 +129,11 @@
         .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
   }
 
-  private ByteStreamStub bsAsyncStub() {
+  private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) {
     return ByteStreamGrpc.newStub(channel)
-        .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+        .withInterceptors(
+            TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
+            new NetworkTimeInterceptor(context::getNetworkTime))
         .withCallCredentials(callCredentialsProvider.getCallCredentials())
         .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
   }
@@ -283,7 +285,8 @@
   }
 
   @Override
-  public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+  public ListenableFuture<Void> downloadBlob(
+      RemoteActionExecutionContext context, Digest digest, OutputStream out) {
     if (digest.getSizeBytes() == 0) {
       return Futures.immediateFuture(null);
     }
@@ -295,12 +298,14 @@
       out = digestOut;
     }
 
-    return downloadBlob(digest, out, digestSupplier);
+    return downloadBlob(context, digest, out, digestSupplier);
   }
 
   private ListenableFuture<Void> downloadBlob(
-      Digest digest, OutputStream out, @Nullable Supplier<Digest> digestSupplier) {
-    Context ctx = Context.current();
+      RemoteActionExecutionContext context,
+      Digest digest,
+      OutputStream out,
+      @Nullable Supplier<Digest> digestSupplier) {
     AtomicLong offset = new AtomicLong(0);
     ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
     ListenableFuture<Void> downloadFuture =
@@ -308,10 +313,8 @@
             () ->
                 retrier.executeAsync(
                     () ->
-                        ctx.call(
-                            () ->
-                                requestRead(
-                                    offset, progressiveBackoff, digest, out, digestSupplier)),
+                        requestRead(
+                            context, offset, progressiveBackoff, digest, out, digestSupplier),
                     progressiveBackoff),
             callCredentialsProvider);
 
@@ -331,6 +334,7 @@
   }
 
   private ListenableFuture<Void> requestRead(
+      RemoteActionExecutionContext context,
       AtomicLong offset,
       ProgressiveBackoff progressiveBackoff,
       Digest digest,
@@ -338,7 +342,7 @@
       @Nullable Supplier<Digest> digestSupplier) {
     String resourceName = getResourceName(options.remoteInstanceName, digest);
     SettableFuture<Void> future = SettableFuture.create();
-    bsAsyncStub()
+    bsAsyncStub(context)
         .read(
             ReadRequest.newBuilder()
                 .setResourceName(resourceName)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index 6191f65..e78c168 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -33,6 +33,9 @@
 import com.google.devtools.build.lib.profiler.ProfilerTask;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.remote.util.Utils;
@@ -66,15 +69,17 @@
   @GuardedBy("lock")
   final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();
 
+  private final String buildRequestId;
+  private final String commandId;
   private final RemoteCache remoteCache;
   private final Path execRoot;
-  private final RequestMetadata requestMetadata;
 
   RemoteActionInputFetcher(
-      RemoteCache remoteCache, Path execRoot, RequestMetadata requestMetadata) {
+      String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) {
+    this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
+    this.commandId = Preconditions.checkNotNull(commandId);
     this.remoteCache = Preconditions.checkNotNull(remoteCache);
     this.execRoot = Preconditions.checkNotNull(execRoot);
-    this.requestMetadata = Preconditions.checkNotNull(requestMetadata);
   }
 
   /**
@@ -160,13 +165,15 @@
 
       ListenableFuture<Void> download = downloadsInProgress.get(path);
       if (download == null) {
-        Context ctx =
-            TracingMetadataUtils.contextWithMetadata(
-                requestMetadata.toBuilder().setActionId(metadata.getActionId()).build());
+        RequestMetadata requestMetadata =
+            TracingMetadataUtils.buildMetadata(buildRequestId, commandId, metadata.getActionId());
+        RemoteActionExecutionContext remoteActionExecutionContext =
+            new RemoteActionExecutionContextImpl(requestMetadata, new NetworkTime());
+        Context ctx = TracingMetadataUtils.contextWithMetadata(requestMetadata);
         Context prevCtx = ctx.attach();
         try {
           Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
-          download = remoteCache.downloadFile(path, digest);
+          download = remoteCache.downloadFile(remoteActionExecutionContext, path, digest);
           downloadsInProgress.put(path, download);
           Futures.addCallback(
               download,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 8e68aab..3950814 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -261,14 +261,15 @@
    * @return a future that completes after the download completes (succeeds / fails). If successful,
    *     the content is stored in the future's {@code byte[]}.
    */
-  public ListenableFuture<byte[]> downloadBlob(Digest digest) {
+  public ListenableFuture<byte[]> downloadBlob(
+      RemoteActionExecutionContext context, Digest digest) {
     if (digest.getSizeBytes() == 0) {
       return EMPTY_BYTES;
     }
     ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
     SettableFuture<byte[]> outerF = SettableFuture.create();
     Futures.addCallback(
-        cacheProtocol.downloadBlob(digest, bOut),
+        cacheProtocol.downloadBlob(context, digest, bOut),
         new FutureCallback<Void>() {
           @Override
           public void onSuccess(Void aVoid) {
@@ -305,12 +306,13 @@
    * @throws ExecException in case clean up after a failed download failed.
    */
   public void download(
+      RemoteActionExecutionContext context,
       ActionResult result,
       Path execRoot,
       FileOutErr origOutErr,
       OutputFilesLocker outputFilesLocker)
       throws ExecException, IOException, InterruptedException {
-    ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);
+    ActionResultMetadata metadata = parseActionResultMetadata(context, result, execRoot);
 
     List<ListenableFuture<FileMetadata>> downloads =
         Stream.concat(
@@ -321,7 +323,7 @@
                 (file) -> {
                   try {
                     ListenableFuture<Void> download =
-                        downloadFile(toTmpDownloadPath(file.path()), file.digest());
+                        downloadFile(context, toTmpDownloadPath(file.path()), file.digest());
                     return Futures.transform(download, (d) -> file, directExecutor());
                   } catch (IOException e) {
                     return Futures.<FileMetadata>immediateFailedFuture(e);
@@ -337,7 +339,7 @@
     if (origOutErr != null) {
       tmpOutErr = origOutErr.childOutErr();
     }
-    downloads.addAll(downloadOutErr(result, tmpOutErr));
+    downloads.addAll(downloadOutErr(context, result, tmpOutErr));
 
     try {
       waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
@@ -449,7 +451,8 @@
   }
 
   /** Downloads a file (that is not a directory). The content is fetched from the digest. */
-  public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOException {
+  public ListenableFuture<Void> downloadFile(
+      RemoteActionExecutionContext context, Path path, Digest digest) throws IOException {
     Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
     if (digest.getSizeBytes() == 0) {
       // Handle empty file locally.
@@ -472,7 +475,7 @@
 
     OutputStream out = new LazyFileOutputStream(path);
     SettableFuture<Void> outerF = SettableFuture.create();
-    ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out);
+    ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
     Futures.addCallback(
         f,
         new FutureCallback<Void>() {
@@ -509,7 +512,8 @@
     return outerF;
   }
 
-  private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
+  private List<ListenableFuture<FileMetadata>> downloadOutErr(
+      RemoteActionExecutionContext context, ActionResult result, OutErr outErr) {
     List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
     if (!result.getStdoutRaw().isEmpty()) {
       try {
@@ -521,7 +525,8 @@
     } else if (result.hasStdoutDigest()) {
       downloads.add(
           Futures.transform(
-              cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
+              cacheProtocol.downloadBlob(
+                  context, result.getStdoutDigest(), outErr.getOutputStream()),
               (d) -> null,
               directExecutor()));
     }
@@ -535,7 +540,8 @@
     } else if (result.hasStderrDigest()) {
       downloads.add(
           Futures.transform(
-              cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
+              cacheProtocol.downloadBlob(
+                  context, result.getStderrDigest(), outErr.getErrorStream()),
               (d) -> null,
               directExecutor()));
     }
@@ -549,6 +555,7 @@
    * <p>This method only downloads output directory metadata, stdout and stderr as well as the
    * contents of {@code inMemoryOutputPath} if specified.
    *
+   * @param context the context this action running with
    * @param result the action result metadata of a successfully executed action (exit code = 0).
    * @param outputs the action's declared output files
    * @param inMemoryOutputPath the path of an output file whose contents should be returned in
@@ -564,6 +571,7 @@
    */
   @Nullable
   public InMemoryOutput downloadMinimal(
+      RemoteActionExecutionContext context,
       String actionId,
       ActionResult result,
       Collection<? extends ActionInput> outputs,
@@ -579,7 +587,7 @@
 
     ActionResultMetadata metadata;
     try (SilentCloseable c = Profiler.instance().profile("Remote.parseActionResultMetadata")) {
-      metadata = parseActionResultMetadata(result, execRoot);
+      metadata = parseActionResultMetadata(context, result, execRoot);
     }
 
     if (!metadata.symlinks().isEmpty()) {
@@ -614,9 +622,10 @@
     try (SilentCloseable c = Profiler.instance().profile("Remote.download")) {
       ListenableFuture<byte[]> inMemoryOutputDownload = null;
       if (inMemoryOutput != null) {
-        inMemoryOutputDownload = downloadBlob(inMemoryOutputDigest);
+        inMemoryOutputDownload = downloadBlob(context, inMemoryOutputDigest);
       }
-      waitForBulkTransfer(downloadOutErr(result, outErr), /* cancelRemainingOnInterrupt=*/ true);
+      waitForBulkTransfer(
+          downloadOutErr(context, result, outErr), /* cancelRemainingOnInterrupt=*/ true);
       if (inMemoryOutputDownload != null) {
         waitForBulkTransfer(
             ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true);
@@ -708,7 +717,8 @@
     return new DirectoryMetadata(filesBuilder.build(), symlinksBuilder.build());
   }
 
-  private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult, Path execRoot)
+  private ActionResultMetadata parseActionResultMetadata(
+      RemoteActionExecutionContext context, ActionResult actionResult, Path execRoot)
       throws IOException, InterruptedException {
     Preconditions.checkNotNull(actionResult, "actionResult");
     Map<Path, ListenableFuture<Tree>> dirMetadataDownloads =
@@ -717,7 +727,7 @@
       dirMetadataDownloads.put(
           execRoot.getRelative(dir.getPath()),
           Futures.transform(
-              downloadBlob(dir.getTreeDigest()),
+              downloadBlob(context, dir.getTreeDigest()),
               (treeBytes) -> {
                 try {
                   return Tree.parseFrom(treeBytes);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index ecb175e..dfa368b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -15,7 +15,6 @@
 package com.google.devtools.build.lib.remote;
 
 import build.bazel.remote.execution.v2.DigestFunction;
-import build.bazel.remote.execution.v2.RequestMetadata;
 import build.bazel.remote.execution.v2.ServerCapabilities;
 import com.google.auth.Credentials;
 import com.google.common.annotations.VisibleForTesting;
@@ -512,9 +511,6 @@
             requestContext,
             remoteOptions.remoteInstanceName));
 
-    Context repoContext =
-        TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "repository_rule");
-
     if (enableRemoteExecution) {
       RemoteExecutionClient remoteExecutor;
       if (remoteOptions.remoteExecutionKeepalive) {
@@ -550,7 +546,6 @@
               digestUtil,
               buildRequestId,
               invocationId,
-              "repository_rule",
               remoteOptions.remoteInstanceName,
               remoteOptions.remoteAcceptCached));
     } else {
@@ -579,10 +574,11 @@
     if (enableRemoteDownloader) {
       remoteDownloaderSupplier.set(
           new GrpcRemoteDownloader(
+              buildRequestId,
+              invocationId,
               downloaderChannel.retain(),
               Optional.ofNullable(credentials),
               retrier,
-              repoContext,
               cacheClient,
               remoteOptions));
       downloaderChannel.release();
@@ -855,14 +851,12 @@
             env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
     RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;
     if (!remoteOutputsMode.downloadAllOutputs()) {
-      RequestMetadata requestMetadata =
-          RequestMetadata.newBuilder()
-              .setCorrelatedInvocationsId(env.getBuildRequestId())
-              .setToolInvocationId(env.getCommandId().toString())
-              .build();
       actionInputFetcher =
           new RemoteActionInputFetcher(
-              actionContextProvider.getRemoteCache(), env.getExecRoot(), requestMetadata);
+              env.getBuildRequestId(),
+              env.getCommandId().toString(),
+              actionContextProvider.getRemoteCache(),
+              env.getExecRoot());
       builder.setActionInputPrefetcher(actionInputFetcher);
       remoteOutputService.setActionInputFetcher(actionInputFetcher);
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
index e66897b..5d89161 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
@@ -20,6 +20,7 @@
 import build.bazel.remote.execution.v2.ExecuteRequest;
 import build.bazel.remote.execution.v2.ExecuteResponse;
 import build.bazel.remote.execution.v2.Platform;
+import build.bazel.remote.execution.v2.RequestMetadata;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
@@ -55,7 +56,6 @@
   private final DigestUtil digestUtil;
   private final String buildRequestId;
   private final String commandId;
-  private final String actionId;
 
   private final String remoteInstanceName;
   private final boolean acceptCached;
@@ -66,7 +66,6 @@
       DigestUtil digestUtil,
       String buildRequestId,
       String commandId,
-      String actionId,
       String remoteInstanceName,
       boolean acceptCached) {
     this.remoteCache = remoteCache;
@@ -74,12 +73,11 @@
     this.digestUtil = digestUtil;
     this.buildRequestId = buildRequestId;
     this.commandId = commandId;
-    this.actionId = actionId;
     this.remoteInstanceName = remoteInstanceName;
     this.acceptCached = acceptCached;
   }
 
-  private ExecutionResult downloadOutErr(ActionResult result)
+  private ExecutionResult downloadOutErr(RemoteActionExecutionContext context, ActionResult result)
       throws IOException, InterruptedException {
     try (SilentCloseable c =
         Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "download stdout/stderr")) {
@@ -87,14 +85,14 @@
       if (!result.getStdoutRaw().isEmpty()) {
         stdout = result.getStdoutRaw().toByteArray();
       } else if (result.hasStdoutDigest()) {
-        stdout = Utils.getFromFuture(remoteCache.downloadBlob(result.getStdoutDigest()));
+        stdout = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStdoutDigest()));
       }
 
       byte[] stderr = new byte[0];
       if (!result.getStderrRaw().isEmpty()) {
         stderr = result.getStderrRaw().toByteArray();
       } else if (result.hasStderrDigest()) {
-        stderr = Utils.getFromFuture(remoteCache.downloadBlob(result.getStderrDigest()));
+        stderr = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStderrDigest()));
       }
 
       return new ExecutionResult(result.getExitCode(), stdout, stderr);
@@ -110,8 +108,9 @@
       String workingDirectory,
       Duration timeout)
       throws IOException, InterruptedException {
-    Context requestCtx =
-        TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionId);
+    RequestMetadata metadata =
+        TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "repository_rule");
+    Context requestCtx = TracingMetadataUtils.contextWithMetadata(metadata);
     Context prev = requestCtx.attach();
     try {
       Platform platform = PlatformUtils.buildPlatformProto(executionProperties);
@@ -130,9 +129,7 @@
       Digest actionDigest = digestUtil.compute(action);
       ActionKey actionKey = new ActionKey(actionDigest);
       RemoteActionExecutionContext remoteActionExecutionContext =
-          new RemoteActionExecutionContextImpl(
-              TracingMetadataUtils.buildMetadata(buildRequestId, commandId, actionId),
-              new NetworkTime());
+          new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
       ActionResult actionResult;
       try (SilentCloseable c =
           Profiler.instance().profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
@@ -164,7 +161,7 @@
           actionResult = response.getResult();
         }
       }
-      return downloadOutErr(actionResult);
+      return downloadOutErr(remoteActionExecutionContext, actionResult);
     } finally {
       requestCtx.detach(prev);
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java
index 70ee4d9..a67cf85 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java
@@ -26,7 +26,6 @@
   private final DigestUtil digestUtil;
   private final String buildRequestId;
   private final String commandId;
-  private final String actionId;
 
   private final String remoteInstanceName;
   private final boolean acceptCached;
@@ -37,7 +36,6 @@
       DigestUtil digestUtil,
       String buildRequestId,
       String commandId,
-      String actionId,
       String remoteInstanceName,
       boolean acceptCached) {
     this.remoteExecutionCache = remoteExecutionCache;
@@ -45,7 +43,6 @@
     this.digestUtil = digestUtil;
     this.buildRequestId = buildRequestId;
     this.commandId = commandId;
-    this.actionId = actionId;
     this.remoteInstanceName = remoteInstanceName;
     this.acceptCached = acceptCached;
   }
@@ -58,7 +55,6 @@
         digestUtil,
         buildRequestId,
         commandId,
-        actionId,
         remoteInstanceName,
         acceptCached);
   }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index a7bc727..d213bee 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -191,7 +191,11 @@
             try (SilentCloseable c =
                 prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs")) {
               remoteCache.download(
-                  result, execRoot, context.getFileOutErr(), context::lockOutputFiles);
+                  remoteActionExecutionContext,
+                  result,
+                  execRoot,
+                  context.getFileOutErr(),
+                  context::lockOutputFiles);
             }
           } else {
             PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
@@ -200,6 +204,7 @@
                 prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs minimal")) {
               inMemoryOutput =
                   remoteCache.downloadMinimal(
+                      remoteActionExecutionContext,
                       actionKey.getDigest().getHash(),
                       result,
                       spawn.getOutputFiles(),
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index fdce7c5..fa99c63 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -277,6 +277,7 @@
           } else {
             try {
               return downloadAndFinalizeSpawnResult(
+                  remoteActionExecutionContext,
                   actionKey.getDigest().getHash(),
                   cachedResult,
                   /* cacheHit= */ true,
@@ -366,11 +367,12 @@
               spawnMetricsAccounting(spawnMetrics, actionResult.getExecutionMetadata());
 
               try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download server logs")) {
-                maybeDownloadServerLogs(reply, actionKey);
+                maybeDownloadServerLogs(remoteActionExecutionContext, reply, actionKey);
               }
 
               try {
                 return downloadAndFinalizeSpawnResult(
+                    remoteActionExecutionContext,
                     actionKey.getDigest().getHash(),
                     actionResult,
                     reply.getCachedResult(),
@@ -452,6 +454,7 @@
   }
 
   private SpawnResult downloadAndFinalizeSpawnResult(
+      RemoteActionExecutionContext remoteActionExecutionContext,
       String actionId,
       ActionResult actionResult,
       boolean cacheHit,
@@ -473,7 +476,11 @@
     if (downloadOutputs) {
       try (SilentCloseable c = Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs")) {
         remoteCache.download(
-            actionResult, execRoot, context.getFileOutErr(), context::lockOutputFiles);
+            remoteActionExecutionContext,
+            actionResult,
+            execRoot,
+            context.getFileOutErr(),
+            context::lockOutputFiles);
       }
     } else {
       PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
@@ -481,6 +488,7 @@
           Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs minimal")) {
         inMemoryOutput =
             remoteCache.downloadMinimal(
+                remoteActionExecutionContext,
                 actionId,
                 actionResult,
                 spawn.getOutputFiles(),
@@ -532,7 +540,8 @@
     }
   }
 
-  private void maybeDownloadServerLogs(ExecuteResponse resp, ActionKey actionKey)
+  private void maybeDownloadServerLogs(
+      RemoteActionExecutionContext context, ExecuteResponse resp, ActionKey actionKey)
       throws InterruptedException {
     ActionResult result = resp.getResult();
     if (resp.getServerLogsCount() > 0
@@ -545,7 +554,7 @@
           logPath = parent.getRelative(e.getKey());
           logCount++;
           try {
-            getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest()));
+            getFromFuture(remoteCache.downloadFile(context, logPath, e.getValue().getDigest()));
           } catch (IOException ex) {
             reportOnce(Event.warn("Failed downloading server logs from the remote cache."));
           }
@@ -598,11 +607,16 @@
           command,
           uploadLocalResults);
     }
-    return handleError(cause, context.getFileOutErr(), actionKey, context);
+    return handleError(
+        remoteActionExecutionContext, cause, context.getFileOutErr(), actionKey, context);
   }
 
   private SpawnResult handleError(
-      IOException exception, FileOutErr outErr, ActionKey actionKey, SpawnExecutionContext context)
+      RemoteActionExecutionContext remoteActionExecutionContext,
+      IOException exception,
+      FileOutErr outErr,
+      ActionKey actionKey,
+      SpawnExecutionContext context)
       throws ExecException, InterruptedException, IOException {
     boolean remoteCacheFailed =
         BulkTransferException.isOnlyCausedByCacheNotFoundException(exception);
@@ -610,11 +624,16 @@
       ExecutionStatusException e = (ExecutionStatusException) exception.getCause();
       if (e.getResponse() != null) {
         ExecuteResponse resp = e.getResponse();
-        maybeDownloadServerLogs(resp, actionKey);
+        maybeDownloadServerLogs(remoteActionExecutionContext, resp, actionKey);
         if (resp.hasResult()) {
           try {
             // We try to download all (partial) results even on server error, for debuggability.
-            remoteCache.download(resp.getResult(), execRoot, outErr, context::lockOutputFiles);
+            remoteCache.download(
+                remoteActionExecutionContext,
+                resp.getResult(),
+                execRoot,
+                outErr,
+                context::lockOutputFiles);
           } catch (BulkTransferException bulkTransferEx) {
             exception.addSuppressed(bulkTransferEx);
           }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index 2f3c74b..caf497f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -66,6 +66,7 @@
   /**
    * Downloads an action result for the {@code actionKey}.
    *
+   * @param context the context for the action.
    * @param actionKey The digest of the {@link Action} that generated the action result.
    * @param inlineOutErr A hint to the server to inline the stdout and stderr in the {@code
    *     ActionResult} message.
@@ -78,6 +79,7 @@
   /**
    * Uploads an action result for the {@code actionKey}.
    *
+   * @param context the context for the action.
    * @param actionKey The digest of the {@link Action} that generated the action result.
    * @param actionResult The action result to associate with the {@code actionKey}.
    * @throws IOException If there is an error uploading the action result.
@@ -92,10 +94,12 @@
    *
    * <p>It's the callers responsibility to close {@code out}.
    *
+   * @param context the context for the action.
    * @return A Future representing pending completion of the download. If a BLOB for {@code digest}
    *     does not exist in the cache the Future fails with a {@link CacheNotFoundException}.
    */
-  ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
+  ListenableFuture<Void> downloadBlob(
+      RemoteActionExecutionContext context, Digest digest, OutputStream out);
 
   /**
    * Uploads a {@code file} to the CAS.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
index 74c19bd..040a7d4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
@@ -129,9 +129,10 @@
   }
 
   @Override
-  public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+  public ListenableFuture<Void> downloadBlob(
+      RemoteActionExecutionContext context, Digest digest, OutputStream out) {
     if (diskCache.contains(digest)) {
-      return diskCache.downloadBlob(digest, out);
+      return diskCache.downloadBlob(context, digest, out);
     }
 
     Path tempPath = newTempPath();
@@ -144,21 +145,19 @@
 
     if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteAcceptCached) {
       ListenableFuture<Void> download =
-          closeStreamOnError(remoteCache.downloadBlob(digest, tempOut), tempOut);
-      ListenableFuture<Void> saveToDiskAndTarget =
-          Futures.transformAsync(
-              download,
-              (unused) -> {
-                try {
-                  tempOut.close();
-                  diskCache.captureFile(tempPath, digest, /* isActionCache= */ false);
-                } catch (IOException e) {
-                  return Futures.immediateFailedFuture(e);
-                }
-                return diskCache.downloadBlob(digest, out);
-              },
-              MoreExecutors.directExecutor());
-      return saveToDiskAndTarget;
+          closeStreamOnError(remoteCache.downloadBlob(context, digest, tempOut), tempOut);
+      return Futures.transformAsync(
+          download,
+          (unused) -> {
+            try {
+              tempOut.close();
+              diskCache.captureFile(tempPath, digest, /* isActionCache= */ false);
+            } catch (IOException e) {
+              return Futures.immediateFailedFuture(e);
+            }
+            return diskCache.downloadBlob(context, digest, out);
+          },
+          MoreExecutors.directExecutor());
     } else {
       return Futures.immediateFuture(null);
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
index 9cef499..0278913 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -81,7 +81,8 @@
   }
 
   @Override
-  public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+  public ListenableFuture<Void> downloadBlob(
+      RemoteActionExecutionContext context, Digest digest, OutputStream out) {
     @Nullable
     DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null;
     return Futures.transformAsync(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
index 37b6399..672207b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
@@ -20,6 +20,7 @@
 import build.bazel.remote.asset.v1.FetchGrpc.FetchBlockingStub;
 import build.bazel.remote.asset.v1.Qualifier;
 import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.devtools.build.lib.bazel.repository.downloader.Checksum;
@@ -28,6 +29,9 @@
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.remote.ReferenceCountedChannel;
 import com.google.devtools.build.lib.remote.RemoteRetrier;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -36,7 +40,6 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import io.grpc.CallCredentials;
-import io.grpc.Context;
 import io.grpc.StatusRuntimeException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -58,10 +61,11 @@
  */
 public class GrpcRemoteDownloader implements AutoCloseable, Downloader {
 
+  private final String buildRequestId;
+  private final String commandId;
   private final ReferenceCountedChannel channel;
   private final Optional<CallCredentials> credentials;
   private final RemoteRetrier retrier;
-  private final Context requestCtx;
   private final RemoteCacheClient cacheClient;
   private final RemoteOptions options;
 
@@ -75,17 +79,19 @@
   private static final String QUALIFIER_AUTH_HEADERS = "bazel.auth_headers";
 
   public GrpcRemoteDownloader(
+      String buildRequestId,
+      String commandId,
       ReferenceCountedChannel channel,
       Optional<CallCredentials> credentials,
       RemoteRetrier retrier,
-      Context requestCtx,
       RemoteCacheClient cacheClient,
       RemoteOptions options) {
+    this.buildRequestId = buildRequestId;
+    this.commandId = commandId;
     this.channel = channel;
     this.credentials = credentials;
     this.retrier = retrier;
     this.cacheClient = cacheClient;
-    this.requestCtx = requestCtx;
     this.options = options;
   }
 
@@ -109,22 +115,26 @@
       Map<String, String> clientEnv,
       com.google.common.base.Optional<String> type)
       throws IOException, InterruptedException {
+    RequestMetadata metadata =
+        TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "remote_downloader");
+    RemoteActionExecutionContext remoteActionExecutionContext =
+        new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+
     final FetchBlobRequest request =
         newFetchBlobRequest(options.remoteInstanceName, urls, authHeaders, checksum, canonicalId);
     try {
       FetchBlobResponse response =
-          retrier.execute(() -> requestCtx.call(() -> fetchBlockingStub().fetchBlob(request)));
+          retrier.execute(() -> fetchBlockingStub(remoteActionExecutionContext).fetchBlob(request));
       final Digest blobDigest = response.getBlobDigest();
 
       retrier.execute(
-          () ->
-              requestCtx.call(
-                  () -> {
-                    try (OutputStream out = newOutputStream(destination, checksum)) {
-                      Utils.getFromFuture(cacheClient.downloadBlob(blobDigest, out));
-                    }
-                    return null;
-                  }));
+          () -> {
+            try (OutputStream out = newOutputStream(destination, checksum)) {
+              Utils.getFromFuture(
+                  cacheClient.downloadBlob(remoteActionExecutionContext, blobDigest, out));
+            }
+            return null;
+          });
     } catch (StatusRuntimeException e) {
       throw new IOException(e);
     }
@@ -164,9 +174,10 @@
     return requestBuilder.build();
   }
 
-  private FetchBlockingStub fetchBlockingStub() {
+  private FetchBlockingStub fetchBlockingStub(RemoteActionExecutionContext context) {
     return FetchGrpc.newBlockingStub(channel)
-        .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+        .withInterceptors(
+            TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
         .withInterceptors(TracingMetadataUtils.newDownloaderHeadersInterceptor(options))
         .withCallCredentials(credentials.orElse(null))
         .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 9a55733..7a67bb5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -438,7 +438,8 @@
   }
 
   @Override
-  public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+  public ListenableFuture<Void> downloadBlob(
+      RemoteActionExecutionContext context, Digest digest, OutputStream out) {
     final DigestOutputStream digestOut =
         verifyDownloads ? digestUtil.newDigestOutputStream(out) : null;
     return Futures.transformAsync(