Remote: Fixes a confusion that background upload counter could increase after build finished.

At the end of a build, the number of files waiting to be uploaded could increase as other ones finished. This PR fixes that.

Also, changes to only emit profile block `upload outputs` for blocking uploads.

Fixes https://github.com/bazelbuild/bazel/pull/13655#issuecomment-914418852.

Closes #13954.

PiperOrigin-RevId: 398161750
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index af68ddf..f9801df 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -30,11 +30,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
-import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
-import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
 import com.google.devtools.build.lib.concurrent.ThreadSafety;
-import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.exec.SpawnProgressEvent;
 import com.google.devtools.build.lib.remote.common.BulkTransferException;
 import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
@@ -66,7 +62,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import javax.annotation.Nullable;
 
 /**
  * A cache for storing artifacts (input and output) as well as the output of running an action.
@@ -85,7 +80,6 @@
   private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
   private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);
 
-  private final ExtendedEventHandler reporter;
   private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
   protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();
 
@@ -94,11 +88,9 @@
   protected final DigestUtil digestUtil;
 
   public RemoteCache(
-      ExtendedEventHandler reporter,
       RemoteCacheClient cacheProtocol,
       RemoteOptions options,
       DigestUtil digestUtil) {
-    this.reporter = reporter;
     this.cacheProtocol = cacheProtocol;
     this.options = options;
     this.digestUtil = digestUtil;
@@ -110,23 +102,6 @@
     return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
   }
 
-  private void postUploadStartedEvent(@Nullable ActionExecutionMetadata action, String resourceId) {
-    if (action == null) {
-      return;
-    }
-
-    reporter.post(ActionUploadStartedEvent.create(action, resourceId));
-  }
-
-  private void postUploadFinishedEvent(
-      @Nullable ActionExecutionMetadata action, String resourceId) {
-    if (action == null) {
-      return;
-    }
-
-    reporter.post(ActionUploadFinishedEvent.create(action, resourceId));
-  }
-
   /**
    * Returns a set of digests that the remote cache does not know about. The returned set is
    * guaranteed to be a subset of {@code digests}.
@@ -143,38 +118,14 @@
   public ListenableFuture<Void> uploadActionResult(
       RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
 
-    ActionExecutionMetadata action = context.getSpawnOwner();
-
     Completable upload =
-        Completable.using(
-            () -> {
-              String resourceId = "ac/" + actionKey.getDigest().getHash();
-              postUploadStartedEvent(action, resourceId);
-              return resourceId;
-            },
-            resourceId ->
-                RxFutures.toCompletable(
-                    () -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
-                    directExecutor()),
-            resourceId -> postUploadFinishedEvent(action, resourceId));
+        RxFutures.toCompletable(
+            () -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
+            directExecutor());
 
     return RxFutures.toListenableFuture(upload);
   }
 
-  private Completable doUploadFile(RemoteActionExecutionContext context, Digest digest, Path file) {
-    ActionExecutionMetadata action = context.getSpawnOwner();
-    return Completable.using(
-        () -> {
-          String resourceId = "cas/" + digest.getHash();
-          postUploadStartedEvent(action, resourceId);
-          return resourceId;
-        },
-        resourceId ->
-            RxFutures.toCompletable(
-                () -> cacheProtocol.uploadFile(context, digest, file), directExecutor()),
-        resourceId -> postUploadFinishedEvent(action, resourceId));
-  }
-
   /**
    * Upload a local file to the remote cache.
    *
@@ -191,26 +142,15 @@
       return COMPLETED_SUCCESS;
     }
 
-    Completable upload = casUploadCache.executeIfNot(digest, doUploadFile(context, digest, file));
+    Completable upload =
+        casUploadCache.executeIfNot(
+            digest,
+            RxFutures.toCompletable(
+                () -> cacheProtocol.uploadFile(context, digest, file), directExecutor()));
 
     return RxFutures.toListenableFuture(upload);
   }
 
-  private Completable doUploadBlob(
-      RemoteActionExecutionContext context, Digest digest, ByteString data) {
-    ActionExecutionMetadata action = context.getSpawnOwner();
-    return Completable.using(
-        () -> {
-          String resourceId = "cas/" + digest.getHash();
-          postUploadStartedEvent(action, resourceId);
-          return resourceId;
-        },
-        resourceId ->
-            RxFutures.toCompletable(
-                () -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()),
-        resourceId -> postUploadFinishedEvent(action, resourceId));
-  }
-
   /**
    * Upload sequence of bytes to the remote cache.
    *
@@ -227,7 +167,11 @@
       return COMPLETED_SUCCESS;
     }
 
-    Completable upload = casUploadCache.executeIfNot(digest, doUploadBlob(context, digest, data));
+    Completable upload =
+        casUploadCache.executeIfNot(
+            digest,
+            RxFutures.toCompletable(
+                () -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()));
 
     return RxFutures.toListenableFuture(upload);
   }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
index 86ec811..3b59afb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -22,7 +22,6 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
@@ -43,11 +42,10 @@
 public class RemoteExecutionCache extends RemoteCache {
 
   public RemoteExecutionCache(
-      ExtendedEventHandler reporter,
       RemoteCacheClient protocolImpl,
       RemoteOptions options,
       DigestUtil digestUtil) {
-    super(reporter, protocolImpl, options, digestUtil);
+    super(protocolImpl, options, digestUtil);
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
index 1a4b123..7722aff 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
@@ -84,6 +84,7 @@
 import com.google.devtools.build.lib.events.Reporter;
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.profiler.Profiler;
+import com.google.devtools.build.lib.profiler.ProfilerTask;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
 import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata;
 import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
@@ -1070,7 +1071,8 @@
         Single.using(
                 remoteCache::retain,
                 remoteCache ->
-                    manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
+                    manifest.uploadAsync(
+                        action.getRemoteActionExecutionContext(), remoteCache, reporter),
                 RemoteCache::release)
             .subscribeOn(scheduler)
             .subscribe(
@@ -1087,7 +1089,10 @@
                   }
                 });
       } else {
-        manifest.upload(action.getRemoteActionExecutionContext(), remoteCache);
+        try (SilentCloseable c =
+            Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
+          manifest.upload(action.getRemoteActionExecutionContext(), remoteCache, reporter);
+        }
       }
     } catch (IOException e) {
       reportUploadError(e);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index d2508bd..6ad97fb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -234,8 +234,7 @@
       handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
       return;
     }
-    RemoteCache remoteCache =
-        new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
+    RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
     actionContextProvider =
         RemoteActionContextProvider.createForRemoteCaching(
             executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
@@ -573,7 +572,7 @@
       }
       execChannel.release();
       RemoteExecutionCache remoteCache =
-          new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
+          new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil);
       actionContextProvider =
           RemoteActionContextProvider.createForRemoteExecution(
               executorService,
@@ -609,8 +608,7 @@
         }
       }
 
-      RemoteCache remoteCache =
-          new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
+      RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
       actionContextProvider =
           RemoteActionContextProvider.createForRemoteCaching(
               executorService, env, remoteCache, retryScheduler, digestUtil);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index d2219dc..3b8344c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -198,9 +198,7 @@
             }
           }
 
-          try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
-            remoteExecutionService.uploadOutputs(action, result);
-          }
+          remoteExecutionService.uploadOutputs(action, result);
         }
 
         @Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 1d2a08f..c8e6fec 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -575,9 +575,7 @@
       }
     }
 
-    try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) {
-      remoteExecutionService.uploadOutputs(action, result);
-    }
+    remoteExecutionService.uploadOutputs(action, result);
     return result;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
index d2e7ba7..5dbbb07 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -28,14 +28,20 @@
 import build.bazel.remote.execution.v2.Tree;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
+import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
 import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.UserExecException;
+import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.common.RemotePathResolver;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.RxUtils;
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
 import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
 import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
@@ -56,6 +62,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 /** UploadManifest adds output metadata to a {@link ActionResult}. */
@@ -341,10 +348,11 @@
   }
 
   /** Uploads outputs and action result (if exit code is 0) to remote cache. */
-  public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
+  public ActionResult upload(
+      RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
       throws IOException, InterruptedException {
     try {
-      return uploadAsync(context, remoteCache).blockingGet();
+      return uploadAsync(context, remoteCache, reporter).blockingGet();
     } catch (RuntimeException e) {
       throwIfInstanceOf(e.getCause(), InterruptedException.class);
       throwIfInstanceOf(e.getCause(), IOException.class);
@@ -368,29 +376,91 @@
     return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
   }
 
+  private static void reportUploadStarted(
+      ExtendedEventHandler reporter,
+      @Nullable ActionExecutionMetadata action,
+      String prefix,
+      Iterable<Digest> digests) {
+    if (action != null) {
+      for (Digest digest : digests) {
+        reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
+      }
+    }
+  }
+
+  private static void reportUploadFinished(
+      ExtendedEventHandler reporter,
+      @Nullable ActionExecutionMetadata action,
+      String resourceIdPrefix,
+      Iterable<Digest> digests) {
+    if (action != null) {
+      for (Digest digest : digests) {
+        reporter.post(
+            ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
+      }
+    }
+  }
+
   /**
    * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
    * code is 0) to remote cache.
    */
   public Single<ActionResult> uploadAsync(
-      RemoteActionExecutionContext context, RemoteCache remoteCache) {
+      RemoteActionExecutionContext context,
+      RemoteCache remoteCache,
+      ExtendedEventHandler reporter) {
     Collection<Digest> digests = new ArrayList<>();
     digests.addAll(digestToFile.keySet());
     digests.addAll(digestToBlobs.keySet());
 
-    Completable uploadOutputs =
-        mergeBulkTransfer(
-            toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
-                .flatMapPublisher(Flowable::fromIterable)
-                .flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
+    ActionExecutionMetadata action = context.getSpawnOwner();
+
+    String outputPrefix = "cas/";
+    Flowable<RxUtils.TransferResult> bulkTransfers =
+        toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
+            .doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
+            .doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
+            .doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
+            .doOnSuccess(
+                missingDigests -> {
+                  List<Digest> existedDigests =
+                      digests.stream()
+                          .filter(digest -> !missingDigests.contains(digest))
+                          .collect(Collectors.toList());
+                  reportUploadFinished(reporter, action, outputPrefix, existedDigests);
+                })
+            .flatMapPublisher(Flowable::fromIterable)
+            .flatMapSingle(
+                digest ->
+                    toTransferResult(upload(context, remoteCache, digest))
+                        .doFinally(
+                            () ->
+                                reportUploadFinished(
+                                    reporter, action, outputPrefix, ImmutableList.of(digest))));
+    Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);
 
     ActionResult actionResult = result.build();
     Completable uploadActionResult = Completable.complete();
     if (actionResult.getExitCode() == 0 && actionKey != null) {
+      String actionResultPrefix = "ac/";
       uploadActionResult =
           toCompletable(
-              () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
-              directExecutor());
+                  () -> remoteCache.uploadActionResult(context, actionKey, actionResult),
+                  directExecutor())
+              .doOnSubscribe(
+                  d ->
+                      reportUploadStarted(
+                          reporter,
+                          action,
+                          actionResultPrefix,
+                          ImmutableList.of(actionKey.getDigest())))
+              .doFinally(
+                  () ->
+                      reportUploadFinished(
+                          reporter,
+                          action,
+                          actionResultPrefix,
+                          ImmutableList.of(actionKey.getDigest())));
     }
 
     return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index c4212d0..a409604 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -414,7 +414,7 @@
                     invocationOnMock.getArgument(0), invocationOnMock.getArgument(1)))
         .when(cacheClient)
         .findMissingDigests(any(), any());
-    RemoteCache remoteCache = new RemoteCache(reporter, cacheClient, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, DIGEST_UTIL);
 
     return new ByteStreamBuildEventArtifactUploader(
         MoreExecutors.directExecutor(),
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index 1ec656f..cbb22d7 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -51,7 +51,6 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -61,7 +60,7 @@
 import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
 import com.google.devtools.build.lib.authandtls.GoogleAuthUtils;
 import com.google.devtools.build.lib.clock.JavaClock;
-import com.google.devtools.build.lib.events.Reporter;
+import com.google.devtools.build.lib.events.NullEventHandler;
 import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
 import com.google.devtools.build.lib.remote.Retrier.Backoff;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
@@ -132,7 +131,6 @@
   private Path execRoot;
   private FileOutErr outErr;
   private FakeActionInputFileCache fakeFileCache;
-  private final Reporter reporter = new Reporter(new EventBus());
   private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
   private final String fakeServerName = "fake server for " + getClass();
   private Server fakeServer;
@@ -271,7 +269,7 @@
   public void testVirtualActionInputSupport() throws Exception {
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     RemoteExecutionCache client =
-        new RemoteExecutionCache(reporter, newClient(options), options, DIGEST_UTIL);
+        new RemoteExecutionCache(newClient(options), options, DIGEST_UTIL);
     PathFragment execPath = PathFragment.create("my/exec/path");
     VirtualActionInput virtualActionInput =
         ActionsTestUtil.createVirtualActionInput(execPath, "hello");
@@ -381,7 +379,7 @@
     // arrange
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents");
     Digest barDigest = DIGEST_UTIL.computeAsUtf8("bar-contents");
@@ -404,7 +402,7 @@
   public void testUploadDirectory() throws Exception {
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
@@ -468,7 +466,7 @@
   public void testUploadDirectoryEmpty() throws Exception {
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     final Digest barDigest =
         fakeFileCache.createScratchInputDirectory(
@@ -507,7 +505,7 @@
   public void testUploadDirectoryNested() throws Exception {
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     final Digest wobbleDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz");
@@ -583,7 +581,7 @@
             outputs,
             outErr,
             0);
-    return uploadManifest.upload(context, remoteCache);
+    return uploadManifest.upload(context, remoteCache, NullEventHandler.INSTANCE);
   }
 
   private ActionResult uploadDirectory(RemoteCache remoteCache, List<Path> outputs)
@@ -658,7 +656,7 @@
     serviceRegistry.addService(ServerInterceptors.intercept(actionCache, interceptor));
 
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
     remoteCache.downloadActionResult(
         context,
         DIGEST_UTIL.asActionKey(DIGEST_UTIL.computeAsUtf8("key")),
@@ -669,7 +667,7 @@
   public void testUpload() throws Exception {
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
@@ -745,7 +743,7 @@
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     remoteOptions.maxOutboundMessageSize = 80; // Enough for one digest, but not two.
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
@@ -810,7 +808,7 @@
   public void testUploadCacheMissesWithRetries() throws Exception {
     RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
     GrpcCacheClient client = newClient(remoteOptions);
-    RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL);
+    RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL);
 
     final Digest fooDigest =
         fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
diff --git a/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java b/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java
index 9dcaf36..2c33218 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java
@@ -16,7 +16,6 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import build.bazel.remote.execution.v2.Digest;
-import com.google.devtools.build.lib.events.Reporter;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -30,15 +29,14 @@
 class InMemoryRemoteCache extends RemoteExecutionCache {
 
   InMemoryRemoteCache(
-      Reporter reporter,
       Map<Digest, byte[]> casEntries,
       RemoteOptions options,
       DigestUtil digestUtil) {
-    super(reporter, new InMemoryCacheClient(casEntries), options, digestUtil);
+    super(new InMemoryCacheClient(casEntries), options, digestUtil);
   }
 
-  InMemoryRemoteCache(Reporter reporter, RemoteOptions options, DigestUtil digestUtil) {
-    super(reporter, new InMemoryCacheClient(), options, digestUtil);
+  InMemoryRemoteCache(RemoteOptions options, DigestUtil digestUtil) {
+    super(new InMemoryCacheClient(), options, digestUtil);
   }
 
   Digest addContents(RemoteActionExecutionContext context, String txt)
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
index 1dc2494..9223028 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
@@ -23,7 +23,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
 import com.google.common.hash.HashCode;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -37,7 +36,6 @@
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
 import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
 import com.google.devtools.build.lib.clock.JavaClock;
-import com.google.devtools.build.lib.events.Reporter;
 import com.google.devtools.build.lib.remote.common.BulkTransferException;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -67,7 +65,6 @@
 
   private static final DigestHashFunction HASH_FUNCTION = DigestHashFunction.SHA256;
 
-  private final Reporter reporter = new Reporter(new EventBus());
   private Path execRoot;
   private ArtifactRoot artifactRoot;
   private RemoteOptions options;
@@ -395,7 +392,6 @@
     for (Map.Entry<Digest, ByteString> entry : cacheEntries.entrySet()) {
       cacheEntriesByteArray.put(entry.getKey(), entry.getValue().toByteArray());
     }
-    return new RemoteCache(
-        reporter, new InMemoryCacheClient(cacheEntriesByteArray), options, digestUtil);
+    return new RemoteCache(new InMemoryCacheClient(cacheEntriesByteArray), options, digestUtil);
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java
index cec16dc..2983c85 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java
@@ -16,19 +16,15 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 
-import build.bazel.remote.execution.v2.Action;
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.Digest;
 import build.bazel.remote.execution.v2.RequestMetadata;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.actions.ActionInputHelper;
-import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
-import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
 import com.google.devtools.build.lib.actions.ArtifactRoot;
 import com.google.devtools.build.lib.actions.ArtifactRoot.RootType;
 import com.google.devtools.build.lib.actions.ResourceSet;
@@ -37,11 +33,8 @@
 import com.google.devtools.build.lib.clock.JavaClock;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
 import com.google.devtools.build.lib.collect.nestedset.Order;
-import com.google.devtools.build.lib.events.Reporter;
-import com.google.devtools.build.lib.events.StoredEventHandler;
 import com.google.devtools.build.lib.exec.util.FakeOwner;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
-import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -71,8 +64,6 @@
 @RunWith(JUnit4.class)
 public class RemoteCacheTest {
 
-  private final Reporter reporter = new Reporter(new EventBus());
-  private final StoredEventHandler eventHandler = new StoredEventHandler();
   private RemoteActionExecutionContext context;
   private FileSystem fs;
   private Path execRoot;
@@ -84,8 +75,6 @@
 
   @Before
   public void setUp() throws Exception {
-    reporter.addHandler(eventHandler);
-
     MockitoAnnotations.initMocks(this);
     RequestMetadata metadata =
         TracingMetadataUtils.buildMetadata("none", "none", "action-id", null);
@@ -170,7 +159,7 @@
     Path file = fs.getPath("/execroot/symlink-to-file");
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
     options.remoteDownloadSymlinkTemplate = "/home/alice/cas/{hash}-{size_bytes}";
-    RemoteCache remoteCache = new InMemoryRemoteCache(reporter, cas, options, digestUtil);
+    RemoteCache remoteCache = new InMemoryRemoteCache(cas, options, digestUtil);
 
     // act
     getFromFuture(remoteCache.downloadFile(context, file, helloDigest));
@@ -199,53 +188,8 @@
         .containsExactly(emptyDigest);
   }
 
-  @Test
-  public void uploadActionResult_firesUploadEvents() throws Exception {
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    ActionKey actionKey = new ActionKey(digestUtil.compute(Action.getDefaultInstance()));
-    ActionResult actionResult = ActionResult.getDefaultInstance();
-
-    getFromFuture(remoteCache.uploadActionResult(context, actionKey, actionResult));
-
-    String resourceId = "ac/" + actionKey.getDigest().getHash();
-    assertThat(eventHandler.getPosts())
-        .containsExactly(
-            ActionUploadStartedEvent.create(context.getSpawn().getResourceOwner(), resourceId),
-            ActionUploadFinishedEvent.create(context.getSpawn().getResourceOwner(), resourceId));
-  }
-
-  @Test
-  public void uploadBlob_firesUploadEvents() throws Exception {
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    ByteString content = ByteString.copyFromUtf8("content");
-    Digest digest = digestUtil.compute(content.toByteArray());
-
-    getFromFuture(remoteCache.uploadBlob(context, digest, content));
-
-    String resourceId = "cas/" + digest.getHash();
-    assertThat(eventHandler.getPosts())
-        .containsExactly(
-            ActionUploadStartedEvent.create(context.getSpawn().getResourceOwner(), resourceId),
-            ActionUploadFinishedEvent.create(context.getSpawn().getResourceOwner(), resourceId));
-  }
-
-  @Test
-  public void uploadFile_firesUploadEvents() throws Exception {
-    InMemoryRemoteCache remoteCache = newRemoteCache();
-    Digest digest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "content");
-    Path file = execRoot.getRelative("file");
-
-    getFromFuture(remoteCache.uploadFile(context, digest, file));
-
-    String resourceId = "cas/" + digest.getHash();
-    assertThat(eventHandler.getPosts())
-        .containsExactly(
-            ActionUploadStartedEvent.create(context.getSpawn().getResourceOwner(), resourceId),
-            ActionUploadFinishedEvent.create(context.getSpawn().getResourceOwner(), resourceId));
-  }
-
   private InMemoryRemoteCache newRemoteCache() {
     RemoteOptions options = Options.getDefaults(RemoteOptions.class);
-    return new InMemoryRemoteCache(reporter, options, digestUtil);
+    return new InMemoryRemoteCache(options, digestUtil);
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java
index 89eb7e7..03d8fc7 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java
@@ -52,6 +52,8 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.devtools.build.lib.actions.ActionInput;
 import com.google.devtools.build.lib.actions.ActionInputHelper;
+import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
+import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
 import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
@@ -154,7 +156,7 @@
     checkNotNull(stderr.getParentDirectory()).createDirectoryAndParents();
     outErr = new FileOutErr(stdout, stderr);
 
-    cache = spy(new InMemoryRemoteCache(reporter, remoteOptions, digestUtil));
+    cache = spy(new InMemoryRemoteCache(remoteOptions, digestUtil));
     executor = mock(RemoteExecutionClient.class);
 
     RequestMetadata metadata =
@@ -1315,6 +1317,34 @@
   }
 
   @Test
+  public void uploadOutputs_firesUploadEvents() throws Exception {
+    Digest digest =
+        fakeFileCache.createScratchInput(ActionInputHelper.fromPath("outputs/file"), "content");
+    Path file = execRoot.getRelative("outputs/file");
+    Artifact outputFile = ActionsTestUtil.createArtifact(artifactRoot, file);
+    RemoteExecutionService service = newRemoteExecutionService();
+    Spawn spawn = newSpawn(ImmutableMap.of(), ImmutableSet.of(outputFile));
+    FakeSpawnExecutionContext context = newSpawnExecutionContext(spawn);
+    RemoteAction action = service.buildRemoteAction(spawn, context);
+    SpawnResult spawnResult =
+        new SpawnResult.Builder()
+            .setExitCode(0)
+            .setStatus(SpawnResult.Status.SUCCESS)
+            .setRunnerName("test")
+            .build();
+
+    service.uploadOutputs(action, spawnResult);
+
+    assertThat(eventHandler.getPosts())
+        .containsAtLeast(
+            ActionUploadStartedEvent.create(spawn.getResourceOwner(), "cas/" + digest.getHash()),
+            ActionUploadFinishedEvent.create(spawn.getResourceOwner(), "cas/" + digest.getHash()),
+            ActionUploadStartedEvent.create(spawn.getResourceOwner(), "ac/" + action.getActionId()),
+            ActionUploadFinishedEvent.create(
+                spawn.getResourceOwner(), "ac/" + action.getActionId()));
+  }
+
+  @Test
   public void uploadInputsIfNotPresent_deduplicateFindMissingBlobCalls() throws Exception {
     int taskCount = 100;
     ExecutorService executorService = Executors.newFixedThreadPool(taskCount);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
index 5f5bfef..5529421 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
@@ -299,7 +299,7 @@
             DIGEST_UTIL,
             uploader);
     RemoteExecutionCache remoteCache =
-        new RemoteExecutionCache(reporter, cacheProtocol, remoteOptions, DIGEST_UTIL);
+        new RemoteExecutionCache(cacheProtocol, remoteOptions, DIGEST_UTIL);
     RemoteExecutionService remoteExecutionService =
         new RemoteExecutionService(
             directExecutor(),
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
index 6d01174..f034870 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
@@ -35,6 +35,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.actions.ExecException;
+import com.google.devtools.build.lib.events.NullEventHandler;
 import com.google.devtools.build.lib.remote.ExecutionStatusException;
 import com.google.devtools.build.lib.remote.UploadManifest;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
@@ -356,7 +357,7 @@
                 outputs,
                 outErr,
                 exitCode);
-        result = manifest.upload(context, cache);
+        result = manifest.upload(context, cache, NullEventHandler.INSTANCE);
       } catch (ExecException e) {
         if (errStatus == null) {
           errStatus =
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
index fc81d43..cbf1586 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
@@ -19,8 +19,6 @@
 import build.bazel.remote.execution.v2.Directory;
 import build.bazel.remote.execution.v2.DirectoryNode;
 import build.bazel.remote.execution.v2.FileNode;
-import com.google.devtools.build.lib.events.Event;
-import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.remote.RemoteCache;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.disk.DiskCacheClient;
@@ -34,17 +32,6 @@
 
   public OnDiskBlobStoreCache(RemoteOptions options, Path cacheDir, DigestUtil digestUtil) {
     super(
-        new ExtendedEventHandler() {
-          @Override
-          public void post(Postable obj) {
-            // do nothing
-          }
-
-          @Override
-          public void handle(Event event) {
-            // do nothing
-          }
-        },
         new DiskCacheClient(cacheDir, /* verifyDownloads= */ true, digestUtil),
         options,
         digestUtil);