blob: 0563c6848b6578b74be4783a11c72ae8a0864b7c [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public final class GrpcActionCache implements RemoteActionCache {
/** Channel over which to send gRPC CAS queries. */
private final ManagedChannel channel;
// TODO(olaola): proper profiling to determine the best values for these.
private final int grpcTimeoutSeconds;
private final int maxBatchInputs;
private final int maxChunkSizeBytes;
private final int maxBatchSizeBytes;
private static final int MAX_MEMORY_KBYTES = 512 * 1024;
/** Reads from multiple sequential inputs and chunks the data into BlobChunks. */
static interface BlobChunkIterator {
boolean hasNext();
BlobChunk next() throws IOException; // IOException can be a result of file read.
}
final class BlobChunkInlineIterator implements BlobChunkIterator {
private final Iterator<byte[]> blobIterator;
private final Set<ContentDigest> digests;
private int offset;
private ContentDigest digest;
private byte[] currentBlob;
public BlobChunkInlineIterator(Set<ContentDigest> digests, Iterator<byte[]> blobIterator) {
this.digests = digests;
this.blobIterator = blobIterator;
advanceInput();
}
public BlobChunkInlineIterator(byte[] blob) {
blobIterator = null;
offset = 0;
currentBlob = blob;
digest = ContentDigests.computeDigest(currentBlob);
digests = null;
}
private void advanceInput() {
offset = 0;
do {
if (blobIterator != null && blobIterator.hasNext()) {
currentBlob = blobIterator.next();
digest = ContentDigests.computeDigest(currentBlob);
} else {
currentBlob = null;
digest = null;
}
} while (digest != null && !digests.contains(digest));
}
@Override
public boolean hasNext() {
return currentBlob != null;
}
@Override
public BlobChunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
BlobChunk.Builder chunk = BlobChunk.newBuilder();
if (offset == 0) {
chunk.setDigest(digest);
} else {
chunk.setOffset(offset);
}
int size = Math.min(currentBlob.length - offset, maxChunkSizeBytes);
if (size > 0) {
chunk.setData(ByteString.copyFrom(currentBlob, offset, size));
offset += size;
}
if (offset >= currentBlob.length) {
advanceInput();
}
return chunk.build();
}
}
final class BlobChunkFileIterator implements BlobChunkIterator {
private final Iterator<Path> fileIterator;
private InputStream currentStream;
private final Set<ContentDigest> digests;
private ContentDigest digest;
private long bytesLeft;
public BlobChunkFileIterator(Set<ContentDigest> digests, Iterator<Path> fileIterator)
throws IOException {
this.digests = digests;
this.fileIterator = fileIterator;
advanceInput();
}
public BlobChunkFileIterator(Path file) throws IOException {
fileIterator = Iterators.singletonIterator(file);
digests = ImmutableSet.of(ContentDigests.computeDigest(file));
advanceInput();
}
private void advanceInput() throws IOException {
do {
if (fileIterator != null && fileIterator.hasNext()) {
Path file = fileIterator.next();
digest = ContentDigests.computeDigest(file);
currentStream = file.getInputStream();
bytesLeft = digest.getSizeBytes();
} else {
digest = null;
currentStream = null;
bytesLeft = 0;
}
} while (digest != null && !digests.contains(digest));
}
@Override
public boolean hasNext() {
return currentStream != null;
}
@Override
public BlobChunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
BlobChunk.Builder chunk = BlobChunk.newBuilder();
long offset = digest.getSizeBytes() - bytesLeft;
if (offset == 0) {
chunk.setDigest(digest);
} else {
chunk.setOffset(offset);
}
if (bytesLeft > 0) {
byte[] blob = new byte[(int) Math.min(bytesLeft, (long) maxChunkSizeBytes)];
currentStream.read(blob);
chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
}
if (bytesLeft == 0) {
currentStream.close();
advanceInput();
}
return chunk.build();
}
}
@VisibleForTesting
public GrpcActionCache(ManagedChannel channel, RemoteOptions options) {
this.channel = channel;
maxBatchInputs = options.grpcMaxBatchInputs;
maxChunkSizeBytes = options.grpcMaxChunkSizeBytes;
maxBatchSizeBytes = options.grpcMaxBatchSizeBytes;
grpcTimeoutSeconds = options.grpcTimeoutSeconds;
}
public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException {
this(RemoteUtils.createChannel(options.remoteCache), options);
}
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
private CasServiceBlockingStub getBlockingStub() {
return CasServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
}
private CasServiceStub getStub() {
return CasServiceGrpc.newStub(channel).withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
}
private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests);
if (request.getDigestCount() == 0) {
return ImmutableSet.of();
}
CasStatus status = getBlockingStub().lookup(request.build()).getStatus();
if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) {
// TODO(olaola): here and below, add basic retry logic on transient errors!
throw new RuntimeException(status.getErrorDetail());
}
return ImmutableSet.copyOf(status.getMissingDigestList());
}
/**
* Upload enough of the tree metadata and data into remote cache so that the entire tree can be
* reassembled remotely using the root digest.
*/
@Override
public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
ImmutableSet<ContentDigest> missingDigests = getMissingDigests(repository.getAllDigests(root));
// Only upload data that was missing from the cache.
ArrayList<ActionInput> actionInputs = new ArrayList<>();
ArrayList<FileNode> treeNodes = new ArrayList<>();
repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
if (!treeNodes.isEmpty()) {
CasUploadTreeMetadataRequest.Builder metaRequest =
CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes);
CasUploadTreeMetadataReply reply = getBlockingStub().uploadTreeMetadata(metaRequest.build());
if (!reply.getStatus().getSucceeded()) {
throw new RuntimeException(reply.getStatus().getErrorDetail());
}
}
if (!actionInputs.isEmpty()) {
ArrayList<Path> paths = new ArrayList<>();
for (ActionInput actionInput : actionInputs) {
paths.add(execRoot.getRelative(actionInput.getExecPathString()));
}
uploadChunks(paths.size(), new BlobChunkFileIterator(missingDigests, paths.iterator()));
}
}
/**
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
@Override
public void downloadTree(ContentDigest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
throw new UnsupportedOperationException();
}
private void handleDownloadStatus(CasStatus status) throws CacheNotFoundException {
if (!status.getSucceeded()) {
if (status.getError() == CasStatus.ErrorCode.MISSING_DIGEST) {
throw new CacheNotFoundException(status.getMissingDigest(0));
}
// TODO(olaola): deal with other statuses better.
throw new RuntimeException(status.getErrorDetail());
}
}
/**
* Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
* include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
*/
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
// Send all the file requests in a single synchronous batch.
// TODO(olaola): profile to maybe replace with separate concurrent requests.
CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
ArrayList<Output> fileOutputs = new ArrayList<>();
for (Output output : result.getOutputList()) {
Path path = execRoot.getRelative(output.getPath());
if (output.getContentCase() == ContentCase.FILE_METADATA) {
ContentDigest digest = output.getFileMetadata().getDigest();
if (digest.getSizeBytes() > 0) {
request.addDigest(digest);
fileOutputs.add(output);
} else {
// Handle empty file locally.
FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
FileSystemUtils.writeContent(path, new byte[0]);
}
} else {
downloadTree(output.getDigest(), path);
}
}
Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build());
for (Output output : fileOutputs) {
createFileFromStream(
execRoot.getRelative(output.getPath()), output.getFileMetadata(), replies);
}
}
private void createFileFromStream(
Path path, FileMetadata fileMetadata, Iterator<CasDownloadReply> replies)
throws IOException, CacheNotFoundException {
Preconditions.checkArgument(replies.hasNext());
CasDownloadReply reply = replies.next();
if (reply.hasStatus()) {
handleDownloadStatus(reply.getStatus());
}
BlobChunk chunk = reply.getData();
ContentDigest digest = chunk.getDigest();
Preconditions.checkArgument(digest.equals(fileMetadata.getDigest()));
FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
try (OutputStream stream = path.getOutputStream()) {
ByteString data = chunk.getData();
data.writeTo(stream);
long bytesLeft = digest.getSizeBytes() - data.size();
while (bytesLeft > 0) {
Preconditions.checkArgument(replies.hasNext());
reply = replies.next();
if (reply.hasStatus()) {
handleDownloadStatus(reply.getStatus());
}
chunk = reply.getData();
data = chunk.getData();
Preconditions.checkArgument(!chunk.hasDigest());
Preconditions.checkArgument(chunk.getOffset() == digest.getSizeBytes() - bytesLeft);
data.writeTo(stream);
bytesLeft -= data.size();
}
path.setExecutable(fileMetadata.getExecutable());
}
}
private byte[] getBlobFromStream(ContentDigest blobDigest, Iterator<CasDownloadReply> replies)
throws CacheNotFoundException {
Preconditions.checkArgument(replies.hasNext());
CasDownloadReply reply = replies.next();
if (reply.hasStatus()) {
handleDownloadStatus(reply.getStatus());
}
BlobChunk chunk = reply.getData();
ContentDigest digest = chunk.getDigest();
Preconditions.checkArgument(digest.equals(blobDigest));
// This is not enough, but better than nothing.
Preconditions.checkArgument(digest.getSizeBytes() / 1000.0 < MAX_MEMORY_KBYTES);
byte[] result = new byte[(int) digest.getSizeBytes()];
ByteString data = chunk.getData();
data.copyTo(result, 0);
int offset = data.size();
while (offset < result.length) {
Preconditions.checkArgument(replies.hasNext());
reply = replies.next();
if (reply.hasStatus()) {
handleDownloadStatus(reply.getStatus());
}
chunk = reply.getData();
Preconditions.checkArgument(!chunk.hasDigest());
Preconditions.checkArgument(chunk.getOffset() == offset);
data = chunk.getData();
data.copyTo(result, offset);
offset += data.size();
}
return result;
}
/** Upload all results of a locally executed action to the cache. */
@Override
public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
throws IOException, InterruptedException {
ArrayList<ContentDigest> digests = new ArrayList<>();
for (Path file : files) {
digests.add(ContentDigests.computeDigest(file));
}
ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
if (!missing.isEmpty()) {
uploadChunks(missing.size(), new BlobChunkFileIterator(missing, files.iterator()));
}
int index = 0;
for (Path file : files) {
if (file.isDirectory()) {
// TODO(olaola): to implement this for a directory, will need to create or pass a
// TreeNodeRepository to call uploadTree.
throw new UnsupportedOperationException("Storing a directory is not yet supported.");
}
// Add to protobuf.
result
.addOutputBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
.getFileMetadataBuilder()
.setDigest(digests.get(index++))
.setExecutable(file.isExecutable());
}
}
/**
* Put the file contents cache if it is not already in it. No-op if the file is already stored in
* cache. The given path must be a full absolute path. Note: this is horribly inefficient, need to
* patch through an overload that uses an ActionInputFile cache to compute the digests!
*
* @return The key for fetching the file contents blob from cache.
*/
@Override
public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
ContentDigest digest = ContentDigests.computeDigest(file);
ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
uploadChunks(1, new BlobChunkFileIterator(file));
}
return digest;
}
/**
* Download a blob keyed by the given digest and write it to the specified path. Set the
* executable parameter to the specified value.
*/
@Override
public void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException {
// Send all the file requests in a single synchronous batch.
// TODO(olaola): profile to maybe replace with separate concurrent requests.
CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder().addDigest(digest);
Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build());
FileMetadata fileMetadata =
FileMetadata.newBuilder().setDigest(digest).setExecutable(executable).build();
createFileFromStream(dest, fileMetadata, replies);
}
static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> {
private final CountDownLatch finishLatch;
private final AtomicReference<RuntimeException> exception;
public UploadBlobReplyStreamObserver(
CountDownLatch finishLatch, AtomicReference<RuntimeException> exception) {
this.finishLatch = finishLatch;
this.exception = exception;
}
@Override
public void onNext(CasUploadBlobReply reply) {
if (!reply.getStatus().getSucceeded()) {
// TODO(olaola): add basic retry logic on transient errors!
this.exception.compareAndSet(
null, new RuntimeException(reply.getStatus().getErrorDetail()));
}
}
@Override
public void onError(Throwable t) {
this.exception.compareAndSet(null, new StatusRuntimeException(Status.fromThrowable(t)));
finishLatch.countDown();
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
}
private void uploadChunks(int numItems, BlobChunkIterator blobs)
throws InterruptedException, IOException {
CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches.
AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
UploadBlobReplyStreamObserver responseObserver = null;
StreamObserver<CasUploadBlobRequest> requestObserver = null;
int currentBatchBytes = 0;
int batchedInputs = 0;
int batches = 0;
try {
while (blobs.hasNext()) {
BlobChunk chunk = blobs.next();
if (chunk.hasDigest()) {
// Determine whether to start next batch.
if (batchedInputs % maxBatchInputs == 0
|| chunk.getDigest().getSizeBytes() + currentBatchBytes > maxBatchSizeBytes) {
// The batches execute simultaneously.
if (requestObserver != null) {
batchedInputs = 0;
currentBatchBytes = 0;
requestObserver.onCompleted();
}
batches++;
responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
requestObserver = getStub().uploadBlob(responseObserver);
}
batchedInputs++;
}
currentBatchBytes += chunk.getData().size();
requestObserver.onNext(CasUploadBlobRequest.newBuilder().setData(chunk).build());
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
throw new RuntimeException(
"gRPC terminated prematurely: "
+ (exception.get() != null ? exception.get() : "unknown cause"));
}
}
} catch (RuntimeException e) {
// Cancel RPC
if (requestObserver != null) {
requestObserver.onError(e);
}
throw e;
}
if (requestObserver != null) {
requestObserver.onCompleted(); // Finish last batch.
}
while (batches++ < numItems) {
finishLatch.countDown(); // Non-sent batches.
}
finishLatch.await(grpcTimeoutSeconds, TimeUnit.SECONDS);
if (exception.get() != null) {
throw exception.get(); // Re-throw the first encountered exception.
}
}
@Override
public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
throws InterruptedException {
ArrayList<ContentDigest> digests = new ArrayList<>();
for (byte[] blob : blobs) {
digests.add(ContentDigests.computeDigest(blob));
}
ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
try {
if (!missing.isEmpty()) {
uploadChunks(missing.size(), new BlobChunkInlineIterator(missing, blobs.iterator()));
}
return ImmutableList.copyOf(digests);
} catch (IOException e) {
// This will never happen.
throw new RuntimeException(e);
}
}
@Override
public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
ContentDigest digest = ContentDigests.computeDigest(blob);
ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
try {
if (!missing.isEmpty()) {
uploadChunks(1, new BlobChunkInlineIterator(blob));
}
return digest;
} catch (IOException e) {
// This will never happen.
throw new RuntimeException();
}
}
@Override
public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
return downloadBlobs(ImmutableList.of(digest)).get(0);
}
@Override
public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
throws CacheNotFoundException {
// Send all the file requests in a single synchronous batch.
// TODO(olaola): profile to maybe replace with separate concurrent requests.
CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
for (ContentDigest digest : digests) {
if (digest.getSizeBytes() > 0) {
request.addDigest(digest); // We handle empty blobs locally.
}
}
Iterator<CasDownloadReply> replies = null;
if (request.getDigestCount() > 0) {
replies = getBlockingStub().downloadBlob(request.build());
}
ArrayList<byte[]> result = new ArrayList<>();
for (ContentDigest digest : digests) {
result.add(digest.getSizeBytes() == 0 ? new byte[0] : getBlobFromStream(digest, replies));
}
return ImmutableList.copyOf(result);
}
// Execution Cache API
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
ExecutionCacheServiceBlockingStub stub =
ExecutionCacheServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheRequest request =
ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
ExecutionCacheReply reply = stub.getCachedResult(request);
ExecutionCacheStatus status = reply.getStatus();
if (!status.getSucceeded()
&& status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) {
throw new RuntimeException(status.getErrorDetail());
}
return reply.hasResult() ? reply.getResult() : null;
}
/** Sets the given result as result of the given Action. */
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
ExecutionCacheServiceBlockingStub stub =
ExecutionCacheServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheSetRequest request =
ExecutionCacheSetRequest.newBuilder()
.setActionDigest(actionKey.getDigest())
.setResult(result)
.build();
ExecutionCacheSetReply reply = stub.setCachedResult(request);
ExecutionCacheStatus status = reply.getStatus();
if (!status.getSucceeded()
&& status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) {
throw new RuntimeException(status.getErrorDetail());
}
}
}