Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 4)

Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext.

PiperOrigin-RevId: 354472775
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index eec073a..29cd41e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -36,6 +36,9 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
 import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TestUtils;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -86,9 +89,7 @@
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-/**
- * Tests for {@link ByteStreamUploader}.
- */
+/** Tests for {@link ByteStreamUploader}. */
 @RunWith(JUnit4.class)
 public class ByteStreamUploaderTest {
 
@@ -102,6 +103,7 @@
 
   private Server server;
   private ManagedChannel channel;
+  private RemoteActionExecutionContext context;
   private Context withEmptyMetadata;
   private Context prevContext;
 
@@ -112,12 +114,19 @@
     MockitoAnnotations.initMocks(this);
 
     String serverName = "Server for " + this.getClass();
-    server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry)
-        .build().start();
+    server =
+        InProcessServerBuilder.forName(serverName)
+            .fallbackHandlerRegistry(serviceRegistry)
+            .build()
+            .start();
     channel = InProcessChannelBuilder.forName(serverName).build();
-    withEmptyMetadata =
-        TracingMetadataUtils.contextWithMetadata(
-            "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+    RequestMetadata metadata =
+        TracingMetadataUtils.buildMetadata(
+            "none",
+            "none",
+            DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash());
+    context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+    withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata);
 
     retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
 
@@ -161,7 +170,8 @@
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
           @Override
           public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
             return new StreamObserver<WriteRequest>() {
@@ -183,8 +193,8 @@
 
                 ByteString data = writeRequest.getData();
 
-                System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset,
-                    data.size());
+                System.arraycopy(
+                    data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
 
                 nextOffset += data.size();
                 boolean lastWrite = blob.length == nextOffset;
@@ -210,7 +220,7 @@
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verifyZeroInteractions(mockBackoff);
@@ -328,7 +338,7 @@
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
@@ -392,7 +402,7 @@
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     assertThat(numWriteCalls.get()).isEqualTo(1);
@@ -466,7 +476,7 @@
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should have triggered a single retry, because it made
     // no progress.
@@ -508,7 +518,7 @@
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verifyZeroInteractions(mockBackoff);
@@ -549,7 +559,7 @@
         });
 
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       // expected
@@ -592,7 +602,7 @@
 
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
 
-    uploader.uploadBlobs(chunkers, true);
+    uploader.uploadBlobs(context, chunkers, true);
 
     blockUntilInternalStateConsistent(uploader);
 
@@ -690,16 +700,19 @@
 
     for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
       Digest actionDigest = chunkerEntry.getKey();
-      Context ctx =
-          TracingMetadataUtils.contextWithMetadata(
-              "build-req-id", "command-id", DIGEST_UTIL.asActionKey(actionDigest));
-      ctx.run(
-          () ->
-              uploads.add(
-                  uploader.uploadBlobAsync(
-                      HashCode.fromString(actionDigest.getHash()),
-                      chunkerEntry.getValue(),
-                      /* forceUpload=*/ true)));
+      RequestMetadata metadata =
+          TracingMetadataUtils.buildMetadata(
+              "build-req-id",
+              "command-id",
+              DIGEST_UTIL.asActionKey(actionDigest).getDigest().getHash());
+      RemoteActionExecutionContext remoteActionExecutionContext =
+          new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+      uploads.add(
+          uploader.uploadBlobAsync(
+              remoteActionExecutionContext,
+              actionDigest,
+              chunkerEntry.getValue(),
+              /* forceUpload= */ true));
     }
 
     for (ListenableFuture<Void> upload : uploads) {
@@ -776,7 +789,7 @@
               }
             }));
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
   }
 
   @Test
@@ -796,47 +809,48 @@
 
     byte[] blob = new byte[CHUNK_SIZE * 10];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     AtomicInteger numWriteCalls = new AtomicInteger();
     CountDownLatch blocker = new CountDownLatch(1);
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        numWriteCalls.incrementAndGet();
-        try {
-          // Ensures that the first upload does not finish, before the second upload is started.
-          blocker.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-
-        return new StreamObserver<WriteRequest>() {
-
-          private long bytesReceived;
-
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
           @Override
-          public void onNext(WriteRequest writeRequest) {
-            bytesReceived += writeRequest.getData().size();
-          }
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            numWriteCalls.incrementAndGet();
+            try {
+              // Ensures that the first upload does not finish, before the second upload is started.
+              blocker.await();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
 
-          @Override
-          public void onError(Throwable throwable) {
-            fail("onError should never be called.");
-          }
+            return new StreamObserver<WriteRequest>() {
 
-          @Override
-          public void onCompleted() {
-            response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
-            response.onCompleted();
-          }
-        };
-      }
-    });
+              private long bytesReceived;
 
-    Future<?> upload1 = uploader.uploadBlobAsync(hash, chunker, true);
-    Future<?> upload2 = uploader.uploadBlobAsync(hash, chunker, true);
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                bytesReceived += writeRequest.getData().size();
+              }
+
+              @Override
+              public void onError(Throwable throwable) {
+                fail("onError should never be called.");
+              }
+
+              @Override
+              public void onCompleted() {
+                response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
+                response.onCompleted();
+              }
+            };
+          }
+        });
+
+    Future<?> upload1 = uploader.uploadBlobAsync(context, digest, chunker, true);
+    Future<?> upload2 = uploader.uploadBlobAsync(context, digest, chunker, true);
 
     blocker.countDown();
 
@@ -866,16 +880,17 @@
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        response.onError(Status.INTERNAL.asException());
-        return new NoopStreamObserver();
-      }
-    });
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            response.onError(Status.INTERNAL.asException());
+            return new NoopStreamObserver();
+          }
+        });
 
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -926,14 +941,14 @@
 
     byte[] blob1 = new byte[CHUNK_SIZE];
     Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash1 = HashCode.fromString(DIGEST_UTIL.compute(blob1).getHash());
+    Digest digest1 = DIGEST_UTIL.compute(blob1);
 
     byte[] blob2 = new byte[CHUNK_SIZE + 1];
     Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash2 = HashCode.fromString(DIGEST_UTIL.compute(blob2).getHash());
+    Digest digest2 = DIGEST_UTIL.compute(blob2);
 
-    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(hash1, chunker1, true);
-    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(hash2, chunker2, true);
+    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true);
+    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true);
 
     assertThat(uploader.uploadsInProgress()).isTrue();
 
@@ -964,14 +979,15 @@
             /* callTimeoutSecs= */ 60,
             retrier);
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        // Immediately fail the call, so that it is retried.
-        response.onError(Status.ABORTED.asException());
-        return new NoopStreamObserver();
-      }
-    });
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            // Immediately fail the call, so that it is retried.
+            response.onError(Status.ABORTED.asException());
+            return new NoopStreamObserver();
+          }
+        });
 
     retryService.shutdownNow();
     // Random very high timeout, as the test will timeout by itself.
@@ -982,7 +998,7 @@
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1004,35 +1020,34 @@
             /* callTimeoutSecs= */ 60,
             retrier);
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        return new StreamObserver<WriteRequest>() {
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
           @Override
-          public void onNext(WriteRequest writeRequest) {
-            // Test that the resource name doesn't start with an instance name.
-            assertThat(writeRequest.getResourceName()).startsWith("uploads/");
-          }
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                // Test that the resource name doesn't start with an instance name.
+                assertThat(writeRequest.getResourceName()).startsWith("uploads/");
+              }
 
-          @Override
-          public void onError(Throwable throwable) {
+              @Override
+              public void onError(Throwable throwable) {}
 
+              @Override
+              public void onCompleted() {
+                response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
+                response.onCompleted();
+              }
+            };
           }
-
-          @Override
-          public void onCompleted() {
-            response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
-            response.onCompleted();
-          }
-        };
-      }
-    });
+        });
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     withEmptyMetadata.detach(prevContext);
   }
@@ -1053,21 +1068,22 @@
 
     AtomicInteger numCalls = new AtomicInteger();
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        numCalls.incrementAndGet();
-        response.onError(Status.INTERNAL.asException());
-        return new NoopStreamObserver();
-      }
-    });
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            numCalls.incrementAndGet();
+            response.onError(Status.INTERNAL.asException());
+            return new NoopStreamObserver();
+          }
+        });
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(numCalls.get()).isEqualTo(1);
@@ -1139,7 +1155,7 @@
     StatusRuntimeException expected = null;
     try {
       // This should fail
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
     } catch (IOException e) {
       if (e.getCause() instanceof StatusRuntimeException) {
         expected = (StatusRuntimeException) e.getCause();
@@ -1148,7 +1164,7 @@
     assertThat(expected).isNotNull();
     assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
     // This should trigger an upload.
-    uploader.uploadBlob(hash, chunker, false);
+    uploader.uploadBlob(context, hash, chunker, false);
 
     assertThat(numUploads.get()).isEqualTo(2);
 
@@ -1177,42 +1193,43 @@
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
     AtomicInteger numUploads = new AtomicInteger();
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
-        numUploads.incrementAndGet();
-        return new StreamObserver<WriteRequest>() {
-
-          long nextOffset = 0;
-
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
           @Override
-          public void onNext(WriteRequest writeRequest) {
-            nextOffset += writeRequest.getData().size();
-            boolean lastWrite = blob.length == nextOffset;
-            assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+            numUploads.incrementAndGet();
+            return new StreamObserver<WriteRequest>() {
+
+              long nextOffset = 0;
+
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                nextOffset += writeRequest.getData().size();
+                boolean lastWrite = blob.length == nextOffset;
+                assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+              }
+
+              @Override
+              public void onError(Throwable throwable) {
+                fail("onError should never be called.");
+              }
+
+              @Override
+              public void onCompleted() {
+                assertThat(nextOffset).isEqualTo(blob.length);
+
+                WriteResponse response =
+                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+                streamObserver.onNext(response);
+                streamObserver.onCompleted();
+              }
+            };
           }
+        });
 
-          @Override
-          public void onError(Throwable throwable) {
-            fail("onError should never be called.");
-          }
-
-          @Override
-          public void onCompleted() {
-            assertThat(nextOffset).isEqualTo(blob.length);
-
-            WriteResponse response =
-                WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
-            streamObserver.onNext(response);
-            streamObserver.onCompleted();
-          }
-        };
-      }
-    });
-
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
     // This should not trigger an upload.
-    uploader.uploadBlob(hash, chunker, false);
+    uploader.uploadBlob(context, hash, chunker, false);
 
     assertThat(numUploads.get()).isEqualTo(1);
 
@@ -1271,11 +1288,7 @@
           }
         });
 
-    assertThrows(
-        IOException.class,
-        () -> {
-          uploader.uploadBlob(hash, chunker, true);
-        });
+    assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true));
 
     assertThat(refreshTimes.get()).isEqualTo(1);
     assertThat(numUploads.get()).isEqualTo(2);
@@ -1363,7 +1376,7 @@
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     assertThat(refreshTimes.get()).isEqualTo(1);
     assertThat(numUploads.get()).isEqualTo(2);
@@ -1378,16 +1391,13 @@
 
   private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
     @Override
-    public void onNext(WriteRequest writeRequest) {
-    }
+    public void onNext(WriteRequest writeRequest) {}
 
     @Override
-    public void onError(Throwable throwable) {
-    }
+    public void onError(Throwable throwable) {}
 
     @Override
-    public void onCompleted() {
-    }
+    public void onCompleted() {}
   }
 
   static class FixedBackoff implements Retrier.Backoff {