Blobs are only uploaded when the upload succeeds

Avoid recording that a blob has been uploaded when an upload has failed

Closes #7019.

PiperOrigin-RevId: 228560183
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 303cfdd..b2e5f56 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -45,6 +45,7 @@
 import io.grpc.ServerServiceDefinition;
 import io.grpc.Status;
 import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.stub.StreamObserver;
@@ -602,6 +603,83 @@
   }
 
   @Test
+  public void failedUploadsShouldNotDeduplicate() throws Exception {
+    Context prevContext = withEmptyMetadata.attach();
+    RemoteRetrier retrier =
+        new RemoteRetrier(
+            () -> Retrier.RETRIES_DISABLED, (e) -> false, retryService, Retrier.ALLOW_ALL_CALLS);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 3, retrier);
+
+    byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
+    new Random().nextBytes(blob);
+
+    Chunker chunker = Chunker.builder(DIGEST_UTIL).setInput(blob).setChunkSize(CHUNK_SIZE).build();
+
+    AtomicInteger numUploads = new AtomicInteger();
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          boolean failRequest = true;
+
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+            numUploads.incrementAndGet();
+            return new StreamObserver<WriteRequest>() {
+              long nextOffset = 0;
+
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                if (failRequest) {
+                  streamObserver.onError(Status.UNKNOWN.asException());
+                  failRequest = false;
+                } else {
+                  nextOffset += writeRequest.getData().size();
+                  boolean lastWrite = blob.length == nextOffset;
+                  assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+                }
+              }
+
+              @Override
+              public void onError(Throwable throwable) {
+                fail("onError should never be called.");
+              }
+
+              @Override
+              public void onCompleted() {
+                assertThat(nextOffset).isEqualTo(blob.length);
+
+                WriteResponse response =
+                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+                streamObserver.onNext(response);
+                streamObserver.onCompleted();
+              }
+            };
+          }
+        });
+
+    StatusRuntimeException expected = null;
+    try {
+      // This should fail
+      uploader.uploadBlob(chunker, true);
+    } catch (RetryException e) {
+      if (e.getCause() instanceof StatusRuntimeException) {
+        expected = (StatusRuntimeException) e.getCause();
+      }
+    }
+    assertThat(expected).isNotNull();
+    assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
+    // This should trigger an upload.
+    uploader.uploadBlob(chunker, false);
+
+    assertThat(numUploads.get()).isEqualTo(2);
+
+    blockUntilInternalStateConsistent(uploader);
+
+    withEmptyMetadata.detach(prevContext);
+  }
+
+  @Test
   public void deduplicationOfUploadsShouldWork() throws Exception {
     Context prevContext = withEmptyMetadata.attach();
     RemoteRetrier retrier =