Only request grpc write when not complete
If a queryWriteStatus yields a committedSize which leaves no content
remaining to be uploaded, immediately succeed a blob upload. This can
easily occur if a competing blob write completes asynchronously between
abnormal write termination and a query.
Closes #10284.
PiperOrigin-RevId: 281944912
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 91f9f50..90c9e45 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -353,7 +353,12 @@
AtomicLong committedOffset = new AtomicLong(0);
return Futures.transformAsync(
retrier.executeAsync(
- () -> ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)),
+ () -> {
+ if (committedOffset.get() < chunker.getSize()) {
+ return ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
+ }
+ return Futures.immediateFuture(null);
+ },
progressiveBackoff),
(result) -> {
long committedSize = committedOffset.get();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index cda7a3a..8904e59 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -218,7 +218,7 @@
return new Chunk(blob, offsetBefore);
}
- private long bytesLeft() {
+ public long bytesLeft() {
return getSize() - getOffset();
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index cd28f56..2b47d94 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -334,6 +334,65 @@
}
@Test
+ public void concurrentlyCompletedUploadIsNotRetried() throws Exception {
+ // Test that after an upload has failed and the QueryWriteStatus call returns
+ // that the upload has completed that we'll not retry the upload.
+ Context prevContext = withEmptyMetadata.attach();
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 1, retrier);
+
+ byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
+ new Random().nextBytes(blob);
+
+ Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+
+ AtomicInteger numWriteCalls = new AtomicInteger(0);
+
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ numWriteCalls.getAndIncrement();
+ streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest writeRequest) {}
+
+ @Override
+ public void onError(Throwable throwable) {}
+
+ @Override
+ public void onCompleted() {}
+ };
+ }
+
+ @Override
+ public void queryWriteStatus(
+ QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
+ response.onNext(
+ QueryWriteStatusResponse.newBuilder()
+ .setCommittedSize(blob.length)
+ .setComplete(true)
+ .build());
+ response.onCompleted();
+ }
+ });
+
+ uploader.uploadBlob(hash, chunker, true);
+
+ // This test should not have triggered any retries.
+ assertThat(numWriteCalls.get()).isEqualTo(1);
+
+ blockUntilInternalStateConsistent(uploader);
+
+ withEmptyMetadata.detach(prevContext);
+ }
+
+ @Test
public void unimplementedQueryShouldRestartUpload() throws Exception {
Context prevContext = withEmptyMetadata.attach();
Mockito.when(mockBackoff.getRetryAttempts()).thenReturn(0);