Implement --bes_upload_mode=fully_async.
Unlike NOWAIT_FOR_UPLOAD_COMPLETE, this mode does not block the subsequent build on server-side acknowledgement.
All file uploads should be complete by the time we unblock, so there should not be a risk of a racy read.
In case of server shutdown, we attempt to block on acknowledgement just as in NOWAIT_FOR_UPLOAD_COMPLETE.
RELNOTES: None
PiperOrigin-RevId: 244867998
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index da511ff..2d41ab4 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -28,6 +28,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
@@ -96,6 +97,15 @@
private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesMap =
ImmutableMap.of();
+ /**
+ * Holds the half-close futures for the upload of each transport. The completion of the half-close
+ * indicates that the client has sent all of the data to the server and is just waiting for
+ * acknowledgement. The client must still keep the data buffered locally in case acknowledgement
+ * fails.
+ */
+ private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> halfCloseFuturesMap =
+ ImmutableMap.of();
+
// TODO(lpino): Use Optional instead of @Nullable for the members below.
@Nullable private OutErr outErr;
@Nullable private ImmutableSet<BuildEventTransport> bepTransports;
@@ -147,6 +157,7 @@
.values()
.forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
closeFuturesMap = ImmutableMap.of();
+ halfCloseFuturesMap = ImmutableMap.of();
}
private void waitForPreviousInvocation() {
@@ -157,8 +168,12 @@
try {
// TODO(b/234994611): Find a way to print a meaningful message when waiting. The current
// infrastructure doesn't allow printing messages in the terminal in beforeCommand.
+ ImmutableMap<BuildEventTransport, ListenableFuture<Void>> futureMap =
+ besOptions.besUploadMode == BesUploadMode.FULLY_ASYNC
+ ? halfCloseFuturesMap
+ : closeFuturesMap;
Uninterruptibles.getUninterruptibly(
- Futures.allAsList(closeFuturesMap.values()),
+ Futures.allAsList(futureMap.values()),
getMaxWaitForPreviousInvocation().getSeconds(),
TimeUnit.SECONDS);
} catch (TimeoutException exception) {
@@ -278,6 +293,7 @@
try {
// TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
// passed. We should fix this by waiting at most the value set by bes_timeout.
+ googleLogger.atInfo().log("Closing pending build event transports");
Uninterruptibles.getUninterruptibly(
Futures.allAsList(streamer.getCloseFuturesMap().values()));
} catch (ExecutionException e) {
@@ -380,12 +396,14 @@
private void closeBepTransports() throws AbruptExitException {
closeFuturesMap = streamer.getCloseFuturesMap();
+ halfCloseFuturesMap = streamer.getHalfClosedMap();
switch (besOptions.besUploadMode) {
case WAIT_FOR_UPLOAD_COMPLETE:
waitForBuildEventTransportsToClose();
return;
case NOWAIT_FOR_UPLOAD_COMPLETE:
+ case FULLY_ASYNC:
// When running asynchronously notify the UI immediately since we won't wait for the
// uploads to close.
for (BuildEventTransport bepTransport : bepTransports) {
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
index ed22d50..7bf8231 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
@@ -139,8 +139,15 @@
/** Determines the mode that will be used to upload data to the Build Event Service. */
public enum BesUploadMode {
+ /** Block at the end of the build waiting for the upload to complete */
WAIT_FOR_UPLOAD_COMPLETE,
- NOWAIT_FOR_UPLOAD_COMPLETE
+ /** Block at the beginning of the next build waiting for upload completion */
+ NOWAIT_FOR_UPLOAD_COMPLETE,
+ /**
+ * Block at the beginning of the next build waiting for the client to finish uploading the data,
+ * but possibly not blocking on the server acknowledgement.
+ */
+ FULLY_ASYNC,
}
/** Converter for {@link BesUploadMode} */
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 30f4374..ca3b4e6 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -131,6 +131,8 @@
@GuardedBy("lock")
private SettableFuture<Void> closeFuture;
+ private final SettableFuture<Void> halfCloseFuture = SettableFuture.create();
+
/**
* The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily
* on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes
@@ -459,6 +461,7 @@
lastEvent.getSequenceNumber(), lastEvent.getCreationTime());
streamContext.sendOverStream(request);
streamContext.halfCloseStream();
+ halfCloseFuture.set(null);
}
break;
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
index 782d36a..7d15fab 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
@@ -46,15 +46,32 @@
/**
* Initiates a close. Callers may listen to the returned future to be notified when the close is
- * complete i.e. wait for all build events to be sent. The future may contain any information
- * about possible transport errors.
+ * complete i.e. wait for all build events to be sent and acknowledged. The future may contain any
+ * information about possible transport errors.
*
* <p>This method might be called multiple times without any effect after the first call.
*
- * <p>This method should not throw any exceptions.
+ * <p>This method should not throw any exceptions, but the returned Future might.
*/
ListenableFuture<Void> close();
+ /**
+ * Returns the status of half-close. Callers may listen to the return future to be notified when
+ * the half-close is complete
+ *
+ * <p>Half-close indicates that all client-side data is transmitted but still waiting on
+ * server-side acknowledgement. The client must buffer the information in case the server fails to
+ * acknowledge.
+ *
+ * <p>Implementations may choose to return the full close Future via {@link #close()} if there is
+ * no sensible half-close state.
+ *
+ * <p>This should be only called after {@link #close()}.
+ */
+ default ListenableFuture<Void> getHalfCloseFuture() {
+ return close();
+ }
+
@VisibleForTesting
@Nullable
BuildEventArtifactUploader getUploader();
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 560dd18..93d7da1 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -43,6 +43,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
@@ -85,6 +86,7 @@
private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
private final BuildEventArtifactUploader uploader;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
@VisibleForTesting
final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
@@ -170,7 +172,9 @@
}
public ListenableFuture<Void> close() {
- if (closeFuture.isDone()) {
+ if (isClosed.getAndSet(true)) {
+ return closeFuture;
+ } else if (closeFuture.isDone()) {
return closeFuture;
}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index ef4bdb9..742aa2b 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -120,6 +120,15 @@
ImmutableMap.of();
/**
+ * Holds the half-close futures for the upload of each transport. The completion of the half-close
+ * indicates that the client has sent all of the data to the server and is just waiting for
+ * acknowledgement. The client must still keep the data buffered locally in case acknowledgement
+ * fails.
+ */
+ private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> halfCloseFuturesMap =
+ ImmutableMap.of();
+
+ /**
* Provider for stdout and stderr output.
*/
public interface OutErrProvider {
@@ -341,10 +350,16 @@
ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> closeFuturesMapBuilder =
ImmutableMap.builder();
for (final BuildEventTransport transport : transports) {
- ListenableFuture<Void> closeFuture = transport.close();
- closeFuturesMapBuilder.put(transport, closeFuture);
+ closeFuturesMapBuilder.put(transport, transport.close());
}
closeFuturesMap = closeFuturesMapBuilder.build();
+
+ ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> halfCloseFuturesMapBuilder =
+ ImmutableMap.builder();
+ for (final BuildEventTransport transport : transports) {
+ halfCloseFuturesMapBuilder.put(transport, transport.getHalfCloseFuture());
+ }
+ halfCloseFuturesMap = halfCloseFuturesMapBuilder.build();
}
private void maybeReportArtifactSet(
@@ -685,6 +700,19 @@
return closeFuturesMap;
}
+ /**
+ * Returns the map from BEP transports to their corresponding half-close futures.
+ *
+ * <p>Half-close indicates that all client-side data is transmitted but still waiting on
+ * server-side acknowledgement. The client must buffer the information in case the server fails to
+ * acknowledge.
+ *
+ * <p>If this method is called before calling {@link #close()} then it will return an empty map.
+ */
+ public synchronized ImmutableMap<BuildEventTransport, ListenableFuture<Void>> getHalfClosedMap() {
+ return halfCloseFuturesMap;
+ }
+
/** A builder for {@link BuildEventStreamer}. */
public static class Builder {
private Set<BuildEventTransport> buildEventTransports;