Remote: Fixes a confusion that background upload counter could increase after build finished.

At the end of a build, the number of files waiting to be uploaded could increase as other ones finished. This PR fixes that.

Also, changes to only emit profile block `upload outputs` for blocking uploads.

Fixes https://github.com/bazelbuild/bazel/pull/13655#issuecomment-914418852.

Closes #13954.

PiperOrigin-RevId: 398161750
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
index d2e7ba7..5dbbb07 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -28,14 +28,20 @@
 import build.bazel.remote.execution.v2.Tree;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
+import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
 import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.UserExecException;
+import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.common.RemotePathResolver;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.RxUtils;
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
 import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
 import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
@@ -56,6 +62,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 /** UploadManifest adds output metadata to a {@link ActionResult}. */
@@ -341,10 +348,11 @@
   }
 
   /** Uploads outputs and action result (if exit code is 0) to remote cache. */
-  public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
+  public ActionResult upload(
+      RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
       throws IOException, InterruptedException {
     try {
-      return uploadAsync(context, remoteCache).blockingGet();
+      return uploadAsync(context, remoteCache, reporter).blockingGet();
     } catch (RuntimeException e) {
       throwIfInstanceOf(e.getCause(), InterruptedException.class);
       throwIfInstanceOf(e.getCause(), IOException.class);
@@ -368,29 +376,91 @@
     return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
   }
 
+  private static void reportUploadStarted(
+      ExtendedEventHandler reporter,
+      @Nullable ActionExecutionMetadata action,
+      String prefix,
+      Iterable<Digest> digests) {
+    if (action != null) {
+      for (Digest digest : digests) {
+        reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
+      }
+    }
+  }
+
+  private static void reportUploadFinished(
+      ExtendedEventHandler reporter,
+      @Nullable ActionExecutionMetadata action,
+      String resourceIdPrefix,
+      Iterable<Digest> digests) {
+    if (action != null) {
+      for (Digest digest : digests) {
+        reporter.post(
+            ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
+      }
+    }
+  }
+
   /**
    * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
    * code is 0) to remote cache.
    */
   public Single<ActionResult> uploadAsync(
-      RemoteActionExecutionContext context, RemoteCache remoteCache) {
+      RemoteActionExecutionContext context,
+      RemoteCache remoteCache,
+      ExtendedEventHandler reporter) {
     Collection<Digest> digests = new ArrayList<>();
     digests.addAll(digestToFile.keySet());
     digests.addAll(digestToBlobs.keySet());
 
-    Completable uploadOutputs =
-        mergeBulkTransfer(
-            toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
-                .flatMapPublisher(Flowable::fromIterable)
-                .flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
+    ActionExecutionMetadata action = context.getSpawnOwner();
+
+    String outputPrefix = "cas/";
+    Flowable<RxUtils.TransferResult> bulkTransfers =
+        toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
+            .doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
+            .doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
+            .doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
+            .doOnSuccess(
+                missingDigests -> {
+                  List<Digest> existedDigests =
+                      digests.stream()
+                          .filter(digest -> !missingDigests.contains(digest))
+                          .collect(Collectors.toList());
+                  reportUploadFinished(reporter, action, outputPrefix, existedDigests);
+                })
+            .flatMapPublisher(Flowable::fromIterable)
+            .flatMapSingle(
+                digest ->
+                    toTransferResult(upload(context, remoteCache, digest))
+                        .doFinally(
+                            () ->
+                                reportUploadFinished(
+                                    reporter, action, outputPrefix, ImmutableList.of(digest))));
+    Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);
 
     ActionResult actionResult = result.build();
     Completable uploadActionResult = Completable.complete();
     if (actionResult.getExitCode() == 0 && actionKey != null) {
+      String actionResultPrefix = "ac/";
       uploadActionResult =
           toCompletable(
-              () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
-              directExecutor());
+                  () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
+                  directExecutor())
+              .doOnSubscribe(
+                  d ->
+                      reportUploadStarted(
+                          reporter,
+                          action,
+                          actionResultPrefix,
+                          ImmutableList.of(actionKey.getDigest())))
+              .doFinally(
+                  () ->
+                      reportUploadFinished(
+                          reporter,
+                          action,
+                          actionResultPrefix,
+                          ImmutableList.of(actionKey.getDigest())));
     }
 
     return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);