blob: 0f3f6045d29f4ef8740cb6257df87a40a6bb11ee [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
import com.google.devtools.remoteexecution.v1test.Command;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
import com.google.devtools.remoteexecution.v1test.OutputDirectory;
import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public class GrpcActionCache implements RemoteActionCache {
private final RemoteOptions options;
private final ChannelOptions channelOptions;
private final Channel channel;
@VisibleForTesting
public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
this.options = options;
this.channelOptions = channelOptions;
this.channel = channel;
}
@Override
public void close() {}
// All gRPC stubs are reused.
private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub =
Suppliers.memoize(
new Supplier<ContentAddressableStorageBlockingStub>() {
@Override
public ContentAddressableStorageBlockingStub get() {
return ContentAddressableStorageGrpc.newBlockingStub(channel)
.withCallCredentials(channelOptions.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
});
private final Supplier<ByteStreamBlockingStub> bsBlockingStub =
Suppliers.memoize(
new Supplier<ByteStreamBlockingStub>() {
@Override
public ByteStreamBlockingStub get() {
return ByteStreamGrpc.newBlockingStub(channel)
.withCallCredentials(channelOptions.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
});
private final Supplier<ByteStreamStub> bsStub =
Suppliers.memoize(
new Supplier<ByteStreamStub>() {
@Override
public ByteStreamStub get() {
return ByteStreamGrpc.newStub(channel)
.withCallCredentials(channelOptions.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
});
private final Supplier<ActionCacheBlockingStub> acBlockingStub =
Suppliers.memoize(
new Supplier<ActionCacheBlockingStub>() {
@Override
public ActionCacheBlockingStub get() {
return ActionCacheGrpc.newBlockingStub(channel)
.withCallCredentials(channelOptions.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
});
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) {
FindMissingBlobsRequest.Builder request =
FindMissingBlobsRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.addAllBlobDigests(digests);
if (request.getBlobDigestsCount() == 0) {
return ImmutableSet.of();
}
FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build());
return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
/**
* Upload enough of the tree metadata and data into remote cache so that the entire tree can be
* reassembled remotely using the root digest.
*/
@Override
public void ensureInputsPresent(
TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
// Only upload data that was missing from the cache.
ArrayList<ActionInput> actionInputs = new ArrayList<>();
ArrayList<Directory> treeNodes = new ArrayList<>();
repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
if (!treeNodes.isEmpty()) {
// TODO(olaola): split this into multiple requests if total size is > 10MB.
BatchUpdateBlobsRequest.Builder treeBlobRequest =
BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
for (Directory d : treeNodes) {
final byte[] data = d.toByteArray();
treeBlobRequest
.addRequestsBuilder()
.setContentDigest(Digests.computeDigest(data))
.setData(ByteString.copyFrom(data));
}
BatchUpdateBlobsResponse response =
casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
// TODO(olaola): handle retries on transient errors.
for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
throw StatusProto.toStatusRuntimeException(r.getStatus());
}
}
}
uploadBlob(command.toByteArray());
if (!actionInputs.isEmpty()) {
uploadChunks(
actionInputs.size(),
new Chunker.Builder()
.addAllInputs(actionInputs, repository.getInputFileCache(), execRoot)
.onlyUseDigests(missingDigests)
.build());
}
}
/**
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
@SuppressWarnings("unused")
private void downloadTree(Digest rootDigest, Path rootLocation) {
throw new UnsupportedOperationException();
}
/**
* Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
* include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
*/
@Override
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
throws IOException, CacheNotFoundException {
for (OutputFile file : result.getOutputFilesList()) {
Path path = execRoot.getRelative(file.getPath());
FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
Digest digest = file.getDigest();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
FileSystemUtils.writeContent(path, new byte[0]);
} else {
try (OutputStream stream = path.getOutputStream()) {
if (!file.getContent().isEmpty()) {
file.getContent().writeTo(stream);
} else {
Iterator<ReadResponse> replies = readBlob(digest);
while (replies.hasNext()) {
replies.next().getData().writeTo(stream);
}
}
}
}
path.setExecutable(file.getIsExecutable());
}
for (OutputDirectory directory : result.getOutputDirectoriesList()) {
downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
}
// TODO(ulfjack): use same code as above also for stdout / stderr if applicable.
downloadOutErr(result, outErr);
}
private void downloadOutErr(ActionResult result, FileOutErr outErr)
throws IOException, CacheNotFoundException {
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} else if (result.hasStdoutDigest()) {
byte[] stdoutBytes = downloadBlob(result.getStdoutDigest());
outErr.getOutputStream().write(stdoutBytes);
outErr.getOutputStream().flush();
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} else if (result.hasStderrDigest()) {
byte[] stderrBytes = downloadBlob(result.getStderrDigest());
outErr.getErrorStream().write(stderrBytes);
outErr.getErrorStream().flush();
}
}
private Iterator<ReadResponse> readBlob(Digest digest) throws CacheNotFoundException {
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes();
try {
return bsBlockingStub
.get()
.read(ReadRequest.newBuilder().setResourceName(resourceName).build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
throw new CacheNotFoundException(digest);
}
throw e;
}
}
@Override
public void upload(
ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr)
throws IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
upload(execRoot, files, outErr, result);
try {
acBlockingStub
.get()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setActionResult(result)
.build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) {
throw e;
}
}
}
void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result)
throws IOException, InterruptedException {
ArrayList<Digest> digests = new ArrayList<>();
Chunker.Builder b = new Chunker.Builder();
for (Path file : files) {
if (!file.exists()) {
// We ignore requested results that have not been generated by the action.
continue;
}
if (file.isDirectory()) {
// TODO(olaola): to implement this for a directory, will need to create or pass a
// TreeNodeRepository to call uploadTree.
throw new UnsupportedOperationException("Storing a directory is not yet supported.");
}
digests.add(Digests.computeDigest(file));
b.addInput(file);
}
ImmutableSet<Digest> missing = getMissingDigests(digests);
if (!missing.isEmpty()) {
uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
}
int index = 0;
for (Path file : files) {
// Add to protobuf.
// TODO(olaola): inline small results here.
result
.addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
.setDigest(digests.get(index++))
.setIsExecutable(file.isExecutable());
}
// TODO(ulfjack): Use the Chunker also for stdout / stderr.
if (outErr.getErrorPath().exists()) {
Digest stderr = uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
}
if (outErr.getOutputPath().exists()) {
Digest stdout = uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
}
/**
* Put the file contents cache if it is not already in it. No-op if the file is already stored in
* cache. The given path must be a full absolute path.
*
* @return The key for fetching the file contents blob from cache.
*/
private Digest uploadFileContents(Path file) throws IOException, InterruptedException {
Digest digest = Digests.computeDigest(file);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
uploadChunks(1, Chunker.from(file));
}
return digest;
}
/**
* Put the file contents cache if it is not already in it. No-op if the file is already stored in
* cache. The given path must be a full absolute path.
*
* @return The key for fetching the file contents blob from cache.
*/
Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
Digest digest = Digests.getDigestFromInputCache(input, inputCache);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
uploadChunks(1, Chunker.from(input, inputCache, execRoot));
}
return digest;
}
private void uploadChunks(int numItems, Chunker chunker)
throws InterruptedException, IOException {
final CountDownLatch finishLatch = new CountDownLatch(numItems);
final AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
StreamObserver<WriteRequest> requestObserver = null;
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
while (chunker.hasNext()) {
Chunker.Chunk chunk = chunker.next();
final Digest digest = chunk.getDigest();
long offset = chunk.getOffset();
WriteRequest.Builder request = WriteRequest.newBuilder();
if (offset == 0) { // Beginning of new upload.
numItems--;
request.setResourceName(
resourceName
+ "uploads/"
+ UUID.randomUUID()
+ "/blobs/"
+ digest.getHash()
+ "/"
+ digest.getSizeBytes());
// The batches execute simultaneously.
requestObserver =
bsStub
.get()
.write(
new StreamObserver<WriteResponse>() {
private long bytesLeft = digest.getSizeBytes();
@Override
public void onNext(WriteResponse reply) {
bytesLeft -= reply.getCommittedSize();
}
@Override
public void onError(Throwable t) {
exception.compareAndSet(
null, new StatusRuntimeException(Status.fromThrowable(t)));
finishLatch.countDown();
}
@Override
public void onCompleted() {
if (bytesLeft != 0) {
exception.compareAndSet(
null, new RuntimeException("Server did not commit all data."));
}
finishLatch.countDown();
}
});
}
byte[] data = chunk.getData();
boolean finishWrite = offset + data.length == digest.getSizeBytes();
request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite);
requestObserver.onNext(request.build());
if (finishWrite) {
requestObserver.onCompleted();
}
if (finishLatch.getCount() <= numItems) {
// Current RPC errored before we finished sending.
if (!finishWrite) {
chunker.advanceInput();
}
}
}
finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS);
if (exception.get() != null) {
throw exception.get(); // Re-throw the first encountered exception.
}
}
Digest uploadBlob(byte[] blob) throws InterruptedException {
Digest digest = Digests.computeDigest(blob);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
try {
if (!missing.isEmpty()) {
uploadChunks(1, Chunker.from(blob));
}
return digest;
} catch (IOException e) {
// This will never happen.
throw new RuntimeException();
}
}
byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
Iterator<ReadResponse> replies = readBlob(digest);
byte[] result = new byte[(int) digest.getSizeBytes()];
int offset = 0;
while (replies.hasNext()) {
ByteString data = replies.next().getData();
data.copyTo(result, offset);
offset += data.size();
}
Preconditions.checkState(digest.getSizeBytes() == offset);
return result;
}
// Execution Cache API
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
try {
return acBlockingStub
.get()
.getActionResult(
GetActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
return null;
}
throw e;
}
}
}