Only request grpc write when not complete

If a queryWriteStatus yields a committedSize which leaves no content
remaining to be uploaded, immediately succeed a blob upload. This can
easily occur if a competing blob write completes asynchronously between
abnormal write termination and a query.

Closes #10284.

PiperOrigin-RevId: 281944912
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 91f9f50..90c9e45 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -353,7 +353,12 @@
       AtomicLong committedOffset = new AtomicLong(0);
       return Futures.transformAsync(
           retrier.executeAsync(
-              () -> ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)),
+              () -> {
+                if (committedOffset.get() < chunker.getSize()) {
+                  return ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
+                }
+                return Futures.immediateFuture(null);
+              },
               progressiveBackoff),
           (result) -> {
             long committedSize = committedOffset.get();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index cda7a3a..8904e59 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -218,7 +218,7 @@
     return new Chunk(blob, offsetBefore);
   }
 
-  private long bytesLeft() {
+  public long bytesLeft() {
     return getSize() - getOffset();
   }
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index cd28f56..2b47d94 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -334,6 +334,65 @@
   }
 
   @Test
+  public void concurrentlyCompletedUploadIsNotRetried() throws Exception {
+    // Test that after an upload has failed and the QueryWriteStatus call returns
+    // that the upload has completed that we'll not retry the upload.
+    Context prevContext = withEmptyMetadata.attach();
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 1, retrier);
+
+    byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
+    new Random().nextBytes(blob);
+
+    Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+
+    AtomicInteger numWriteCalls = new AtomicInteger(0);
+
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+            numWriteCalls.getAndIncrement();
+            streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest writeRequest) {}
+
+              @Override
+              public void onError(Throwable throwable) {}
+
+              @Override
+              public void onCompleted() {}
+            };
+          }
+
+          @Override
+          public void queryWriteStatus(
+              QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
+            response.onNext(
+                QueryWriteStatusResponse.newBuilder()
+                    .setCommittedSize(blob.length)
+                    .setComplete(true)
+                    .build());
+            response.onCompleted();
+          }
+        });
+
+    uploader.uploadBlob(hash, chunker, true);
+
+    // This test should not have triggered any retries.
+    assertThat(numWriteCalls.get()).isEqualTo(1);
+
+    blockUntilInternalStateConsistent(uploader);
+
+    withEmptyMetadata.detach(prevContext);
+  }
+
+  @Test
   public void unimplementedQueryShouldRestartUpload() throws Exception {
     Context prevContext = withEmptyMetadata.attach();
     Mockito.when(mockBackoff.getRetryAttempts()).thenReturn(0);