Remote: Cleanup ByteStreamUploader.

During previous work on improving BES uploader, we updated the uploader to use `RemoteCache` directly instead of referencing `ByteStreamUploader`. This makes following cleanups possible:

1. `ByteStreamUploader` is no longer reference counted. It was reference counted mainly because it's needed to be shared between remote execution and BES. Now the only place which references it is `RemoteCache`.
2. `ByteStreamUploader` does not deduplicate uploads as `RemoteCache` already supports it. Similar to #14441. This should reduce memory footprint when using remote execution and/or BES.
3. Deprecated methods of `ByteStreamUploader` are removed.

Closes #14680.

PiperOrigin-RevId: 425887231
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index adcc14a..feb39e7 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -24,7 +24,6 @@
 import build.bazel.remote.execution.v2.RequestMetadata;
 import com.github.luben.zstd.Zstd;
 import com.github.luben.zstd.ZstdInputStream;
-import com.google.bytestream.ByteStreamGrpc;
 import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
 import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
 import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
@@ -52,11 +51,9 @@
 import io.grpc.Metadata;
 import io.grpc.Server;
 import io.grpc.ServerCall;
-import io.grpc.ServerCall.Listener;
 import io.grpc.ServerCallHandler;
 import io.grpc.ServerInterceptor;
 import io.grpc.ServerInterceptors;
-import io.grpc.ServerServiceDefinition;
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
@@ -76,9 +73,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -174,16 +169,14 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should not have triggered any retries.
     Mockito.verifyNoInteractions(mockBackoff);
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -204,7 +197,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -294,13 +287,11 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should not have triggered any retries.
     Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
     Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -322,7 +313,7 @@
 
     Chunker chunker =
         Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     while (chunker.hasNext()) {
       chunker.next();
@@ -419,13 +410,11 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should not have triggered any retries.
     Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
     Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -447,7 +436,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     AtomicInteger numWriteCalls = new AtomicInteger(0);
 
@@ -481,12 +470,10 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should not have triggered any retries.
     assertThat(numWriteCalls.get()).isEqualTo(1);
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -507,7 +494,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -553,13 +540,11 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should have triggered a single retry, because it made
     // no progress.
     Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class));
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -581,7 +566,7 @@
     InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE);
 
     Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -593,12 +578,10 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should not have triggered any retries.
     Mockito.verifyNoInteractions(mockBackoff);
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -618,7 +601,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -645,7 +628,7 @@
         });
 
     try {
-      uploader.uploadBlob(context, hash, chunker, true);
+      uploader.uploadBlob(context, digest, chunker);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       // expected
@@ -653,8 +636,6 @@
 
     // This test should not have triggered any retries.
     Mockito.verifyNoInteractions(mockBackoff);
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -674,7 +655,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -686,8 +667,7 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
-    blockUntilInternalStateConsistent(uploader);
+    uploader.uploadBlob(context, digest, chunker);
   }
 
   @Test
@@ -705,23 +685,21 @@
 
     int numUploads = 10;
     Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
-    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
     Random rand = new Random();
     for (int i = 0; i < numUploads; i++) {
       int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
       byte[] blob = new byte[blobSize];
       rand.nextBytes(blob);
       Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-      chunkers.put(hash, chunker);
-      blobsByHash.put(hash, blob);
+      Digest digest = DIGEST_UTIL.compute(blob);
+      chunkers.put(digest, chunker);
+      blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
     }
 
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
 
-    uploader.uploadBlobs(context, chunkers, true);
-
-    blockUntilInternalStateConsistent(uploader);
+    uploader.uploadBlobs(context, chunkers);
   }
 
   @Test
@@ -738,15 +716,15 @@
             /*maximumOpenFiles=*/ -1);
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Mockito.mock(Chunker.class);
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
     AtomicLong committedOffset = new AtomicLong(0);
     Mockito.doThrow(new IOException("Too many open files"))
         .when(chunker)
         .seek(committedOffset.get());
-    Mockito.when(chunker.getSize()).thenReturn(1L);
+    Mockito.when(chunker.getSize()).thenReturn(digest.getSizeBytes());
 
     try {
-      uploader.uploadBlob(context, hash, chunker, true);
+      uploader.uploadBlob(context, digest, chunker);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       String newMessage =
@@ -779,7 +757,7 @@
     CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
     int numUploads = 1000;
     Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
-    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
     Random rand = new Random();
     for (int i = 0; i < numUploads; i++) {
       int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
@@ -787,16 +765,14 @@
       rand.nextBytes(blob);
       Chunker chunker =
           TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
-      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-      chunkers.put(hash, chunker);
-      blobsByHash.put(hash, blob);
+      Digest digest = DIGEST_UTIL.compute(blob);
+      chunkers.put(digest, chunker);
+      blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
     }
 
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
 
-    uploader.uploadBlobs(context, chunkers, true);
-
-    blockUntilInternalStateConsistent(uploader);
+    uploader.uploadBlobs(context, chunkers);
 
     assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
   }
@@ -817,22 +793,21 @@
 
     int numUploads = 10;
     Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
-    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
     Random rand = new Random();
     for (int i = 0; i < numUploads; i++) {
       int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
       byte[] blob = new byte[blobSize];
       rand.nextBytes(blob);
       Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-      chunkers.put(hash, chunker);
-      blobsByHash.put(hash, blob);
+      Digest digest = DIGEST_UTIL.compute(blob);
+      chunkers.put(digest, chunker);
+      blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
     }
 
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
 
-    uploader.uploadBlobs(context, chunkers, true);
-    blockUntilInternalStateConsistent(uploader);
+    uploader.uploadBlobs(context, chunkers);
     assertThat(uploader.getOpenedFilePermits()).isNull();
   }
 
@@ -937,17 +912,12 @@
           RemoteActionExecutionContext.create(metadata);
       uploads.add(
           uploader.uploadBlobAsync(
-              remoteActionExecutionContext,
-              actionDigest,
-              chunkerEntry.getValue(),
-              /* forceUpload= */ true));
+              remoteActionExecutionContext, actionDigest, chunkerEntry.getValue()));
     }
 
     for (ListenableFuture<Void> upload : uploads) {
       upload.get();
     }
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -985,7 +955,7 @@
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         ServerInterceptors.intercept(
@@ -1028,75 +998,7 @@
               }
             }));
 
-    uploader.uploadBlob(context, hash, chunker, true);
-  }
-
-  @Test
-  public void sameBlobShouldNotBeUploadedTwice() throws Exception {
-    // Test that uploading the same file concurrently triggers only one file upload.
-    RemoteRetrier retrier =
-        TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
-    ByteStreamUploader uploader =
-        new ByteStreamUploader(
-            INSTANCE_NAME,
-            new ReferenceCountedChannel(channelConnectionFactory),
-            CallCredentialsProvider.NO_CREDENTIALS,
-            /* callTimeoutSecs= */ 60,
-            retrier,
-            /*maximumOpenFiles=*/ -1);
-
-    byte[] blob = new byte[CHUNK_SIZE * 10];
-    Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    Digest digest = DIGEST_UTIL.compute(blob);
-
-    AtomicInteger numWriteCalls = new AtomicInteger();
-    CountDownLatch blocker = new CountDownLatch(1);
-
-    serviceRegistry.addService(
-        new ByteStreamImplBase() {
-          @Override
-          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-            numWriteCalls.incrementAndGet();
-            try {
-              // Ensures that the first upload does not finish, before the second upload is started.
-              blocker.await();
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-            }
-
-            return new StreamObserver<WriteRequest>() {
-
-              private long bytesReceived;
-
-              @Override
-              public void onNext(WriteRequest writeRequest) {
-                bytesReceived += writeRequest.getData().size();
-              }
-
-              @Override
-              public void onError(Throwable throwable) {
-                fail("onError should never be called.");
-              }
-
-              @Override
-              public void onCompleted() {
-                response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
-                response.onCompleted();
-              }
-            };
-          }
-        });
-
-    Future<?> upload1 = uploader.uploadBlobAsync(context, digest, chunker, true);
-    Future<?> upload2 = uploader.uploadBlobAsync(context, digest, chunker, true);
-
-    blocker.countDown();
-
-    assertThat(upload1).isSameInstanceAs(upload2);
-
-    upload1.get();
-
-    assertThat(numWriteCalls.get()).isEqualTo(1);
+    uploader.uploadBlob(context, digest, chunker);
   }
 
   @Test
@@ -1114,7 +1016,7 @@
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1126,7 +1028,7 @@
         });
 
     try {
-      uploader.uploadBlob(context, hash, chunker, true);
+      uploader.uploadBlob(context, digest, chunker);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -1134,69 +1036,6 @@
   }
 
   @Test
-  public void shutdownShouldCancelOngoingUploads() throws Exception {
-    RemoteRetrier retrier =
-        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, retryService);
-    ByteStreamUploader uploader =
-        new ByteStreamUploader(
-            INSTANCE_NAME,
-            new ReferenceCountedChannel(channelConnectionFactory),
-            CallCredentialsProvider.NO_CREDENTIALS,
-            /* callTimeoutSecs= */ 60,
-            retrier,
-            /*maximumOpenFiles=*/ -1);
-
-    CountDownLatch cancellations = new CountDownLatch(2);
-
-    ServerServiceDefinition service =
-        ServerServiceDefinition.builder(ByteStreamGrpc.SERVICE_NAME)
-            .addMethod(
-                ByteStreamGrpc.getWriteMethod(),
-                new ServerCallHandler<WriteRequest, WriteResponse>() {
-                  @Override
-                  public Listener<WriteRequest> startCall(
-                      ServerCall<WriteRequest, WriteResponse> call, Metadata headers) {
-                    // Don't request() any messages from the client, so that the client will be
-                    // blocked
-                    // on flow control and thus the call will sit there idle long enough to receive
-                    // the
-                    // cancellation.
-                    return new Listener<WriteRequest>() {
-                      @Override
-                      public void onCancel() {
-                        cancellations.countDown();
-                      }
-                    };
-                  }
-                })
-            .build();
-
-    serviceRegistry.addService(service);
-
-    byte[] blob1 = new byte[CHUNK_SIZE];
-    Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build();
-    Digest digest1 = DIGEST_UTIL.compute(blob1);
-
-    byte[] blob2 = new byte[CHUNK_SIZE + 1];
-    Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build();
-    Digest digest2 = DIGEST_UTIL.compute(blob2);
-
-    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true);
-    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true);
-
-    assertThat(uploader.uploadsInProgress()).isTrue();
-
-    uploader.shutdown();
-
-    cancellations.await();
-
-    assertThat(f1.isCancelled()).isTrue();
-    assertThat(f2.isCancelled()).isTrue();
-
-    blockUntilInternalStateConsistent(uploader);
-  }
-
-  @Test
   public void failureInRetryExecutorShouldBeHandled() throws Exception {
     ListeningScheduledExecutorService retryService =
         MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@@ -1228,9 +1067,9 @@
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
     try {
-      uploader.uploadBlob(context, hash, chunker, true);
+      uploader.uploadBlob(context, digest, chunker);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1275,9 +1114,9 @@
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
   }
 
   @Test
@@ -1308,10 +1147,10 @@
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     try {
-      uploader.uploadBlob(context, hash, chunker, true);
+      uploader.uploadBlob(context, digest, chunker);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(numCalls.get()).isEqualTo(1);
@@ -1319,151 +1158,6 @@
   }
 
   @Test
-  public void failedUploadsShouldNotDeduplicate() throws Exception {
-    RemoteRetrier retrier =
-        TestUtils.newRemoteRetrier(() -> Retrier.RETRIES_DISABLED, (e) -> false, retryService);
-    ByteStreamUploader uploader =
-        new ByteStreamUploader(
-            INSTANCE_NAME,
-            new ReferenceCountedChannel(channelConnectionFactory),
-            CallCredentialsProvider.NO_CREDENTIALS,
-            /* callTimeoutSecs= */ 60,
-            retrier,
-            /*maximumOpenFiles=*/ -1);
-
-    byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
-    new Random().nextBytes(blob);
-
-    Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-
-    AtomicInteger numUploads = new AtomicInteger();
-    serviceRegistry.addService(
-        new ByteStreamImplBase() {
-          boolean failRequest = true;
-
-          @Override
-          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
-            numUploads.incrementAndGet();
-            return new StreamObserver<WriteRequest>() {
-              long nextOffset = 0;
-
-              @Override
-              public void onNext(WriteRequest writeRequest) {
-                if (failRequest) {
-                  streamObserver.onError(Status.UNKNOWN.asException());
-                  failRequest = false;
-                } else {
-                  nextOffset += writeRequest.getData().size();
-                  boolean lastWrite = blob.length == nextOffset;
-                  assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
-                }
-              }
-
-              @Override
-              public void onError(Throwable throwable) {
-                fail("onError should never be called.");
-              }
-
-              @Override
-              public void onCompleted() {
-                assertThat(nextOffset).isEqualTo(blob.length);
-
-                WriteResponse response =
-                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
-                streamObserver.onNext(response);
-                streamObserver.onCompleted();
-              }
-            };
-          }
-        });
-
-    StatusRuntimeException expected = null;
-    try {
-      // This should fail
-      uploader.uploadBlob(context, hash, chunker, true);
-    } catch (IOException e) {
-      if (e.getCause() instanceof StatusRuntimeException) {
-        expected = (StatusRuntimeException) e.getCause();
-      }
-    }
-    assertThat(expected).isNotNull();
-    assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
-    // This should trigger an upload.
-    uploader.uploadBlob(context, hash, chunker, false);
-
-    assertThat(numUploads.get()).isEqualTo(2);
-
-    blockUntilInternalStateConsistent(uploader);
-  }
-
-  @Test
-  public void deduplicationOfUploadsShouldWork() throws Exception {
-    RemoteRetrier retrier =
-        TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
-    ByteStreamUploader uploader =
-        new ByteStreamUploader(
-            INSTANCE_NAME,
-            new ReferenceCountedChannel(channelConnectionFactory),
-            CallCredentialsProvider.NO_CREDENTIALS,
-            /* callTimeoutSecs= */ 60,
-            retrier,
-            /*maximumOpenFiles=*/ -1);
-
-    byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
-    new Random().nextBytes(blob);
-
-    Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
-
-    AtomicInteger numUploads = new AtomicInteger();
-    serviceRegistry.addService(
-        new ByteStreamImplBase() {
-          @Override
-          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
-            numUploads.incrementAndGet();
-            return new StreamObserver<WriteRequest>() {
-
-              long nextOffset = 0;
-
-              @Override
-              public void onNext(WriteRequest writeRequest) {
-                nextOffset += writeRequest.getData().size();
-                boolean lastWrite = blob.length == nextOffset;
-                assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
-              }
-
-              @Override
-              public void onError(Throwable throwable) {
-                fail("onError should never be called.");
-              }
-
-              @Override
-              public void onCompleted() {
-                assertThat(nextOffset).isEqualTo(blob.length);
-
-                WriteResponse response =
-                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
-                streamObserver.onNext(response);
-                streamObserver.onCompleted();
-              }
-            };
-          }
-        });
-
-    uploader.uploadBlob(context, hash, chunker, true);
-    // This should not trigger an upload.
-    uploader.uploadBlob(context, hash, chunker, false);
-
-    assertThat(numUploads.get()).isEqualTo(1);
-
-    // This test should not have triggered any retries.
-    Mockito.verifyNoInteractions(mockBackoff);
-
-    blockUntilInternalStateConsistent(uploader);
-  }
-
-  @Test
   public void unauthenticatedErrorShouldNotBeRetried() throws Exception {
     RemoteRetrier retrier =
         TestUtils.newRemoteRetrier(
@@ -1496,7 +1190,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     AtomicInteger numUploads = new AtomicInteger();
     serviceRegistry.addService(
@@ -1510,15 +1204,13 @@
           }
         });
 
-    assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true));
+    assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker));
 
     assertThat(refreshTimes.get()).isEqualTo(1);
     assertThat(numUploads.get()).isEqualTo(2);
 
     // This test should not have triggered any retries.
     Mockito.verifyNoInteractions(mockBackoff);
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -1554,7 +1246,7 @@
     new Random().nextBytes(blob);
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     AtomicInteger numUploads = new AtomicInteger();
     serviceRegistry.addService(
@@ -1596,15 +1288,13 @@
           }
         });
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     assertThat(refreshTimes.get()).isEqualTo(1);
     assertThat(numUploads.get()).isEqualTo(2);
 
     // This test should not have triggered any retries.
     Mockito.verifyNoInteractions(mockBackoff);
-
-    blockUntilInternalStateConsistent(uploader);
   }
 
   @Test
@@ -1661,11 +1351,9 @@
         });
 
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
-    uploader.uploadBlob(context, hash, chunker, true);
-
-    blockUntilInternalStateConsistent(uploader);
+    uploader.uploadBlob(context, digest, chunker);
 
     assertThat(numUploads.get()).isEqualTo(1);
   }
@@ -1752,15 +1440,13 @@
 
     Chunker chunker =
         Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
-    uploader.uploadBlob(context, hash, chunker, true);
+    uploader.uploadBlob(context, digest, chunker);
 
     // This test should not have triggered any retries.
     Mockito.verifyNoInteractions(mockBackoff);
 
-    blockUntilInternalStateConsistent(uploader);
-
     assertThat(numUploads.get()).isEqualTo(1);
   }
 
@@ -1883,14 +1569,6 @@
     }
   }
 
-  private void blockUntilInternalStateConsistent(ByteStreamUploader uploader) throws Exception {
-    // Poll until all upload futures have been removed from the internal hash map. The polling is
-    // necessary, as listeners are executed after Future.get() calls are notified about completion.
-    while (uploader.uploadsInProgress()) {
-      Thread.sleep(1);
-    }
-  }
-
   /* Custom Chunker used to track number of open files */
   private static class TestChunker extends Chunker {
 
@@ -1911,6 +1589,8 @@
 
       @Override
       public Chunker.Builder setInput(byte[] existingData) {
+        checkState(this.inputStream == null);
+        this.size = existingData.length;
         return setInputSupplier(
             () -> new TestByteArrayInputStream(existingData, customFileTracker));
       }