Remote: Cleanup ByteStreamUploader.
During previous work on improving BES uploader, we updated the uploader to use `RemoteCache` directly instead of referencing `ByteStreamUploader`. This makes following cleanups possible:
1. `ByteStreamUploader` is no longer reference counted. It was reference counted mainly because it's needed to be shared between remote execution and BES. Now the only place which references it is `RemoteCache`.
2. `ByteStreamUploader` does not deduplicate uploads as `RemoteCache` already supports it. Similar to #14441. This should reduce memory footprint when using remote execution and/or BES.
3. Deprecated methods of `ByteStreamUploader` are removed.
Closes #14680.
PiperOrigin-RevId: 425887231
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index adcc14a..feb39e7 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -24,7 +24,6 @@
import build.bazel.remote.execution.v2.RequestMetadata;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
-import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
@@ -52,11 +51,9 @@
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
-import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
-import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
@@ -76,9 +73,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -174,16 +169,14 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -204,7 +197,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -294,13 +287,11 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -322,7 +313,7 @@
Chunker chunker =
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
while (chunker.hasNext()) {
chunker.next();
@@ -419,13 +410,11 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -447,7 +436,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
AtomicInteger numWriteCalls = new AtomicInteger(0);
@@ -481,12 +470,10 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should not have triggered any retries.
assertThat(numWriteCalls.get()).isEqualTo(1);
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -507,7 +494,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -553,13 +540,11 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should have triggered a single retry, because it made
// no progress.
Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class));
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -581,7 +566,7 @@
InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE);
Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -593,12 +578,10 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -618,7 +601,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -645,7 +628,7 @@
});
try {
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
// expected
@@ -653,8 +636,6 @@
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -674,7 +655,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -686,8 +667,7 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
- blockUntilInternalStateConsistent(uploader);
+ uploader.uploadBlob(context, digest, chunker);
}
@Test
@@ -705,23 +685,21 @@
int numUploads = 10;
Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
- Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
byte[] blob = new byte[blobSize];
rand.nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- chunkers.put(hash, chunker);
- blobsByHash.put(hash, blob);
+ Digest digest = DIGEST_UTIL.compute(blob);
+ chunkers.put(digest, chunker);
+ blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
}
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(context, chunkers, true);
-
- blockUntilInternalStateConsistent(uploader);
+ uploader.uploadBlobs(context, chunkers);
}
@Test
@@ -738,15 +716,15 @@
/*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Mockito.mock(Chunker.class);
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
AtomicLong committedOffset = new AtomicLong(0);
Mockito.doThrow(new IOException("Too many open files"))
.when(chunker)
.seek(committedOffset.get());
- Mockito.when(chunker.getSize()).thenReturn(1L);
+ Mockito.when(chunker.getSize()).thenReturn(digest.getSizeBytes());
try {
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
String newMessage =
@@ -779,7 +757,7 @@
CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
int numUploads = 1000;
Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
- Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
@@ -787,16 +765,14 @@
rand.nextBytes(blob);
Chunker chunker =
TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- chunkers.put(hash, chunker);
- blobsByHash.put(hash, blob);
+ Digest digest = DIGEST_UTIL.compute(blob);
+ chunkers.put(digest, chunker);
+ blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
}
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(context, chunkers, true);
-
- blockUntilInternalStateConsistent(uploader);
+ uploader.uploadBlobs(context, chunkers);
assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
}
@@ -817,22 +793,21 @@
int numUploads = 10;
Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
- Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
byte[] blob = new byte[blobSize];
rand.nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- chunkers.put(hash, chunker);
- blobsByHash.put(hash, blob);
+ Digest digest = DIGEST_UTIL.compute(blob);
+ chunkers.put(digest, chunker);
+ blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
}
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(context, chunkers, true);
- blockUntilInternalStateConsistent(uploader);
+ uploader.uploadBlobs(context, chunkers);
assertThat(uploader.getOpenedFilePermits()).isNull();
}
@@ -937,17 +912,12 @@
RemoteActionExecutionContext.create(metadata);
uploads.add(
uploader.uploadBlobAsync(
- remoteActionExecutionContext,
- actionDigest,
- chunkerEntry.getValue(),
- /* forceUpload= */ true));
+ remoteActionExecutionContext, actionDigest, chunkerEntry.getValue()));
}
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -985,7 +955,7 @@
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
ServerInterceptors.intercept(
@@ -1028,75 +998,7 @@
}
}));
- uploader.uploadBlob(context, hash, chunker, true);
- }
-
- @Test
- public void sameBlobShouldNotBeUploadedTwice() throws Exception {
- // Test that uploading the same file concurrently triggers only one file upload.
- RemoteRetrier retrier =
- TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
- ByteStreamUploader uploader =
- new ByteStreamUploader(
- INSTANCE_NAME,
- new ReferenceCountedChannel(channelConnectionFactory),
- CallCredentialsProvider.NO_CREDENTIALS,
- /* callTimeoutSecs= */ 60,
- retrier,
- /*maximumOpenFiles=*/ -1);
-
- byte[] blob = new byte[CHUNK_SIZE * 10];
- Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- Digest digest = DIGEST_UTIL.compute(blob);
-
- AtomicInteger numWriteCalls = new AtomicInteger();
- CountDownLatch blocker = new CountDownLatch(1);
-
- serviceRegistry.addService(
- new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- numWriteCalls.incrementAndGet();
- try {
- // Ensures that the first upload does not finish, before the second upload is started.
- blocker.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- return new StreamObserver<WriteRequest>() {
-
- private long bytesReceived;
-
- @Override
- public void onNext(WriteRequest writeRequest) {
- bytesReceived += writeRequest.getData().size();
- }
-
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
- response.onCompleted();
- }
- };
- }
- });
-
- Future<?> upload1 = uploader.uploadBlobAsync(context, digest, chunker, true);
- Future<?> upload2 = uploader.uploadBlobAsync(context, digest, chunker, true);
-
- blocker.countDown();
-
- assertThat(upload1).isSameInstanceAs(upload2);
-
- upload1.get();
-
- assertThat(numWriteCalls.get()).isEqualTo(1);
+ uploader.uploadBlob(context, digest, chunker);
}
@Test
@@ -1114,7 +1016,7 @@
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1126,7 +1028,7 @@
});
try {
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -1134,69 +1036,6 @@
}
@Test
- public void shutdownShouldCancelOngoingUploads() throws Exception {
- RemoteRetrier retrier =
- TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, retryService);
- ByteStreamUploader uploader =
- new ByteStreamUploader(
- INSTANCE_NAME,
- new ReferenceCountedChannel(channelConnectionFactory),
- CallCredentialsProvider.NO_CREDENTIALS,
- /* callTimeoutSecs= */ 60,
- retrier,
- /*maximumOpenFiles=*/ -1);
-
- CountDownLatch cancellations = new CountDownLatch(2);
-
- ServerServiceDefinition service =
- ServerServiceDefinition.builder(ByteStreamGrpc.SERVICE_NAME)
- .addMethod(
- ByteStreamGrpc.getWriteMethod(),
- new ServerCallHandler<WriteRequest, WriteResponse>() {
- @Override
- public Listener<WriteRequest> startCall(
- ServerCall<WriteRequest, WriteResponse> call, Metadata headers) {
- // Don't request() any messages from the client, so that the client will be
- // blocked
- // on flow control and thus the call will sit there idle long enough to receive
- // the
- // cancellation.
- return new Listener<WriteRequest>() {
- @Override
- public void onCancel() {
- cancellations.countDown();
- }
- };
- }
- })
- .build();
-
- serviceRegistry.addService(service);
-
- byte[] blob1 = new byte[CHUNK_SIZE];
- Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build();
- Digest digest1 = DIGEST_UTIL.compute(blob1);
-
- byte[] blob2 = new byte[CHUNK_SIZE + 1];
- Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build();
- Digest digest2 = DIGEST_UTIL.compute(blob2);
-
- ListenableFuture<Void> f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true);
- ListenableFuture<Void> f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true);
-
- assertThat(uploader.uploadsInProgress()).isTrue();
-
- uploader.shutdown();
-
- cancellations.await();
-
- assertThat(f1.isCancelled()).isTrue();
- assertThat(f2.isCancelled()).isTrue();
-
- blockUntilInternalStateConsistent(uploader);
- }
-
- @Test
public void failureInRetryExecutorShouldBeHandled() throws Exception {
ListeningScheduledExecutorService retryService =
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@@ -1228,9 +1067,9 @@
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
try {
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1275,9 +1114,9 @@
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
}
@Test
@@ -1308,10 +1147,10 @@
byte[] blob = new byte[1];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
try {
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(numCalls.get()).isEqualTo(1);
@@ -1319,151 +1158,6 @@
}
@Test
- public void failedUploadsShouldNotDeduplicate() throws Exception {
- RemoteRetrier retrier =
- TestUtils.newRemoteRetrier(() -> Retrier.RETRIES_DISABLED, (e) -> false, retryService);
- ByteStreamUploader uploader =
- new ByteStreamUploader(
- INSTANCE_NAME,
- new ReferenceCountedChannel(channelConnectionFactory),
- CallCredentialsProvider.NO_CREDENTIALS,
- /* callTimeoutSecs= */ 60,
- retrier,
- /*maximumOpenFiles=*/ -1);
-
- byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
- new Random().nextBytes(blob);
-
- Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-
- AtomicInteger numUploads = new AtomicInteger();
- serviceRegistry.addService(
- new ByteStreamImplBase() {
- boolean failRequest = true;
-
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
- numUploads.incrementAndGet();
- return new StreamObserver<WriteRequest>() {
- long nextOffset = 0;
-
- @Override
- public void onNext(WriteRequest writeRequest) {
- if (failRequest) {
- streamObserver.onError(Status.UNKNOWN.asException());
- failRequest = false;
- } else {
- nextOffset += writeRequest.getData().size();
- boolean lastWrite = blob.length == nextOffset;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- assertThat(nextOffset).isEqualTo(blob.length);
-
- WriteResponse response =
- WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
- };
- }
- });
-
- StatusRuntimeException expected = null;
- try {
- // This should fail
- uploader.uploadBlob(context, hash, chunker, true);
- } catch (IOException e) {
- if (e.getCause() instanceof StatusRuntimeException) {
- expected = (StatusRuntimeException) e.getCause();
- }
- }
- assertThat(expected).isNotNull();
- assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
- // This should trigger an upload.
- uploader.uploadBlob(context, hash, chunker, false);
-
- assertThat(numUploads.get()).isEqualTo(2);
-
- blockUntilInternalStateConsistent(uploader);
- }
-
- @Test
- public void deduplicationOfUploadsShouldWork() throws Exception {
- RemoteRetrier retrier =
- TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
- ByteStreamUploader uploader =
- new ByteStreamUploader(
- INSTANCE_NAME,
- new ReferenceCountedChannel(channelConnectionFactory),
- CallCredentialsProvider.NO_CREDENTIALS,
- /* callTimeoutSecs= */ 60,
- retrier,
- /*maximumOpenFiles=*/ -1);
-
- byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
- new Random().nextBytes(blob);
-
- Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-
- AtomicInteger numUploads = new AtomicInteger();
- serviceRegistry.addService(
- new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
- numUploads.incrementAndGet();
- return new StreamObserver<WriteRequest>() {
-
- long nextOffset = 0;
-
- @Override
- public void onNext(WriteRequest writeRequest) {
- nextOffset += writeRequest.getData().size();
- boolean lastWrite = blob.length == nextOffset;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
- }
-
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- assertThat(nextOffset).isEqualTo(blob.length);
-
- WriteResponse response =
- WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
- };
- }
- });
-
- uploader.uploadBlob(context, hash, chunker, true);
- // This should not trigger an upload.
- uploader.uploadBlob(context, hash, chunker, false);
-
- assertThat(numUploads.get()).isEqualTo(1);
-
- // This test should not have triggered any retries.
- Mockito.verifyNoInteractions(mockBackoff);
-
- blockUntilInternalStateConsistent(uploader);
- }
-
- @Test
public void unauthenticatedErrorShouldNotBeRetried() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(
@@ -1496,7 +1190,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
AtomicInteger numUploads = new AtomicInteger();
serviceRegistry.addService(
@@ -1510,15 +1204,13 @@
}
});
- assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true));
+ assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker));
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -1554,7 +1246,7 @@
new Random().nextBytes(blob);
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
AtomicInteger numUploads = new AtomicInteger();
serviceRegistry.addService(
@@ -1596,15 +1288,13 @@
}
});
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
-
- blockUntilInternalStateConsistent(uploader);
}
@Test
@@ -1661,11 +1351,9 @@
});
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, hash, chunker, true);
-
- blockUntilInternalStateConsistent(uploader);
+ uploader.uploadBlob(context, digest, chunker);
assertThat(numUploads.get()).isEqualTo(1);
}
@@ -1752,15 +1440,13 @@
Chunker chunker =
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
- HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, hash, chunker, true);
+ uploader.uploadBlob(context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
- blockUntilInternalStateConsistent(uploader);
-
assertThat(numUploads.get()).isEqualTo(1);
}
@@ -1883,14 +1569,6 @@
}
}
- private void blockUntilInternalStateConsistent(ByteStreamUploader uploader) throws Exception {
- // Poll until all upload futures have been removed from the internal hash map. The polling is
- // necessary, as listeners are executed after Future.get() calls are notified about completion.
- while (uploader.uploadsInProgress()) {
- Thread.sleep(1);
- }
- }
-
/* Custom Chunker used to track number of open files */
private static class TestChunker extends Chunker {
@@ -1911,6 +1589,8 @@
@Override
public Chunker.Builder setInput(byte[] existingData) {
+ checkState(this.inputStream == null);
+ this.size = existingData.length;
return setInputSupplier(
() -> new TestByteArrayInputStream(existingData, customFileTracker));
}