Move BEP upload timeout management to the BES module.

Timeout management for the BEP transports is currently very ad-hoc. For the FileTransports we don't have a timeout at all and for the BES transport we create a separate future that cancels the upload in case bes_timeout is reached.

With this CL we unify the timeout management by wrapping the close Futures returned by the streamer with a Futures.withTimeout each. The timeout we use is the one specified by BuildEventTransport#getTimeout. Notice that this CL keeps the preserves the old behavior by setting the timeout of BES transport to bes_timeout and Duration.ZERO (e.g. no timeout) for FileTransport.

Another thing to notice when Futures.withTimeout times out it wraps the TimeoutException with an ExecutionException so we need to add some special-casing in the BES module to detect that case an print the right message in the terminal (instead of a cryptic TimeoutException stack trace).

PiperOrigin-RevId: 245325738
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 2d41ab4..145848c 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.buildeventservice;
 
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -25,6 +26,7 @@
 import com.google.common.flogger.GoogleLogger;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
@@ -71,7 +73,6 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
@@ -86,9 +87,6 @@
   private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());
   private static final GoogleLogger googleLogger = GoogleLogger.forEnclosingClass();
 
-  private final AtomicReference<AbruptExitException> pendingAbruptExitException =
-      new AtomicReference<>();
-
   private BuildEventProtocolOptions bepOptions;
   private AuthAndTLSOptions authTlsOptions;
   private BuildEventStreamOptions besStreamOptions;
@@ -160,6 +158,10 @@
     halfCloseFuturesMap = ImmutableMap.of();
   }
 
+  private static boolean isTimeoutException(ExecutionException e) {
+    return e.getCause() instanceof TimeoutException;
+  }
+
   private void waitForPreviousInvocation() {
     if (closeFuturesMap.isEmpty()) {
       return;
@@ -184,15 +186,19 @@
               getMaxWaitForPreviousInvocation().getSeconds());
       cmdLineReporter.handle(Event.warn(msg));
       googleLogger.atWarning().withCause(exception).log(msg);
-    } catch (ExecutionException exception) {
+    } catch (ExecutionException e) {
+      // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
+      // times out.
+      String previousExceptionMsg =
+          isTimeoutException(e) ? "The Build Event Protocol upload timed out" : e.getMessage();
       String msg =
           String.format(
               "Previous invocation failed to finish Build Event Protocol upload "
                   + "with the following exception: '%s'. "
                   + "Ignoring the failure and starting a new invocation...",
-              exception.getMessage());
+              previousExceptionMsg);
       cmdLineReporter.handle(Event.warn(msg));
-      googleLogger.atWarning().withCause(exception).log(msg);
+      googleLogger.atWarning().withCause(e).log(msg);
     } finally {
       cancelPendingUploads();
     }
@@ -289,13 +295,12 @@
 
   private void forceShutdownBuildEventStreamer() {
     streamer.close(AbortReason.INTERNAL);
-
+    closeFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
     try {
       // TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
       //  passed. We should fix this by waiting at most the value set by bes_timeout.
       googleLogger.atInfo().log("Closing pending build event transports");
-      Uninterruptibles.getUninterruptibly(
-          Futures.allAsList(streamer.getCloseFuturesMap().values()));
+      Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
     } catch (ExecutionException e) {
       googleLogger.atSevere().withCause(e).log("Failed to close a build event transport");
       LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e);
@@ -314,12 +319,6 @@
 
   @Override
   public void blazeShutdown() {
-    AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
-    if (pendingException != null) {
-      cancelPendingUploads();
-      return;
-    }
-
     if (closeFuturesMap.isEmpty()) {
       return;
     }
@@ -380,6 +379,15 @@
         Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
       }
     } catch (ExecutionException e) {
+      // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
+      // times out.
+      if (isTimeoutException(e)) {
+        throw new AbruptExitException(
+            "The Build Event Protocol upload timed out",
+            ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+            e);
+      }
+
       Throwables.throwIfInstanceOf(e.getCause(), AbruptExitException.class);
       throw new RuntimeException(
           String.format(
@@ -394,9 +402,42 @@
     }
   }
 
+  private static ImmutableMap<BuildEventTransport, ListenableFuture<Void>>
+      constructCloseFuturesMapWithTimeouts(
+          ImmutableMap<BuildEventTransport, ListenableFuture<Void>> bepTransportToCloseFuturesMap) {
+    ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> builder =
+        ImmutableMap.builder();
+
+    bepTransportToCloseFuturesMap.forEach(
+        (bepTransport, closeFuture) -> {
+          final ListenableFuture<Void> closeFutureWithTimeout;
+          if (bepTransport.getTimeout().isZero() || bepTransport.getTimeout().isNegative()) {
+            closeFutureWithTimeout = closeFuture;
+          } else {
+            final ScheduledExecutorService timeoutExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactoryBuilder()
+                        .setNameFormat("bes-close-" + bepTransport.name() + "-%d")
+                        .build());
+
+            closeFutureWithTimeout =
+                Futures.withTimeout(
+                    closeFuture,
+                    bepTransport.getTimeout().toMillis(),
+                    TimeUnit.MILLISECONDS,
+                    timeoutExecutor);
+            closeFutureWithTimeout.addListener(
+                () -> timeoutExecutor.shutdown(), MoreExecutors.directExecutor());
+          }
+          builder.put(bepTransport, closeFutureWithTimeout);
+        });
+
+    return builder.build();
+  }
+
   private void closeBepTransports() throws AbruptExitException {
-    closeFuturesMap = streamer.getCloseFuturesMap();
-    halfCloseFuturesMap = streamer.getHalfClosedMap();
+    closeFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
+    halfCloseFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap());
     switch (besOptions.besUploadMode) {
       case WAIT_FOR_UPLOAD_COMPLETE:
         waitForBuildEventTransportsToClose();
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 682cdf3..88797dd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -34,6 +34,7 @@
 /** A {@link BuildEventTransport} that streams {@link BuildEvent}s to BuildEventService. */
 public class BuildEventServiceTransport implements BuildEventTransport {
   private final BuildEventServiceUploader besUploader;
+  private final Duration besTimeout;
 
   private BuildEventServiceTransport(
       BuildEventServiceClient besClient,
@@ -46,6 +47,7 @@
       EventBus eventBus,
       Duration closeTimeout,
       Sleeper sleeper) {
+    this.besTimeout = closeTimeout;
     this.besUploader =
         new BuildEventServiceUploader.Builder()
             .besClient(besClient)
@@ -57,7 +59,6 @@
             .sleeper(sleeper)
             .artifactGroupNamer(artifactGroupNamer)
             .eventBus(eventBus)
-            .closeTimeout(closeTimeout)
             .build();
   }
 
@@ -81,6 +82,11 @@
     besUploader.enqueueEvent(event);
   }
 
+  @Override
+  public Duration getTimeout() {
+    return besTimeout;
+  }
+
   /** A builder for {@link BuildEventServiceTransport}. */
   public static class Builder {
     private BuildEventServiceClient besClient;
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 145c227..53eab93 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -15,7 +15,6 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
 import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
 import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
 import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
@@ -51,7 +50,6 @@
 import com.google.devtools.build.lib.clock.Clock;
 import com.google.devtools.build.lib.util.AbruptExitException;
 import com.google.devtools.build.lib.util.ExitCode;
-import com.google.devtools.build.lib.util.LoggingUtil;
 import com.google.devtools.build.lib.util.Sleeper;
 import com.google.devtools.build.v1.BuildStatus.Result;
 import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
@@ -62,14 +60,11 @@
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.StatusException;
-import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -100,7 +95,6 @@
   private final BuildEventServiceProtoUtil besProtoUtil;
   private final BuildEventProtocolOptions buildEventProtocolOptions;
   private final boolean publishLifecycleEvents;
-  private final Duration closeTimeout;
   private final Sleeper sleeper;
   private final Clock clock;
   private final ArtifactGroupNamer namer;
@@ -137,9 +131,6 @@
   private Thread uploadThread;
 
   @GuardedBy("lock")
-  private boolean interruptCausedByTimeout;
-
-  @GuardedBy("lock")
   private boolean interruptCausedByCancel;
 
   private StreamContext streamContext;
@@ -150,7 +141,6 @@
       BuildEventServiceProtoUtil besProtoUtil,
       BuildEventProtocolOptions buildEventProtocolOptions,
       boolean publishLifecycleEvents,
-      Duration closeTimeout,
       Sleeper sleeper,
       Clock clock,
       ArtifactGroupNamer namer,
@@ -160,7 +150,6 @@
     this.besProtoUtil = besProtoUtil;
     this.buildEventProtocolOptions = buildEventProtocolOptions;
     this.publishLifecycleEvents = publishLifecycleEvents;
-    this.closeTimeout = closeTimeout;
     this.sleeper = sleeper;
     this.clock = clock;
     this.namer = namer;
@@ -215,10 +204,6 @@
     // Enqueue the last event which will terminate the upload.
     eventQueue.addLast(new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
 
-    if (!closeTimeout.isZero()) {
-      startCloseTimer(closeFuture, closeTimeout);
-    }
-
     final SettableFuture<Void> finalCloseFuture = closeFuture;
     closeFuture.addListener(
         () -> {
@@ -232,13 +217,6 @@
     return closeFuture;
   }
 
-  private void closeOnTimeout() {
-    synchronized (lock) {
-      interruptCausedByTimeout = true;
-      closeNow();
-    }
-  }
-
   private void closeOnCancel() {
     synchronized (lock) {
       interruptCausedByCancel = true;
@@ -288,14 +266,7 @@
       logger.info("Aborting the BES upload due to having received an interrupt");
       synchronized (lock) {
         Preconditions.checkState(
-            interruptCausedByTimeout || interruptCausedByCancel,
-            "Unexpected interrupt on BES uploader thread");
-        if (interruptCausedByTimeout) {
-          logAndExitAbruptly(
-              "The Build Event Protocol upload timed out",
-              ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-              e);
-        }
+            interruptCausedByCancel, "Unexpected interrupt on BES uploader thread");
       }
     } catch (StatusException e) {
       logAndExitAbruptly(
@@ -605,32 +576,6 @@
     }
   }
 
-  private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeTimeout) {
-    Thread closeTimer =
-        new Thread(
-            () -> {
-              // Call closeOnTimeout() if the future does not complete within closeTimeout
-              try {
-                getUninterruptibly(closeFuture, closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
-              } catch (TimeoutException e) {
-                closeOnTimeout();
-              } catch (ExecutionException e) {
-                if (e.getCause() instanceof TimeoutException) {
-                  // This is likely due to an internal timeout doing the local file uploading.
-                  closeOnTimeout();
-                } else {
-                  // This code only cares about calling closeOnTimeout() if the closeFuture does
-                  // not complete within closeTimeout.
-                  String failureMsg = "BES close failure";
-                  logger.severe(failureMsg);
-                  LoggingUtil.logToRemote(Level.SEVERE, failureMsg, e);
-                }
-              }
-            },
-            "bes-uploader-close-timer");
-    closeTimer.start();
-  }
-
   private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
       throws LocalFileUploadException, InterruptedException {
     try {
@@ -711,7 +656,6 @@
     private BuildEventServiceProtoUtil besProtoUtil;
     private BuildEventProtocolOptions bepOptions;
     private boolean publishLifecycleEvents;
-    private Duration closeTimeout;
     private Sleeper sleeper;
     private Clock clock;
     private ArtifactGroupNamer artifactGroupNamer;
@@ -742,11 +686,6 @@
       return this;
     }
 
-    Builder closeTimeout(Duration value) {
-      this.closeTimeout = value;
-      return this;
-    }
-
     Builder clock(Clock value) {
       this.clock = value;
       return this;
@@ -774,7 +713,6 @@
           checkNotNull(besProtoUtil),
           checkNotNull(bepOptions),
           publishLifecycleEvents,
-          checkNotNull(closeTimeout),
           checkNotNull(sleeper),
           checkNotNull(clock),
           checkNotNull(artifactGroupNamer),
@@ -782,4 +720,3 @@
     }
   }
 }
-
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
index 7d15fab..15cbb6a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
@@ -15,6 +15,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.time.Duration;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -72,6 +73,14 @@
     return close();
   }
 
+  /**
+   * Returns how long a caller should wait for the transport to finish uploading events and closing
+   * gracefully. Setting the timeout to {@link Duration#ZERO} means that there's no timeout.
+   */
+  default Duration getTimeout() {
+    return Duration.ZERO;
+  }
+
   @VisibleForTesting
   @Nullable
   BuildEventArtifactUploader getUploader();