blob: 1017f8f02d48ae537f6b4efd9b311331507f56bc [file]
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.CacheCapabilities;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.FastCdc2020Params;
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.common.collect.ImmutableClassToInstanceMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactRoot;
import com.google.devtools.build.lib.actions.ArtifactRoot.RootType;
import com.google.devtools.build.lib.actions.ResourceSet;
import com.google.devtools.build.lib.actions.SimpleSpawn;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.Order;
import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.exec.util.FakeOwner;
import com.google.devtools.build.lib.exec.util.SpawnBuilder;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTreeComputer;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.FakeSpawnExecutionContext;
import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
import com.google.devtools.build.lib.remote.util.RxNoGlobalErrorsRule;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.testutil.TestConstants;
import com.google.devtools.build.lib.testutil.TestUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Deque;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.MockitoAnnotations;
/** Tests for {@link CombinedCache}. */
@RunWith(JUnit4.class)
public class CombinedCacheTest {
@Rule public final RxNoGlobalErrorsRule rxNoGlobalErrorsRule = new RxNoGlobalErrorsRule();
private RequestMetadata metadata;
private RemoteActionExecutionContext remoteActionExecutionContext;
private FileSystem fs;
private Path execRoot;
ArtifactRoot artifactRoot;
private final DigestUtil digestUtil =
new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256);
private final MerkleTreeComputer merkleTreeComputer =
new MerkleTreeComputer(
digestUtil,
/* remoteExecutionCache= */ null,
"buildRequestId",
"commandId",
TestConstants.WORKSPACE_NAME);
private FakeActionInputFileCache fakeFileCache;
private ListeningScheduledExecutorService retryService;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
metadata = TracingMetadataUtils.buildMetadata("none", "none", "action-id", null);
Spawn spawn =
new SimpleSpawn(
new FakeOwner("foo", "bar", "//dummy:label"),
/* arguments= */ ImmutableList.of(),
/* environment= */ ImmutableMap.of(),
/* executionInfo= */ ImmutableMap.of(),
/* inputs= */ NestedSetBuilder.emptySet(Order.STABLE_ORDER),
/* outputs= */ ImmutableSet.of(),
ResourceSet.ZERO);
SpawnExecutionContext spawnExecutionContext = mock(SpawnExecutionContext.class);
remoteActionExecutionContext =
RemoteActionExecutionContext.create(spawn, spawnExecutionContext, metadata);
fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
execRoot = fs.getPath("/execroot/main");
execRoot.createDirectoryAndParents();
fakeFileCache = new FakeActionInputFileCache(execRoot);
artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, RootType.OUTPUT, "outputs");
artifactRoot.getRoot().asPath().createDirectoryAndParents();
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
}
@After
public void afterEverything() throws InterruptedException {
retryService.shutdownNow();
retryService.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
@Test
public void testDownloadEmptyBlobAndFile() throws Exception {
// Test that downloading an empty BLOB/file does not try to perform a download.
// arrange
Path file = fs.getPath("/execroot/file");
InMemoryCombinedCache combinedCache = newCombinedCache();
Digest emptyDigest = digestUtil.compute(new byte[0]);
// act and assert
assertThat(getFromFuture(combinedCache.downloadBlob(remoteActionExecutionContext, emptyDigest)))
.isEmpty();
try (OutputStream out = file.getOutputStream()) {
getFromFuture(combinedCache.downloadFile(remoteActionExecutionContext, file, emptyDigest));
}
assertThat(file.exists()).isTrue();
assertThat(file.getFileSize()).isEqualTo(0);
}
@Test
public void downloadActionResult_reportsSpawnCheckingCacheEvent() throws Exception {
var combinedCache = newCombinedCache();
var unused =
combinedCache.downloadActionResult(
remoteActionExecutionContext,
digestUtil.asActionKey(digestUtil.computeAsUtf8("key")),
/* inlineOutErr= */ false,
/* inlineOutputFiles= */ ImmutableSet.of());
verify(remoteActionExecutionContext.getSpawnExecutionContext())
.report(SpawnCheckingCacheEvent.create("remote-cache"));
}
@Test
public void downloadFile_cancelled_cancelDownload() throws Exception {
// Test that if a download future is cancelled, the download itself is also cancelled.
// arrange
RemoteCacheClient remoteCacheClient = mock(RemoteCacheClient.class);
SettableFuture<Void> future = SettableFuture.create();
// Return a future that never completes
doAnswer(invocationOnMock -> future).when(remoteCacheClient).downloadBlob(any(), any(), any());
CombinedCache remoteCache = newCombinedCache(remoteCacheClient);
Digest digest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "content");
Path file = execRoot.getRelative("file");
// act
ListenableFuture<Void> download =
remoteCache.downloadFile(remoteActionExecutionContext, file, digest);
download.cancel(/* mayInterruptIfRunning= */ true);
// assert
assertThat(future.isCancelled()).isTrue();
}
@Test
public void downloadOutErr_empty_doNotPerformDownload() throws Exception {
// Test that downloading empty stdout/stderr does not try to perform a download.
InMemoryCombinedCache combinedCache = newCombinedCache();
Digest emptyDigest = digestUtil.compute(new byte[0]);
ActionResult.Builder result = ActionResult.newBuilder();
result.setStdoutDigest(emptyDigest);
result.setStderrDigest(emptyDigest);
waitForBulkTransfer(
combinedCache.downloadOutErr(
remoteActionExecutionContext,
result.build(),
new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr"))));
assertThat(combinedCache.getNumSuccessfulDownloads()).isEqualTo(0);
assertThat(combinedCache.getNumFailedDownloads()).isEqualTo(0);
}
@Test
public void testDownloadFileWithSymlinkTemplate() throws Exception {
// Test that when a symlink template is provided, we don't actually download files to disk.
// Instead, a symbolic link should be created that points to a location where the file may
// actually be found. That location could, for example, be backed by a FUSE file system that
// exposes the Content Addressable Storage.
// arrange
final ConcurrentMap<Digest, byte[]> cas = new ConcurrentHashMap<>();
Digest helloDigest = digestUtil.computeAsUtf8("hello-contents");
cas.put(helloDigest, "hello-contents".getBytes(UTF_8));
Path file = fs.getPath("/execroot/symlink-to-file");
InMemoryCombinedCache combinedCache =
newCombinedCache(cas, digestUtil, "/home/alice/cas/{hash}-{size_bytes}");
// act
getFromFuture(combinedCache.downloadFile(remoteActionExecutionContext, file, helloDigest));
// assert
assertThat(file.isSymbolicLink()).isTrue();
assertThat(file.readSymbolicLink())
.isEqualTo(
PathFragment.create(
"/home/alice/cas/a378b939ad2e1d470a9a28b34b0e256b189e85cb236766edc1d46ec3b6ca82e5-14"));
}
@Test
public void upload_emptyBlobAndFile_doNotPerformUpload() throws Exception {
// Test that uploading an empty BLOB/file does not try to perform an upload.
InMemoryCombinedCache combinedCache = newCombinedCache();
Digest emptyDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "");
Path file = execRoot.getRelative("file");
getFromFuture(
combinedCache.uploadBlob(remoteActionExecutionContext, emptyDigest, ByteString.EMPTY));
assertThat(
getFromFuture(
combinedCache.findMissingDigests(
remoteActionExecutionContext, ImmutableSet.of(emptyDigest))))
.containsExactly(emptyDigest);
getFromFuture(combinedCache.uploadFile(remoteActionExecutionContext, emptyDigest, file));
assertThat(
getFromFuture(
combinedCache.findMissingDigests(
remoteActionExecutionContext, ImmutableSet.of(emptyDigest))))
.containsExactly(emptyDigest);
}
@Test
@SuppressWarnings("FutureReturnValueIgnored")
public void upload_deduplicationWorks() throws IOException {
RemoteCacheClient remoteCacheClient = spy(new InMemoryCacheClient());
AtomicInteger times = new AtomicInteger(0);
doAnswer(
invocationOnMock -> {
times.incrementAndGet();
return SettableFuture.create();
})
.when(remoteCacheClient)
.uploadBlobImpl(any(), any(), any());
CombinedCache combinedCache = newCombinedCache(remoteCacheClient);
Digest digest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "content");
Path file = execRoot.getRelative("file");
var unused1 = combinedCache.uploadFile(remoteActionExecutionContext, digest, file);
var unused2 = combinedCache.uploadFile(remoteActionExecutionContext, digest, file);
assertThat(times.get()).isEqualTo(1);
}
@Test
public void upload_failedUploads_doNotDeduplicate() throws Exception {
AtomicBoolean failRequest = new AtomicBoolean(true);
RemoteCacheClient remoteCacheClient = spy(new InMemoryCacheClient());
doAnswer(
invocationOnMock -> {
if (failRequest.getAndSet(false)) {
return Futures.immediateFailedFuture(new IOException("Failed"));
}
return invocationOnMock.callRealMethod();
})
.when(remoteCacheClient)
.uploadBlobImpl(any(), any(), any());
CombinedCache combinedCache = newCombinedCache(remoteCacheClient);
Digest digest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "content");
Path file = execRoot.getRelative("file");
assertThat(
getFromFuture(
combinedCache.findMissingDigests(
remoteActionExecutionContext, ImmutableList.of(digest))))
.containsExactly(digest);
Exception thrown = null;
try {
getFromFuture(combinedCache.uploadFile(remoteActionExecutionContext, digest, file));
} catch (IOException e) {
thrown = e;
}
assertThat(thrown).isNotNull();
assertThat(thrown).isInstanceOf(IOException.class);
getFromFuture(combinedCache.uploadFile(remoteActionExecutionContext, digest, file));
assertThat(
getFromFuture(
combinedCache.findMissingDigests(
remoteActionExecutionContext, ImmutableList.of(digest))))
.isEmpty();
}
@Test
public void ensureInputsPresent_missingInputs_exceptionHasLostInputs() throws Exception {
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));
remoteActionExecutionContext = RemoteActionExecutionContext.create(metadata);
remoteCache.setRemotePathChecker(
(context, path) ->
immediateFuture(!path.relativeTo(execRoot).equals(PathFragment.create("foo"))));
Path path = execRoot.getRelative("foo");
FileSystemUtils.writeContentAsLatin1(path, "bar");
SortedMap<PathFragment, Path> inputs = new TreeMap<>();
inputs.put(PathFragment.create("foo"), path);
var merkleTree = merkleTreeComputer.buildForFiles(inputs);
path.delete();
var e =
assertThrows(
BulkTransferException.class,
() ->
remoteCache.ensureInputsPresent(
remoteActionExecutionContext,
merkleTree,
ImmutableMap.of(),
false,
new RemotePathResolver.DefaultRemotePathResolver(execRoot)));
assertThat(e.getLostArtifacts(ActionInputHelper::fromPath).byDigest())
.containsExactly(
DigestUtil.toString(digestUtil.computeAsUtf8("bar")),
ActionInputHelper.fromPath("foo"));
}
@Test
public void ensureInputsPresent_sharedMissingDigest_exceptionsHaveOwnLostInputs()
throws Exception {
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));
CountDownLatch findMissingDigestsCalls = new CountDownLatch(2);
doAnswer(
invocationOnMock -> {
findMissingDigestsCalls.countDown();
return invocationOnMock.callRealMethod();
})
.when(cacheProtocol)
.findMissingDigests(any(), any());
SettableFuture<Boolean> missingInputAvailable = SettableFuture.create();
CountDownLatch remotePathChecked = new CountDownLatch(1);
remoteCache.setRemotePathChecker(
(context, path) -> {
PathFragment execPath = path.relativeTo(execRoot);
if (execPath.equals(PathFragment.create("outputs/foo"))
|| execPath.equals(PathFragment.create("outputs/bar"))) {
remotePathChecked.countDown();
return missingInputAvailable;
}
return immediateFuture(true);
});
Artifact foo = ActionsTestUtil.createArtifact(artifactRoot, "foo");
Artifact bar = ActionsTestUtil.createArtifact(artifactRoot, "bar");
Digest digest = fakeFileCache.createScratchInput(foo, "same");
assertThat(fakeFileCache.createScratchInput(bar, "same")).isEqualTo(digest);
Spawn fooSpawn = new SpawnBuilder().withInput(foo).build();
var fooContext =
new FakeSpawnExecutionContext(
fooSpawn,
fakeFileCache,
execRoot,
new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")),
ImmutableClassToInstanceMap.of(),
/* actionFileSystem= */ null);
var fooRemoteContext = RemoteActionExecutionContext.create(fooSpawn, fooContext, metadata);
var fooTree =
(MerkleTree.Uploadable)
merkleTreeComputer.buildForSpawn(
fooSpawn,
ImmutableSet.of(),
/* scrubber= */ null,
fooContext,
RemotePathResolver.createDefault(execRoot),
MerkleTreeComputer.BlobPolicy.KEEP_AND_REUPLOAD);
Spawn barSpawn = new SpawnBuilder().withInput(bar).build();
var barContext =
new FakeSpawnExecutionContext(
barSpawn,
fakeFileCache,
execRoot,
new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")),
ImmutableClassToInstanceMap.of(),
/* actionFileSystem= */ null);
var barRemoteContext = RemoteActionExecutionContext.create(barSpawn, barContext, metadata);
var barTree =
(MerkleTree.Uploadable)
merkleTreeComputer.buildForSpawn(
barSpawn,
ImmutableSet.of(),
/* scrubber= */ null,
barContext,
RemotePathResolver.createDefault(execRoot),
MerkleTreeComputer.BlobPolicy.KEEP_AND_REUPLOAD);
var fooFailure = new AtomicReference<Throwable>();
Thread fooThread =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(
fooRemoteContext,
fooTree,
ImmutableMap.of(),
/* force= */ false,
RemotePathResolver.createDefault(execRoot));
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
fooFailure.set(t);
}
});
fooThread.start();
assertThat(remotePathChecked.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue();
var barFailure = new AtomicReference<Throwable>();
Thread barThread =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(
barRemoteContext,
barTree,
ImmutableMap.of(),
/* force= */ false,
RemotePathResolver.createDefault(execRoot));
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
barFailure.set(t);
}
});
barThread.start();
assertThat(findMissingDigestsCalls.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.isTrue();
missingInputAvailable.set(false);
fooThread.join();
barThread.join();
assertThat(fooFailure.get()).isInstanceOf(BulkTransferException.class);
assertThat(
((BulkTransferException) fooFailure.get())
.getLostArtifacts(execPath -> execPath.equals(foo.getExecPath()) ? foo : null)
.byDigest())
.containsExactly(DigestUtil.toString(digest), foo);
assertThat(barFailure.get()).isInstanceOf(BulkTransferException.class);
assertThat(
((BulkTransferException) barFailure.get())
.getLostArtifacts(execPath -> execPath.equals(bar.getExecPath()) ? bar : null)
.byDigest())
.containsExactly(DigestUtil.toString(digest), bar);
}
@Test
public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUploadTasks()
throws Exception {
// arrange
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));
remoteActionExecutionContext = RemoteActionExecutionContext.create(metadata);
Deque<SettableFuture<Void>> futures = new ConcurrentLinkedDeque<>();
CountDownLatch uploadBlobCalls = new CountDownLatch(2);
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
futures.add(future);
uploadBlobCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), (Blob) any());
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
futures.add(future);
uploadBlobCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), any());
Path path = execRoot.getRelative("foo");
FileSystemUtils.writeContentAsLatin1(path, "bar");
SortedMap<PathFragment, Path> inputs = new TreeMap<>();
inputs.put(PathFragment.create("foo"), path);
var merkleTree = merkleTreeComputer.buildForFiles(inputs);
CountDownLatch ensureInputsPresentReturned = new CountDownLatch(1);
Thread thread =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(
remoteActionExecutionContext,
merkleTree,
ImmutableMap.of(),
false,
/* remotePathResolver= */ null);
} catch (IOException | InterruptedException ignored) {
// ignored
} finally {
ensureInputsPresentReturned.countDown();
}
});
// act
thread.start();
uploadBlobCalls.await();
assertThat(futures).hasSize(2);
assertThat(cacheProtocol.getInProgressUploads()).isNotEmpty();
thread.interrupt();
ensureInputsPresentReturned.await();
// assert
assertThat(cacheProtocol.getInProgressUploads()).isEmpty();
assertThat(cacheProtocol.getFinishedUploads()).isEmpty();
for (SettableFuture<Void> future : futures) {
assertThat(future.isCancelled()).isTrue();
}
}
@Test
public void
ensureInputsPresent_multipleConsumers_interruptedOneDuringFindMissingBlobs_keepAndFinishInProgressUploadTasks()
throws Exception {
// arrange
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = newRemoteExecutionCache(cacheProtocol);
remoteActionExecutionContext = RemoteActionExecutionContext.create(metadata);
SettableFuture<ImmutableSet<Digest>> findMissingDigestsFuture = SettableFuture.create();
CountDownLatch findMissingDigestsCalled = new CountDownLatch(1);
doAnswer(
invocationOnMock -> {
findMissingDigestsCalled.countDown();
return findMissingDigestsFuture;
})
.when(cacheProtocol)
.findMissingDigests(any(), any());
Deque<SettableFuture<Void>> futures = new ConcurrentLinkedDeque<>();
CountDownLatch uploadBlobCalls = new CountDownLatch(2);
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
futures.add(future);
uploadBlobCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), (Blob) any());
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
futures.add(future);
uploadBlobCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), any());
Path path = execRoot.getRelative("foo");
FileSystemUtils.writeContentAsLatin1(path, "bar");
SortedMap<PathFragment, Path> inputs = new TreeMap<>();
inputs.put(PathFragment.create("foo"), path);
var merkleTree = merkleTreeComputer.buildForFiles(inputs);
CountDownLatch ensureInputsPresentReturned = new CountDownLatch(2);
CountDownLatch ensureInterrupted = new CountDownLatch(1);
Runnable work =
() -> {
try {
remoteCache.ensureInputsPresent(
remoteActionExecutionContext,
merkleTree,
ImmutableMap.of(),
false,
/* remotePathResolver= */ null);
} catch (IOException ignored) {
// ignored
} catch (InterruptedException e) {
ensureInterrupted.countDown();
} finally {
ensureInputsPresentReturned.countDown();
}
};
Thread thread1 = new Thread(work);
Thread thread2 = new Thread(work);
thread1.start();
thread2.start();
findMissingDigestsCalled.await();
// act
thread1.interrupt();
ensureInterrupted.await();
findMissingDigestsFuture.set(ImmutableSet.copyOf(merkleTree.allDigests()));
uploadBlobCalls.await();
assertThat(futures).hasSize(2);
// assert
assertThat(cacheProtocol.getInProgressUploads()).hasSize(2);
assertThat(cacheProtocol.getFinishedUploads()).isEmpty();
for (SettableFuture<Void> future : futures) {
assertThat(future.isCancelled()).isFalse();
}
for (SettableFuture<Void> future : futures) {
future.set(null);
}
ensureInputsPresentReturned.await();
assertThat(cacheProtocol.getInProgressUploads()).isEmpty();
assertThat(cacheProtocol.getFinishedUploads()).hasSize(2);
}
@Test
public void
ensureInputsPresent_multipleConsumers_interruptedOneDuringUploadBlobs_keepInProgressUploadTasks()
throws Exception {
// arrange
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));
remoteActionExecutionContext = RemoteActionExecutionContext.create(metadata);
Map<Digest, SettableFuture<Void>> uploadFutures = Maps.newConcurrentMap();
// 3 unique file digests + 2 unique directory blob digests = 5 uploads total.
CountDownLatch uploadCalls = new CountDownLatch(5);
doAnswer(
invocationOnMock -> {
Digest digest = invocationOnMock.getArgument(1, Digest.class);
SettableFuture<Void> future = SettableFuture.create();
uploadFutures.put(digest, future);
uploadCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), (Blob) any());
Path foo = execRoot.getRelative("foo");
FileSystemUtils.writeContentAsLatin1(foo, "foo");
Path bar = execRoot.getRelative("bar");
FileSystemUtils.writeContentAsLatin1(bar, "bar");
Path qux = execRoot.getRelative("qux");
FileSystemUtils.writeContentAsLatin1(qux, "qux");
Digest fooDigest = digestUtil.computeAsUtf8("foo");
Digest barDigest = digestUtil.computeAsUtf8("bar");
Digest quxDigest = digestUtil.computeAsUtf8("qux");
SortedMap<PathFragment, Path> input1 = new TreeMap<>();
input1.put(PathFragment.create("foo"), foo);
input1.put(PathFragment.create("bar"), bar);
var merkleTree1 = merkleTreeComputer.buildForFiles(input1);
SortedMap<PathFragment, Path> input2 = new TreeMap<>();
input2.put(PathFragment.create("bar"), bar);
input2.put(PathFragment.create("qux"), qux);
var merkleTree2 = merkleTreeComputer.buildForFiles(input2);
CountDownLatch ensureInputsPresentReturned = new CountDownLatch(2);
CountDownLatch ensureInterrupted = new CountDownLatch(1);
Thread thread1 =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(
remoteActionExecutionContext,
merkleTree1,
ImmutableMap.of(),
false,
/* remotePathResolver= */ null);
} catch (IOException ignored) {
// ignored
} catch (InterruptedException e) {
ensureInterrupted.countDown();
} finally {
ensureInputsPresentReturned.countDown();
}
});
Thread thread2 =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(
remoteActionExecutionContext,
merkleTree2,
ImmutableMap.of(),
false,
/* remotePathResolver= */ null);
} catch (InterruptedException | IOException ignored) {
// ignored
} finally {
ensureInputsPresentReturned.countDown();
}
});
// act
thread1.start();
thread2.start();
uploadCalls.await();
assertThat(uploadFutures).hasSize(5);
assertThat(cacheProtocol.getInProgressUploads()).hasSize(5);
thread1.interrupt();
ensureInterrupted.await();
// assert
assertThat(cacheProtocol.getInProgressUploads()).hasSize(3);
assertThat(cacheProtocol.getFinishedUploads()).isEmpty();
// foo is only in tree1, so interrupting thread1 cancels it; bar is shared and qux is only in
// tree2, so both are kept.
assertThat(uploadFutures.get(fooDigest).isCancelled()).isTrue();
assertThat(uploadFutures.get(barDigest).isCancelled()).isFalse();
assertThat(uploadFutures.get(quxDigest).isCancelled()).isFalse();
for (SettableFuture<Void> future : uploadFutures.values()) {
future.set(null);
}
ensureInputsPresentReturned.await();
assertThat(cacheProtocol.getInProgressUploads()).isEmpty();
assertThat(cacheProtocol.getFinishedUploads()).hasSize(3);
}
@Test
public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception {
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
remoteActionExecutionContext = RemoteActionExecutionContext.create(metadata);
doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed")))
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), (Blob) any());
doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed")))
.when(cacheProtocol)
.uploadBlobImpl(any(), any(), any());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));
Path path = execRoot.getRelative("foo");
FileSystemUtils.writeContentAsLatin1(path, "bar");
SortedMap<PathFragment, Path> inputs = ImmutableSortedMap.of(PathFragment.create("foo"), path);
var merkleTree = merkleTreeComputer.buildForFiles(inputs);
IOException e =
assertThrows(
IOException.class,
() ->
remoteCache.ensureInputsPresent(
remoteActionExecutionContext,
merkleTree,
ImmutableMap.of(),
false,
/* remotePathResolver= */ null));
assertThat(e).hasMessageThat().contains("upload failed");
}
@Test
public void shutdownNow_cancelInProgressUploads() throws Exception {
RemoteCacheClient remoteCacheClient = spy(new InMemoryCacheClient());
// Return a future that never completes
doAnswer(invocationOnMock -> SettableFuture.create())
.when(remoteCacheClient)
.uploadBlobImpl(any(), any(), any());
CombinedCache combinedCache = newCombinedCache(remoteCacheClient);
Digest digest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "content");
Path file = execRoot.getRelative("file");
ListenableFuture<Void> upload =
combinedCache.uploadFile(remoteActionExecutionContext, digest, file);
assertThat(remoteCacheClient.getInProgressUploads()).contains(digest);
combinedCache.shutdownNow();
assertThat(upload.isCancelled()).isTrue();
}
@Test
public void uploadFile_chunkedUpload_deduplicatesRemoteUpload() throws Exception {
// Spy on a real GrpcCacheClient so that final methods on the RemoteCacheClient base class
// (e.g. dedupUpload, uploadFile) execute their real implementations against a properly
// initialized casUploadCache.
GrpcCacheClient grpcCacheClient =
spy(
new GrpcCacheClient(
mock(ReferenceCountedChannel.class),
mock(CallCredentialsProvider.class),
Options.getDefaults(RemoteOptions.class),
mock(RemoteRetrier.class),
digestUtil));
doAnswer(unused -> chunkingCapabilities()).when(grpcCacheClient).getServerCapabilities();
doAnswer(unused -> immediateFuture(ImmutableSet.of()))
.when(grpcCacheClient)
.findMissingDigests(any(), any());
CountDownLatch spliceStarted = new CountDownLatch(1);
SettableFuture<Void> spliceFuture = SettableFuture.create();
doAnswer(
unused -> {
spliceStarted.countDown();
return spliceFuture;
})
.when(grpcCacheClient)
.spliceBlob(any(), any(), any());
CombinedCache combinedCache =
new CombinedCache(
grpcCacheClient,
/* diskCacheClient= */ null,
/* symlinkTemplate= */ null,
digestUtil,
/* chunkingEnabled= */ true);
byte[] data = new byte[8192];
Path file = execRoot.getRelative("chunked-output");
try (var out = file.getOutputStream()) {
out.write(data);
}
Digest digest = digestUtil.compute(data);
try {
ListenableFuture<Void> firstUpload =
combinedCache.uploadFile(remoteActionExecutionContext, digest, file);
assertThat(spliceStarted.await(1, TimeUnit.SECONDS)).isTrue();
ListenableFuture<Void> secondUpload =
combinedCache.uploadFile(remoteActionExecutionContext, digest, file);
assertThat(grpcCacheClient.getUploadSubscriberCount(digest)).isEqualTo(2);
verify(grpcCacheClient).findMissingDigests(any(), any());
verify(grpcCacheClient).spliceBlob(any(), any(), any());
spliceFuture.set(null);
getFromFuture(firstUpload);
getFromFuture(secondUpload);
} finally {
combinedCache.release();
}
}
private InMemoryCombinedCache newCombinedCache() {
return new InMemoryCombinedCache(digestUtil);
}
private InMemoryCombinedCache newCombinedCache(
Map<Digest, byte[]> casEntries, DigestUtil digestUtil, @Nullable String symlinkTemplate) {
return new InMemoryCombinedCache(casEntries, digestUtil, symlinkTemplate);
}
private CombinedCache newCombinedCache(RemoteCacheClient remoteCacheClient) {
return new CombinedCache(
remoteCacheClient,
/* diskCacheClient= */ null,
/* symlinkTemplate= */ null,
digestUtil,
/* chunkingEnabled= */ false);
}
private RemoteExecutionCache newRemoteExecutionCache(RemoteCacheClient remoteCacheClient) {
return new RemoteExecutionCache(
remoteCacheClient,
/* diskCacheClient= */ null,
/* symlinkTemplate= */ null,
digestUtil,
/* chunkingEnabled= */ false);
}
private static ServerCapabilities chunkingCapabilities() {
return ServerCapabilities.newBuilder()
.setCacheCapabilities(
CacheCapabilities.newBuilder()
.setFastCdc2020Params(
FastCdc2020Params.newBuilder().setAvgChunkSizeBytes(1024).build()))
.build();
}
}