blob: 8f65021709a0964d5cbfbdc61242c896c2a52ab8 [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public class GrpcRemoteCache extends AbstractRemoteActionCache {
private final CallCredentials credentials;
private final Channel channel;
private final RemoteRetrier retrier;
private final ByteStreamUploader uploader;
private final ListeningScheduledExecutorService retryScheduler =
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@VisibleForTesting
public GrpcRemoteCache(
Channel channel,
CallCredentials credentials,
RemoteOptions options,
RemoteRetrier retrier,
DigestUtil digestUtil) {
super(options, digestUtil);
this.credentials = credentials;
this.channel = channel;
this.retrier = retrier;
uploader = new ByteStreamUploader(options.remoteInstanceName, channel, credentials,
options.remoteTimeout, retrier, retryScheduler);
}
private ContentAddressableStorageBlockingStub casBlockingStub() {
return ContentAddressableStorageGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
private ByteStreamBlockingStub bsBlockingStub() {
return ByteStreamGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
private ActionCacheBlockingStub acBlockingStub() {
return ActionCacheGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
@Override
public void close() {
retryScheduler.shutdownNow();
uploader.shutdown();
}
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
throws IOException, InterruptedException {
FindMissingBlobsRequest.Builder request =
FindMissingBlobsRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.addAllBlobDigests(digests);
if (request.getBlobDigestsCount() == 0) {
return ImmutableSet.of();
}
FindMissingBlobsResponse response =
retrier.execute(() -> casBlockingStub().findMissingBlobs(request.build()));
return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
/**
* Upload enough of the tree metadata and data into remote cache so that the entire tree can be
* reassembled remotely using the root digest.
*/
@Override
public void ensureInputsPresent(
TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
Digest commandDigest = digestUtil.compute(command);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
ImmutableSet<Digest> missingDigests =
getMissingDigests(
Iterables.concat(repository.getAllDigests(root), ImmutableList.of(commandDigest)));
List<Chunker> toUpload = new ArrayList<>();
// Only upload data that was missing from the cache.
ArrayList<ActionInput> missingActionInputs = new ArrayList<>();
ArrayList<Directory> missingTreeNodes = new ArrayList<>();
HashSet<Digest> missingTreeDigests = new HashSet<>(missingDigests);
missingTreeDigests.remove(commandDigest);
repository.getDataFromDigests(missingTreeDigests, missingActionInputs, missingTreeNodes);
if (missingDigests.contains(commandDigest)) {
toUpload.add(new Chunker(command.toByteArray(), digestUtil));
}
if (!missingTreeNodes.isEmpty()) {
for (Directory d : missingTreeNodes) {
toUpload.add(new Chunker(d.toByteArray(), digestUtil));
}
}
if (!missingActionInputs.isEmpty()) {
MetadataProvider inputFileCache = repository.getInputFileCache();
for (ActionInput actionInput : missingActionInputs) {
toUpload.add(new Chunker(actionInput, inputFileCache, execRoot, digestUtil));
}
}
uploader.uploadBlobs(toUpload);
}
/**
* This method can throw {@link StatusRuntimeException}, but the RemoteCache interface does not
* allow throwing such an exception. Any caller must make sure to catch the
* {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used
* in the context of {@link RemoteRetrier#execute}, that's perfectly safe.
*/
private void readBlob(Digest digest, OutputStream stream)
throws IOException, StatusRuntimeException {
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
resourceName += "blobs/" + digestUtil.toString(digest);
Iterator<ReadResponse> replies = bsBlockingStub()
.read(ReadRequest.newBuilder().setResourceName(resourceName).build());
while (replies.hasNext()) {
replies.next().getData().writeTo(stream);
}
}
@Override
protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
try {
retrier.execute(
() -> {
try (OutputStream stream = dest.getOutputStream()) {
readBlob(digest, stream);
}
return null;
});
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
throw new CacheNotFoundException(digest, digestUtil);
}
throw e;
}
}
@Override
protected byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
try {
return retrier.execute(
() -> {
ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
readBlob(digest, stream);
return stream.toByteArray();
});
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
throw new CacheNotFoundException(digest, digestUtil);
}
throw e;
}
}
@Override
public void upload(
ActionKey actionKey,
Path execRoot,
Collection<Path> files,
FileOutErr outErr,
boolean uploadAction)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
upload(execRoot, files, outErr, result);
if (!uploadAction) {
return;
}
try {
retrier.execute(
() ->
acBlockingStub()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setActionResult(result)
.build()));
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.UNIMPLEMENTED)) {
// Silently return without upload.
return;
}
throw e;
}
}
void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result)
throws ExecException, IOException, InterruptedException {
UploadManifest manifest =
new UploadManifest(digestUtil, result, execRoot, options.allowSymlinkUpload);
manifest.addFiles(files);
List<Chunker> filesToUpload = new ArrayList<>();
Map<Digest, Path> digestToFile = manifest.getDigestToFile();
Map<Digest, Chunker> digestToChunkers = manifest.getDigestToChunkers();
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToChunkers.keySet());
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
for (Digest digest : digestsToUpload) {
Chunker chunker;
Path file = digestToFile.get(digest);
if (file != null) {
chunker = new Chunker(file);
} else {
chunker = digestToChunkers.get(digest);
if (chunker == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
}
filesToUpload.add(chunker);
}
if (!filesToUpload.isEmpty()) {
uploader.uploadBlobs(filesToUpload);
}
// TODO(olaola): inline small stdout/stderr here.
if (outErr.getErrorPath().exists()) {
Digest stderr = uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
}
if (outErr.getOutputPath().exists()) {
Digest stdout = uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
}
/**
* Put the file contents cache if it is not already in it. No-op if the file is already stored in
* cache. The given path must be a full absolute path.
*
* @return The key for fetching the file contents blob from cache.
*/
private Digest uploadFileContents(Path file) throws IOException, InterruptedException {
Digest digest = digestUtil.compute(file);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
uploader.uploadBlob(new Chunker(file));
}
return digest;
}
/**
* Put the file contents cache if it is not already in it. No-op if the file is already stored in
* cache. The given path must be a full absolute path.
*
* @return The key for fetching the file contents blob from cache.
*/
Digest uploadFileContents(ActionInput input, Path execRoot, MetadataProvider inputCache)
throws IOException, InterruptedException {
Digest digest = DigestUtil.getFromInputCache(input, inputCache);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
uploader.uploadBlob(new Chunker(input, inputCache, execRoot, digestUtil));
}
return digest;
}
Digest uploadBlob(byte[] blob) throws IOException, InterruptedException {
Digest digest = digestUtil.compute(blob);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
uploader.uploadBlob(new Chunker(blob, digestUtil));
}
return digest;
}
// Execution Cache API
@Override
public ActionResult getCachedActionResult(ActionKey actionKey)
throws IOException, InterruptedException {
try {
return retrier.execute(
() ->
acBlockingStub()
.getActionResult(
GetActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.build()));
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
// Return null to indicate that it was a cache miss.
return null;
}
throw e;
}
}
}