Implement --bes_upload_mode=fully_async.

Unlike NOWAIT_FOR_UPLOAD_COMPLETE, this mode does not block the subsequent build on server-side acknowledgement.

All file uploads should be complete by the time we unblock, so there should not be a risk of a racy read.

In case of server shutdown, we attempt to block on acknowledgement just as in NOWAIT_FOR_UPLOAD_COMPLETE.

RELNOTES: None
PiperOrigin-RevId: 244867998
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index da511ff..2d41ab4 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -28,6 +28,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
 import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
 import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
@@ -96,6 +97,15 @@
   private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesMap =
       ImmutableMap.of();
 
+  /**
+   * Holds the half-close futures for the upload of each transport. The completion of the half-close
+   * indicates that the client has sent all of the data to the server and is just waiting for
+   * acknowledgement. The client must still keep the data buffered locally in case acknowledgement
+   * fails.
+   */
+  private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> halfCloseFuturesMap =
+      ImmutableMap.of();
+
   // TODO(lpino): Use Optional instead of @Nullable for the members below.
   @Nullable private OutErr outErr;
   @Nullable private ImmutableSet<BuildEventTransport> bepTransports;
@@ -147,6 +157,7 @@
         .values()
         .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
     closeFuturesMap = ImmutableMap.of();
+    halfCloseFuturesMap = ImmutableMap.of();
   }
 
   private void waitForPreviousInvocation() {
@@ -157,8 +168,12 @@
     try {
       // TODO(b/234994611): Find a way to print a meaningful message when waiting. The current
       // infrastructure doesn't allow printing messages in the terminal in beforeCommand.
+      ImmutableMap<BuildEventTransport, ListenableFuture<Void>> futureMap =
+          besOptions.besUploadMode == BesUploadMode.FULLY_ASYNC
+              ? halfCloseFuturesMap
+              : closeFuturesMap;
       Uninterruptibles.getUninterruptibly(
-          Futures.allAsList(closeFuturesMap.values()),
+          Futures.allAsList(futureMap.values()),
           getMaxWaitForPreviousInvocation().getSeconds(),
           TimeUnit.SECONDS);
     } catch (TimeoutException exception) {
@@ -278,6 +293,7 @@
     try {
       // TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
       //  passed. We should fix this by waiting at most the value set by bes_timeout.
+      googleLogger.atInfo().log("Closing pending build event transports");
       Uninterruptibles.getUninterruptibly(
           Futures.allAsList(streamer.getCloseFuturesMap().values()));
     } catch (ExecutionException e) {
@@ -380,12 +396,14 @@
 
   private void closeBepTransports() throws AbruptExitException {
     closeFuturesMap = streamer.getCloseFuturesMap();
+    halfCloseFuturesMap = streamer.getHalfClosedMap();
     switch (besOptions.besUploadMode) {
       case WAIT_FOR_UPLOAD_COMPLETE:
         waitForBuildEventTransportsToClose();
         return;
 
       case NOWAIT_FOR_UPLOAD_COMPLETE:
+      case FULLY_ASYNC:
         // When running asynchronously notify the UI immediately since we won't wait for the
         // uploads to close.
         for (BuildEventTransport bepTransport : bepTransports) {
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
index ed22d50..7bf8231 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
@@ -139,8 +139,15 @@
 
   /** Determines the mode that will be used to upload data to the Build Event Service. */
   public enum BesUploadMode {
+    /** Block at the end of the build waiting for the upload to complete */
     WAIT_FOR_UPLOAD_COMPLETE,
-    NOWAIT_FOR_UPLOAD_COMPLETE
+    /** Block at the beginning of the next build waiting for upload completion */
+    NOWAIT_FOR_UPLOAD_COMPLETE,
+    /**
+     * Block at the beginning of the next build waiting for the client to finish uploading the data,
+     * but possibly not blocking on the server acknowledgement.
+     */
+    FULLY_ASYNC,
   }
 
   /** Converter for {@link BesUploadMode} */
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 30f4374..ca3b4e6 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -131,6 +131,8 @@
   @GuardedBy("lock")
   private SettableFuture<Void> closeFuture;
 
+  private final SettableFuture<Void> halfCloseFuture = SettableFuture.create();
+
   /**
    * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily
    * on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes
@@ -459,6 +461,7 @@
                       lastEvent.getSequenceNumber(), lastEvent.getCreationTime());
               streamContext.sendOverStream(request);
               streamContext.halfCloseStream();
+              halfCloseFuture.set(null);
             }
             break;
 
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
index 782d36a..7d15fab 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
@@ -46,15 +46,32 @@
 
   /**
    * Initiates a close. Callers may listen to the returned future to be notified when the close is
-   * complete i.e. wait for all build events to be sent. The future may contain any information
-   * about possible transport errors.
+   * complete i.e. wait for all build events to be sent and acknowledged. The future may contain any
+   * information about possible transport errors.
    *
    * <p>This method might be called multiple times without any effect after the first call.
    *
-   * <p>This method should not throw any exceptions.
+   * <p>This method should not throw any exceptions, but the returned Future might.
    */
   ListenableFuture<Void> close();
 
+  /**
+   * Returns the status of half-close. Callers may listen to the return future to be notified when
+   * the half-close is complete
+   *
+   * <p>Half-close indicates that all client-side data is transmitted but still waiting on
+   * server-side acknowledgement. The client must buffer the information in case the server fails to
+   * acknowledge.
+   *
+   * <p>Implementations may choose to return the full close Future via {@link #close()} if there is
+   * no sensible half-close state.
+   *
+   * <p>This should be only called after {@link #close()}.
+   */
+  default ListenableFuture<Void> getHalfCloseFuture() {
+    return close();
+  }
+
   @VisibleForTesting
   @Nullable
   BuildEventArtifactUploader getUploader();
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 560dd18..93d7da1 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import javax.annotation.concurrent.ThreadSafe;
@@ -85,6 +86,7 @@
     private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
 
     private final BuildEventArtifactUploader uploader;
+    private final AtomicBoolean isClosed = new AtomicBoolean();
 
     @VisibleForTesting
     final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
@@ -170,7 +172,9 @@
     }
 
     public ListenableFuture<Void> close() {
-      if (closeFuture.isDone()) {
+      if (isClosed.getAndSet(true)) {
+        return closeFuture;
+      } else if (closeFuture.isDone()) {
         return closeFuture;
       }
 
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index ef4bdb9..742aa2b 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -120,6 +120,15 @@
       ImmutableMap.of();
 
   /**
+   * Holds the half-close futures for the upload of each transport. The completion of the half-close
+   * indicates that the client has sent all of the data to the server and is just waiting for
+   * acknowledgement. The client must still keep the data buffered locally in case acknowledgement
+   * fails.
+   */
+  private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> halfCloseFuturesMap =
+      ImmutableMap.of();
+
+  /**
    * Provider for stdout and stderr output.
    */
   public interface OutErrProvider {
@@ -341,10 +350,16 @@
     ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> closeFuturesMapBuilder =
         ImmutableMap.builder();
     for (final BuildEventTransport transport : transports) {
-      ListenableFuture<Void> closeFuture = transport.close();
-      closeFuturesMapBuilder.put(transport, closeFuture);
+      closeFuturesMapBuilder.put(transport, transport.close());
     }
     closeFuturesMap = closeFuturesMapBuilder.build();
+
+    ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> halfCloseFuturesMapBuilder =
+        ImmutableMap.builder();
+    for (final BuildEventTransport transport : transports) {
+      halfCloseFuturesMapBuilder.put(transport, transport.getHalfCloseFuture());
+    }
+    halfCloseFuturesMap = halfCloseFuturesMapBuilder.build();
   }
 
   private void maybeReportArtifactSet(
@@ -685,6 +700,19 @@
     return closeFuturesMap;
   }
 
+  /**
+   * Returns the map from BEP transports to their corresponding half-close futures.
+   *
+   * <p>Half-close indicates that all client-side data is transmitted but still waiting on
+   * server-side acknowledgement. The client must buffer the information in case the server fails to
+   * acknowledge.
+   *
+   * <p>If this method is called before calling {@link #close()} then it will return an empty map.
+   */
+  public synchronized ImmutableMap<BuildEventTransport, ListenableFuture<Void>> getHalfClosedMap() {
+    return halfCloseFuturesMap;
+  }
+
   /** A builder for {@link BuildEventStreamer}. */
   public static class Builder {
     private Set<BuildEventTransport> buildEventTransports;