Blobs are only uploaded when the upload succeeds
Avoid recording that a blob has been uploaded when an upload has failed
Closes #7019.
PiperOrigin-RevId: 228560183
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 303cfdd..b2e5f56 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -45,6 +45,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -602,6 +603,83 @@
}
@Test
+ public void failedUploadsShouldNotDeduplicate() throws Exception {
+ Context prevContext = withEmptyMetadata.attach();
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ () -> Retrier.RETRIES_DISABLED, (e) -> false, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 3, retrier);
+
+ byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
+ new Random().nextBytes(blob);
+
+ Chunker chunker = Chunker.builder(DIGEST_UTIL).setInput(blob).setChunkSize(CHUNK_SIZE).build();
+
+ AtomicInteger numUploads = new AtomicInteger();
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ boolean failRequest = true;
+
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ numUploads.incrementAndGet();
+ return new StreamObserver<WriteRequest>() {
+ long nextOffset = 0;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ if (failRequest) {
+ streamObserver.onError(Status.UNKNOWN.asException());
+ failRequest = false;
+ } else {
+ nextOffset += writeRequest.getData().size();
+ boolean lastWrite = blob.length == nextOffset;
+ assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ assertThat(nextOffset).isEqualTo(blob.length);
+
+ WriteResponse response =
+ WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+ };
+ }
+ });
+
+ StatusRuntimeException expected = null;
+ try {
+ // This should fail
+ uploader.uploadBlob(chunker, true);
+ } catch (RetryException e) {
+ if (e.getCause() instanceof StatusRuntimeException) {
+ expected = (StatusRuntimeException) e.getCause();
+ }
+ }
+ assertThat(expected).isNotNull();
+ assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
+ // This should trigger an upload.
+ uploader.uploadBlob(chunker, false);
+
+ assertThat(numUploads.get()).isEqualTo(2);
+
+ blockUntilInternalStateConsistent(uploader);
+
+ withEmptyMetadata.detach(prevContext);
+ }
+
+ @Test
public void deduplicationOfUploadsShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =