Implement asynchronous Build Event Service (BES) upload.
We introduce a flag to control the flow of invocations w.r.t. BES upload. This new flag is called --bes_upload_mode and it currently has two possible values:
- WAIT_FOR_UPLOAD_COMPLETE (default, present in before this commit): In this mode Bazel will wait for the communication with the BES to finish.
- NOWAIT_FOR_UPLOAD_COMPLETE (new mode): In this mode Bazel *won't* wait for the BES upload to finish, instead it will finish the invocation as soon as it can. The upload continues in the background and if a new invocation starts Bazel will wait five seconds for the previous upload to finish before (abruptly) cancelling it.
PiperOrigin-RevId: 243591454
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
index 4049fa5..e037768 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
@@ -39,6 +39,7 @@
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//third_party:auto_value",
+ "//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
"//third_party/grpc:grpc-jar",
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index b0529d2..671f032 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -69,6 +70,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -82,6 +84,9 @@
extends BlazeModule {
private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());
+ private static final GoogleLogger googleLogger = GoogleLogger.forEnclosingClass();
+ // TODO(lpino): Consider making the wait value configurable.
+ private static final Duration MAX_WAIT_FOR_PREVIOUS_INVOCATION = Duration.ofSeconds(5);
private final AtomicReference<AbruptExitException> pendingAbruptExitException =
new AtomicReference<>();
@@ -90,14 +95,17 @@
private AuthAndTLSOptions authTlsOptions;
private BuildEventStreamOptions besStreamOptions;
private boolean useExperimentalUi;
+ /** Holds the close futures for the upload of each transport */
+ private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesMap =
+ ImmutableMap.of();
// TODO(lpino): Use Optional instead of @Nullable for the members below.
- private @Nullable OutErr outErr;
- private @Nullable ImmutableSet<BuildEventTransport> bepTransports;
- private @Nullable String buildRequestId;
- private @Nullable String invocationId;
- private @Nullable Reporter cmdLineReporter;
- private @Nullable BuildEventStreamer streamer;
+ @Nullable private OutErr outErr;
+ @Nullable private ImmutableSet<BuildEventTransport> bepTransports;
+ @Nullable private String buildRequestId;
+ @Nullable private String invocationId;
+ @Nullable private Reporter cmdLineReporter;
+ @Nullable private BuildEventStreamer streamer;
protected BESOptionsT besOptions;
@@ -128,7 +136,7 @@
// Don't hide unchecked exceptions as part of the error reporting.
Throwables.throwIfUnchecked(exception);
- logger.log(Level.SEVERE, msg, exception);
+ googleLogger.atSevere().withCause(exception).log(msg);
AbruptExitException abruptException = new AbruptExitException(msg, exitCode, exception);
reportCommandLineError(commandLineReporter, exception);
moduleEnvironment.exit(abruptException);
@@ -143,15 +151,65 @@
BuildEventProtocolOptions.class);
}
+ private void cancelPendingUploads() {
+ closeFuturesMap
+ .values()
+ .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
+ closeFuturesMap = ImmutableMap.of();
+ }
+
+ private void waitForPreviousInvocation() {
+ AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
+ if (pendingException != null) {
+ cmdLineReporter.handle(
+ Event.warn(
+ String.format(
+ "Previous invocation failed to finish Build Event Protocol upload with "
+ + "the following exception: '%s'. "
+ + "Ignoring the failure and starting a new invocation...",
+ pendingException.getMessage())));
+ cancelPendingUploads();
+ return;
+ }
+
+ if (closeFuturesMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ // TODO(b/234994611): Find a way to print a meaningful message when waiting. The current
+ // infrastructure doesn't allow printing messages in the terminal in beforeCommand.
+ Uninterruptibles.getUninterruptibly(
+ Futures.allAsList(closeFuturesMap.values()),
+ MAX_WAIT_FOR_PREVIOUS_INVOCATION.getSeconds(),
+ TimeUnit.SECONDS);
+ } catch (TimeoutException exception) {
+ String msg =
+ String.format(
+ "Pending Build Event Protocol upload took more than %ds to finish. "
+ + "Cancelling and starting a new invocation...",
+ MAX_WAIT_FOR_PREVIOUS_INVOCATION.getSeconds());
+ cmdLineReporter.handle(Event.warn(msg));
+ googleLogger.atWarning().withCause(exception).log(msg);
+ } catch (ExecutionException exception) {
+ String msg =
+ String.format(
+ "Previous Build Event Protocol upload failed with "
+ + "the following exception: '%s'. "
+ + "Ignoring the failure and starting a new invocation...",
+ exception.getMessage());
+ cmdLineReporter.handle(Event.warn(msg));
+ googleLogger.atWarning().withCause(exception).log(msg);
+ } finally {
+ cancelPendingUploads();
+ }
+ }
+
@Override
public void beforeCommand(CommandEnvironment cmdEnv) {
this.invocationId = cmdEnv.getCommandId().toString();
this.buildRequestId = cmdEnv.getBuildRequestId();
this.cmdLineReporter = cmdEnv.getReporter();
- // Reset to null in case afterCommand was not called.
- // TODO(lpino): Remove this statement once {@link BlazeModule#afterCommmand()} is guaranteed
- // to be executed for every invocation.
- this.outErr = null;
OptionsParsingResult parsingResult = cmdEnv.getOptions();
this.besOptions = Preconditions.checkNotNull(parsingResult.getOptions(optionsClass()));
@@ -175,6 +233,8 @@
.select(bepOptions.buildEventUploadStrategy)
.create(cmdEnv));
+ waitForPreviousInvocation();
+
if (!whitelistedCommands(besOptions).contains(cmdEnv.getCommandName())) {
// Exit early if the running command isn't supported.
return;
@@ -230,17 +290,52 @@
return outErr;
}
+ private void forceShutdownBuildEventStreamer() {
+ streamer.close(AbortReason.INTERNAL);
+
+ try {
+ // TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
+ // passed. We should fix this by waiting at most the value set by bes_timeout.
+ Uninterruptibles.getUninterruptibly(
+ Futures.allAsList(streamer.getCloseFuturesMap().values()));
+ } catch (ExecutionException e) {
+ googleLogger.atSevere().withCause(e).log("Failed to close a build event transport");
+ LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e);
+ } finally {
+ cancelPendingUploads();
+ }
+ }
+
@Override
public void blazeShutdownOnCrash() {
if (streamer != null) {
- logger.warning("Attempting to close BES streamer on crash");
- streamer.close(AbortReason.INTERNAL);
+ googleLogger.atWarning().log("Attempting to close BES streamer on crash");
+ forceShutdownBuildEventStreamer();
+ }
+ }
- try {
- waitForBuildEventTransportsToClose(streamer.getCloseFuturesMap());
- } catch (AbruptExitException e) {
- LoggingUtil.logToRemote(Level.WARNING, "Failure while waiting for BES close", e);
- }
+ @Override
+ public void blazeShutdown() {
+ AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
+ if (pendingException != null) {
+ cancelPendingUploads();
+ return;
+ }
+
+ if (closeFuturesMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ Uninterruptibles.getUninterruptibly(
+ Futures.allAsList(closeFuturesMap.values()),
+ MAX_WAIT_FOR_PREVIOUS_INVOCATION.getSeconds(),
+ TimeUnit.SECONDS);
+ } catch (TimeoutException | ExecutionException exception) {
+ googleLogger.atWarning().withCause(exception).log(
+ "Encountered Exception when closing BEP transports in Blaze's shutting down sequence");
+ } finally {
+ cancelPendingUploads();
}
}
@@ -254,9 +349,7 @@
+ "s."));
}
- private void waitForBuildEventTransportsToClose(
- ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesMap)
- throws AbruptExitException {
+ private void waitForBuildEventTransportsToClose() {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("bes-notify-ui-%d").build());
@@ -283,16 +376,22 @@
TimeUnit.SECONDS);
}
- // Wait synchronously for all the futures to finish.
try (AutoProfiler p = AutoProfiler.logged("waiting for BES close", logger)) {
+ // TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
+ // passed. We should fix this by waiting at most the value set by bes_timeout.
Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
}
- } catch (ExecutionException exception) {
- throw new AbruptExitException(
- "Failed to close a build event transport",
- ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
- exception);
+ } catch (ExecutionException e) {
+ this.pendingAbruptExitException.compareAndSet(
+ null,
+ new AbruptExitException(
+ String.format(
+ "Failed to close a build event transport with exception '%s'", e.getMessage()),
+ ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+ e));
} finally {
+ cancelPendingUploads();
+
if (waitMessageFuture != null) {
waitMessageFuture.cancel(/* mayInterruptIfRunning= */ true);
}
@@ -303,19 +402,41 @@
}
}
+ private void closeBepTransports() throws AbruptExitException {
+ closeFuturesMap = streamer.getCloseFuturesMap();
+ switch (besOptions.besUploadMode) {
+ case WAIT_FOR_UPLOAD_COMPLETE:
+ waitForBuildEventTransportsToClose();
+ AbruptExitException e = pendingAbruptExitException.getAndSet(null);
+ if (e != null) {
+ throw e;
+ }
+ return;
+
+ case NOWAIT_FOR_UPLOAD_COMPLETE:
+ // When running asynchronously notify the UI immediately since we won't wait for the
+ // uploads to close.
+ for (BuildEventTransport bepTransport : bepTransports) {
+ cmdLineReporter.post(new BuildEventTransportClosedEvent(bepTransport));
+ }
+ return;
+ }
+ throw new IllegalStateException("Unknown BesUploadMode found: " + besOptions.besUploadMode);
+ }
+
@Override
public void afterCommand() throws AbruptExitException {
if (streamer != null) {
if (!streamer.isClosed()) {
// This should not occur, but close with an internal error if a {@link BuildEventStreamer}
// bug manifests as an unclosed streamer.
- logger.warning("Attempting to close BES streamer after command");
+ googleLogger.atWarning().log("Attempting to close BES streamer after command");
String msg = "BES was not properly closed";
LoggingUtil.logToRemote(Level.WARNING, msg, new IllegalStateException(msg));
- streamer.close(AbortReason.INTERNAL);
+ forceShutdownBuildEventStreamer();
}
- waitForBuildEventTransportsToClose(streamer.getCloseFuturesMap());
+ closeBepTransports();
if (!Strings.isNullOrEmpty(besOptions.besBackend)) {
constructAndMaybeReportInvocationIdUrl();
@@ -324,11 +445,6 @@
}
}
- AbruptExitException e = pendingAbruptExitException.getAndSet(null);
- if (e != null) {
- throw e;
- }
-
if (!besStreamOptions.keepBackendConnections) {
clearBesClient();
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
index 2c23ff1..eda45ce 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
@@ -15,6 +15,7 @@
package com.google.devtools.build.lib.buildeventservice;
import com.google.devtools.common.options.Converters;
+import com.google.devtools.common.options.EnumConverter;
import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionDocumentationCategory;
import com.google.devtools.common.options.OptionEffectTag;
@@ -124,4 +125,28 @@
+ " backend. Bazel will output the URL appended by the invocation id to the"
+ " terminal.")
public String besResultsUrl;
+
+ @Option(
+ name = "bes_upload_mode",
+ defaultValue = "WAIT_FOR_UPLOAD_COMPLETE",
+ converter = BesUploadModeConverter.class,
+ documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
+ effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT},
+ help =
+ "Specifies whether the Build Event Service upload should block the build completion "
+ + "or should end the invocation immediately and finish the upload in the background.")
+ public BesUploadMode besUploadMode;
+
+ /** Determines the mode that will be used to upload data to the Build Event Service. */
+ public enum BesUploadMode {
+ WAIT_FOR_UPLOAD_COMPLETE,
+ NOWAIT_FOR_UPLOAD_COMPLETE
+ }
+
+ /** Converter for {@link BesUploadMode} */
+ public static class BesUploadModeConverter extends EnumConverter<BesUploadMode> {
+ public BesUploadModeConverter() {
+ super(BesUploadMode.class, "Mode for uploading to the Build Event Service");
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index da71445..7707872d 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -72,7 +72,16 @@
// per API contract it is expected to never fail.
SettableFuture<Void> closeFuture = SettableFuture.create();
ListenableFuture<Void> uploaderCloseFuture = besUploader.close();
- uploaderCloseFuture.addListener(() -> closeFuture.set(null), MoreExecutors.directExecutor());
+ uploaderCloseFuture.addListener(
+ () -> {
+ // Make sure to cancel any pending uploads if the closing is cancelled.
+ if (uploaderCloseFuture.isCancelled()) {
+ besUploader.closeOnCancel();
+ }
+ closeFuture.set(null);
+ },
+ MoreExecutors.directExecutor());
+
return closeFuture;
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 2535091..c69ff9a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -148,6 +148,9 @@
@GuardedBy("lock")
private boolean interruptCausedByTimeout;
+ @GuardedBy("lock")
+ private boolean interruptCausedByCancel;
+
private StreamContext streamContext;
private BuildEventServiceUploader(
@@ -239,22 +242,34 @@
}
}
- /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
private void closeOnTimeout() {
synchronized (lock) {
+ interruptCausedByTimeout = true;
+ closeNow();
+ }
+ }
+
+ void closeOnCancel() {
+ synchronized (lock) {
+ interruptCausedByCancel = true;
+ closeNow();
+ }
+ }
+
+ /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
+ private void closeNow() {
+ synchronized (lock) {
if (uploadThread != null) {
if (uploadThread.isInterrupted()) {
return;
}
-
- interruptCausedByTimeout = true;
uploadThread.interrupt();
}
}
}
private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) {
- checkState(exitCode != ExitCode.SUCCESS);
+ checkState(!exitCode.equals(ExitCode.SUCCESS));
logger.info(message);
abruptExitCallback.accept(new AbruptExitException(message, exitCode, cause));
}
@@ -291,11 +306,14 @@
logger.info("Aborting the BES upload due to having received an interrupt");
synchronized (lock) {
Preconditions.checkState(
- interruptCausedByTimeout, "Unexpected interrupt on BES uploader thread");
- logAndExitAbruptly(
- "The Build Event Protocol upload timed out",
- ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
- e);
+ interruptCausedByTimeout || interruptCausedByCancel,
+ "Unexpected interrupt on BES uploader thread");
+ if (interruptCausedByTimeout) {
+ logAndExitAbruptly(
+ "The Build Event Protocol upload timed out",
+ ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+ e);
+ }
}
} finally {
// TODO(buchgr): Due to b/113035235 exitFunc needs to be called before the close future
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java
index 4da6c78..5ac4448 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java
@@ -116,29 +116,4 @@
+ "of the set as well as the file and uri lengths, which may in turn depend on the "
+ "hash function.")
public int maxNamedSetEntries;
-
- // TODO(ruperts): Remove these public getter methods for consistency with other options classes?
- public String getBuildEventTextFile() {
- return buildEventTextFile;
- }
-
- public String getBuildEventBinaryFile() {
- return buildEventBinaryFile;
- }
-
- public String getBuildEventJsonFile() {
- return buildEventJsonFile;
- }
-
- public boolean getBuildEventTextFilePathConversion() {
- return buildEventTextFilePathConversion;
- }
-
- public boolean getBuildEventBinaryFilePathConversion() {
- return buildEventBinaryFilePathConversion;
- }
-
- public boolean getBuildEventJsonFilePathConversion() {
- return buildEventJsonFilePathConversion;
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 55444cb..87f837a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -187,6 +187,16 @@
if (closeFuture.isDone()) {
return closeFuture;
}
+
+ // Close abruptly if the closing future is cancelled.
+ closeFuture.addListener(
+ () -> {
+ if (closeFuture.isCancelled()) {
+ closeNow();
+ }
+ },
+ MoreExecutors.directExecutor());
+
try {
pendingWrites.put(CLOSE);
} catch (InterruptedException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index f592396..6a56407 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -78,7 +78,7 @@
@ThreadSafe
public class BuildEventStreamer {
private final Collection<BuildEventTransport> transports;
- private final BuildEventStreamOptions options;
+ private final BuildEventStreamOptions besOptions;
@GuardedBy("this")
private Set<BuildEventId> announcedEvents;
@@ -144,7 +144,7 @@
BuildEventStreamOptions options,
CountingArtifactGroupNamer artifactGroupNamer) {
this.transports = transports;
- this.options = options;
+ this.besOptions = options;
this.announcedEvents = null;
this.progressCount = 0;
this.artifactGroupNamer = artifactGroupNamer;
@@ -355,11 +355,11 @@
}
// We only split if the max number of entries is at least 2 (it must be at least a binary tree).
// The method throws for smaller values.
- if (options.maxNamedSetEntries >= 2) {
+ if (besOptions.maxNamedSetEntries >= 2) {
// We only split the event after naming it to avoid splitting the same node multiple times.
// Note that the artifactGroupNames keeps references to the individual pieces, so this can
// double the memory consumption of large nested sets.
- view = view.splitIfExceedsMaximumSize(options.maxNamedSetEntries);
+ view = view.splitIfExceedsMaximumSize(besOptions.maxNamedSetEntries);
}
for (NestedSetView<Artifact> transitive : view.transitives()) {
maybeReportArtifactSet(pathResolver, transitive);
@@ -645,7 +645,7 @@
/** Returns whether an {@link ActionExecutedEvent} should be published. */
private boolean shouldPublishActionExecutedEvent(ActionExecutedEvent event) {
- if (options.publishAllActions) {
+ if (besOptions.publishAllActions) {
return true;
}
if (event.getException() != null) {
@@ -713,3 +713,4 @@
}
}
}
+