Remote: Only waits for background tasks from remote execution.

We added the block waiting behaviour after each command in remote module to wait for background uploads when introducing async upload. However, not all background uploads should be waited, e.g. uploads from BES module but with flag `--bes_upload_mode=fully_async`.

This PR updates remote module so that only uploads initiated by remote module are waited after the command. This also enable us to implement something like `--remote_upload_mode=fully_async` in the future.

Fixes #14620.

Closes #14634.

PiperOrigin-RevId: 424296966
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
index 0dfcedb..cf1f893 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
@@ -142,6 +142,7 @@
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 
@@ -164,6 +165,7 @@
   @Nullable private final Path captureCorruptedOutputsDir;
   private final Cache<Object, MerkleTree> merkleTreeCache;
   private final Set<String> reportedErrors = new HashSet<>();
+  private final Phaser backgroundTaskPhaser = new Phaser(1);
 
   private final Scheduler scheduler;
 
@@ -1162,13 +1164,18 @@
             .subscribe(
                 new SingleObserver<ActionResult>() {
                   @Override
-                  public void onSubscribe(@NonNull Disposable d) {}
+                  public void onSubscribe(@NonNull Disposable d) {
+                    backgroundTaskPhaser.register();
+                  }
 
                   @Override
-                  public void onSuccess(@NonNull ActionResult actionResult) {}
+                  public void onSuccess(@NonNull ActionResult actionResult) {
+                    backgroundTaskPhaser.arriveAndDeregister();
+                  }
 
                   @Override
                   public void onError(@NonNull Throwable e) {
+                    backgroundTaskPhaser.arriveAndDeregister();
                     reportUploadError(e);
                   }
                 });
@@ -1302,7 +1309,7 @@
       remoteCache.release();
 
       try {
-        remoteCache.awaitTermination();
+        backgroundTaskPhaser.awaitAdvanceInterruptibly(backgroundTaskPhaser.arrive());
       } catch (InterruptedException e) {
         buildInterrupted.set(true);
         remoteCache.shutdownNow();