blob: 522241cd4b0382e8df6b7751bb5880bacc1fca0e [file] [log] [blame]
// Copyright 2015 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.authandtls.GoogleAuthUtils;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.testutil.Scratch;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystem.HashFunction;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
import com.google.devtools.remoteexecution.v1test.Action;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.devtools.remoteexecution.v1test.DirectoryNode;
import com.google.devtools.remoteexecution.v1test.FileNode;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
import com.google.devtools.remoteexecution.v1test.Tree;
import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** Tests for {@link GrpcRemoteCache}. */
@RunWith(JUnit4.class)
public class GrpcRemoteCacheTest {
private static final DigestUtil DIGEST_UTIL = new DigestUtil(HashFunction.SHA256);
private FileSystem fs;
private Path execRoot;
private FileOutErr outErr;
private FakeActionInputFileCache fakeFileCache;
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private final String fakeServerName = "fake server for " + getClass();
private Server fakeServer;
private Context withEmptyMetadata;
private Context prevContext;
@Before
public final void setUp() throws Exception {
// Use a mutable service registry for later registering the service impl for each test case.
fakeServer =
InProcessServerBuilder.forName(fakeServerName)
.fallbackHandlerRegistry(serviceRegistry)
.directExecutor()
.build()
.start();
Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk.
fs = new InMemoryFileSystem(new JavaClock(), HashFunction.SHA256);
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
withEmptyMetadata =
TracingMetadataUtils.contextWithMetadata(
"none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
prevContext = withEmptyMetadata.attach();
}
@After
public void tearDown() throws Exception {
withEmptyMetadata.detach(prevContext);
fakeServer.shutdownNow();
fakeServer.awaitTermination();
}
private static class CallCredentialsInterceptor implements ClientInterceptor {
private final CallCredentials credentials;
public CallCredentialsInterceptor(CallCredentials credentials) {
this.credentials = credentials;
}
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions, Channel next) {
assertThat(callOptions.getCredentials()).isEqualTo(credentials);
// Remove the call credentials to allow testing with dummy ones.
return next.newCall(method, callOptions.withCallCredentials(null));
}
}
private GrpcRemoteCache newClient() throws IOException {
AuthAndTLSOptions authTlsOptions = Options.getDefaults(AuthAndTLSOptions.class);
authTlsOptions.useGoogleDefaultCredentials = true;
authTlsOptions.googleCredentials = "/exec/root/creds.json";
authTlsOptions.googleAuthScopes = ImmutableList.of("dummy.scope");
GenericJson json = new GenericJson();
json.put("type", "authorized_user");
json.put("client_id", "some_client");
json.put("client_secret", "foo");
json.put("refresh_token", "bar");
Scratch scratch = new Scratch();
scratch.file(authTlsOptions.googleCredentials, new JacksonFactory().toString(json));
CallCredentials creds =
GoogleAuthUtils.newCallCredentials(
scratch.resolve(authTlsOptions.googleCredentials).getInputStream(),
authTlsOptions.googleAuthScopes);
RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
return new GrpcRemoteCache(
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(),
ImmutableList.of(new CallCredentialsInterceptor(creds))),
creds,
remoteOptions,
retrier,
DIGEST_UTIL);
}
@Test
public void testDownloadEmptyBlob() throws Exception {
GrpcRemoteCache client = newClient();
Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
// Will not call the mock Bytestream interface at all.
assertThat(client.downloadBlob(emptyDigest)).isEmpty();
}
@Test
public void testDownloadBlobSingleChunk() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
assertThat(request.getResourceName().contains(digest.getHash())).isTrue();
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abcdefg")).build());
responseObserver.onCompleted();
}
});
assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
}
@Test
public void testDownloadBlobMultipleChunks() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
assertThat(request.getResourceName().contains(digest.getHash())).isTrue();
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build());
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build());
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("g")).build());
responseObserver.onCompleted();
}
});
assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
}
@Test
public void testDownloadAllResults() throws Exception {
GrpcRemoteCache client = newClient();
Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
Digest barDigest = DIGEST_UTIL.computeAsUtf8("bar-contents");
Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
serviceRegistry.addService(
new FakeImmutableCacheByteStreamImpl(fooDigest, "foo-contents", barDigest, "bar-contents"));
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest);
result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true);
client.download(result.build(), execRoot, null);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar"))).isEqualTo(barDigest);
assertThat(execRoot.getRelative("a/bar").isExecutable()).isTrue();
}
@Test
public void testDownloadDirectory() throws Exception {
GrpcRemoteCache client = newClient();
Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
Digest quxDigest = DIGEST_UTIL.computeAsUtf8("qux-contents");
Tree barTreeMessage =
Tree.newBuilder()
.setRoot(
Directory.newBuilder()
.addFiles(
FileNode.newBuilder()
.setName("qux")
.setDigest(quxDigest)
.setIsExecutable(true)))
.build();
Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
serviceRegistry.addService(
new FakeImmutableCacheByteStreamImpl(
ImmutableMap.of(
fooDigest, "foo-contents",
barTreeDigest, barTreeMessage.toByteString(),
quxDigest, "qux-contents")));
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
client.download(result.build(), execRoot, null);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/qux"))).isEqualTo(quxDigest);
assertThat(execRoot.getRelative("a/bar/qux").isExecutable()).isTrue();
}
@Test
public void testDownloadDirectoryEmpty() throws Exception {
GrpcRemoteCache client = newClient();
Tree barTreeMessage = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
serviceRegistry.addService(
new FakeImmutableCacheByteStreamImpl(
ImmutableMap.of(barTreeDigest, barTreeMessage.toByteString())));
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
client.download(result.build(), execRoot, null);
assertThat(execRoot.getRelative("a/bar").isDirectory()).isTrue();
}
@Test
public void testDownloadDirectoryNested() throws Exception {
GrpcRemoteCache client = newClient();
Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
Digest quxDigest = DIGEST_UTIL.computeAsUtf8("qux-contents");
Directory wobbleDirMessage =
Directory.newBuilder()
.addFiles(FileNode.newBuilder().setName("qux").setDigest(quxDigest))
.build();
Digest wobbleDirDigest = DIGEST_UTIL.compute(wobbleDirMessage);
Tree barTreeMessage =
Tree.newBuilder()
.setRoot(
Directory.newBuilder()
.addFiles(
FileNode.newBuilder()
.setName("qux")
.setDigest(quxDigest)
.setIsExecutable(true))
.addDirectories(
DirectoryNode.newBuilder().setName("wobble").setDigest(wobbleDirDigest)))
.addChildren(wobbleDirMessage)
.build();
Digest barTreeDigest = DIGEST_UTIL.compute(barTreeMessage);
serviceRegistry.addService(
new FakeImmutableCacheByteStreamImpl(
ImmutableMap.of(
fooDigest, "foo-contents",
barTreeDigest, barTreeMessage.toByteString(),
quxDigest, "qux-contents")));
ActionResult.Builder result = ActionResult.newBuilder();
result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest);
client.download(result.build(), execRoot, null);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/wobble/qux"))).isEqualTo(quxDigest);
assertThat(execRoot.getRelative("a/bar/wobble/qux").isExecutable()).isFalse();
}
@Test
public void testUploadBlobCacheHitWithRetries() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
private int numErrors = 4;
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
if (numErrors-- <= 0) {
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
} else {
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
}
}
});
assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
@Test
public void testUploadBlobSingleChunk() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
responseObserver.onNext(
FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
responseObserver.onCompleted();
}
});
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(
final StreamObserver<WriteResponse> responseObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest request) {
assertThat(request.getResourceName()).contains(digest.getHash());
assertThat(request.getFinishWrite()).isTrue();
assertThat(request.getData().toStringUtf8()).isEqualTo("abcdefg");
}
@Override
public void onCompleted() {
responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(7).build());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
fail("An error occurred: " + t);
}
};
}
});
assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> {
private final StreamObserver<WriteResponse> responseObserver;
private final String contents;
private Chunker chunker;
public TestChunkedRequestObserver(
StreamObserver<WriteResponse> responseObserver, String contents, int chunkSizeBytes) {
this.responseObserver = responseObserver;
this.contents = contents;
try {
chunker = new Chunker(contents.getBytes(UTF_8), chunkSizeBytes, DIGEST_UTIL);
} catch (IOException e) {
fail("An error occurred:" + e);
}
}
@Override
public void onNext(WriteRequest request) {
assertThat(chunker.hasNext()).isTrue();
try {
Chunker.Chunk chunk = chunker.next();
Digest digest = chunk.getDigest();
long offset = chunk.getOffset();
ByteString data = chunk.getData();
if (offset == 0) {
assertThat(request.getResourceName()).contains(digest.getHash());
} else {
assertThat(request.getResourceName()).isEmpty();
}
assertThat(request.getFinishWrite())
.isEqualTo(offset + data.size() == digest.getSizeBytes());
assertThat(request.getData()).isEqualTo(data);
} catch (IOException e) {
fail("An error occurred:" + e);
}
}
@Override
public void onCompleted() {
assertThat(chunker.hasNext()).isFalse();
responseObserver.onNext(
WriteResponse.newBuilder().setCommittedSize(contents.length()).build());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
fail("An error occurred: " + t);
}
}
private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswer(
final String contents, final int chunkSize) {
return new Answer<StreamObserver<WriteRequest>>() {
@Override
@SuppressWarnings("unchecked")
public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
return new TestChunkedRequestObserver(
(StreamObserver<WriteResponse>) invocation.getArguments()[0], contents, chunkSize);
}
};
}
@Test
public void testUploadBlobMultipleChunks() throws Exception {
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdef");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
responseObserver.onNext(
FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
responseObserver.onCompleted();
}
});
ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
serviceRegistry.addService(mockByteStreamImpl);
for (int chunkSize = 1; chunkSize <= 6; ++chunkSize) {
GrpcRemoteCache client = newClient();
Chunker.setDefaultChunkSizeForTesting(chunkSize);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
.thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize));
assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest);
}
Mockito.verify(mockByteStreamImpl, Mockito.times(6))
.write(Mockito.<StreamObserver<WriteResponse>>anyObject());
}
@Test
public void testUploadDirectory() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest fooDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
final Digest quxDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
final Digest barDigest =
fakeFileCache.createScratchInputDirectory(
ActionInputHelper.fromPath("bar"),
Tree.newBuilder()
.setRoot(
Directory.newBuilder()
.addFiles(
FileNode.newBuilder()
.setIsExecutable(true)
.setName("qux")
.setDigest(quxDigest)
.build())
.build())
.build());
final Path fooFile = execRoot.getRelative("a/foo");
final Path quxFile = execRoot.getRelative("bar/qux");
quxFile.setExecutable(true);
final Path barDir = execRoot.getRelative("bar");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
assertThat(request.getBlobDigestsList())
.containsExactly(fooDigest, quxDigest, barDigest);
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});
ActionResult.Builder result = ActionResult.newBuilder();
client.upload(execRoot, ImmutableList.<Path>of(fooFile, barDir), outErr, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
}
@Test
public void testUploadDirectoryEmpty() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest barDigest =
fakeFileCache.createScratchInputDirectory(
ActionInputHelper.fromPath("bar"),
Tree.newBuilder().setRoot(Directory.newBuilder().build()).build());
final Path barDir = execRoot.getRelative("bar");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
assertThat(request.getBlobDigestsList()).containsExactly(barDigest);
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});
ActionResult.Builder result = ActionResult.newBuilder();
client.upload(execRoot, ImmutableList.<Path>of(barDir), outErr, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
}
@Test
public void testUploadDirectoryNested() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest wobbleDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz");
final Digest quxDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/qux"), "abc");
final Directory testDirMessage =
Directory.newBuilder()
.addFiles(FileNode.newBuilder().setName("wobble").setDigest(wobbleDigest).build())
.build();
final Digest testDigest = DIGEST_UTIL.compute(testDirMessage);
final Tree barTree =
Tree.newBuilder()
.setRoot(
Directory.newBuilder()
.addFiles(
FileNode.newBuilder()
.setIsExecutable(true)
.setName("qux")
.setDigest(quxDigest))
.addDirectories(
DirectoryNode.newBuilder().setName("test").setDigest(testDigest)))
.addChildren(testDirMessage)
.build();
final Digest barDigest =
fakeFileCache.createScratchInputDirectory(ActionInputHelper.fromPath("bar"), barTree);
final Path quxFile = execRoot.getRelative("bar/qux");
quxFile.setExecutable(true);
final Path barDir = execRoot.getRelative("bar");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
assertThat(request.getBlobDigestsList())
.containsExactly(quxDigest, barDigest, wobbleDigest);
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});
ActionResult.Builder result = ActionResult.newBuilder();
client.upload(execRoot, ImmutableList.<Path>of(barDir), outErr, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputDirectoriesBuilder().setPath("bar").setTreeDigest(barDigest);
assertThat(result.build()).isEqualTo(expectedResult.build());
}
@Test
public void testUploadCacheHits() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest fooDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
final Digest barDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
final Path fooFile = execRoot.getRelative("a/foo");
final Path barFile = execRoot.getRelative("bar");
barFile.setExecutable(true);
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
assertThat(request.getBlobDigestsList()).containsExactly(fooDigest, barDigest);
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});
ActionResult.Builder result = ActionResult.newBuilder();
client.upload(execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, result);
ActionResult.Builder expectedResult = ActionResult.newBuilder();
expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
expectedResult
.addOutputFilesBuilder()
.setPath("bar")
.setDigest(barDigest)
.setIsExecutable(true);
assertThat(result.build()).isEqualTo(expectedResult.build());
}
@Test
public void testUploadUploadsOnlyOutputs() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest fooDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
final Digest barDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
// This checks we will try to upload the actual outputs.
assertThat(request.getBlobDigestsList()).containsExactly(fooDigest, barDigest);
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});
serviceRegistry.addService(
new ActionCacheImplBase() {
@Override
public void updateActionResult(
UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
fail("Update action result was expected to not be called.");
}
});
ActionKey emptyKey = DIGEST_UTIL.computeActionKey(Action.getDefaultInstance());
Path fooFile = execRoot.getRelative("a/foo");
Path barFile = execRoot.getRelative("bar");
client.upload(emptyKey, execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, false);
}
@Test
public void testUploadCacheMissesWithRetries() throws Exception {
final GrpcRemoteCache client = newClient();
final Digest fooDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
final Digest barDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
final Digest bazDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("baz"), "z");
final Path fooFile = execRoot.getRelative("a/foo");
final Path barFile = execRoot.getRelative("bar");
final Path bazFile = execRoot.getRelative("baz");
ActionKey actionKey = DIGEST_UTIL.asActionKey(fooDigest); // Could be any key.
barFile.setExecutable(true);
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
private int numErrors = 4;
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
if (numErrors-- <= 0) {
// Everything is missing.
responseObserver.onNext(
FindMissingBlobsResponse.newBuilder()
.addMissingBlobDigests(fooDigest)
.addMissingBlobDigests(barDigest)
.addMissingBlobDigests(bazDigest)
.build());
responseObserver.onCompleted();
} else {
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
}
}
});
ActionResult.Builder rb = ActionResult.newBuilder();
rb.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
rb.addOutputFilesBuilder().setPath("bar").setDigest(barDigest).setIsExecutable(true);
rb.addOutputFilesBuilder().setPath("baz").setDigest(bazDigest);
ActionResult result = rb.build();
serviceRegistry.addService(
new ActionCacheImplBase() {
private int numErrors = 4;
@Override
public void updateActionResult(
UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
assertThat(request)
.isEqualTo(
UpdateActionResultRequest.newBuilder()
.setActionDigest(fooDigest)
.setActionResult(result)
.build());
if (numErrors-- <= 0) {
responseObserver.onNext(result);
responseObserver.onCompleted();
} else {
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
}
}
});
ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
serviceRegistry.addService(mockByteStreamImpl);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
.thenAnswer(
new Answer<StreamObserver<WriteRequest>>() {
private int numErrors = 4;
@Override
@SuppressWarnings("unchecked")
public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
StreamObserver<WriteResponse> responseObserver =
(StreamObserver<WriteResponse>) invocation.getArguments()[0];
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest request) {
numErrors--;
if (numErrors >= 0) {
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
return;
}
assertThat(request.getFinishWrite()).isTrue();
String resourceName = request.getResourceName();
String dataStr = request.getData().toStringUtf8();
int size = 0;
if (resourceName.contains(fooDigest.getHash())) {
assertThat(dataStr).isEqualTo("xyz");
size = 3;
} else if (resourceName.contains(barDigest.getHash())) {
assertThat(dataStr).isEqualTo("x");
size = 1;
} else if (resourceName.contains(bazDigest.getHash())) {
assertThat(dataStr).isEqualTo("z");
size = 1;
} else {
fail("Unexpected resource name in upload: " + resourceName);
}
responseObserver.onNext(
WriteResponse.newBuilder().setCommittedSize(size).build());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
fail("An error occurred: " + t);
}
};
}
});
client.upload(
actionKey, execRoot, ImmutableList.<Path>of(fooFile, barFile, bazFile), outErr, true);
// 4 times for the errors, 3 times for the successful uploads.
Mockito.verify(mockByteStreamImpl, Mockito.times(7))
.write(Mockito.<StreamObserver<WriteResponse>>anyObject());
}
@Test
public void testGetCachedActionResultWithRetries() throws Exception {
final GrpcRemoteCache client = newClient();
ActionKey actionKey = DIGEST_UTIL.asActionKey(DIGEST_UTIL.computeAsUtf8("key"));
serviceRegistry.addService(
new ActionCacheImplBase() {
private int numErrors = 4;
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
responseObserver.onError(
(numErrors-- <= 0 ? Status.NOT_FOUND : Status.UNAVAILABLE).asRuntimeException());
}
});
assertThat(client.getCachedActionResult(actionKey)).isNull();
}
}