Stop remote blob upload if upload is complete.
If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0136a83d303d4d88373d6e510f85a81fbb for uploads.
On bazel-6.0.0-pre.20211117.1, I observed:
```
java.lang.NullPointerException
at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156)
at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416)
at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277)
at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
```
Closes #14464.
PiperOrigin-RevId: 417795715
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index e409fdf..d933223 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -253,7 +253,7 @@
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) {
- return Futures.immediateFuture(null);
+ return immediateVoidFuture();
}
ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
@@ -424,7 +424,7 @@
() ->
retrier.executeAsync(
() -> {
- if (chunker.getSize() == 0) {
+ if (chunker.getSize() == committedOffset.get()) {
return immediateVoidFuture();
}
try {
@@ -452,7 +452,7 @@
if (chunker.hasNext()) {
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
- return Futures.immediateFuture(null);
+ return immediateVoidFuture();
},
progressiveBackoff),
callCredentialsProvider);
@@ -476,7 +476,7 @@
return Futures.immediateFailedFuture(new IOException(message));
}
}
- return Futures.immediateFuture(null);
+ return immediateVoidFuture();
},
MoreExecutors.directExecutor());
}
@@ -564,7 +564,7 @@
progressiveBackoff.reset();
}
committedOffset.set(committedSize);
- return Futures.immediateFuture(null);
+ return immediateVoidFuture();
},
MoreExecutors.directExecutor());
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 4865af2..2623921 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -1606,6 +1606,69 @@
}
@Test
+ public void failureAfterUploadCompletes() throws Exception {
+ AtomicInteger numUploads = new AtomicInteger();
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(
+ () -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ -1);
+
+ byte[] blob = new byte[CHUNK_SIZE - 1];
+ new Random().nextBytes(blob);
+
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ numUploads.incrementAndGet();
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest writeRequest) {}
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ streamObserver.onNext(
+ WriteResponse.newBuilder().setCommittedSize(blob.length).build());
+ streamObserver.onError(Status.UNAVAILABLE.asException());
+ }
+ };
+ }
+
+ @Override
+ public void queryWriteStatus(
+ QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
+ response.onNext(
+ QueryWriteStatusResponse.newBuilder()
+ .setCommittedSize(blob.length)
+ .setComplete(true)
+ .build());
+ response.onCompleted();
+ }
+ });
+
+ Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+
+ uploader.uploadBlob(context, hash, chunker, true);
+
+ blockUntilInternalStateConsistent(uploader);
+
+ assertThat(numUploads.get()).isEqualTo(1);
+ }
+
+ @Test
public void testCompressedUploads() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);