Remote: Fixes a confusion that background upload counter could increase after build finished.
At the end of a build, the number of files waiting to be uploaded could increase as other ones finished. This PR fixes that.
Also, changes to only emit profile block `upload outputs` for blocking uploads.
Fixes https://github.com/bazelbuild/bazel/pull/13655#issuecomment-914418852.
Closes #13954.
PiperOrigin-RevId: 398161750
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index af68ddf..f9801df 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -30,11 +30,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
-import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
-import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
-import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
@@ -66,7 +62,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
/**
* A cache for storing artifacts (input and output) as well as the output of running an action.
@@ -85,7 +80,6 @@
private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);
- private final ExtendedEventHandler reporter;
private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();
@@ -94,11 +88,9 @@
protected final DigestUtil digestUtil;
public RemoteCache(
- ExtendedEventHandler reporter,
RemoteCacheClient cacheProtocol,
RemoteOptions options,
DigestUtil digestUtil) {
- this.reporter = reporter;
this.cacheProtocol = cacheProtocol;
this.options = options;
this.digestUtil = digestUtil;
@@ -110,23 +102,6 @@
return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
}
- private void postUploadStartedEvent(@Nullable ActionExecutionMetadata action, String resourceId) {
- if (action == null) {
- return;
- }
-
- reporter.post(ActionUploadStartedEvent.create(action, resourceId));
- }
-
- private void postUploadFinishedEvent(
- @Nullable ActionExecutionMetadata action, String resourceId) {
- if (action == null) {
- return;
- }
-
- reporter.post(ActionUploadFinishedEvent.create(action, resourceId));
- }
-
/**
* Returns a set of digests that the remote cache does not know about. The returned set is
* guaranteed to be a subset of {@code digests}.
@@ -143,38 +118,14 @@
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
- ActionExecutionMetadata action = context.getSpawnOwner();
-
Completable upload =
- Completable.using(
- () -> {
- String resourceId = "ac/" + actionKey.getDigest().getHash();
- postUploadStartedEvent(action, resourceId);
- return resourceId;
- },
- resourceId ->
- RxFutures.toCompletable(
- () -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
- directExecutor()),
- resourceId -> postUploadFinishedEvent(action, resourceId));
+ RxFutures.toCompletable(
+ () -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
+ directExecutor());
return RxFutures.toListenableFuture(upload);
}
- private Completable doUploadFile(RemoteActionExecutionContext context, Digest digest, Path file) {
- ActionExecutionMetadata action = context.getSpawnOwner();
- return Completable.using(
- () -> {
- String resourceId = "cas/" + digest.getHash();
- postUploadStartedEvent(action, resourceId);
- return resourceId;
- },
- resourceId ->
- RxFutures.toCompletable(
- () -> cacheProtocol.uploadFile(context, digest, file), directExecutor()),
- resourceId -> postUploadFinishedEvent(action, resourceId));
- }
-
/**
* Upload a local file to the remote cache.
*
@@ -191,26 +142,15 @@
return COMPLETED_SUCCESS;
}
- Completable upload = casUploadCache.executeIfNot(digest, doUploadFile(context, digest, file));
+ Completable upload =
+ casUploadCache.executeIfNot(
+ digest,
+ RxFutures.toCompletable(
+ () -> cacheProtocol.uploadFile(context, digest, file), directExecutor()));
return RxFutures.toListenableFuture(upload);
}
- private Completable doUploadBlob(
- RemoteActionExecutionContext context, Digest digest, ByteString data) {
- ActionExecutionMetadata action = context.getSpawnOwner();
- return Completable.using(
- () -> {
- String resourceId = "cas/" + digest.getHash();
- postUploadStartedEvent(action, resourceId);
- return resourceId;
- },
- resourceId ->
- RxFutures.toCompletable(
- () -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()),
- resourceId -> postUploadFinishedEvent(action, resourceId));
- }
-
/**
* Upload sequence of bytes to the remote cache.
*
@@ -227,7 +167,11 @@
return COMPLETED_SUCCESS;
}
- Completable upload = casUploadCache.executeIfNot(digest, doUploadBlob(context, digest, data));
+ Completable upload =
+ casUploadCache.executeIfNot(
+ digest,
+ RxFutures.toCompletable(
+ () -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()));
return RxFutures.toListenableFuture(upload);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
index 86ec811..3b59afb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -22,7 +22,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
@@ -43,11 +42,10 @@
public class RemoteExecutionCache extends RemoteCache {
public RemoteExecutionCache(
- ExtendedEventHandler reporter,
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
- super(reporter, protocolImpl, options, digestUtil);
+ super(protocolImpl, options, digestUtil);
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
index 1a4b123..7722aff 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
@@ -84,6 +84,7 @@
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
+import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
@@ -1070,7 +1071,8 @@
Single.using(
remoteCache::retain,
remoteCache ->
- manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
+ manifest.uploadAsync(
+ action.getRemoteActionExecutionContext(), remoteCache, reporter),
RemoteCache::release)
.subscribeOn(scheduler)
.subscribe(
@@ -1087,7 +1089,10 @@
}
});
} else {
- manifest.upload(action.getRemoteActionExecutionContext(), remoteCache);
+ try (SilentCloseable c =
+ Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
+ manifest.upload(action.getRemoteActionExecutionContext(), remoteCache, reporter);
+ }
}
} catch (IOException e) {
reportUploadError(e);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index d2508bd..6ad97fb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -234,8 +234,7 @@
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
- RemoteCache remoteCache =
- new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
+ RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
@@ -573,7 +572,7 @@
}
execChannel.release();
RemoteExecutionCache remoteCache =
- new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
+ new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteExecution(
executorService,
@@ -609,8 +608,7 @@
}
}
- RemoteCache remoteCache =
- new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
+ RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, retryScheduler, digestUtil);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index d2219dc..3b8344c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -198,9 +198,7 @@
}
}
- try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
- remoteExecutionService.uploadOutputs(action, result);
- }
+ remoteExecutionService.uploadOutputs(action, result);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 1d2a08f..c8e6fec 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -575,9 +575,7 @@
}
}
- try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) {
- remoteExecutionService.uploadOutputs(action, result);
- }
+ remoteExecutionService.uploadOutputs(action, result);
return result;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
index d2e7ba7..5dbbb07 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -28,14 +28,20 @@
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
+import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
+import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.RxUtils;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
@@ -56,6 +62,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
/** UploadManifest adds output metadata to a {@link ActionResult}. */
@@ -341,10 +348,11 @@
}
/** Uploads outputs and action result (if exit code is 0) to remote cache. */
- public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
+ public ActionResult upload(
+ RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
throws IOException, InterruptedException {
try {
- return uploadAsync(context, remoteCache).blockingGet();
+ return uploadAsync(context, remoteCache, reporter).blockingGet();
} catch (RuntimeException e) {
throwIfInstanceOf(e.getCause(), InterruptedException.class);
throwIfInstanceOf(e.getCause(), IOException.class);
@@ -368,29 +376,91 @@
return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
}
+ private static void reportUploadStarted(
+ ExtendedEventHandler reporter,
+ @Nullable ActionExecutionMetadata action,
+ String prefix,
+ Iterable<Digest> digests) {
+ if (action != null) {
+ for (Digest digest : digests) {
+ reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
+ }
+ }
+ }
+
+ private static void reportUploadFinished(
+ ExtendedEventHandler reporter,
+ @Nullable ActionExecutionMetadata action,
+ String resourceIdPrefix,
+ Iterable<Digest> digests) {
+ if (action != null) {
+ for (Digest digest : digests) {
+ reporter.post(
+ ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
+ }
+ }
+ }
+
/**
* Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
* code is 0) to remote cache.
*/
public Single<ActionResult> uploadAsync(
- RemoteActionExecutionContext context, RemoteCache remoteCache) {
+ RemoteActionExecutionContext context,
+ RemoteCache remoteCache,
+ ExtendedEventHandler reporter) {
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());
- Completable uploadOutputs =
- mergeBulkTransfer(
- toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
- .flatMapPublisher(Flowable::fromIterable)
- .flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
+ ActionExecutionMetadata action = context.getSpawnOwner();
+
+ String outputPrefix = "cas/";
+ Flowable<RxUtils.TransferResult> bulkTransfers =
+ toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
+ .doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
+ .doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
+ .doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
+ .doOnSuccess(
+ missingDigests -> {
+ List<Digest> existedDigests =
+ digests.stream()
+ .filter(digest -> !missingDigests.contains(digest))
+ .collect(Collectors.toList());
+ reportUploadFinished(reporter, action, outputPrefix, existedDigests);
+ })
+ .flatMapPublisher(Flowable::fromIterable)
+ .flatMapSingle(
+ digest ->
+ toTransferResult(upload(context, remoteCache, digest))
+ .doFinally(
+ () ->
+ reportUploadFinished(
+ reporter, action, outputPrefix, ImmutableList.of(digest))));
+ Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);
ActionResult actionResult = result.build();
Completable uploadActionResult = Completable.complete();
if (actionResult.getExitCode() == 0 && actionKey != null) {
+ String actionResultPrefix = "ac/";
uploadActionResult =
toCompletable(
- () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
- directExecutor());
+ () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
+ directExecutor())
+ .doOnSubscribe(
+ d ->
+ reportUploadStarted(
+ reporter,
+ action,
+ actionResultPrefix,
+ ImmutableList.of(actionKey.getDigest())))
+ .doFinally(
+ () ->
+ reportUploadFinished(
+ reporter,
+ action,
+ actionResultPrefix,
+ ImmutableList.of(actionKey.getDigest())));
}
return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);