blob: 9e6ca94bb3789055fcec3ff83f7881e2280b354a [file]
// Copyright 2026 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import build.bazel.remote.execution.v2.Digest;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
import com.google.devtools.build.lib.remote.chunking.FastCdcChunker;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/** Tests for {@link ChunkedBlobUploader}. */
@RunWith(JUnit4.class)
public class ChunkedBlobUploaderTest {
private static final DigestUtil DIGEST_UTIL =
new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256);
private static final int MAX_IN_FLIGHT_CHUNK_UPLOADS = 16;
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
@Mock private GrpcCacheClient grpcCacheClient;
@Mock private CombinedCache combinedCache;
@Mock private RemoteActionExecutionContext context;
private FileSystem fs;
private Path execRoot;
private ChunkedBlobUploader uploader;
@Before
public void setUp() throws Exception {
fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
execRoot = fs.getPath("/execroot");
execRoot.createDirectoryAndParents();
ChunkingConfig config = new ChunkingConfig(1024, 2, 0);
uploader = new ChunkedBlobUploader(grpcCacheClient, combinedCache, config, DIGEST_UTIL);
}
@Test
public void getChunkingThreshold_returnsConfiguredValue() {
ChunkingConfig config = new ChunkingConfig(512, 2, 0);
ChunkedBlobUploader uploader =
new ChunkedBlobUploader(grpcCacheClient, combinedCache, config, DIGEST_UTIL);
assertThat(uploader.getChunkingThreshold()).isEqualTo(512 * 4);
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_allChunksMissing_uploadsAllChunks() throws Exception {
Path file = execRoot.getRelative("test.txt");
byte[] data = new byte[8192];
new Random(42).nextBytes(data);
writeFile(file, data);
Digest blobDigest = DIGEST_UTIL.compute(data);
ArgumentCaptor<List<Digest>> digestsCaptor = ArgumentCaptor.forClass(List.class);
when(grpcCacheClient.findMissingDigests(any(), digestsCaptor.capture()))
.thenAnswer(
invocation -> {
List<Digest> digests = invocation.getArgument(1);
return immediateFuture(ImmutableSet.copyOf(digests));
});
when(combinedCache.uploadBlob(any(), any(Digest.class), any(Blob.class)))
.thenReturn(immediateVoidFuture());
when(grpcCacheClient.spliceBlob(any(), any(), any())).thenReturn(immediateVoidFuture());
uploader.uploadChunked(context, blobDigest, file);
List<Digest> chunkDigests = digestsCaptor.getValue();
assertThat(chunkDigests.size()).isGreaterThan(1);
long totalSize = chunkDigests.stream().mapToLong(Digest::getSizeBytes).sum();
assertThat(totalSize).isEqualTo(data.length);
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_noChunksMissing_skipsChunkUpload() throws Exception {
Path file = execRoot.getRelative("test.txt");
byte[] data = new byte[8192];
new Random(42).nextBytes(data);
writeFile(file, data);
Digest blobDigest = DIGEST_UTIL.compute(data);
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.of()));
when(grpcCacheClient.spliceBlob(any(), any(), any())).thenReturn(immediateVoidFuture());
uploader.uploadChunked(context, blobDigest, file);
verify(combinedCache, never()).uploadBlob(any(), any(Digest.class), any(Blob.class));
verify(grpcCacheClient).spliceBlob(any(), eq(blobDigest), any());
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_someChunksMissing_uploadsOnlyMissingWithCorrectData() throws Exception {
Path file = execRoot.getRelative("test_partial.txt");
byte[] fileData = new byte[16384];
new Random(42).nextBytes(fileData);
writeFile(file, fileData);
Digest blobDigest = DIGEST_UTIL.compute(fileData);
ChunkingConfig config = new ChunkingConfig(1024, 2, 0);
FastCdcChunker testChunker = new FastCdcChunker(config, DIGEST_UTIL);
List<Digest> allChunkDigests;
try (InputStream input = file.getInputStream()) {
allChunkDigests = testChunker.chunkToDigests(input);
}
assertThat(allChunkDigests.size()).isAtLeast(5);
Set<Digest> digestsToReportMissing = new LinkedHashSet<>();
for (int i = 0; i < allChunkDigests.size(); i++) {
boolean isFirst = i == 0;
boolean isLast = i == allChunkDigests.size() - 1;
boolean isOdd = i % 2 == 1;
if (isFirst || isLast || isOdd) {
digestsToReportMissing.add(allChunkDigests.get(i));
}
}
Map<Digest, ByteString> expectedChunkData = new LinkedHashMap<>();
try (InputStream input = file.getInputStream()) {
for (Digest digest : allChunkDigests) {
byte[] chunkBytes = input.readNBytes((int) digest.getSizeBytes());
if (digestsToReportMissing.contains(digest)) {
expectedChunkData.put(digest, ByteString.copyFrom(chunkBytes));
}
}
}
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.copyOf(digestsToReportMissing)));
Map<Digest, ByteString> actualUploads = new HashMap<>();
when(combinedCache.uploadBlob(any(), any(Digest.class), any(Blob.class)))
.thenAnswer(
invocation -> {
Digest d = invocation.getArgument(1);
Blob blob = invocation.getArgument(2);
try (InputStream in = blob.get()) {
actualUploads.put(d, ByteString.readFrom(in));
}
return immediateVoidFuture();
});
when(grpcCacheClient.spliceBlob(any(), any(), any())).thenReturn(immediateVoidFuture());
uploader.uploadChunked(context, blobDigest, file);
assertThat(actualUploads.keySet()).isEqualTo(expectedChunkData.keySet());
for (Map.Entry<Digest, ByteString> entry : expectedChunkData.entrySet()) {
assertThat(actualUploads.get(entry.getKey())).isEqualTo(entry.getValue());
}
verify(grpcCacheClient).spliceBlob(any(), eq(blobDigest), eq(allChunkDigests));
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_windowRefillsAfterOneChunkCompletes() throws Exception {
Path file = execRoot.getRelative("test_window.txt");
byte[] data = new byte[262144];
new Random(42).nextBytes(data);
writeFile(file, data);
Digest blobDigest = DIGEST_UTIL.compute(data);
FastCdcChunker testChunker = new FastCdcChunker(new ChunkingConfig(1024, 2, 0), DIGEST_UTIL);
List<Digest> chunkDigests;
try (InputStream input = file.getInputStream()) {
chunkDigests = testChunker.chunkToDigests(input);
}
List<Digest> uniqueChunkDigests = new ArrayList<>();
Set<Digest> seen = new HashSet<>();
for (Digest chunkDigest : chunkDigests) {
if (seen.add(chunkDigest)) {
uniqueChunkDigests.add(chunkDigest);
}
if (uniqueChunkDigests.size() == MAX_IN_FLIGHT_CHUNK_UPLOADS + 1) {
break;
}
}
assertThat(uniqueChunkDigests).hasSize(MAX_IN_FLIGHT_CHUNK_UPLOADS + 1);
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.copyOf(uniqueChunkDigests)));
when(grpcCacheClient.spliceBlob(any(), any(), any())).thenReturn(immediateVoidFuture());
List<SettableFuture<Void>> uploads = new ArrayList<>(uniqueChunkDigests.size());
for (int i = 0; i < uniqueChunkDigests.size(); i++) {
uploads.add(SettableFuture.create());
}
CountDownLatch firstWindowRequested = new CountDownLatch(MAX_IN_FLIGHT_CHUNK_UPLOADS);
CountDownLatch overflowUploadRequested = new CountDownLatch(1);
when(combinedCache.uploadBlob(any(), any(Digest.class), any(Blob.class)))
.thenAnswer(
invocation -> {
Digest digest = invocation.getArgument(1);
int chunkIndex = uniqueChunkDigests.indexOf(digest);
if (chunkIndex < MAX_IN_FLIGHT_CHUNK_UPLOADS) {
firstWindowRequested.countDown();
} else if (chunkIndex == MAX_IN_FLIGHT_CHUNK_UPLOADS) {
overflowUploadRequested.countDown();
}
return uploads.get(chunkIndex);
});
Thread uploadThread =
Thread.ofVirtual()
.unstarted(
() -> {
try {
uploader.uploadChunked(context, blobDigest, file);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
uploadThread.start();
assertThat(firstWindowRequested.await(1, TimeUnit.SECONDS)).isTrue();
assertThat(overflowUploadRequested.await(100, TimeUnit.MILLISECONDS)).isFalse();
uploads.get(1).set(null);
assertThat(overflowUploadRequested.await(1, TimeUnit.SECONDS)).isTrue();
for (SettableFuture<Void> upload : uploads) {
if (!upload.isDone()) {
upload.set(null);
}
}
uploadThread.join(TimeUnit.SECONDS.toMillis(1));
assertThat(uploadThread.isAlive()).isFalse();
verify(grpcCacheClient).spliceBlob(any(), eq(blobDigest), eq(chunkDigests));
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_chunkFails_cancelsOtherInFlightUploads() throws Exception {
Path file = execRoot.getRelative("test_failure.txt");
byte[] data = new byte[16384];
new Random(42).nextBytes(data);
writeFile(file, data);
Digest blobDigest = DIGEST_UTIL.compute(data);
FastCdcChunker testChunker = new FastCdcChunker(new ChunkingConfig(1024, 2, 0), DIGEST_UTIL);
List<Digest> chunkDigests;
try (InputStream input = file.getInputStream()) {
chunkDigests = testChunker.chunkToDigests(input);
}
List<Digest> uniqueChunkDigests = new ArrayList<>();
Set<Digest> seen = new HashSet<>();
for (Digest chunkDigest : chunkDigests) {
if (seen.add(chunkDigest)) {
uniqueChunkDigests.add(chunkDigest);
}
if (uniqueChunkDigests.size() == 2) {
break;
}
}
assertThat(uniqueChunkDigests).hasSize(2);
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.copyOf(uniqueChunkDigests)));
SettableFuture<Void> failedUpload = SettableFuture.create();
SettableFuture<Void> cancelledUpload = SettableFuture.create();
CountDownLatch uploadsStarted = new CountDownLatch(2);
when(combinedCache.uploadBlob(any(), any(Digest.class), any(Blob.class)))
.thenAnswer(
invocation -> {
Digest digest = invocation.getArgument(1);
uploadsStarted.countDown();
if (digest.equals(uniqueChunkDigests.get(0))) {
return failedUpload;
}
if (digest.equals(uniqueChunkDigests.get(1))) {
return cancelledUpload;
}
return immediateVoidFuture();
});
Thread uploadThread =
Thread.ofVirtual()
.unstarted(
() -> {
try {
uploader.uploadChunked(context, blobDigest, file);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
uploadThread.start();
assertThat(uploadsStarted.await(1, TimeUnit.SECONDS)).isTrue();
failedUpload.setException(new IOException("upload failed"));
uploadThread.join(TimeUnit.SECONDS.toMillis(1));
assertThat(uploadThread.isAlive()).isFalse();
assertThat(cancelledUpload.isCancelled()).isTrue();
verify(grpcCacheClient, never()).spliceBlob(any(), any(), any());
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_cancelledUpload_throwsInterruptedException() throws Exception {
Path file = execRoot.getRelative("test_cancelled.txt");
byte[] data = new byte[8192];
new Random(42).nextBytes(data);
writeFile(file, data);
Digest blobDigest = DIGEST_UTIL.compute(data);
FastCdcChunker testChunker = new FastCdcChunker(new ChunkingConfig(1024, 2, 0), DIGEST_UTIL);
List<Digest> chunkDigests;
try (InputStream input = file.getInputStream()) {
chunkDigests = testChunker.chunkToDigests(input);
}
Digest firstChunkDigest = chunkDigests.get(0);
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.of(firstChunkDigest)));
SettableFuture<Void> cancelledUpload = SettableFuture.create();
cancelledUpload.cancel(/* mayInterruptIfRunning= */ true);
when(combinedCache.uploadBlob(any(), eq(firstChunkDigest), any(Blob.class)))
.thenReturn(cancelledUpload);
assertThrows(
InterruptedException.class, () -> uploader.uploadChunked(context, blobDigest, file));
verify(grpcCacheClient, never()).spliceBlob(any(), any(), any());
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_failedUploadDuringPendingChunks_surfacesBeforeOpeningChunkStream()
throws Exception {
byte[] data = new byte[16384];
new Random(42).nextBytes(data);
Digest blobDigest = DIGEST_UTIL.compute(data);
FastCdcChunker testChunker = new FastCdcChunker(new ChunkingConfig(1024, 2, 0), DIGEST_UTIL);
List<Digest> chunkDigests;
try (InputStream input = new ByteArrayInputStream(data)) {
chunkDigests = testChunker.chunkToDigests(input);
}
assertThat(chunkDigests.size()).isAtLeast(2);
Path file = mock(Path.class);
when(file.getInputStream()).thenReturn(new ByteArrayInputStream(data));
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.of(chunkDigests.get(0))));
SettableFuture<Void> failedUpload = SettableFuture.create();
failedUpload.setException(new IOException("upload failed"));
when(combinedCache.uploadBlob(any(), eq(chunkDigests.get(0)), any(Blob.class)))
.thenReturn(failedUpload);
Thread uploadThread =
Thread.ofVirtual()
.unstarted(
() -> {
try {
uploader.uploadChunked(context, blobDigest, file);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
uploadThread.start();
uploadThread.join(TimeUnit.SECONDS.toMillis(1));
assertThat(uploadThread.isAlive()).isFalse();
verify(file, times(1)).getInputStream();
verify(grpcCacheClient, never()).spliceBlob(any(), any(), any());
}
@Test
@SuppressWarnings("unchecked")
public void uploadChunked_fileTruncatedBeforeChunkUpload_reportsConcurrentModification()
throws Exception {
byte[] data = new byte[8192];
new Random(42).nextBytes(data);
Digest blobDigest = DIGEST_UTIL.compute(data);
FastCdcChunker testChunker = new FastCdcChunker(new ChunkingConfig(1024, 2, 0), DIGEST_UTIL);
List<Digest> chunkDigests;
try (InputStream input = new ByteArrayInputStream(data)) {
chunkDigests = testChunker.chunkToDigests(input);
}
assertThat(chunkDigests.size()).isAtLeast(2);
Digest secondChunkDigest = chunkDigests.get(1);
Path file = mock(Path.class);
when(file.getInputStream())
.thenReturn(new ByteArrayInputStream(data), new ByteArrayInputStream(new byte[0]));
when(grpcCacheClient.findMissingDigests(any(), any()))
.thenReturn(immediateFuture(ImmutableSet.of(secondChunkDigest)));
when(combinedCache.uploadBlob(any(), eq(secondChunkDigest), any(Blob.class)))
.thenAnswer(
invocation -> {
Blob blob = invocation.getArgument(2);
try (InputStream in = blob.get()) {
ByteString unused = ByteString.readFrom(in);
}
return immediateVoidFuture();
});
IOException e =
assertThrows(IOException.class, () -> uploader.uploadChunked(context, blobDigest, file));
assertThat(e).hasMessageThat().contains("file was concurrently modified during upload");
assertThat(e).hasCauseThat().isInstanceOf(EOFException.class);
verify(grpcCacheClient, never()).spliceBlob(any(), any(), any());
}
private void writeFile(Path path, byte[] data) throws IOException {
try (var out = path.getOutputStream()) {
out.write(data);
}
}
}