blob: b75c897187211de21067a8499ae4b0e2972d1168 [file] [log] [blame]
chiwang0b773602021-09-01 01:36:41 -07001// Copyright 2021 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.remote;
15
chiwang862fd5e2021-09-05 20:39:35 -070016import static com.google.common.base.Throwables.throwIfInstanceOf;
17import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
18import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
19import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
20import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
21import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
chiwang0b773602021-09-01 01:36:41 -070022
23import build.bazel.remote.execution.v2.Action;
24import build.bazel.remote.execution.v2.ActionResult;
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -070025import build.bazel.remote.execution.v2.CacheCapabilities;
chiwang0b773602021-09-01 01:36:41 -070026import build.bazel.remote.execution.v2.Command;
27import build.bazel.remote.execution.v2.Digest;
28import build.bazel.remote.execution.v2.Directory;
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -070029import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy;
chiwang0b773602021-09-01 01:36:41 -070030import build.bazel.remote.execution.v2.Tree;
31import com.google.common.annotations.VisibleForTesting;
32import com.google.common.base.Preconditions;
Chi Wang003e2d02021-09-21 22:52:08 -070033import com.google.common.collect.ImmutableList;
34import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
35import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
36import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
chiwang0b773602021-09-01 01:36:41 -070037import com.google.devtools.build.lib.actions.ExecException;
38import com.google.devtools.build.lib.actions.UserExecException;
Chi Wang003e2d02021-09-21 22:52:08 -070039import com.google.devtools.build.lib.events.ExtendedEventHandler;
chiwang0b773602021-09-01 01:36:41 -070040import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
41import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
42import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
43import com.google.devtools.build.lib.remote.common.RemotePathResolver;
44import com.google.devtools.build.lib.remote.options.RemoteOptions;
45import com.google.devtools.build.lib.remote.util.DigestUtil;
Chi Wang003e2d02021-09-21 22:52:08 -070046import com.google.devtools.build.lib.remote.util.RxUtils;
chiwang0b773602021-09-01 01:36:41 -070047import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
48import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
49import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
50import com.google.devtools.build.lib.util.io.FileOutErr;
51import com.google.devtools.build.lib.vfs.Dirent;
52import com.google.devtools.build.lib.vfs.FileStatus;
53import com.google.devtools.build.lib.vfs.Path;
54import com.google.devtools.build.lib.vfs.PathFragment;
55import com.google.devtools.build.lib.vfs.Symlinks;
56import com.google.protobuf.ByteString;
Tiago Quelhas93029d82022-04-20 03:47:04 -070057import com.google.protobuf.Timestamp;
chiwang862fd5e2021-09-05 20:39:35 -070058import io.reactivex.rxjava3.core.Completable;
59import io.reactivex.rxjava3.core.Flowable;
60import io.reactivex.rxjava3.core.Single;
chiwang0b773602021-09-01 01:36:41 -070061import java.io.IOException;
Tiago Quelhas93029d82022-04-20 03:47:04 -070062import java.time.Duration;
63import java.time.Instant;
chiwang0b773602021-09-01 01:36:41 -070064import java.util.ArrayList;
65import java.util.Collection;
66import java.util.Comparator;
67import java.util.HashMap;
68import java.util.List;
69import java.util.Map;
Tiago Quelhas93029d82022-04-20 03:47:04 -070070import java.util.Optional;
Chi Wang003e2d02021-09-21 22:52:08 -070071import java.util.stream.Collectors;
chiwang0b773602021-09-01 01:36:41 -070072import javax.annotation.Nullable;
73
chiwang862fd5e2021-09-05 20:39:35 -070074/** UploadManifest adds output metadata to a {@link ActionResult}. */
chiwang0b773602021-09-01 01:36:41 -070075public class UploadManifest {
76
77 private final DigestUtil digestUtil;
78 private final RemotePathResolver remotePathResolver;
79 private final ActionResult.Builder result;
Googler4f557e82022-09-22 04:03:36 -070080 private final boolean followSymlinks;
Tiago Quelhas5b46a482022-09-29 08:45:49 -070081 private final boolean allowDanglingSymlinks;
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -070082 private final boolean allowAbsoluteSymlinks;
chiwang0b773602021-09-01 01:36:41 -070083 private final Map<Digest, Path> digestToFile = new HashMap<>();
84 private final Map<Digest, ByteString> digestToBlobs = new HashMap<>();
85 @Nullable private ActionKey actionKey;
86 private Digest stderrDigest;
87 private Digest stdoutDigest;
88
89 public static UploadManifest create(
90 RemoteOptions remoteOptions,
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -070091 CacheCapabilities cacheCapabilities,
chiwang0b773602021-09-01 01:36:41 -070092 DigestUtil digestUtil,
93 RemotePathResolver remotePathResolver,
94 ActionKey actionKey,
95 Action action,
96 Command command,
97 Collection<Path> outputFiles,
98 FileOutErr outErr,
Tiago Quelhas93029d82022-04-20 03:47:04 -070099 int exitCode,
100 Optional<Instant> startTime,
101 Optional<Duration> wallTime)
chiwang0b773602021-09-01 01:36:41 -0700102 throws ExecException, IOException {
103 ActionResult.Builder result = ActionResult.newBuilder();
104 result.setExitCode(exitCode);
105
106 UploadManifest manifest =
107 new UploadManifest(
108 digestUtil,
109 remotePathResolver,
110 result,
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700111 /* followSymlinks= */ !remoteOptions.incompatibleRemoteSymlinks,
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -0700112 /* allowDanglingSymlinks= */ remoteOptions.incompatibleRemoteDanglingSymlinks,
113 /* allowAbsoluteSymlinks= */ cacheCapabilities
114 .getSymlinkAbsolutePathStrategy()
115 .equals(SymlinkAbsolutePathStrategy.Value.ALLOWED));
chiwang0b773602021-09-01 01:36:41 -0700116 manifest.addFiles(outputFiles);
117 manifest.setStdoutStderr(outErr);
118 manifest.addAction(actionKey, action, command);
119 if (manifest.getStderrDigest() != null) {
120 result.setStderrDigest(manifest.getStderrDigest());
121 }
122 if (manifest.getStdoutDigest() != null) {
123 result.setStdoutDigest(manifest.getStdoutDigest());
124 }
125
Tiago Quelhas93029d82022-04-20 03:47:04 -0700126 if (startTime.isPresent() && wallTime.isPresent()) {
127 result
128 .getExecutionMetadataBuilder()
129 .setWorkerStartTimestamp(instantToTimestamp(startTime.get()))
130 .setWorkerCompletedTimestamp(instantToTimestamp(startTime.get().plus(wallTime.get())));
131 }
132
chiwang0b773602021-09-01 01:36:41 -0700133 return manifest;
134 }
135
Tiago Quelhas93029d82022-04-20 03:47:04 -0700136 private static Timestamp instantToTimestamp(Instant instant) {
137 return Timestamp.newBuilder()
138 .setSeconds(instant.getEpochSecond())
139 .setNanos(instant.getNano())
140 .build();
141 }
142
chiwang0b773602021-09-01 01:36:41 -0700143 /**
144 * Create an UploadManifest from an ActionResult builder and an exec root. The ActionResult
145 * builder is populated through a call to {@link #addFile(Digest, Path)}.
146 */
147 @VisibleForTesting
148 public UploadManifest(
149 DigestUtil digestUtil,
150 RemotePathResolver remotePathResolver,
151 ActionResult.Builder result,
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700152 boolean followSymlinks,
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -0700153 boolean allowDanglingSymlinks,
154 boolean allowAbsoluteSymlinks) {
chiwang0b773602021-09-01 01:36:41 -0700155 this.digestUtil = digestUtil;
156 this.remotePathResolver = remotePathResolver;
157 this.result = result;
Googler4f557e82022-09-22 04:03:36 -0700158 this.followSymlinks = followSymlinks;
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700159 this.allowDanglingSymlinks = allowDanglingSymlinks;
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -0700160 this.allowAbsoluteSymlinks = allowAbsoluteSymlinks;
chiwang0b773602021-09-01 01:36:41 -0700161 }
162
163 private void setStdoutStderr(FileOutErr outErr) throws IOException {
164 if (outErr.getErrorPath().exists()) {
165 stderrDigest = digestUtil.compute(outErr.getErrorPath());
166 digestToFile.put(stderrDigest, outErr.getErrorPath());
167 }
168 if (outErr.getOutputPath().exists()) {
169 stdoutDigest = digestUtil.compute(outErr.getOutputPath());
170 digestToFile.put(stdoutDigest, outErr.getOutputPath());
171 }
172 }
173
174 /**
175 * Add a collection of files or directories to the UploadManifest. Adding a directory has the
176 * effect of 1) uploading a {@link Tree} protobuf message from which the whole structure of the
177 * directory, including the descendants, can be reconstructed and 2) uploading all the
178 * non-directory descendant files.
179 */
180 @VisibleForTesting
chiwang862fd5e2021-09-05 20:39:35 -0700181 void addFiles(Collection<Path> files) throws ExecException, IOException {
chiwang0b773602021-09-01 01:36:41 -0700182 for (Path file : files) {
183 // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
184 // rely on the local spawn runner to stat the files, instead of statting here.
185 FileStatus stat = file.statIfFound(Symlinks.NOFOLLOW);
186 // TODO(#6547): handle the case where the parent directory of the output file is an
187 // output symlink.
188 if (stat == null) {
189 // We ignore requested results that have not been generated by the action.
190 continue;
191 }
192 if (stat.isDirectory()) {
193 addDirectory(file);
194 } else if (stat.isFile() && !stat.isSpecialFile()) {
195 Digest digest = digestUtil.compute(file, stat.getSize());
196 addFile(digest, file);
Googler4f557e82022-09-22 04:03:36 -0700197 } else if (stat.isSymbolicLink()) {
chiwang0b773602021-09-01 01:36:41 -0700198 PathFragment target = file.readSymbolicLink();
199 // Need to resolve the symbolic link to know what to add, file or directory.
200 FileStatus statFollow = file.statIfFound(Symlinks.FOLLOW);
201 if (statFollow == null) {
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700202 if (allowDanglingSymlinks) {
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -0700203 if (target.isAbsolute() && !allowAbsoluteSymlinks) {
204 throw new IOException(
205 String.format(
206 "Action output %s is an absolute symbolic link to %s, which is not allowed by"
207 + " the remote cache",
208 file, target));
209 }
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700210 // Report symlink to a file since we don't know any better.
chiwang0b773602021-09-01 01:36:41 -0700211 addFileSymbolicLink(file, target);
212 } else {
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700213 throw new IOException(
Tiago Quelhasf71bbcf2022-09-29 13:45:20 -0700214 String.format(
215 "Action output %s is a dangling symbolic link to %s. ", file, target));
chiwang0b773602021-09-01 01:36:41 -0700216 }
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700217 } else if (statFollow.isSpecialFile()) {
218 illegalOutput(file);
chiwang0b773602021-09-01 01:36:41 -0700219 } else {
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700220 Preconditions.checkState(
221 statFollow.isFile() || statFollow.isDirectory(), "Unknown stat type for %s", file);
222 if (!followSymlinks && !target.isAbsolute()) {
223 if (statFollow.isFile()) {
224 addFileSymbolicLink(file, target);
225 } else {
226 addDirectorySymbolicLink(file, target);
227 }
chiwang0b773602021-09-01 01:36:41 -0700228 } else {
Tiago Quelhas5b46a482022-09-29 08:45:49 -0700229 if (statFollow.isFile()) {
230 addFile(digestUtil.compute(file), file);
231 } else {
232 addDirectory(file);
233 }
chiwang0b773602021-09-01 01:36:41 -0700234 }
235 }
236 } else {
237 illegalOutput(file);
238 }
239 }
240 }
241
242 /**
243 * Adds an action and command protos to upload. They need to be uploaded as part of the action
244 * result.
245 */
246 private void addAction(RemoteCacheClient.ActionKey actionKey, Action action, Command command) {
247 Preconditions.checkState(this.actionKey == null, "Already added an action");
248 this.actionKey = actionKey;
249 digestToBlobs.put(actionKey.getDigest(), action.toByteString());
250 digestToBlobs.put(action.getCommandDigest(), command.toByteString());
251 }
252
chiwang862fd5e2021-09-05 20:39:35 -0700253 /** Map of digests to file paths to upload. */
chiwang0b773602021-09-01 01:36:41 -0700254 public Map<Digest, Path> getDigestToFile() {
255 return digestToFile;
256 }
257
258 /**
259 * Map of digests to chunkers to upload. When the file is a regular, non-directory file it is
260 * transmitted through {@link #getDigestToFile()}. When it is a directory, it is transmitted as a
261 * {@link Tree} protobuf message through {@link #getDigestToBlobs()}.
262 */
263 public Map<Digest, ByteString> getDigestToBlobs() {
264 return digestToBlobs;
265 }
266
267 @Nullable
268 public Digest getStdoutDigest() {
269 return stdoutDigest;
270 }
271
272 @Nullable
273 public Digest getStderrDigest() {
274 return stderrDigest;
275 }
276
277 private void addFileSymbolicLink(Path file, PathFragment target) {
278 result
279 .addOutputFileSymlinksBuilder()
280 .setPath(remotePathResolver.localPathToOutputPath(file))
281 .setTarget(target.toString());
282 }
283
284 private void addDirectorySymbolicLink(Path file, PathFragment target) {
285 result
286 .addOutputDirectorySymlinksBuilder()
287 .setPath(remotePathResolver.localPathToOutputPath(file))
288 .setTarget(target.toString());
289 }
290
291 private void addFile(Digest digest, Path file) throws IOException {
292 result
293 .addOutputFilesBuilder()
294 .setPath(remotePathResolver.localPathToOutputPath(file))
295 .setDigest(digest)
Chi Wang11066c72021-09-17 00:30:45 -0700296 // The permission of output file is changed to 0555 after action execution
297 .setIsExecutable(true);
chiwang0b773602021-09-01 01:36:41 -0700298
299 digestToFile.put(digest, file);
300 }
301
302 private void addDirectory(Path dir) throws ExecException, IOException {
303 Tree.Builder tree = Tree.newBuilder();
304 Directory root = computeDirectory(dir, tree);
305 tree.setRoot(root);
306
307 ByteString data = tree.build().toByteString();
308 Digest digest = digestUtil.compute(data.toByteArray());
309
310 if (result != null) {
311 result
312 .addOutputDirectoriesBuilder()
313 .setPath(remotePathResolver.localPathToOutputPath(dir))
314 .setTreeDigest(digest);
315 }
316
317 digestToBlobs.put(digest, data);
318 }
319
320 private Directory computeDirectory(Path path, Tree.Builder tree)
321 throws ExecException, IOException {
322 Directory.Builder b = Directory.newBuilder();
323
324 List<Dirent> sortedDirent = new ArrayList<>(path.readdir(Symlinks.NOFOLLOW));
325 sortedDirent.sort(Comparator.comparing(Dirent::getName));
326
327 for (Dirent dirent : sortedDirent) {
328 String name = dirent.getName();
329 Path child = path.getRelative(name);
330 if (dirent.getType() == Dirent.Type.DIRECTORY) {
331 Directory dir = computeDirectory(child, tree);
332 b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir));
333 tree.addChildren(dir);
Googler4f557e82022-09-22 04:03:36 -0700334 } else if (dirent.getType() == Dirent.Type.SYMLINK) {
chiwang0b773602021-09-01 01:36:41 -0700335 PathFragment target = child.readSymbolicLink();
Googler4f557e82022-09-22 04:03:36 -0700336 if (!followSymlinks && !target.isAbsolute()) {
chiwang0b773602021-09-01 01:36:41 -0700337 // Whether it is dangling or not, we're passing it on.
338 b.addSymlinksBuilder().setName(name).setTarget(target.toString());
339 continue;
340 }
341 // Need to resolve the symbolic link now to know whether to upload a file or a directory.
342 FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW);
343 if (statFollow == null) {
344 throw new IOException(
chiwang862fd5e2021-09-05 20:39:35 -0700345 String.format("Action output %s is a dangling symbolic link to %s ", child, target));
chiwang0b773602021-09-01 01:36:41 -0700346 }
347 if (statFollow.isFile() && !statFollow.isSpecialFile()) {
348 Digest digest = digestUtil.compute(child);
chiwang862fd5e2021-09-05 20:39:35 -0700349 b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable());
chiwang0b773602021-09-01 01:36:41 -0700350 digestToFile.put(digest, child);
351 } else if (statFollow.isDirectory()) {
352 Directory dir = computeDirectory(child, tree);
353 b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir));
354 tree.addChildren(dir);
355 } else {
356 illegalOutput(child);
357 }
358 } else if (dirent.getType() == Dirent.Type.FILE) {
359 Digest digest = digestUtil.compute(child);
360 b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable());
361 digestToFile.put(digest, child);
362 } else {
363 illegalOutput(child);
364 }
365 }
366
367 return b.build();
368 }
369
Googler4f557e82022-09-22 04:03:36 -0700370 private void illegalOutput(Path path) throws ExecException {
chiwang0b773602021-09-01 01:36:41 -0700371 String message =
372 String.format(
Googler4f557e82022-09-22 04:03:36 -0700373 "Output %s is a special file. Only regular files, directories or symlinks may be "
374 + "uploaded to a remote cache.",
375 remotePathResolver.localPathToOutputPath(path));
chiwang0b773602021-09-01 01:36:41 -0700376
chiwang862fd5e2021-09-05 20:39:35 -0700377 FailureDetail failureDetail =
378 FailureDetail.newBuilder()
379 .setMessage(message)
380 .setRemoteExecution(RemoteExecution.newBuilder().setCode(Code.ILLEGAL_OUTPUT))
381 .build();
chiwang0b773602021-09-01 01:36:41 -0700382 throw new UserExecException(failureDetail);
383 }
384
chiwang4a12a2c2021-09-02 21:15:54 -0700385 @VisibleForTesting
386 ActionResult getActionResult() {
387 return result.build();
388 }
389
chiwang0b773602021-09-01 01:36:41 -0700390 /** Uploads outputs and action result (if exit code is 0) to remote cache. */
Chi Wang003e2d02021-09-21 22:52:08 -0700391 public ActionResult upload(
392 RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
Chi Wang5b545882022-03-14 06:34:32 -0700393 throws IOException, InterruptedException, ExecException {
chiwang862fd5e2021-09-05 20:39:35 -0700394 try {
Chi Wang003e2d02021-09-21 22:52:08 -0700395 return uploadAsync(context, remoteCache, reporter).blockingGet();
chiwang862fd5e2021-09-05 20:39:35 -0700396 } catch (RuntimeException e) {
Chi Wangad663a72021-12-14 06:02:04 -0800397 Throwable cause = e.getCause();
398 if (cause != null) {
399 throwIfInstanceOf(cause, InterruptedException.class);
400 throwIfInstanceOf(cause, IOException.class);
Chi Wang5b545882022-03-14 06:34:32 -0700401 throwIfInstanceOf(cause, ExecException.class);
Chi Wangad663a72021-12-14 06:02:04 -0800402 }
chiwang862fd5e2021-09-05 20:39:35 -0700403 throw e;
404 }
405 }
406
407 private Completable upload(
408 RemoteActionExecutionContext context, RemoteCache remoteCache, Digest digest) {
409 Path file = digestToFile.get(digest);
410 if (file != null) {
411 return toCompletable(() -> remoteCache.uploadFile(context, digest, file), directExecutor());
412 }
413
414 ByteString blob = digestToBlobs.get(digest);
415 if (blob == null) {
416 String message = "FindMissingBlobs call returned an unknown digest: " + digest;
417 return Completable.error(new IOException(message));
418 }
419
420 return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
421 }
422
Chi Wang003e2d02021-09-21 22:52:08 -0700423 private static void reportUploadStarted(
424 ExtendedEventHandler reporter,
425 @Nullable ActionExecutionMetadata action,
426 String prefix,
427 Iterable<Digest> digests) {
428 if (action != null) {
429 for (Digest digest : digests) {
430 reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
431 }
432 }
433 }
434
435 private static void reportUploadFinished(
436 ExtendedEventHandler reporter,
437 @Nullable ActionExecutionMetadata action,
438 String resourceIdPrefix,
439 Iterable<Digest> digests) {
440 if (action != null) {
441 for (Digest digest : digests) {
442 reporter.post(
443 ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
444 }
445 }
446 }
447
chiwang862fd5e2021-09-05 20:39:35 -0700448 /**
449 * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
450 * code is 0) to remote cache.
451 */
452 public Single<ActionResult> uploadAsync(
Chi Wang003e2d02021-09-21 22:52:08 -0700453 RemoteActionExecutionContext context,
454 RemoteCache remoteCache,
455 ExtendedEventHandler reporter) {
chiwang0b773602021-09-01 01:36:41 -0700456 Collection<Digest> digests = new ArrayList<>();
457 digests.addAll(digestToFile.keySet());
458 digests.addAll(digestToBlobs.keySet());
459
Chi Wang003e2d02021-09-21 22:52:08 -0700460 ActionExecutionMetadata action = context.getSpawnOwner();
461
462 String outputPrefix = "cas/";
463 Flowable<RxUtils.TransferResult> bulkTransfers =
464 toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
465 .doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
466 .doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
467 .doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
468 .doOnSuccess(
469 missingDigests -> {
470 List<Digest> existedDigests =
471 digests.stream()
472 .filter(digest -> !missingDigests.contains(digest))
473 .collect(Collectors.toList());
474 reportUploadFinished(reporter, action, outputPrefix, existedDigests);
475 })
476 .flatMapPublisher(Flowable::fromIterable)
477 .flatMapSingle(
478 digest ->
479 toTransferResult(upload(context, remoteCache, digest))
480 .doFinally(
481 () ->
482 reportUploadFinished(
483 reporter, action, outputPrefix, ImmutableList.of(digest))));
484 Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);
chiwang0b773602021-09-01 01:36:41 -0700485
486 ActionResult actionResult = result.build();
chiwang862fd5e2021-09-05 20:39:35 -0700487 Completable uploadActionResult = Completable.complete();
chiwang0b773602021-09-01 01:36:41 -0700488 if (actionResult.getExitCode() == 0 && actionKey != null) {
Chi Wang003e2d02021-09-21 22:52:08 -0700489 String actionResultPrefix = "ac/";
chiwang862fd5e2021-09-05 20:39:35 -0700490 uploadActionResult =
491 toCompletable(
Chi Wang003e2d02021-09-21 22:52:08 -0700492 () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
493 directExecutor())
494 .doOnSubscribe(
495 d ->
496 reportUploadStarted(
497 reporter,
498 action,
499 actionResultPrefix,
500 ImmutableList.of(actionKey.getDigest())))
501 .doFinally(
502 () ->
503 reportUploadFinished(
504 reporter,
505 action,
506 actionResultPrefix,
507 ImmutableList.of(actionKey.getDigest())));
chiwang0b773602021-09-01 01:36:41 -0700508 }
509
chiwang862fd5e2021-09-05 20:39:35 -0700510 return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);
chiwang0b773602021-09-01 01:36:41 -0700511 }
512}