When BES mode is FULLY_ASYNC, let BEP transports finish closing in background.

TESTED=built dev blaze, ran some builds in a loop with
--bes_upload_mode=FULLY_ASYNC. unit testing will require followup.

RELNOTES: None.
PiperOrigin-RevId: 254577111
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 8bcfb37..35af45f 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -33,7 +33,6 @@
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.analysis.test.TestConfiguration.TestOptions;
 import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
-import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
 import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
 import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
@@ -173,12 +172,19 @@
         BuildEventProtocolOptions.class);
   }
 
-  private void cancelPendingUploads() {
+  // Resets the maps tracking the state of closing/half-closing BES transports.
+  private void resetPendingUploads() {
+    closeFuturesWithTimeoutsMap = ImmutableMap.of();
+    halfCloseFuturesWithTimeoutsMap = ImmutableMap.of();
+  }
+
+  // Cancels and interrupts any in-flight threads closing BES transports, then resets the maps
+  // tracking in-flight close operations.
+  private void cancelAndResetPendingUploads() {
     closeFuturesWithTimeoutsMap
         .values()
         .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
-    closeFuturesWithTimeoutsMap = ImmutableMap.of();
-    halfCloseFuturesWithTimeoutsMap = ImmutableMap.of();
+    resetPendingUploads();
   }
 
   private static boolean isTimeoutException(ExecutionException e) {
@@ -198,20 +204,30 @@
                   "The Build Event Protocol encountered a connectivity problem: %s. Cancelling"
                       + " previous background uploads",
                   status)));
-      cancelPendingUploads();
+      cancelAndResetPendingUploads();
       return;
     }
 
+    ImmutableMap<BuildEventTransport, ListenableFuture<Void>> waitingFutureMap = null;
+    boolean cancelCloseFutures = true;
+    switch (besOptions.besUploadMode) {
+      case FULLY_ASYNC:
+        waitingFutureMap = halfCloseFuturesWithTimeoutsMap;
+        cancelCloseFutures = false;
+        break;
+      case WAIT_FOR_UPLOAD_COMPLETE:
+      case NOWAIT_FOR_UPLOAD_COMPLETE:
+        waitingFutureMap = closeFuturesWithTimeoutsMap;
+        cancelCloseFutures = true;
+        break;
+    }
+
     Stopwatch stopwatch = Stopwatch.createStarted();
     try {
       // TODO(b/234994611): It would be better to report before we wait, but the current
       //  infrastructure does not support that. At least we can report it afterwards.
-      ImmutableMap<BuildEventTransport, ListenableFuture<Void>> futureMap =
-          besOptions.besUploadMode == BesUploadMode.FULLY_ASYNC
-              ? halfCloseFuturesWithTimeoutsMap
-              : closeFuturesWithTimeoutsMap;
       Uninterruptibles.getUninterruptibly(
-          Futures.allAsList(futureMap.values()),
+          Futures.allAsList(waitingFutureMap.values()),
           getMaxWaitForPreviousInvocation().getSeconds(),
           TimeUnit.SECONDS);
       long waitedMillis = stopwatch.elapsed().toMillis();
@@ -233,6 +249,7 @@
               waitedMillis / 1000, waitedMillis % 1000);
       reporter.handle(Event.warn(msg));
       googleLogger.atWarning().withCause(exception).log(msg);
+      cancelCloseFutures = true;
     } catch (ExecutionException e) {
       String msg;
       // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
@@ -252,8 +269,13 @@
       }
       reporter.handle(Event.warn(msg));
       googleLogger.atWarning().withCause(e).log(msg);
+      cancelCloseFutures = true;
     } finally {
-      cancelPendingUploads();
+      if (cancelCloseFutures) {
+        cancelAndResetPendingUploads();
+      } else {
+        resetPendingUploads();
+      }
     }
   }
 
@@ -375,7 +397,7 @@
       googleLogger.atSevere().withCause(e).log("Failed to close a build event transport");
       LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e);
     } finally {
-      cancelPendingUploads();
+      cancelAndResetPendingUploads();
     }
   }
 
@@ -402,7 +424,7 @@
       googleLogger.atWarning().withCause(exception).log(
           "Encountered Exception when closing BEP transports in Blaze's shutting down sequence");
     } finally {
-      cancelPendingUploads();
+      cancelAndResetPendingUploads();
     }
   }
 
@@ -463,7 +485,7 @@
               "Unexpected Exception '%s' when closing BEP transports, this is a bug.",
               e.getCause().getMessage()));
     } finally {
-      cancelPendingUploads();
+      cancelAndResetPendingUploads();
       if (waitMessageFuture != null) {
         waitMessageFuture.cancel(/* mayInterruptIfRunning= */ true);
       }