Delete BuildEventServiceAbruptExitCallback and handle BEP upload exceptions in the BES module.

Since the closing of all the BEP transports now happens in the BES module we don't need to use the BuildEventServiceAbruptExitCallback, instead we make use of SettableFuture.setException to signal a problem with the upload. This makes the code (a lot) cleaner by removing one layer of indirection for the error handling of the BEP uploads.

As a consequence of this change, all BES failures are now reported in afterCommand (or beforeCommand for the async case) which means that, for instance, if the connection with BES times out we'll wait until the build is finished to report the time out, instead of doing it mid-build. Exiting early was a leftover of the original design of the BES code but given the amount of complexity it adds and the little benefit we've decided to drop it.

PiperOrigin-RevId: 244064861
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 671f032..f65c9a5 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -32,7 +32,6 @@
 import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent;
@@ -109,17 +108,6 @@
 
   protected BESOptionsT besOptions;
 
-  /** Callback used by the transports to report errors and possible exit abruptly. */
-  protected BuildEventServiceAbruptExitCallback getAbruptExitCallback(
-      ModuleEnvironment moduleEnvironment) {
-    return (e) -> {
-      // Request exiting early for the first abrupt exception we find.
-      if (this.pendingAbruptExitException.compareAndSet(null, e)) {
-        moduleEnvironment.exit(pendingAbruptExitException.get());
-      }
-    };
-  }
-
   protected void reportCommandLineError(EventHandler commandLineReporter, Exception exception) {
     // Don't hide unchecked exceptions as part of the error reporting.
     Throwables.throwIfUnchecked(exception);
@@ -159,19 +147,6 @@
   }
 
   private void waitForPreviousInvocation() {
-    AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
-    if (pendingException != null) {
-      cmdLineReporter.handle(
-          Event.warn(
-              String.format(
-                  "Previous invocation failed to finish Build Event Protocol upload with "
-                      + "the following exception: '%s'. "
-                      + "Ignoring the failure and starting a new invocation...",
-                  pendingException.getMessage())));
-      cancelPendingUploads();
-      return;
-    }
-
     if (closeFuturesMap.isEmpty()) {
       return;
     }
@@ -194,8 +169,8 @@
     } catch (ExecutionException exception) {
       String msg =
           String.format(
-              "Previous Build Event Protocol upload failed with "
-                  + "the following exception: '%s'. "
+              "Previous invocation failed to finish Build Event Protocol upload "
+                  + "with the following exception: '%s'. "
                   + "Ignoring the failure and starting a new invocation...",
               exception.getMessage());
       cmdLineReporter.handle(Event.warn(msg));
@@ -233,6 +208,10 @@
                     .select(bepOptions.buildEventUploadStrategy)
                     .create(cmdEnv));
 
+    // We need to wait for the previous invocation before we check the whitelist of commands to
+    // allow completing previous runs using BES, for example:
+    //   bazel build (..run with async BES..)
+    //   bazel info <-- Doesn't run with BES unless we wait before cheking the whitelist.
     waitForPreviousInvocation();
 
     if (!whitelistedCommands(besOptions).contains(cmdEnv.getCommandName())) {
@@ -349,7 +328,7 @@
                 + "s."));
   }
 
-  private void waitForBuildEventTransportsToClose() {
+  private void waitForBuildEventTransportsToClose() throws AbruptExitException {
     final ScheduledExecutorService executor =
         Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("bes-notify-ui-%d").build());
@@ -382,23 +361,17 @@
         Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
       }
     } catch (ExecutionException e) {
-      this.pendingAbruptExitException.compareAndSet(
-          null,
-          new AbruptExitException(
-              String.format(
-                  "Failed to close a build event transport with exception '%s'", e.getMessage()),
-              ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-              e));
+      Throwables.throwIfInstanceOf(e.getCause(), AbruptExitException.class);
+      throw new RuntimeException(
+          String.format(
+              "Unexpected Exception '%s' when closing BEP transports, this is a bug.",
+              e.getCause().getMessage()));
     } finally {
       cancelPendingUploads();
-
       if (waitMessageFuture != null) {
         waitMessageFuture.cancel(/* mayInterruptIfRunning= */ true);
       }
-
-      if (executor != null) {
-        executor.shutdown();
-      }
+      executor.shutdown();
     }
   }
 
@@ -407,10 +380,6 @@
     switch (besOptions.besUploadMode) {
       case WAIT_FOR_UPLOAD_COMPLETE:
         waitForBuildEventTransportsToClose();
-        AbruptExitException e = pendingAbruptExitException.getAndSet(null);
-        if (e != null) {
-          throw e;
-        }
         return;
 
       case NOWAIT_FOR_UPLOAD_COMPLETE:
@@ -529,7 +498,6 @@
         .artifactGroupNamer(artifactGroupNamer)
         .bepOptions(bepOptions)
         .clock(cmdEnv.getRuntime().getClock())
-        .abruptExitCallback(getAbruptExitCallback(cmdEnv.getBlazeModuleEnvironment()))
         .eventBus(cmdEnv.getEventBus())
         .build();
   }
@@ -555,7 +523,6 @@
                 bepTextOutputStream,
                 bepOptions,
                 localFileUploader,
-                getAbruptExitCallback(cmdEnv.getBlazeModuleEnvironment()),
                 artifactGroupNamer));
       } catch (IOException exception) {
         // TODO(b/125216340): Consider making this a warning instead of an error once the
@@ -586,7 +553,6 @@
                 bepBinaryOutputStream,
                 bepOptions,
                 localFileUploader,
-                getAbruptExitCallback(cmdEnv.getBlazeModuleEnvironment()),
                 artifactGroupNamer));
       } catch (IOException exception) {
         // TODO(b/125216340): Consider making this a warning instead of an error once the
@@ -616,7 +582,6 @@
                 bepJsonOutputStream,
                 bepOptions,
                 localFileUploader,
-                getAbruptExitCallback(cmdEnv.getBlazeModuleEnvironment()),
                 artifactGroupNamer));
       } catch (IOException exception) {
         // TODO(b/125216340): Consider making this a warning instead of an error once the
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 7707872d..682cdf3 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -19,14 +19,11 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
 import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.devtools.build.lib.clock.Clock;
 import com.google.devtools.build.lib.util.JavaSleeper;
@@ -44,7 +41,6 @@
       BuildEventProtocolOptions bepOptions,
       BuildEventServiceProtoUtil besProtoUtil,
       Clock clock,
-      BuildEventServiceAbruptExitCallback abruptExitCallback,
       boolean publishLifecycleEvents,
       ArtifactGroupNamer artifactGroupNamer,
       EventBus eventBus,
@@ -57,7 +53,6 @@
             .bepOptions(bepOptions)
             .besProtoUtil(besProtoUtil)
             .clock(clock)
-            .abruptExitCallback(abruptExitCallback)
             .publishLifecycleEvents(publishLifecycleEvents)
             .sleeper(sleeper)
             .artifactGroupNamer(artifactGroupNamer)
@@ -68,21 +63,7 @@
 
   @Override
   public ListenableFuture<Void> close() {
-    // This future completes once the upload has finished. As
-    // per API contract it is expected to never fail.
-    SettableFuture<Void> closeFuture = SettableFuture.create();
-    ListenableFuture<Void> uploaderCloseFuture = besUploader.close();
-    uploaderCloseFuture.addListener(
-        () -> {
-          // Make sure to cancel any pending uploads if the closing is cancelled.
-          if (uploaderCloseFuture.isCancelled()) {
-            besUploader.closeOnCancel();
-          }
-          closeFuture.set(null);
-        },
-        MoreExecutors.directExecutor());
-
-    return closeFuture;
+    return besUploader.close();
   }
 
   @Override
@@ -100,11 +81,6 @@
     besUploader.enqueueEvent(event);
   }
 
-  @VisibleForTesting
-  public BuildEventServiceUploader getBesUploader() {
-    return besUploader;
-  }
-
   /** A builder for {@link BuildEventServiceTransport}. */
   public static class Builder {
     private BuildEventServiceClient besClient;
@@ -115,7 +91,6 @@
     private ArtifactGroupNamer artifactGroupNamer;
     private BuildEventServiceProtoUtil besProtoUtil;
     private EventBus eventBus;
-    private BuildEventServiceAbruptExitCallback abruptExitCallback;
     private @Nullable Sleeper sleeper;
 
     public Builder besClient(BuildEventServiceClient value) {
@@ -158,11 +133,6 @@
       return this;
     }
 
-    public Builder abruptExitCallback(BuildEventServiceAbruptExitCallback value) {
-      this.abruptExitCallback = value;
-      return this;
-    }
-
     @VisibleForTesting
     public Builder sleeper(Sleeper value) {
       this.sleeper = value;
@@ -177,7 +147,6 @@
           checkNotNull(bepOptions),
           checkNotNull(besProtoUtil),
           checkNotNull(clock),
-          checkNotNull(abruptExitCallback),
           besOptions.besLifecycleEvents,
           checkNotNull(artifactGroupNamer),
           checkNotNull(eventBus),
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 9a541ae..30f4374 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -45,7 +45,6 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
 import com.google.devtools.build.lib.buildeventstream.PathConverter;
@@ -101,7 +100,6 @@
   private final BuildEventProtocolOptions buildEventProtocolOptions;
   private final boolean publishLifecycleEvents;
   private final Duration closeTimeout;
-  private final BuildEventServiceAbruptExitCallback abruptExitCallback;
   private final Sleeper sleeper;
   private final Clock clock;
   private final ArtifactGroupNamer namer;
@@ -156,7 +154,6 @@
       BuildEventProtocolOptions buildEventProtocolOptions,
       boolean publishLifecycleEvents,
       Duration closeTimeout,
-      BuildEventServiceAbruptExitCallback abruptExitCallback,
       Sleeper sleeper,
       Clock clock,
       ArtifactGroupNamer namer,
@@ -167,7 +164,6 @@
     this.buildEventProtocolOptions = buildEventProtocolOptions;
     this.publishLifecycleEvents = publishLifecycleEvents;
     this.closeTimeout = closeTimeout;
-    this.abruptExitCallback = abruptExitCallback;
     this.sleeper = sleeper;
     this.clock = clock;
     this.namer = namer;
@@ -234,6 +230,17 @@
       if (!closeTimeout.isZero()) {
         startCloseTimer(closeFuture, closeTimeout);
       }
+
+      final SettableFuture<Void> finalCloseFuture = closeFuture;
+      closeFuture.addListener(
+          () -> {
+            // Make sure to cancel any pending uploads if the closing is cancelled.
+            if (finalCloseFuture.isCancelled()) {
+              closeOnCancel();
+            }
+          },
+          MoreExecutors.directExecutor());
+
       return closeFuture;
     }
   }
@@ -264,10 +271,19 @@
     }
   }
 
+  private void initCloseFutureOnFailure(Throwable cause) {
+    synchronized (lock) {
+      if (closeFuture == null) {
+        closeFuture = SettableFuture.create();
+      }
+      closeFuture.setException(cause);
+    }
+  }
+
   private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) {
     checkState(!exitCode.equals(ExitCode.SUCCESS));
-    logger.info(message);
-    abruptExitCallback.accept(new AbruptExitException(message, exitCode, cause));
+    logger.severe(message);
+    initCloseFutureOnFailure(new AbruptExitException(message, exitCode, cause));
   }
 
   @Override
@@ -298,47 +314,33 @@
         closeFuture.set(null);
       }
     } catch (InterruptedException e) {
-      try {
-        logger.info("Aborting the BES upload due to having received an interrupt");
-        synchronized (lock) {
-          Preconditions.checkState(
-              interruptCausedByTimeout || interruptCausedByCancel,
-              "Unexpected interrupt on BES uploader thread");
-          if (interruptCausedByTimeout) {
-            logAndExitAbruptly(
-                "The Build Event Protocol upload timed out",
-                ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-                e);
-          }
+      logger.info("Aborting the BES upload due to having received an interrupt");
+      synchronized (lock) {
+        Preconditions.checkState(
+            interruptCausedByTimeout || interruptCausedByCancel,
+            "Unexpected interrupt on BES uploader thread");
+        if (interruptCausedByTimeout) {
+          logAndExitAbruptly(
+              "The Build Event Protocol upload timed out",
+              ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+              e);
         }
-      } finally {
-        // TODO(buchgr): Due to b/113035235 exitFunc needs to be called before the close future
-        // completes.
-        failCloseFuture(e);
       }
     } catch (StatusException e) {
-      try {
-        logAndExitAbruptly(
-            "The Build Event Protocol upload failed: " + besClient.userReadableError(e),
-            shouldRetryStatus(e.getStatus())
-                ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR
-                : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-            e);
-      } finally {
-        failCloseFuture(e);
-      }
+      logAndExitAbruptly(
+          "The Build Event Protocol upload failed: " + besClient.userReadableError(e),
+          shouldRetryStatus(e.getStatus())
+              ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR
+              : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+          e);
     } catch (LocalFileUploadException e) {
       Throwables.throwIfUnchecked(e.getCause());
-      try {
-        logAndExitAbruptly(
-            "The Build Event Protocol local file upload failed: " + e.getCause().getMessage(),
-            ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-            e.getCause());
-      } finally {
-        failCloseFuture(e.getCause());
-      }
+      logAndExitAbruptly(
+          "The Build Event Protocol local file upload failed: " + e.getCause().getMessage(),
+          ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+          e.getCause());
     } catch (Throwable e) {
-      failCloseFuture(e);
+      initCloseFutureOnFailure(e);
       logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug.");
       throw e;
     } finally {
@@ -656,15 +658,6 @@
     closeTimer.start();
   }
 
-  private void failCloseFuture(Throwable cause) {
-    synchronized (lock) {
-      if (closeFuture == null) {
-        closeFuture = SettableFuture.create();
-      }
-      closeFuture.setException(cause);
-    }
-  }
-
   private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
       throws LocalFileUploadException, InterruptedException {
     try {
@@ -732,22 +725,6 @@
     return (long) (DELAY_MILLIS * Math.pow(1.6, attempt));
   }
 
-  /**
-   * This method is only used in tests. Once TODO(b/113035235) is fixed the close future will also
-   * carry error messages.
-   */
-  @VisibleForTesting // productionVisibility = Visibility.PRIVATE
-  public void throwUploaderError() throws Throwable {
-    synchronized (lock) {
-      checkState(closeFuture != null && closeFuture.isDone());
-      try {
-        closeFuture.get();
-      } catch (ExecutionException e) {
-        throw e.getCause();
-      }
-    }
-  }
-
   /** Thrown when encountered problems while uploading build event artifacts. */
   private class LocalFileUploadException extends Exception {
     LocalFileUploadException(Throwable cause) {
@@ -762,7 +739,6 @@
     private BuildEventProtocolOptions bepOptions;
     private boolean publishLifecycleEvents;
     private Duration closeTimeout;
-    private BuildEventServiceAbruptExitCallback abruptExitCallback;
     private Sleeper sleeper;
     private Clock clock;
     private ArtifactGroupNamer artifactGroupNamer;
@@ -803,11 +779,6 @@
       return this;
     }
 
-    Builder abruptExitCallback(BuildEventServiceAbruptExitCallback value) {
-      this.abruptExitCallback = value;
-      return this;
-    }
-
     Builder sleeper(Sleeper value) {
       this.sleeper = value;
       return this;
@@ -831,7 +802,6 @@
           checkNotNull(bepOptions),
           publishLifecycleEvents,
           checkNotNull(closeTimeout),
-          checkNotNull(abruptExitCallback),
           checkNotNull(sleeper),
           checkNotNull(clock),
           checkNotNull(artifactGroupNamer),
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventServiceAbruptExitCallback.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventServiceAbruptExitCallback.java
deleted file mode 100644
index 50c7977..0000000
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventServiceAbruptExitCallback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2019 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.buildeventstream;
-
-import com.google.devtools.build.lib.util.AbruptExitException;
-import java.util.function.Consumer;
-
-/**
- * A callback function that the Build Event Service transports can use to notify of an {@link
- * AbruptExitException} to the main thread which may exit the build abruptly.
- */
-// TODO(lpino): Delete this callback once all the transports can depend directly on
-//  {@link ModuleEnvironment}.
-public interface BuildEventServiceAbruptExitCallback extends Consumer<AbruptExitException> {
-  /** Executes the callback using the provided {@link AbruptExitException}. */
-  @Override
-  void accept(AbruptExitException e);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java
index 021c55b..c3a83c0 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java
@@ -18,7 +18,6 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.protobuf.CodedOutputStream;
@@ -35,9 +34,8 @@
       BufferedOutputStream outputStream,
       BuildEventProtocolOptions options,
       BuildEventArtifactUploader uploader,
-      BuildEventServiceAbruptExitCallback abruptExitCallback,
       ArtifactGroupNamer namer) {
-    super(outputStream, options, uploader, abruptExitCallback, namer);
+    super(outputStream, options, uploader, namer);
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 82da7ff..560dd18 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -28,7 +28,6 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.devtools.build.lib.buildeventstream.PathConverter;
@@ -44,7 +43,6 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import javax.annotation.concurrent.ThreadSafe;
@@ -67,12 +65,10 @@
       BufferedOutputStream outputStream,
       BuildEventProtocolOptions options,
       BuildEventArtifactUploader uploader,
-      BuildEventServiceAbruptExitCallback abruptExitCallback,
       ArtifactGroupNamer namer) {
     this.uploader = uploader;
     this.options = options;
-    this.writer =
-        new SequentialWriter(outputStream, this::serializeEvent, abruptExitCallback, uploader);
+    this.writer = new SequentialWriter(outputStream, this::serializeEvent, uploader);
     this.namer = namer;
   }
 
@@ -87,8 +83,6 @@
     @VisibleForTesting OutputStream out;
     @VisibleForTesting static final Duration FLUSH_INTERVAL = Duration.ofMillis(250);
     private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
-    /** A callback function to notify the main thread about errors in the writer */
-    private final Consumer<AbruptExitException> abruptExitCallback;
 
     private final BuildEventArtifactUploader uploader;
 
@@ -101,17 +95,14 @@
     SequentialWriter(
         BufferedOutputStream outputStream,
         Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc,
-        BuildEventServiceAbruptExitCallback abruptExitCallback,
         BuildEventArtifactUploader uploader) {
       checkNotNull(outputStream);
       checkNotNull(serializeFunc);
       checkNotNull(uploader);
-      checkNotNull(abruptExitCallback);
 
       this.out = outputStream;
       this.writerThread = new Thread(this, "bep-local-writer");
       this.serializeFunc = serializeFunc;
-      this.abruptExitCallback = abruptExitCallback;
       this.uploader = uploader;
       writerThread.start();
     }
@@ -156,7 +147,7 @@
     }
 
     private void exitFailure(Throwable e) {
-      abruptExitCallback.accept(
+      closeFuture.setException(
           new AbruptExitException(
               "Unable to write all BEP events to file due to " + e.getMessage(),
               ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java
index e0fb735..b725fe9 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java
@@ -18,7 +18,6 @@
 import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -34,9 +33,8 @@
       BufferedOutputStream outputStream,
       BuildEventProtocolOptions options,
       BuildEventArtifactUploader uploader,
-      BuildEventServiceAbruptExitCallback abruptExitCallback,
       ArtifactGroupNamer namer) {
-    super(outputStream, options, uploader, abruptExitCallback, namer);
+    super(outputStream, options, uploader, namer);
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java
index 1cb3faa..c21986c 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java
@@ -18,7 +18,6 @@
 import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.protobuf.TextFormat;
@@ -35,9 +34,8 @@
       BufferedOutputStream outputStream,
       BuildEventProtocolOptions options,
       BuildEventArtifactUploader uploader,
-      BuildEventServiceAbruptExitCallback abruptExitCallback,
       ArtifactGroupNamer namer) {
-    super(outputStream, options, uploader, abruptExitCallback, namer);
+    super(outputStream, options, uploader, namer);
   }
 
   @Override
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD
index e39b9d3..fb08d52 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD
@@ -23,6 +23,7 @@
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/common/options",
         "//src/test/java/com/google/devtools/build/lib:packages_testutil",
+        "//src/test/java/com/google/devtools/build/lib:testutil",
         "//third_party:guava",
         "//third_party:junit4",
         "//third_party:mockito",
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
index 082a0ac..bde6dda 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -32,7 +33,6 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
 import com.google.devtools.build.lib.buildeventstream.BuildEventId;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildStarted;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Progress;
@@ -50,6 +50,7 @@
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
@@ -70,7 +71,6 @@
 public class BinaryFormatFileTransportTest {
   private final BuildEventProtocolOptions defaultOpts =
       Options.getDefaults(BuildEventProtocolOptions.class);
-  private static final BuildEventServiceAbruptExitCallback NO_OP_EXIT_CALLBACK = (e) -> {};
 
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
 
@@ -104,7 +104,6 @@
             outputStream,
             defaultOpts,
             new LocalFilesArtifactUploader(),
-            NO_OP_EXIT_CALLBACK,
             artifactGroupNamer);
     transport.sendBuildEvent(buildEvent);
 
@@ -151,10 +150,14 @@
     BufferedOutputStream outputStream =
         new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath())));
     BinaryFormatFileTransport transport =
-        new BinaryFormatFileTransport(
-            outputStream, defaultOpts, uploader, NO_OP_EXIT_CALLBACK, artifactGroupNamer);
+        new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer);
     transport.sendBuildEvent(event1);
-    transport.close().get();
+
+    ExecutionException expected =
+        assertThrows(ExecutionException.class, () -> transport.close().get());
+    assertThat(expected)
+        .hasMessageThat()
+        .containsMatch("Unable to write all BEP events to file due to Task was cancelled");
 
     assertThat(transport.writer.pendingWrites).isEmpty();
     try (InputStream in = new FileInputStream(output)) {
@@ -180,7 +183,6 @@
             outputStream,
             defaultOpts,
             new LocalFilesArtifactUploader(),
-            NO_OP_EXIT_CALLBACK,
             artifactGroupNamer);
 
     // Close the stream.
@@ -214,7 +216,6 @@
             outputStream,
             defaultOpts,
             new LocalFilesArtifactUploader(),
-            NO_OP_EXIT_CALLBACK,
             artifactGroupNamer);
 
     transport.sendBuildEvent(buildEvent);
@@ -262,8 +263,7 @@
     BufferedOutputStream outputStream =
         new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath())));
     BinaryFormatFileTransport transport =
-        new BinaryFormatFileTransport(
-            outputStream, defaultOpts, uploader, NO_OP_EXIT_CALLBACK, artifactGroupNamer);
+        new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer);
     transport.sendBuildEvent(event1);
     transport.sendBuildEvent(event2);
     transport.close().get();
@@ -305,8 +305,7 @@
     BufferedOutputStream outputStream =
         new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath())));
     BinaryFormatFileTransport transport =
-        new BinaryFormatFileTransport(
-            outputStream, defaultOpts, uploader, NO_OP_EXIT_CALLBACK, artifactGroupNamer);
+        new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer);
     transport.sendBuildEvent(event1);
     transport.close().get();
 
@@ -343,8 +342,7 @@
     BufferedOutputStream outputStream =
         new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath())));
     BinaryFormatFileTransport transport =
-        new BinaryFormatFileTransport(
-            outputStream, defaultOpts, uploader, NO_OP_EXIT_CALLBACK, artifactGroupNamer);
+        new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer);
     transport.sendBuildEvent(event);
     ListenableFuture<Void> closeFuture = transport.close();
 
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
index 1bb0405..f469f9a 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
@@ -21,7 +21,6 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildStarted;
 import com.google.devtools.build.lib.buildeventstream.LocalFilesArtifactUploader;
@@ -54,7 +53,6 @@
 public class JsonFormatFileTransportTest {
   private final BuildEventProtocolOptions defaultOpts =
       Options.getDefaults(BuildEventProtocolOptions.class);
-  private static final BuildEventServiceAbruptExitCallback NO_OP_EXIT_CALLBACK = (e) -> {};
 
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
 
@@ -89,7 +87,6 @@
             outputStream,
             defaultOpts,
             new LocalFilesArtifactUploader(),
-            NO_OP_EXIT_CALLBACK,
             artifactGroupNamer);
     transport.sendBuildEvent(buildEvent);
 
@@ -164,7 +161,6 @@
             outputStream,
             defaultOpts,
             new LocalFilesArtifactUploader(),
-            NO_OP_EXIT_CALLBACK,
             artifactGroupNamer);
     WrappedOutputStream out = new WrappedOutputStream(transport.writer.out);
     transport.writer.out = out;
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java
index 966505b..1e68690 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java
@@ -23,7 +23,6 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
-import com.google.devtools.build.lib.buildeventstream.BuildEventServiceAbruptExitCallback;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildStarted;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Progress;
@@ -54,7 +53,6 @@
 public class TextFormatFileTransportTest {
   private final BuildEventProtocolOptions defaultOpts =
       Options.getDefaults(BuildEventProtocolOptions.class);
-  private static final BuildEventServiceAbruptExitCallback NO_OP_EXIT_CALLBACK = (e) -> {};
 
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
 
@@ -90,7 +88,6 @@
             outputStream,
             defaultOpts,
             new LocalFilesArtifactUploader(),
-            NO_OP_EXIT_CALLBACK,
             artifactGroupNamer);
     transport.sendBuildEvent(buildEvent);