Stop remote blob upload if upload is complete.

If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0136a83d303d4d88373d6e510f85a81fbb for uploads.

On bazel-6.0.0-pre.20211117.1, I observed:
```
java.lang.NullPointerException
        at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156)
        at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416)
        at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277)
        at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
```

Closes #14464.

PiperOrigin-RevId: 417795715
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index e409fdf..d933223 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -253,7 +253,7 @@
       checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
 
       if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) {
-        return Futures.immediateFuture(null);
+        return immediateVoidFuture();
       }
 
       ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
@@ -424,7 +424,7 @@
               () ->
                   retrier.executeAsync(
                       () -> {
-                        if (chunker.getSize() == 0) {
+                        if (chunker.getSize() == committedOffset.get()) {
                           return immediateVoidFuture();
                         }
                         try {
@@ -452,7 +452,7 @@
                         if (chunker.hasNext()) {
                           return callAndQueryOnFailure(committedOffset, progressiveBackoff);
                         }
-                        return Futures.immediateFuture(null);
+                        return immediateVoidFuture();
                       },
                       progressiveBackoff),
               callCredentialsProvider);
@@ -476,7 +476,7 @@
                 return Futures.immediateFailedFuture(new IOException(message));
               }
             }
-            return Futures.immediateFuture(null);
+            return immediateVoidFuture();
           },
           MoreExecutors.directExecutor());
     }
@@ -564,7 +564,7 @@
               progressiveBackoff.reset();
             }
             committedOffset.set(committedSize);
-            return Futures.immediateFuture(null);
+            return immediateVoidFuture();
           },
           MoreExecutors.directExecutor());
     }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 4865af2..2623921 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -1606,6 +1606,69 @@
   }
 
   @Test
+  public void failureAfterUploadCompletes() throws Exception {
+    AtomicInteger numUploads = new AtomicInteger();
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(
+            () -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            -1);
+
+    byte[] blob = new byte[CHUNK_SIZE - 1];
+    new Random().nextBytes(blob);
+
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+            numUploads.incrementAndGet();
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest writeRequest) {}
+
+              @Override
+              public void onError(Throwable throwable) {
+                fail("onError should never be called.");
+              }
+
+              @Override
+              public void onCompleted() {
+                streamObserver.onNext(
+                    WriteResponse.newBuilder().setCommittedSize(blob.length).build());
+                streamObserver.onError(Status.UNAVAILABLE.asException());
+              }
+            };
+          }
+
+          @Override
+          public void queryWriteStatus(
+              QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
+            response.onNext(
+                QueryWriteStatusResponse.newBuilder()
+                    .setCommittedSize(blob.length)
+                    .setComplete(true)
+                    .build());
+            response.onCompleted();
+          }
+        });
+
+    Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+
+    uploader.uploadBlob(context, hash, chunker, true);
+
+    blockUntilInternalStateConsistent(uploader);
+
+    assertThat(numUploads.get()).isEqualTo(1);
+  }
+
+  @Test
   public void testCompressedUploads() throws Exception {
     RemoteRetrier retrier =
         TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);