Avoid nullness in BuildEventServiceUploader#closeFuture.

The BES uploader is currently relying on the nullness of closeFuture for its business logic. With this CL we clean this up by making the BES uploader use concrete values for closeFuture.

PiperOrigin-RevId: 245212323
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index ca3b4e6..145c227 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -70,6 +70,7 @@
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.logging.Level;
@@ -104,6 +105,7 @@
   private final Clock clock;
   private final ArtifactGroupNamer namer;
   private final EventBus eventBus;
+  private final AtomicBoolean startedClose = new AtomicBoolean(false);
 
   /**
    * The event queue contains two types of events: - Build events, sorted by sequence number, that
@@ -123,14 +125,7 @@
   @GuardedBy("lock")
   private Result buildStatus = UNKNOWN_STATUS;
 
-  /**
-   * Initialized only after the first call to {@link #close()} or if the upload fails before that.
-   * The {@code null} state is used throughout the code to make multiple calls to {@link #close()}
-   * idempotent.
-   */
-  @GuardedBy("lock")
-  private SettableFuture<Void> closeFuture;
-
+  private final SettableFuture<Void> closeFuture = SettableFuture.create();
   private final SettableFuture<Void> halfCloseFuture = SettableFuture.create();
 
   /**
@@ -183,31 +178,25 @@
     ListenableFuture<PathConverter> localFileUploadFuture =
         localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
 
-    synchronized (lock) {
-      if (closeFuture != null) {
-        // Close has been called and thus we silently ignore any further events and cancel
-        // any pending file uploads
-        closeFuture.addListener(
-            () -> {
-              if (!localFileUploadFuture.isDone()) {
-                localFileUploadFuture.cancel(true);
-              }
-            },
-            MoreExecutors.directExecutor());
-        return;
+    if (startedClose.get()) {
+      if (!localFileUploadFuture.isDone()) {
+        localFileUploadFuture.cancel(true);
       }
-      // BuildCompletingEvent marks the end of the build in the BEP event stream.
-      if (event instanceof BuildCompletingEvent) {
+      return;
+    }
+    // BuildCompletingEvent marks the end of the build in the BEP event stream.
+    if (event instanceof BuildCompletingEvent) {
+      synchronized (lock) {
         this.buildStatus = extractBuildStatus((BuildCompletingEvent) event);
       }
-      ensureUploadThreadStarted();
-      eventQueue.addLast(
-          new SendRegularBuildEventCommand(
-              event,
-              localFileUploadFuture,
-              nextSeqNum.getAndIncrement(),
-              Timestamps.fromMillis(clock.currentTimeMillis())));
     }
+    ensureUploadThreadStarted();
+    eventQueue.addLast(
+        new SendRegularBuildEventCommand(
+            event,
+            localFileUploadFuture,
+            nextSeqNum.getAndIncrement(),
+            Timestamps.fromMillis(clock.currentTimeMillis())));
   }
 
   /**
@@ -217,34 +206,30 @@
    * <p>The returned future completes when the upload completes. It's guaranteed to never fail.
    */
   public ListenableFuture<Void> close() {
-    synchronized (lock) {
-      if (closeFuture != null) {
-        return closeFuture;
-      }
-      ensureUploadThreadStarted();
-
-      closeFuture = SettableFuture.create();
-
-      // Enqueue the last event which will terminate the upload.
-      eventQueue.addLast(
-          new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
-
-      if (!closeTimeout.isZero()) {
-        startCloseTimer(closeFuture, closeTimeout);
-      }
-
-      final SettableFuture<Void> finalCloseFuture = closeFuture;
-      closeFuture.addListener(
-          () -> {
-            // Make sure to cancel any pending uploads if the closing is cancelled.
-            if (finalCloseFuture.isCancelled()) {
-              closeOnCancel();
-            }
-          },
-          MoreExecutors.directExecutor());
-
+    if (startedClose.getAndSet(true)) {
       return closeFuture;
     }
+
+    ensureUploadThreadStarted();
+
+    // Enqueue the last event which will terminate the upload.
+    eventQueue.addLast(new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
+
+    if (!closeTimeout.isZero()) {
+      startCloseTimer(closeFuture, closeTimeout);
+    }
+
+    final SettableFuture<Void> finalCloseFuture = closeFuture;
+    closeFuture.addListener(
+        () -> {
+          // Make sure to cancel any pending uploads if the closing is cancelled.
+          if (finalCloseFuture.isCancelled()) {
+            closeOnCancel();
+          }
+        },
+        MoreExecutors.directExecutor());
+
+    return closeFuture;
   }
 
   private void closeOnTimeout() {
@@ -254,7 +239,7 @@
     }
   }
 
-  void closeOnCancel() {
+  private void closeOnCancel() {
     synchronized (lock) {
       interruptCausedByCancel = true;
       closeNow();
@@ -273,19 +258,10 @@
     }
   }
 
-  private void initCloseFutureOnFailure(Throwable cause) {
-    synchronized (lock) {
-      if (closeFuture == null) {
-        closeFuture = SettableFuture.create();
-      }
-      closeFuture.setException(cause);
-    }
-  }
-
   private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) {
     checkState(!exitCode.equals(ExitCode.SUCCESS));
     logger.severe(message);
-    initCloseFutureOnFailure(new AbruptExitException(message, exitCode, cause));
+    closeFuture.setException(new AbruptExitException(message, exitCode, cause));
   }
 
   @Override
@@ -308,13 +284,6 @@
           publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus));
         }
       }
-      synchronized (lock) {
-        // Invariant: closeFuture is not null.
-        // publishBuildEvents() only terminates successfully after SendLastBuildEventCommand
-        // has been sent successfully and that event is only added to the eventQueue during a
-        // call to close() which initializes the closeFuture.
-        closeFuture.set(null);
-      }
     } catch (InterruptedException e) {
       logger.info("Aborting the BES upload due to having received an interrupt");
       synchronized (lock) {
@@ -342,11 +311,12 @@
           ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
           e.getCause());
     } catch (Throwable e) {
-      initCloseFutureOnFailure(e);
+      closeFuture.setException(e);
       logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug.");
       throw e;
     } finally {
       localFileUploader.shutdown();
+      closeFuture.set(null);
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 93d7da1..fd1679f 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -149,17 +149,15 @@
     }
 
     private void exitFailure(Throwable e) {
+      String msg =
+          String.format("Unable to write all BEP events to file due to '%s'", e.getMessage());
       closeFuture.setException(
-          new AbruptExitException(
-              "Unable to write all BEP events to file due to " + e.getMessage(),
-              ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
-              e));
+          new AbruptExitException(msg, ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, e));
       pendingWrites.clear();
-      logger.log(
-          Level.SEVERE, "Unable to write all BEP events to file due to " + e.getMessage(), e);
+      logger.log(Level.SEVERE, msg, e);
     }
 
-    void closeNow() {
+    private void closeNow() {
       if (closeFuture.isDone()) {
         return;
       }
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
index bde6dda..49dbf16 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
@@ -157,7 +157,7 @@
         assertThrows(ExecutionException.class, () -> transport.close().get());
     assertThat(expected)
         .hasMessageThat()
-        .containsMatch("Unable to write all BEP events to file due to Task was cancelled");
+        .contains("Unable to write all BEP events to file due to 'Task was cancelled.'");
 
     assertThat(transport.writer.pendingWrites).isEmpty();
     try (InputStream in = new FileInputStream(output)) {