blob: 85a06fe2925d9f651f0a23e6f34cca3c98c83969 [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.EnvironmentalExecException;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.devtools.remoteexecution.v1test.DirectoryNode;
import com.google.devtools.remoteexecution.v1test.FileNode;
import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
/**
* A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files
* and action output.
*
* <p>The thread safety is guaranteed by the underlying map.
*
* <p>Note that this class is used from src/tools/remote_worker.
*/
@ThreadSafe
public final class SimpleBlobStoreActionCache implements RemoteActionCache {
private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024;
private final SimpleBlobStore blobStore;
public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) {
this.blobStore = blobStore;
}
@Override
public void ensureInputsPresent(
TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
uploadBlob(command.toByteArray());
for (Directory directory : repository.treeToDirectories(root)) {
uploadBlob(directory.toByteArray());
}
// TODO(ulfjack): Only upload files that aren't in the CAS yet?
for (TreeNode leaf : repository.leaves(root)) {
uploadFileContents(leaf.getActionInput(), execRoot, repository.getInputFileCache());
}
}
public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, InterruptedException {
Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
for (FileNode file : directory.getFilesList()) {
downloadFileContents(
file.getDigest(), rootLocation.getRelative(file.getName()), file.getIsExecutable());
}
for (DirectoryNode child : directory.getDirectoriesList()) {
downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
}
}
private Digest uploadFileContents(Path file) throws IOException, InterruptedException {
Digest digest = Digests.computeDigest(file);
try (InputStream in = file.getInputStream()) {
return uploadStream(digest, in);
}
}
private Digest uploadFileContents(
ActionInput input, Path execRoot, MetadataProvider inputCache)
throws IOException, InterruptedException {
if (input instanceof VirtualActionInput) {
byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray();
return uploadBlob(blob, Digests.computeDigest(blob));
}
try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) {
return uploadStream(Digests.getDigestFromInputCache(input, inputCache), in);
}
}
@Override
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
try {
for (OutputFile file : result.getOutputFilesList()) {
if (!file.getContent().isEmpty()) {
createFile(
file.getContent().toByteArray(),
execRoot.getRelative(file.getPath()),
file.getIsExecutable());
} else {
downloadFileContents(
file.getDigest(), execRoot.getRelative(file.getPath()), file.getIsExecutable());
}
}
if (!result.getOutputDirectoriesList().isEmpty()) {
throw new UnsupportedOperationException();
}
downloadOutErr(result, outErr);
} catch (IOException downloadException) {
try {
// Delete any (partially) downloaded output files, since any subsequent local execution
// of this action may expect none of the output files to exist.
for (OutputFile file : result.getOutputFilesList()) {
execRoot.getRelative(file.getPath()).delete();
}
outErr.getOutputPath().delete();
outErr.getErrorPath().delete();
} catch (IOException e) {
// If deleting of output files failed, we abort the build with a decent error message as
// any subsequent local execution failure would likely be incomprehensible.
// We don't propagate the downloadException, as this is a recoverable error and the cause
// of the build failure is really that we couldn't delete output files.
throw new EnvironmentalExecException("Failed to delete output files after incomplete "
+ "download. Cannot continue with local execution.", e, true);
}
throw downloadException;
}
}
private void downloadOutErr(ActionResult result, FileOutErr outErr)
throws IOException, InterruptedException {
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} else if (result.hasStdoutDigest()) {
downloadFileContents(result.getStdoutDigest(), outErr.getOutputPath(), /*executable=*/false);
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} else if (result.hasStderrDigest()) {
downloadFileContents(result.getStderrDigest(), outErr.getErrorPath(), /*executable=*/false);
}
}
@Override
public void upload(
ActionKey actionKey,
Path execRoot,
Collection<Path> files,
FileOutErr outErr,
boolean uploadAction)
throws IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
upload(result, execRoot, files);
if (outErr.getErrorPath().exists()) {
Digest stderr = uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
}
if (outErr.getOutputPath().exists()) {
Digest stdout = uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
if (uploadAction) {
blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray());
}
}
public void upload(ActionResult.Builder result, Path execRoot, Collection<Path> files)
throws IOException, InterruptedException {
for (Path file : files) {
// TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
// rely on the local spawn runner to stat the files, instead of statting here.
if (!file.exists()) {
continue;
}
if (file.isDirectory()) {
// TODO(olaola): to implement this for a directory, will need to create or pass a
// TreeNodeRepository to call uploadTree.
throw new UnsupportedOperationException("Storing a directory is not yet supported.");
}
// TODO(olaola): inline small file contents here.
// First put the file content to cache.
Digest digest = uploadFileContents(file);
// Add to protobuf.
result
.addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
.setDigest(digest)
.setIsExecutable(file.isExecutable());
}
}
public void uploadOutErr(ActionResult.Builder result, byte[] stdout, byte[] stderr)
throws IOException, InterruptedException {
if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) {
result.setStdoutRaw(ByteString.copyFrom(stdout));
} else if (stdout.length > 0) {
result.setStdoutDigest(uploadBlob(stdout));
}
if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) {
result.setStderrRaw(ByteString.copyFrom(stderr));
} else if (stderr.length > 0) {
result.setStderrDigest(uploadBlob(stderr));
}
}
private void downloadFileContents(Digest digest, Path dest, boolean executable)
throws IOException, InterruptedException {
FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
try (OutputStream out = dest.getOutputStream()) {
downloadBlob(digest, out);
}
dest.setExecutable(executable);
}
private void createFile(byte[] contents, Path dest, boolean executable) throws IOException {
FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
try (OutputStream stream = dest.getOutputStream()) {
stream.write(contents);
}
dest.setExecutable(executable);
}
public Digest uploadBlob(byte[] blob) throws IOException, InterruptedException {
return uploadBlob(blob, Digests.computeDigest(blob));
}
private Digest uploadBlob(byte[] blob, Digest digest) throws IOException, InterruptedException {
return uploadStream(digest, new ByteArrayInputStream(blob));
}
public Digest uploadStream(Digest digest, InputStream in)
throws IOException, InterruptedException {
blobStore.put(digest.getHash(), digest.getSizeBytes(), in);
return digest;
}
private void downloadBlob(Digest digest, OutputStream out)
throws IOException, InterruptedException {
if (digest.getSizeBytes() == 0) {
return;
}
boolean success = blobStore.get(digest.getHash(), out);
if (!success) {
throw new CacheNotFoundException(digest);
}
}
public byte[] downloadBlob(Digest digest)
throws IOException, InterruptedException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
downloadBlob(digest, out);
return out.toByteArray();
}
public boolean containsKey(Digest digest) throws IOException, InterruptedException {
return blobStore.containsKey(digest.getHash());
}
@Override
public ActionResult getCachedActionResult(ActionKey actionKey)
throws IOException, InterruptedException {
try {
byte[] data = downloadActionResult(actionKey.getDigest());
return ActionResult.parseFrom(data);
} catch (InvalidProtocolBufferException | CacheNotFoundException e) {
return null;
}
}
private byte[] downloadActionResult(Digest digest) throws IOException, InterruptedException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
// This unconditionally downloads the whole blob into memory!
ByteArrayOutputStream out = new ByteArrayOutputStream();
boolean success = blobStore.getActionResult(digest.getHash(), out);
if (!success) {
throw new CacheNotFoundException(digest);
}
return out.toByteArray();
}
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws IOException, InterruptedException {
blobStore.putActionResult(actionKey.getDigest().getHash(), result.toByteArray());
}
@Override
public void close() {
blobStore.close();
}
}