| // Copyright 2021 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.remote; |
| |
| import static com.google.common.base.Throwables.throwIfInstanceOf; |
| import static com.google.common.util.concurrent.MoreExecutors.directExecutor; |
| import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; |
| import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle; |
| import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; |
| import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult; |
| |
| import build.bazel.remote.execution.v2.Action; |
| import build.bazel.remote.execution.v2.ActionResult; |
| import build.bazel.remote.execution.v2.Command; |
| import build.bazel.remote.execution.v2.Digest; |
| import build.bazel.remote.execution.v2.Directory; |
| import build.bazel.remote.execution.v2.Tree; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.devtools.build.lib.actions.ActionExecutionMetadata; |
| import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent; |
| import com.google.devtools.build.lib.actions.ActionUploadStartedEvent; |
| import com.google.devtools.build.lib.actions.ExecException; |
| import com.google.devtools.build.lib.actions.UserExecException; |
| import com.google.devtools.build.lib.events.ExtendedEventHandler; |
| import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; |
| import com.google.devtools.build.lib.remote.common.RemoteCacheClient; |
| import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; |
| import com.google.devtools.build.lib.remote.common.RemotePathResolver; |
| import com.google.devtools.build.lib.remote.options.RemoteOptions; |
| import com.google.devtools.build.lib.remote.util.DigestUtil; |
| import com.google.devtools.build.lib.remote.util.RxUtils; |
| import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; |
| import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution; |
| import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code; |
| import com.google.devtools.build.lib.util.io.FileOutErr; |
| import com.google.devtools.build.lib.vfs.Dirent; |
| import com.google.devtools.build.lib.vfs.FileStatus; |
| import com.google.devtools.build.lib.vfs.Path; |
| import com.google.devtools.build.lib.vfs.PathFragment; |
| import com.google.devtools.build.lib.vfs.Symlinks; |
| import com.google.protobuf.ByteString; |
| import io.reactivex.rxjava3.core.Completable; |
| import io.reactivex.rxjava3.core.Flowable; |
| import io.reactivex.rxjava3.core.Single; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| |
| /** UploadManifest adds output metadata to a {@link ActionResult}. */ |
| public class UploadManifest { |
| |
| private final DigestUtil digestUtil; |
| private final RemotePathResolver remotePathResolver; |
| private final ActionResult.Builder result; |
| private final boolean allowSymlinks; |
| private final boolean uploadSymlinks; |
| private final Map<Digest, Path> digestToFile = new HashMap<>(); |
| private final Map<Digest, ByteString> digestToBlobs = new HashMap<>(); |
| @Nullable private ActionKey actionKey; |
| private Digest stderrDigest; |
| private Digest stdoutDigest; |
| |
| public static UploadManifest create( |
| RemoteOptions remoteOptions, |
| DigestUtil digestUtil, |
| RemotePathResolver remotePathResolver, |
| ActionKey actionKey, |
| Action action, |
| Command command, |
| Collection<Path> outputFiles, |
| FileOutErr outErr, |
| int exitCode) |
| throws ExecException, IOException { |
| ActionResult.Builder result = ActionResult.newBuilder(); |
| result.setExitCode(exitCode); |
| |
| UploadManifest manifest = |
| new UploadManifest( |
| digestUtil, |
| remotePathResolver, |
| result, |
| remoteOptions.incompatibleRemoteSymlinks, |
| remoteOptions.allowSymlinkUpload); |
| manifest.addFiles(outputFiles); |
| manifest.setStdoutStderr(outErr); |
| manifest.addAction(actionKey, action, command); |
| if (manifest.getStderrDigest() != null) { |
| result.setStderrDigest(manifest.getStderrDigest()); |
| } |
| if (manifest.getStdoutDigest() != null) { |
| result.setStdoutDigest(manifest.getStdoutDigest()); |
| } |
| |
| return manifest; |
| } |
| |
| /** |
| * Create an UploadManifest from an ActionResult builder and an exec root. The ActionResult |
| * builder is populated through a call to {@link #addFile(Digest, Path)}. |
| */ |
| @VisibleForTesting |
| public UploadManifest( |
| DigestUtil digestUtil, |
| RemotePathResolver remotePathResolver, |
| ActionResult.Builder result, |
| boolean uploadSymlinks, |
| boolean allowSymlinks) { |
| this.digestUtil = digestUtil; |
| this.remotePathResolver = remotePathResolver; |
| this.result = result; |
| this.uploadSymlinks = uploadSymlinks; |
| this.allowSymlinks = allowSymlinks; |
| } |
| |
| private void setStdoutStderr(FileOutErr outErr) throws IOException { |
| if (outErr.getErrorPath().exists()) { |
| stderrDigest = digestUtil.compute(outErr.getErrorPath()); |
| digestToFile.put(stderrDigest, outErr.getErrorPath()); |
| } |
| if (outErr.getOutputPath().exists()) { |
| stdoutDigest = digestUtil.compute(outErr.getOutputPath()); |
| digestToFile.put(stdoutDigest, outErr.getOutputPath()); |
| } |
| } |
| |
| /** |
| * Add a collection of files or directories to the UploadManifest. Adding a directory has the |
| * effect of 1) uploading a {@link Tree} protobuf message from which the whole structure of the |
| * directory, including the descendants, can be reconstructed and 2) uploading all the |
| * non-directory descendant files. |
| */ |
| @VisibleForTesting |
| void addFiles(Collection<Path> files) throws ExecException, IOException { |
| for (Path file : files) { |
| // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and |
| // rely on the local spawn runner to stat the files, instead of statting here. |
| FileStatus stat = file.statIfFound(Symlinks.NOFOLLOW); |
| // TODO(#6547): handle the case where the parent directory of the output file is an |
| // output symlink. |
| if (stat == null) { |
| // We ignore requested results that have not been generated by the action. |
| continue; |
| } |
| if (stat.isDirectory()) { |
| addDirectory(file); |
| } else if (stat.isFile() && !stat.isSpecialFile()) { |
| Digest digest = digestUtil.compute(file, stat.getSize()); |
| addFile(digest, file); |
| } else if (stat.isSymbolicLink() && allowSymlinks) { |
| PathFragment target = file.readSymbolicLink(); |
| // Need to resolve the symbolic link to know what to add, file or directory. |
| FileStatus statFollow = file.statIfFound(Symlinks.FOLLOW); |
| if (statFollow == null) { |
| throw new IOException( |
| String.format("Action output %s is a dangling symbolic link to %s ", file, target)); |
| } |
| if (statFollow.isSpecialFile()) { |
| illegalOutput(file); |
| } |
| Preconditions.checkState( |
| statFollow.isFile() || statFollow.isDirectory(), "Unknown stat type for %s", file); |
| if (uploadSymlinks && !target.isAbsolute()) { |
| if (statFollow.isFile()) { |
| addFileSymbolicLink(file, target); |
| } else { |
| addDirectorySymbolicLink(file, target); |
| } |
| } else { |
| if (statFollow.isFile()) { |
| addFile(digestUtil.compute(file), file); |
| } else { |
| addDirectory(file); |
| } |
| } |
| } else { |
| illegalOutput(file); |
| } |
| } |
| } |
| |
| /** |
| * Adds an action and command protos to upload. They need to be uploaded as part of the action |
| * result. |
| */ |
| private void addAction(RemoteCacheClient.ActionKey actionKey, Action action, Command command) { |
| Preconditions.checkState(this.actionKey == null, "Already added an action"); |
| this.actionKey = actionKey; |
| digestToBlobs.put(actionKey.getDigest(), action.toByteString()); |
| digestToBlobs.put(action.getCommandDigest(), command.toByteString()); |
| } |
| |
| /** Map of digests to file paths to upload. */ |
| public Map<Digest, Path> getDigestToFile() { |
| return digestToFile; |
| } |
| |
| /** |
| * Map of digests to chunkers to upload. When the file is a regular, non-directory file it is |
| * transmitted through {@link #getDigestToFile()}. When it is a directory, it is transmitted as a |
| * {@link Tree} protobuf message through {@link #getDigestToBlobs()}. |
| */ |
| public Map<Digest, ByteString> getDigestToBlobs() { |
| return digestToBlobs; |
| } |
| |
| @Nullable |
| public Digest getStdoutDigest() { |
| return stdoutDigest; |
| } |
| |
| @Nullable |
| public Digest getStderrDigest() { |
| return stderrDigest; |
| } |
| |
| private void addFileSymbolicLink(Path file, PathFragment target) { |
| result |
| .addOutputFileSymlinksBuilder() |
| .setPath(remotePathResolver.localPathToOutputPath(file)) |
| .setTarget(target.toString()); |
| } |
| |
| private void addDirectorySymbolicLink(Path file, PathFragment target) { |
| result |
| .addOutputDirectorySymlinksBuilder() |
| .setPath(remotePathResolver.localPathToOutputPath(file)) |
| .setTarget(target.toString()); |
| } |
| |
| private void addFile(Digest digest, Path file) throws IOException { |
| result |
| .addOutputFilesBuilder() |
| .setPath(remotePathResolver.localPathToOutputPath(file)) |
| .setDigest(digest) |
| // The permission of output file is changed to 0555 after action execution |
| .setIsExecutable(true); |
| |
| digestToFile.put(digest, file); |
| } |
| |
| private void addDirectory(Path dir) throws ExecException, IOException { |
| Tree.Builder tree = Tree.newBuilder(); |
| Directory root = computeDirectory(dir, tree); |
| tree.setRoot(root); |
| |
| ByteString data = tree.build().toByteString(); |
| Digest digest = digestUtil.compute(data.toByteArray()); |
| |
| if (result != null) { |
| result |
| .addOutputDirectoriesBuilder() |
| .setPath(remotePathResolver.localPathToOutputPath(dir)) |
| .setTreeDigest(digest); |
| } |
| |
| digestToBlobs.put(digest, data); |
| } |
| |
| private Directory computeDirectory(Path path, Tree.Builder tree) |
| throws ExecException, IOException { |
| Directory.Builder b = Directory.newBuilder(); |
| |
| List<Dirent> sortedDirent = new ArrayList<>(path.readdir(Symlinks.NOFOLLOW)); |
| sortedDirent.sort(Comparator.comparing(Dirent::getName)); |
| |
| for (Dirent dirent : sortedDirent) { |
| String name = dirent.getName(); |
| Path child = path.getRelative(name); |
| if (dirent.getType() == Dirent.Type.DIRECTORY) { |
| Directory dir = computeDirectory(child, tree); |
| b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir)); |
| tree.addChildren(dir); |
| } else if (dirent.getType() == Dirent.Type.SYMLINK && allowSymlinks) { |
| PathFragment target = child.readSymbolicLink(); |
| if (uploadSymlinks && !target.isAbsolute()) { |
| // Whether it is dangling or not, we're passing it on. |
| b.addSymlinksBuilder().setName(name).setTarget(target.toString()); |
| continue; |
| } |
| // Need to resolve the symbolic link now to know whether to upload a file or a directory. |
| FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW); |
| if (statFollow == null) { |
| throw new IOException( |
| String.format("Action output %s is a dangling symbolic link to %s ", child, target)); |
| } |
| if (statFollow.isFile() && !statFollow.isSpecialFile()) { |
| Digest digest = digestUtil.compute(child); |
| b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable()); |
| digestToFile.put(digest, child); |
| } else if (statFollow.isDirectory()) { |
| Directory dir = computeDirectory(child, tree); |
| b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir)); |
| tree.addChildren(dir); |
| } else { |
| illegalOutput(child); |
| } |
| } else if (dirent.getType() == Dirent.Type.FILE) { |
| Digest digest = digestUtil.compute(child); |
| b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable()); |
| digestToFile.put(digest, child); |
| } else { |
| illegalOutput(child); |
| } |
| } |
| |
| return b.build(); |
| } |
| |
| private void illegalOutput(Path what) throws ExecException { |
| String kind = what.isSymbolicLink() ? "symbolic link" : "special file"; |
| String message = |
| String.format( |
| "Output %s is a %s. Only regular files and directories may be " |
| + "uploaded to a remote cache. " |
| + "Change the file type or use --remote_allow_symlink_upload.", |
| remotePathResolver.localPathToOutputPath(what), kind); |
| |
| FailureDetail failureDetail = |
| FailureDetail.newBuilder() |
| .setMessage(message) |
| .setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT)) |
| .build(); |
| throw new UserExecException(failureDetail); |
| } |
| |
| @VisibleForTesting |
| ActionResult getActionResult() { |
| return result.build(); |
| } |
| |
| /** Uploads outputs and action result (if exit code is 0) to remote cache. */ |
| public ActionResult upload( |
| RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter) |
| throws IOException, InterruptedException, ExecException { |
| try { |
| return uploadAsync(context, remoteCache, reporter).blockingGet(); |
| } catch (RuntimeException e) { |
| Throwable cause = e.getCause(); |
| if (cause != null) { |
| throwIfInstanceOf(cause, InterruptedException.class); |
| throwIfInstanceOf(cause, IOException.class); |
| throwIfInstanceOf(cause, ExecException.class); |
| } |
| throw e; |
| } |
| } |
| |
| private Completable upload( |
| RemoteActionExecutionContext context, RemoteCache remoteCache, Digest digest) { |
| Path file = digestToFile.get(digest); |
| if (file != null) { |
| return toCompletable(() -> remoteCache.uploadFile(context, digest, file), directExecutor()); |
| } |
| |
| ByteString blob = digestToBlobs.get(digest); |
| if (blob == null) { |
| String message = "FindMissingBlobs call returned an unknown digest: " + digest; |
| return Completable.error(new IOException(message)); |
| } |
| |
| return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor()); |
| } |
| |
| private static void reportUploadStarted( |
| ExtendedEventHandler reporter, |
| @Nullable ActionExecutionMetadata action, |
| String prefix, |
| Iterable<Digest> digests) { |
| if (action != null) { |
| for (Digest digest : digests) { |
| reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash())); |
| } |
| } |
| } |
| |
| private static void reportUploadFinished( |
| ExtendedEventHandler reporter, |
| @Nullable ActionExecutionMetadata action, |
| String resourceIdPrefix, |
| Iterable<Digest> digests) { |
| if (action != null) { |
| for (Digest digest : digests) { |
| reporter.post( |
| ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash())); |
| } |
| } |
| } |
| |
| /** |
| * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit |
| * code is 0) to remote cache. |
| */ |
| public Single<ActionResult> uploadAsync( |
| RemoteActionExecutionContext context, |
| RemoteCache remoteCache, |
| ExtendedEventHandler reporter) { |
| Collection<Digest> digests = new ArrayList<>(); |
| digests.addAll(digestToFile.keySet()); |
| digests.addAll(digestToBlobs.keySet()); |
| |
| ActionExecutionMetadata action = context.getSpawnOwner(); |
| |
| String outputPrefix = "cas/"; |
| Flowable<RxUtils.TransferResult> bulkTransfers = |
| toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor()) |
| .doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests)) |
| .doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests)) |
| .doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests)) |
| .doOnSuccess( |
| missingDigests -> { |
| List<Digest> existedDigests = |
| digests.stream() |
| .filter(digest -> !missingDigests.contains(digest)) |
| .collect(Collectors.toList()); |
| reportUploadFinished(reporter, action, outputPrefix, existedDigests); |
| }) |
| .flatMapPublisher(Flowable::fromIterable) |
| .flatMapSingle( |
| digest -> |
| toTransferResult(upload(context, remoteCache, digest)) |
| .doFinally( |
| () -> |
| reportUploadFinished( |
| reporter, action, outputPrefix, ImmutableList.of(digest)))); |
| Completable uploadOutputs = mergeBulkTransfer(bulkTransfers); |
| |
| ActionResult actionResult = result.build(); |
| Completable uploadActionResult = Completable.complete(); |
| if (actionResult.getExitCode() == 0 && actionKey != null) { |
| String actionResultPrefix = "ac/"; |
| uploadActionResult = |
| toCompletable( |
| () -> remoteCache.uploadActionResult(context, actionKey, actionResult), |
| directExecutor()) |
| .doOnSubscribe( |
| d -> |
| reportUploadStarted( |
| reporter, |
| action, |
| actionResultPrefix, |
| ImmutableList.of(actionKey.getDigest()))) |
| .doFinally( |
| () -> |
| reportUploadFinished( |
| reporter, |
| action, |
| actionResultPrefix, |
| ImmutableList.of(actionKey.getDigest()))); |
| } |
| |
| return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult); |
| } |
| } |