Implement asynchronous Build Event Service (BES) upload.

We introduce a flag to control the flow of invocations w.r.t. BES upload. This new flag is called --bes_upload_mode and it currently has two possible values:
- WAIT_FOR_UPLOAD_COMPLETE (default, present in before this commit): In this mode Bazel will wait for the communication with the BES to finish.
- NOWAIT_FOR_UPLOAD_COMPLETE (new mode): In this mode Bazel *won't* wait for the BES upload to finish, instead it will finish the invocation as soon as it can. The upload continues in the background and if a new invocation starts Bazel will wait five seconds for the previous upload to finish before (abruptly) cancelling it.

PiperOrigin-RevId: 243591454
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
index 4049fa5..e037768 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
@@ -39,6 +39,7 @@
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/common/options",
         "//third_party:auto_value",
+        "//third_party:flogger",
         "//third_party:guava",
         "//third_party:jsr305",
         "//third_party/grpc:grpc-jar",
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index b0529d2..671f032 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.GoogleLogger;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -69,6 +70,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -82,6 +84,9 @@
     extends BlazeModule {
 
   private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());
+  private static final GoogleLogger googleLogger = GoogleLogger.forEnclosingClass();
+  // TODO(lpino): Consider making the wait value configurable.
+  private static final Duration MAX_WAIT_FOR_PREVIOUS_INVOCATION = Duration.ofSeconds(5);
 
   private final AtomicReference<AbruptExitException> pendingAbruptExitException =
       new AtomicReference<>();
@@ -90,14 +95,17 @@
   private AuthAndTLSOptions authTlsOptions;
   private BuildEventStreamOptions besStreamOptions;
   private boolean useExperimentalUi;
+  /** Holds the close futures for the upload of each transport */
+  private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesMap =
+      ImmutableMap.of();
 
   // TODO(lpino): Use Optional instead of @Nullable for the members below.
-  private @Nullable OutErr outErr;
-  private @Nullable ImmutableSet<BuildEventTransport> bepTransports;
-  private @Nullable String buildRequestId;
-  private @Nullable String invocationId;
-  private @Nullable Reporter cmdLineReporter;
-  private @Nullable BuildEventStreamer streamer;
+  @Nullable private OutErr outErr;
+  @Nullable private ImmutableSet<BuildEventTransport> bepTransports;
+  @Nullable private String buildRequestId;
+  @Nullable private String invocationId;
+  @Nullable private Reporter cmdLineReporter;
+  @Nullable private BuildEventStreamer streamer;
 
   protected BESOptionsT besOptions;
 
@@ -128,7 +136,7 @@
     // Don't hide unchecked exceptions as part of the error reporting.
     Throwables.throwIfUnchecked(exception);
 
-    logger.log(Level.SEVERE, msg, exception);
+    googleLogger.atSevere().withCause(exception).log(msg);
     AbruptExitException abruptException = new AbruptExitException(msg, exitCode, exception);
     reportCommandLineError(commandLineReporter, exception);
     moduleEnvironment.exit(abruptException);
@@ -143,15 +151,65 @@
         BuildEventProtocolOptions.class);
   }
 
+  private void cancelPendingUploads() {
+    closeFuturesMap
+        .values()
+        .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
+    closeFuturesMap = ImmutableMap.of();
+  }
+
+  private void waitForPreviousInvocation() {
+    AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
+    if (pendingException != null) {
+      cmdLineReporter.handle(
+          Event.warn(
+              String.format(
+                  "Previous invocation failed to finish Build Event Protocol upload with "
+                      + "the following exception: '%s'. "
+                      + "Ignoring the failure and starting a new invocation...",
+                  pendingException.getMessage())));
+      cancelPendingUploads();
+      return;
+    }
+
+    if (closeFuturesMap.isEmpty()) {
+      return;
+    }
+
+    try {
+      // TODO(b/234994611): Find a way to print a meaningful message when waiting. The current
+      // infrastructure doesn't allow printing messages in the terminal in beforeCommand.
+      Uninterruptibles.getUninterruptibly(
+          Futures.allAsList(closeFuturesMap.values()),
+          MAX_WAIT_FOR_PREVIOUS_INVOCATION.getSeconds(),
+          TimeUnit.SECONDS);
+    } catch (TimeoutException exception) {
+      String msg =
+          String.format(
+              "Pending Build Event Protocol upload took more than %ds to finish. "
+                  + "Cancelling and starting a new invocation...",
+              MAX_WAIT_FOR_PREVIOUS_INVOCATION.getSeconds());
+      cmdLineReporter.handle(Event.warn(msg));
+      googleLogger.atWarning().withCause(exception).log(msg);
+    } catch (ExecutionException exception) {
+      String msg =
+          String.format(
+              "Previous Build Event Protocol upload failed with "
+                  + "the following exception: '%s'. "
+                  + "Ignoring the failure and starting a new invocation...",
+              exception.getMessage());
+      cmdLineReporter.handle(Event.warn(msg));
+      googleLogger.atWarning().withCause(exception).log(msg);
+    } finally {
+      cancelPendingUploads();
+    }
+  }
+
   @Override
   public void beforeCommand(CommandEnvironment cmdEnv) {
     this.invocationId = cmdEnv.getCommandId().toString();
     this.buildRequestId = cmdEnv.getBuildRequestId();
     this.cmdLineReporter = cmdEnv.getReporter();
-    // Reset to null in case afterCommand was not called.
-    // TODO(lpino): Remove this statement once {@link BlazeModule#afterCommmand()} is guaranteed
-    // to be executed for every invocation.
-    this.outErr = null;
 
     OptionsParsingResult parsingResult = cmdEnv.getOptions();
     this.besOptions = Preconditions.checkNotNull(parsingResult.getOptions(optionsClass()));
@@ -175,6 +233,8 @@
                     .select(bepOptions.buildEventUploadStrategy)
                     .create(cmdEnv));
 
+    waitForPreviousInvocation();
+
     if (!whitelistedCommands(besOptions).contains(cmdEnv.getCommandName())) {
       // Exit early if the running command isn't supported.
       return;
@@ -230,17 +290,52 @@
     return outErr;
   }
 
+  private void forceShutdownBuildEventStreamer() {
+    streamer.close(AbortReason.INTERNAL);
+
+    try {
+      // TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
+      //  passed. We should fix this by waiting at most the value set by bes_timeout.
+      Uninterruptibles.getUninterruptibly(
+          Futures.allAsList(streamer.getCloseFuturesMap().values()));
+    } catch (ExecutionException e) {
+      googleLogger.atSevere().withCause(e).log("Failed to close a build event transport");
+      LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e);
+    } finally {
+      cancelPendingUploads();
+    }
+  }
+
   @Override
   public void blazeShutdownOnCrash() {
     if (streamer != null) {
-      logger.warning("Attempting to close BES streamer on crash");
-      streamer.close(AbortReason.INTERNAL);
+      googleLogger.atWarning().log("Attempting to close BES streamer on crash");
+      forceShutdownBuildEventStreamer();
+    }
+  }
 
-      try {
-        waitForBuildEventTransportsToClose(streamer.getCloseFuturesMap());
-      } catch (AbruptExitException e) {
-        LoggingUtil.logToRemote(Level.WARNING, "Failure while waiting for BES close", e);
-      }
+  @Override
+  public void blazeShutdown() {
+    AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
+    if (pendingException != null) {
+      cancelPendingUploads();
+      return;
+    }
+
+    if (closeFuturesMap.isEmpty()) {
+      return;
+    }
+
+    try {
+      Uninterruptibles.getUninterruptibly(
+          Futures.allAsList(closeFuturesMap.values()),
+          MAX_WAIT_FOR_PREVIOUS_INVOCATION.getSeconds(),
+          TimeUnit.SECONDS);
+    } catch (TimeoutException | ExecutionException exception) {
+      googleLogger.atWarning().withCause(exception).log(
+          "Encountered Exception when closing BEP transports in Blaze's shutting down sequence");
+    } finally {
+      cancelPendingUploads();
     }
   }
 
@@ -254,9 +349,7 @@
                 + "s."));
   }
 
-  private void waitForBuildEventTransportsToClose(
-      ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesMap)
-      throws AbruptExitException {
+  private void waitForBuildEventTransportsToClose() {
     final ScheduledExecutorService executor =
         Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("bes-notify-ui-%d").build());
@@ -283,16 +376,22 @@
                 TimeUnit.SECONDS);
       }
 
-      // Wait synchronously for all the futures to finish.
       try (AutoProfiler p = AutoProfiler.logged("waiting for BES close", logger)) {
+        // TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
+        //  passed. We should fix this by waiting at most the value set by bes_timeout.
         Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
       }
-    } catch (ExecutionException exception) {
-      throw new AbruptExitException(
-          "Failed to close a build event transport",
-          ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-          exception);
+    } catch (ExecutionException e) {
+      this.pendingAbruptExitException.compareAndSet(
+          null,
+          new AbruptExitException(
+              String.format(
+                  "Failed to close a build event transport with exception '%s'", e.getMessage()),
+              ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+              e));
     } finally {
+      cancelPendingUploads();
+
       if (waitMessageFuture != null) {
         waitMessageFuture.cancel(/* mayInterruptIfRunning= */ true);
       }
@@ -303,19 +402,41 @@
     }
   }
 
+  private void closeBepTransports() throws AbruptExitException {
+    closeFuturesMap = streamer.getCloseFuturesMap();
+    switch (besOptions.besUploadMode) {
+      case WAIT_FOR_UPLOAD_COMPLETE:
+        waitForBuildEventTransportsToClose();
+        AbruptExitException e = pendingAbruptExitException.getAndSet(null);
+        if (e != null) {
+          throw e;
+        }
+        return;
+
+      case NOWAIT_FOR_UPLOAD_COMPLETE:
+        // When running asynchronously notify the UI immediately since we won't wait for the
+        // uploads to close.
+        for (BuildEventTransport bepTransport : bepTransports) {
+          cmdLineReporter.post(new BuildEventTransportClosedEvent(bepTransport));
+        }
+        return;
+    }
+    throw new IllegalStateException("Unknown BesUploadMode found: " + besOptions.besUploadMode);
+  }
+
   @Override
   public void afterCommand() throws AbruptExitException {
     if (streamer != null) {
       if (!streamer.isClosed()) {
         // This should not occur, but close with an internal error if a {@link BuildEventStreamer}
         // bug manifests as an unclosed streamer.
-        logger.warning("Attempting to close BES streamer after command");
+        googleLogger.atWarning().log("Attempting to close BES streamer after command");
         String msg = "BES was not properly closed";
         LoggingUtil.logToRemote(Level.WARNING, msg, new IllegalStateException(msg));
-        streamer.close(AbortReason.INTERNAL);
+        forceShutdownBuildEventStreamer();
       }
 
-      waitForBuildEventTransportsToClose(streamer.getCloseFuturesMap());
+      closeBepTransports();
 
       if (!Strings.isNullOrEmpty(besOptions.besBackend)) {
         constructAndMaybeReportInvocationIdUrl();
@@ -324,11 +445,6 @@
       }
     }
 
-    AbruptExitException e = pendingAbruptExitException.getAndSet(null);
-    if (e != null) {
-      throw e;
-    }
-
     if (!besStreamOptions.keepBackendConnections) {
       clearBesClient();
     }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
index 2c23ff1..eda45ce 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
@@ -15,6 +15,7 @@
 package com.google.devtools.build.lib.buildeventservice;
 
 import com.google.devtools.common.options.Converters;
+import com.google.devtools.common.options.EnumConverter;
 import com.google.devtools.common.options.Option;
 import com.google.devtools.common.options.OptionDocumentationCategory;
 import com.google.devtools.common.options.OptionEffectTag;
@@ -124,4 +125,28 @@
               + " backend. Bazel will output the URL appended by the invocation id to the"
               + " terminal.")
   public String besResultsUrl;
+
+  @Option(
+      name = "bes_upload_mode",
+      defaultValue = "WAIT_FOR_UPLOAD_COMPLETE",
+      converter = BesUploadModeConverter.class,
+      documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
+      effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT},
+      help =
+          "Specifies whether the Build Event Service upload should block the build completion "
+              + "or should end the invocation immediately and finish the upload in the background.")
+  public BesUploadMode besUploadMode;
+
+  /** Determines the mode that will be used to upload data to the Build Event Service. */
+  public enum BesUploadMode {
+    WAIT_FOR_UPLOAD_COMPLETE,
+    NOWAIT_FOR_UPLOAD_COMPLETE
+  }
+
+  /** Converter for {@link BesUploadMode} */
+  public static class BesUploadModeConverter extends EnumConverter<BesUploadMode> {
+    public BesUploadModeConverter() {
+      super(BesUploadMode.class, "Mode for uploading to the Build Event Service");
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index da71445..7707872d 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -72,7 +72,16 @@
     // per API contract it is expected to never fail.
     SettableFuture<Void> closeFuture = SettableFuture.create();
     ListenableFuture<Void> uploaderCloseFuture = besUploader.close();
-    uploaderCloseFuture.addListener(() -> closeFuture.set(null), MoreExecutors.directExecutor());
+    uploaderCloseFuture.addListener(
+        () -> {
+          // Make sure to cancel any pending uploads if the closing is cancelled.
+          if (uploaderCloseFuture.isCancelled()) {
+            besUploader.closeOnCancel();
+          }
+          closeFuture.set(null);
+        },
+        MoreExecutors.directExecutor());
+
     return closeFuture;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 2535091..c69ff9a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -148,6 +148,9 @@
   @GuardedBy("lock")
   private boolean interruptCausedByTimeout;
 
+  @GuardedBy("lock")
+  private boolean interruptCausedByCancel;
+
   private StreamContext streamContext;
 
   private BuildEventServiceUploader(
@@ -239,22 +242,34 @@
     }
   }
 
-  /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
   private void closeOnTimeout() {
     synchronized (lock) {
+      interruptCausedByTimeout = true;
+      closeNow();
+    }
+  }
+
+  void closeOnCancel() {
+    synchronized (lock) {
+      interruptCausedByCancel = true;
+      closeNow();
+    }
+  }
+
+  /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
+  private void closeNow() {
+    synchronized (lock) {
       if (uploadThread != null) {
         if (uploadThread.isInterrupted()) {
           return;
         }
-
-        interruptCausedByTimeout = true;
         uploadThread.interrupt();
       }
     }
   }
 
   private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) {
-    checkState(exitCode != ExitCode.SUCCESS);
+    checkState(!exitCode.equals(ExitCode.SUCCESS));
     logger.info(message);
     abruptExitCallback.accept(new AbruptExitException(message, exitCode, cause));
   }
@@ -291,11 +306,14 @@
         logger.info("Aborting the BES upload due to having received an interrupt");
         synchronized (lock) {
           Preconditions.checkState(
-              interruptCausedByTimeout, "Unexpected interrupt on BES uploader thread");
-          logAndExitAbruptly(
-              "The Build Event Protocol upload timed out",
-              ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-              e);
+              interruptCausedByTimeout || interruptCausedByCancel,
+              "Unexpected interrupt on BES uploader thread");
+          if (interruptCausedByTimeout) {
+            logAndExitAbruptly(
+                "The Build Event Protocol upload timed out",
+                ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+                e);
+          }
         }
       } finally {
         // TODO(buchgr): Due to b/113035235 exitFunc needs to be called before the close future
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java
index 4da6c78..5ac4448 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java
@@ -116,29 +116,4 @@
               + "of the set as well as the file and uri lengths, which may in turn depend on the "
               + "hash function.")
   public int maxNamedSetEntries;
-
-  // TODO(ruperts): Remove these public getter methods for consistency with other options classes?
-  public String getBuildEventTextFile() {
-    return buildEventTextFile;
-  }
-
-  public String getBuildEventBinaryFile() {
-    return buildEventBinaryFile;
-  }
-
-  public String getBuildEventJsonFile() {
-    return buildEventJsonFile;
-  }
-
-  public boolean getBuildEventTextFilePathConversion() {
-    return buildEventTextFilePathConversion;
-  }
-
-  public boolean getBuildEventBinaryFilePathConversion() {
-    return buildEventBinaryFilePathConversion;
-  }
-
-  public boolean getBuildEventJsonFilePathConversion() {
-    return buildEventJsonFilePathConversion;
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 55444cb..87f837a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -187,6 +187,16 @@
       if (closeFuture.isDone()) {
         return closeFuture;
       }
+
+      // Close abruptly if the closing future is cancelled.
+      closeFuture.addListener(
+          () -> {
+            if (closeFuture.isCancelled()) {
+              closeNow();
+            }
+          },
+          MoreExecutors.directExecutor());
+
       try {
         pendingWrites.put(CLOSE);
       } catch (InterruptedException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index f592396..6a56407 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -78,7 +78,7 @@
 @ThreadSafe
 public class BuildEventStreamer {
   private final Collection<BuildEventTransport> transports;
-  private final BuildEventStreamOptions options;
+  private final BuildEventStreamOptions besOptions;
 
   @GuardedBy("this")
   private Set<BuildEventId> announcedEvents;
@@ -144,7 +144,7 @@
       BuildEventStreamOptions options,
       CountingArtifactGroupNamer artifactGroupNamer) {
     this.transports = transports;
-    this.options = options;
+    this.besOptions = options;
     this.announcedEvents = null;
     this.progressCount = 0;
     this.artifactGroupNamer = artifactGroupNamer;
@@ -355,11 +355,11 @@
     }
     // We only split if the max number of entries is at least 2 (it must be at least a binary tree).
     // The method throws for smaller values.
-    if (options.maxNamedSetEntries >= 2) {
+    if (besOptions.maxNamedSetEntries >= 2) {
       // We only split the event after naming it to avoid splitting the same node multiple times.
       // Note that the artifactGroupNames keeps references to the individual pieces, so this can
       // double the memory consumption of large nested sets.
-      view = view.splitIfExceedsMaximumSize(options.maxNamedSetEntries);
+      view = view.splitIfExceedsMaximumSize(besOptions.maxNamedSetEntries);
     }
     for (NestedSetView<Artifact> transitive : view.transitives()) {
       maybeReportArtifactSet(pathResolver, transitive);
@@ -645,7 +645,7 @@
 
   /** Returns whether an {@link ActionExecutedEvent} should be published. */
   private boolean shouldPublishActionExecutedEvent(ActionExecutedEvent event) {
-    if (options.publishAllActions) {
+    if (besOptions.publishAllActions) {
       return true;
     }
     if (event.getException() != null) {
@@ -713,3 +713,4 @@
     }
   }
 }
+