Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 4)
Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext.
PiperOrigin-RevId: 354472775
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index eec073a..29cd41e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -36,6 +36,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -86,9 +89,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-/**
- * Tests for {@link ByteStreamUploader}.
- */
+/** Tests for {@link ByteStreamUploader}. */
@RunWith(JUnit4.class)
public class ByteStreamUploaderTest {
@@ -102,6 +103,7 @@
private Server server;
private ManagedChannel channel;
+ private RemoteActionExecutionContext context;
private Context withEmptyMetadata;
private Context prevContext;
@@ -112,12 +114,19 @@
MockitoAnnotations.initMocks(this);
String serverName = "Server for " + this.getClass();
- server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry)
- .build().start();
+ server =
+ InProcessServerBuilder.forName(serverName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .build()
+ .start();
channel = InProcessChannelBuilder.forName(serverName).build();
- withEmptyMetadata =
- TracingMetadataUtils.contextWithMetadata(
- "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(
+ "none",
+ "none",
+ DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash());
+ context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+ withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata);
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@@ -161,7 +170,8 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- serviceRegistry.addService(new ByteStreamImplBase() {
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@@ -183,8 +193,8 @@
ByteString data = writeRequest.getData();
- System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset,
- data.size());
+ System.arraycopy(
+ data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
nextOffset += data.size();
boolean lastWrite = blob.length == nextOffset;
@@ -210,7 +220,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
Mockito.verifyZeroInteractions(mockBackoff);
@@ -328,7 +338,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
@@ -392,7 +402,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
assertThat(numWriteCalls.get()).isEqualTo(1);
@@ -466,7 +476,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should have triggered a single retry, because it made
// no progress.
@@ -508,7 +518,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This test should not have triggered any retries.
Mockito.verifyZeroInteractions(mockBackoff);
@@ -549,7 +559,7 @@
});
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
// expected
@@ -592,7 +602,7 @@
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(chunkers, true);
+ uploader.uploadBlobs(context, chunkers, true);
blockUntilInternalStateConsistent(uploader);
@@ -690,16 +700,19 @@
for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
Digest actionDigest = chunkerEntry.getKey();
- Context ctx =
- TracingMetadataUtils.contextWithMetadata(
- "build-req-id", "command-id", DIGEST_UTIL.asActionKey(actionDigest));
- ctx.run(
- () ->
- uploads.add(
- uploader.uploadBlobAsync(
- HashCode.fromString(actionDigest.getHash()),
- chunkerEntry.getValue(),
- /* forceUpload=*/ true)));
+ RequestMetadata metadata =
+ TracingMetadataUtils.buildMetadata(
+ "build-req-id",
+ "command-id",
+ DIGEST_UTIL.asActionKey(actionDigest).getDigest().getHash());
+ RemoteActionExecutionContext remoteActionExecutionContext =
+ new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+ uploads.add(
+ uploader.uploadBlobAsync(
+ remoteActionExecutionContext,
+ actionDigest,
+ chunkerEntry.getValue(),
+ /* forceUpload= */ true));
}
for (ListenableFuture<Void> upload : uploads) {
@@ -776,7 +789,7 @@
}
}));
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
}
@Test
@@ -796,47 +809,48 @@
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
AtomicInteger numWriteCalls = new AtomicInteger();
CountDownLatch blocker = new CountDownLatch(1);
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- numWriteCalls.incrementAndGet();
- try {
- // Ensures that the first upload does not finish, before the second upload is started.
- blocker.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- return new StreamObserver<WriteRequest>() {
-
- private long bytesReceived;
-
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
- public void onNext(WriteRequest writeRequest) {
- bytesReceived += writeRequest.getData().size();
- }
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ numWriteCalls.incrementAndGet();
+ try {
+ // Ensures that the first upload does not finish, before the second upload is started.
+ blocker.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
+ return new StreamObserver<WriteRequest>() {
- @Override
- public void onCompleted() {
- response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
- response.onCompleted();
- }
- };
- }
- });
+ private long bytesReceived;
- Future<?> upload1 = uploader.uploadBlobAsync(hash, chunker, true);
- Future<?> upload2 = uploader.uploadBlobAsync(hash, chunker, true);
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ bytesReceived += writeRequest.getData().size();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
+ response.onCompleted();
+ }
+ };
+ }
+ });
+
+ Future<?> upload1 = uploader.uploadBlobAsync(context, digest, chunker, true);
+ Future<?> upload2 = uploader.uploadBlobAsync(context, digest, chunker, true);
blocker.countDown();
@@ -866,16 +880,17 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- response.onError(Status.INTERNAL.asException());
- return new NoopStreamObserver();
- }
- });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ response.onError(Status.INTERNAL.asException());
+ return new NoopStreamObserver();
+ }
+ });
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -926,14 +941,14 @@
byte[] blob1 = new byte[CHUNK_SIZE];
Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build();
- HashCode hash1 = HashCode.fromString(DIGEST_UTIL.compute(blob1).getHash());
+ Digest digest1 = DIGEST_UTIL.compute(blob1);
byte[] blob2 = new byte[CHUNK_SIZE + 1];
Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build();
- HashCode hash2 = HashCode.fromString(DIGEST_UTIL.compute(blob2).getHash());
+ Digest digest2 = DIGEST_UTIL.compute(blob2);
- ListenableFuture<Void> f1 = uploader.uploadBlobAsync(hash1, chunker1, true);
- ListenableFuture<Void> f2 = uploader.uploadBlobAsync(hash2, chunker2, true);
+ ListenableFuture<Void> f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true);
+ ListenableFuture<Void> f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true);
assertThat(uploader.uploadsInProgress()).isTrue();
@@ -964,14 +979,15 @@
/* callTimeoutSecs= */ 60,
retrier);
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- // Immediately fail the call, so that it is retried.
- response.onError(Status.ABORTED.asException());
- return new NoopStreamObserver();
- }
- });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ // Immediately fail the call, so that it is retried.
+ response.onError(Status.ABORTED.asException());
+ return new NoopStreamObserver();
+ }
+ });
retryService.shutdownNow();
// Random very high timeout, as the test will timeout by itself.
@@ -982,7 +998,7 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1004,35 +1020,34 @@
/* callTimeoutSecs= */ 60,
retrier);
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- return new StreamObserver<WriteRequest>() {
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
- public void onNext(WriteRequest writeRequest) {
- // Test that the resource name doesn't start with an instance name.
- assertThat(writeRequest.getResourceName()).startsWith("uploads/");
- }
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ // Test that the resource name doesn't start with an instance name.
+ assertThat(writeRequest.getResourceName()).startsWith("uploads/");
+ }
- @Override
- public void onError(Throwable throwable) {
+ @Override
+ public void onError(Throwable throwable) {}
+ @Override
+ public void onCompleted() {
+ response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
+ response.onCompleted();
+ }
+ };
}
-
- @Override
- public void onCompleted() {
- response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
- response.onCompleted();
- }
- };
- }
- });
+ });
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
withEmptyMetadata.detach(prevContext);
}
@@ -1053,21 +1068,22 @@
AtomicInteger numCalls = new AtomicInteger();
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- numCalls.incrementAndGet();
- response.onError(Status.INTERNAL.asException());
- return new NoopStreamObserver();
- }
- });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ numCalls.incrementAndGet();
+ response.onError(Status.INTERNAL.asException());
+ return new NoopStreamObserver();
+ }
+ });
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
try {
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(numCalls.get()).isEqualTo(1);
@@ -1139,7 +1155,7 @@
StatusRuntimeException expected = null;
try {
// This should fail
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
} catch (IOException e) {
if (e.getCause() instanceof StatusRuntimeException) {
expected = (StatusRuntimeException) e.getCause();
@@ -1148,7 +1164,7 @@
assertThat(expected).isNotNull();
assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
// This should trigger an upload.
- uploader.uploadBlob(hash, chunker, false);
+ uploader.uploadBlob(context, hash, chunker, false);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1177,42 +1193,43 @@
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
AtomicInteger numUploads = new AtomicInteger();
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
- numUploads.incrementAndGet();
- return new StreamObserver<WriteRequest>() {
-
- long nextOffset = 0;
-
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
@Override
- public void onNext(WriteRequest writeRequest) {
- nextOffset += writeRequest.getData().size();
- boolean lastWrite = blob.length == nextOffset;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ numUploads.incrementAndGet();
+ return new StreamObserver<WriteRequest>() {
+
+ long nextOffset = 0;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ nextOffset += writeRequest.getData().size();
+ boolean lastWrite = blob.length == nextOffset;
+ assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ assertThat(nextOffset).isEqualTo(blob.length);
+
+ WriteResponse response =
+ WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+ };
}
+ });
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- assertThat(nextOffset).isEqualTo(blob.length);
-
- WriteResponse response =
- WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
- };
- }
- });
-
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
// This should not trigger an upload.
- uploader.uploadBlob(hash, chunker, false);
+ uploader.uploadBlob(context, hash, chunker, false);
assertThat(numUploads.get()).isEqualTo(1);
@@ -1271,11 +1288,7 @@
}
});
- assertThrows(
- IOException.class,
- () -> {
- uploader.uploadBlob(hash, chunker, true);
- });
+ assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true));
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1363,7 +1376,7 @@
}
});
- uploader.uploadBlob(hash, chunker, true);
+ uploader.uploadBlob(context, hash, chunker, true);
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1378,16 +1391,13 @@
private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
@Override
- public void onNext(WriteRequest writeRequest) {
- }
+ public void onNext(WriteRequest writeRequest) {}
@Override
- public void onError(Throwable throwable) {
- }
+ public void onError(Throwable throwable) {}
@Override
- public void onCompleted() {
- }
+ public void onCompleted() {}
}
static class FixedBackoff implements Retrier.Backoff {