Ensure Chunkers are always reset (closed). Chunkers hold open files and memory. Normally, they are closed when the input is fully read. However, we want to make sure resources are freed in error cases, too. Closes #15416. PiperOrigin-RevId: 448212195
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 6f7c442..b54b6d2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -450,7 +450,7 @@ @Override public ListenableFuture<Void> uploadFile( RemoteActionExecutionContext context, Digest digest, Path path) { - return uploader.uploadBlobAsync( + return uploadChunker( context, digest, Chunker.builder() @@ -462,7 +462,7 @@ @Override public ListenableFuture<Void> uploadBlob( RemoteActionExecutionContext context, Digest digest, ByteString data) { - return uploader.uploadBlobAsync( + return uploadChunker( context, digest, Chunker.builder() @@ -470,4 +470,20 @@ .setCompressed(options.cacheCompression) .build()); } + + ListenableFuture<Void> uploadChunker( + RemoteActionExecutionContext context, Digest digest, Chunker chunker) { + ListenableFuture<Void> f = uploader.uploadBlobAsync(context, digest, chunker); + f.addListener( + () -> { + try { + chunker.reset(); + } catch (IOException e) { + logger.atWarning().withCause(e).log( + "failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); + } + }, + MoreExecutors.directExecutor()); + return f; + } }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 735d24e..3839ed4 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -70,10 +70,12 @@ import io.grpc.Status; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -183,6 +185,56 @@ } @Test + public void testChunkerResetAfterError() throws Exception { + // arrange + GrpcCacheClient client = newClient(); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver<WriteRequest> write( + StreamObserver<WriteResponse> responseObserver) { + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + responseObserver.onError(Status.DATA_LOSS.asRuntimeException()); + } + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) {} + }; + } + }); + byte[] data = new byte[20]; + Digest digest = DIGEST_UTIL.compute(data); + AtomicBoolean closed = new AtomicBoolean(); + Chunker chunker = + new Chunker( + () -> + new ByteArrayInputStream(data) { + + @Override + public void close() throws IOException { + super.close(); + closed.set(true); + } + }, + data.length, + 2, + false); + + // act + Throwable t = + assertThrows(ExecutionException.class, client.uploadChunker(context, digest, chunker)::get); + + // assert + assertThat(Status.fromThrowable(t.getCause()).getCode()).isEqualTo(Status.Code.DATA_LOSS); + assertThat(closed.get()).isTrue(); + } + + @Test public void testDownloadEmptyBlob() throws Exception { GrpcCacheClient client = newClient(); Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);