Move BEP upload timeout management to the BES module.
Timeout management for the BEP transports is currently very ad-hoc. For the FileTransports we don't have a timeout at all and for the BES transport we create a separate future that cancels the upload in case bes_timeout is reached.
With this CL we unify the timeout management by wrapping the close Futures returned by the streamer with a Futures.withTimeout each. The timeout we use is the one specified by BuildEventTransport#getTimeout. Notice that this CL keeps the preserves the old behavior by setting the timeout of BES transport to bes_timeout and Duration.ZERO (e.g. no timeout) for FileTransport.
Another thing to notice when Futures.withTimeout times out it wraps the TimeoutException with an ExecutionException so we need to add some special-casing in the BES module to detect that case an print the right message in the terminal (instead of a cryptic TimeoutException stack trace).
PiperOrigin-RevId: 245325738
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 2d41ab4..145848c 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -25,6 +26,7 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
@@ -71,7 +73,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -86,9 +87,6 @@
private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());
private static final GoogleLogger googleLogger = GoogleLogger.forEnclosingClass();
- private final AtomicReference<AbruptExitException> pendingAbruptExitException =
- new AtomicReference<>();
-
private BuildEventProtocolOptions bepOptions;
private AuthAndTLSOptions authTlsOptions;
private BuildEventStreamOptions besStreamOptions;
@@ -160,6 +158,10 @@
halfCloseFuturesMap = ImmutableMap.of();
}
+ private static boolean isTimeoutException(ExecutionException e) {
+ return e.getCause() instanceof TimeoutException;
+ }
+
private void waitForPreviousInvocation() {
if (closeFuturesMap.isEmpty()) {
return;
@@ -184,15 +186,19 @@
getMaxWaitForPreviousInvocation().getSeconds());
cmdLineReporter.handle(Event.warn(msg));
googleLogger.atWarning().withCause(exception).log(msg);
- } catch (ExecutionException exception) {
+ } catch (ExecutionException e) {
+ // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
+ // times out.
+ String previousExceptionMsg =
+ isTimeoutException(e) ? "The Build Event Protocol upload timed out" : e.getMessage();
String msg =
String.format(
"Previous invocation failed to finish Build Event Protocol upload "
+ "with the following exception: '%s'. "
+ "Ignoring the failure and starting a new invocation...",
- exception.getMessage());
+ previousExceptionMsg);
cmdLineReporter.handle(Event.warn(msg));
- googleLogger.atWarning().withCause(exception).log(msg);
+ googleLogger.atWarning().withCause(e).log(msg);
} finally {
cancelPendingUploads();
}
@@ -289,13 +295,12 @@
private void forceShutdownBuildEventStreamer() {
streamer.close(AbortReason.INTERNAL);
-
+ closeFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
try {
// TODO(b/130148250): Uninterruptibles.getUninterruptibly waits forever if no timeout is
// passed. We should fix this by waiting at most the value set by bes_timeout.
googleLogger.atInfo().log("Closing pending build event transports");
- Uninterruptibles.getUninterruptibly(
- Futures.allAsList(streamer.getCloseFuturesMap().values()));
+ Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
} catch (ExecutionException e) {
googleLogger.atSevere().withCause(e).log("Failed to close a build event transport");
LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e);
@@ -314,12 +319,6 @@
@Override
public void blazeShutdown() {
- AbruptExitException pendingException = pendingAbruptExitException.getAndSet(null);
- if (pendingException != null) {
- cancelPendingUploads();
- return;
- }
-
if (closeFuturesMap.isEmpty()) {
return;
}
@@ -380,6 +379,15 @@
Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesMap.values()));
}
} catch (ExecutionException e) {
+ // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
+ // times out.
+ if (isTimeoutException(e)) {
+ throw new AbruptExitException(
+ "The Build Event Protocol upload timed out",
+ ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
+ e);
+ }
+
Throwables.throwIfInstanceOf(e.getCause(), AbruptExitException.class);
throw new RuntimeException(
String.format(
@@ -394,9 +402,42 @@
}
}
+ private static ImmutableMap<BuildEventTransport, ListenableFuture<Void>>
+ constructCloseFuturesMapWithTimeouts(
+ ImmutableMap<BuildEventTransport, ListenableFuture<Void>> bepTransportToCloseFuturesMap) {
+ ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> builder =
+ ImmutableMap.builder();
+
+ bepTransportToCloseFuturesMap.forEach(
+ (bepTransport, closeFuture) -> {
+ final ListenableFuture<Void> closeFutureWithTimeout;
+ if (bepTransport.getTimeout().isZero() || bepTransport.getTimeout().isNegative()) {
+ closeFutureWithTimeout = closeFuture;
+ } else {
+ final ScheduledExecutorService timeoutExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("bes-close-" + bepTransport.name() + "-%d")
+ .build());
+
+ closeFutureWithTimeout =
+ Futures.withTimeout(
+ closeFuture,
+ bepTransport.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS,
+ timeoutExecutor);
+ closeFutureWithTimeout.addListener(
+ () -> timeoutExecutor.shutdown(), MoreExecutors.directExecutor());
+ }
+ builder.put(bepTransport, closeFutureWithTimeout);
+ });
+
+ return builder.build();
+ }
+
private void closeBepTransports() throws AbruptExitException {
- closeFuturesMap = streamer.getCloseFuturesMap();
- halfCloseFuturesMap = streamer.getHalfClosedMap();
+ closeFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
+ halfCloseFuturesMap = constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap());
switch (besOptions.besUploadMode) {
case WAIT_FOR_UPLOAD_COMPLETE:
waitForBuildEventTransportsToClose();
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 682cdf3..88797dd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -34,6 +34,7 @@
/** A {@link BuildEventTransport} that streams {@link BuildEvent}s to BuildEventService. */
public class BuildEventServiceTransport implements BuildEventTransport {
private final BuildEventServiceUploader besUploader;
+ private final Duration besTimeout;
private BuildEventServiceTransport(
BuildEventServiceClient besClient,
@@ -46,6 +47,7 @@
EventBus eventBus,
Duration closeTimeout,
Sleeper sleeper) {
+ this.besTimeout = closeTimeout;
this.besUploader =
new BuildEventServiceUploader.Builder()
.besClient(besClient)
@@ -57,7 +59,6 @@
.sleeper(sleeper)
.artifactGroupNamer(artifactGroupNamer)
.eventBus(eventBus)
- .closeTimeout(closeTimeout)
.build();
}
@@ -81,6 +82,11 @@
besUploader.enqueueEvent(event);
}
+ @Override
+ public Duration getTimeout() {
+ return besTimeout;
+ }
+
/** A builder for {@link BuildEventServiceTransport}. */
public static class Builder {
private BuildEventServiceClient besClient;
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 145c227..53eab93 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -15,7 +15,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
@@ -51,7 +50,6 @@
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.ExitCode;
-import com.google.devtools.build.lib.util.LoggingUtil;
import com.google.devtools.build.lib.util.Sleeper;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
@@ -62,14 +60,11 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
-import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -100,7 +95,6 @@
private final BuildEventServiceProtoUtil besProtoUtil;
private final BuildEventProtocolOptions buildEventProtocolOptions;
private final boolean publishLifecycleEvents;
- private final Duration closeTimeout;
private final Sleeper sleeper;
private final Clock clock;
private final ArtifactGroupNamer namer;
@@ -137,9 +131,6 @@
private Thread uploadThread;
@GuardedBy("lock")
- private boolean interruptCausedByTimeout;
-
- @GuardedBy("lock")
private boolean interruptCausedByCancel;
private StreamContext streamContext;
@@ -150,7 +141,6 @@
BuildEventServiceProtoUtil besProtoUtil,
BuildEventProtocolOptions buildEventProtocolOptions,
boolean publishLifecycleEvents,
- Duration closeTimeout,
Sleeper sleeper,
Clock clock,
ArtifactGroupNamer namer,
@@ -160,7 +150,6 @@
this.besProtoUtil = besProtoUtil;
this.buildEventProtocolOptions = buildEventProtocolOptions;
this.publishLifecycleEvents = publishLifecycleEvents;
- this.closeTimeout = closeTimeout;
this.sleeper = sleeper;
this.clock = clock;
this.namer = namer;
@@ -215,10 +204,6 @@
// Enqueue the last event which will terminate the upload.
eventQueue.addLast(new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
- if (!closeTimeout.isZero()) {
- startCloseTimer(closeFuture, closeTimeout);
- }
-
final SettableFuture<Void> finalCloseFuture = closeFuture;
closeFuture.addListener(
() -> {
@@ -232,13 +217,6 @@
return closeFuture;
}
- private void closeOnTimeout() {
- synchronized (lock) {
- interruptCausedByTimeout = true;
- closeNow();
- }
- }
-
private void closeOnCancel() {
synchronized (lock) {
interruptCausedByCancel = true;
@@ -288,14 +266,7 @@
logger.info("Aborting the BES upload due to having received an interrupt");
synchronized (lock) {
Preconditions.checkState(
- interruptCausedByTimeout || interruptCausedByCancel,
- "Unexpected interrupt on BES uploader thread");
- if (interruptCausedByTimeout) {
- logAndExitAbruptly(
- "The Build Event Protocol upload timed out",
- ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
- e);
- }
+ interruptCausedByCancel, "Unexpected interrupt on BES uploader thread");
}
} catch (StatusException e) {
logAndExitAbruptly(
@@ -605,32 +576,6 @@
}
}
- private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeTimeout) {
- Thread closeTimer =
- new Thread(
- () -> {
- // Call closeOnTimeout() if the future does not complete within closeTimeout
- try {
- getUninterruptibly(closeFuture, closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- closeOnTimeout();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof TimeoutException) {
- // This is likely due to an internal timeout doing the local file uploading.
- closeOnTimeout();
- } else {
- // This code only cares about calling closeOnTimeout() if the closeFuture does
- // not complete within closeTimeout.
- String failureMsg = "BES close failure";
- logger.severe(failureMsg);
- LoggingUtil.logToRemote(Level.SEVERE, failureMsg, e);
- }
- }
- },
- "bes-uploader-close-timer");
- closeTimer.start();
- }
-
private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
throws LocalFileUploadException, InterruptedException {
try {
@@ -711,7 +656,6 @@
private BuildEventServiceProtoUtil besProtoUtil;
private BuildEventProtocolOptions bepOptions;
private boolean publishLifecycleEvents;
- private Duration closeTimeout;
private Sleeper sleeper;
private Clock clock;
private ArtifactGroupNamer artifactGroupNamer;
@@ -742,11 +686,6 @@
return this;
}
- Builder closeTimeout(Duration value) {
- this.closeTimeout = value;
- return this;
- }
-
Builder clock(Clock value) {
this.clock = value;
return this;
@@ -774,7 +713,6 @@
checkNotNull(besProtoUtil),
checkNotNull(bepOptions),
publishLifecycleEvents,
- checkNotNull(closeTimeout),
checkNotNull(sleeper),
checkNotNull(clock),
checkNotNull(artifactGroupNamer),
@@ -782,4 +720,3 @@
}
}
}
-
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
index 7d15fab..15cbb6a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
@@ -15,6 +15,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
+import java.time.Duration;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -72,6 +73,14 @@
return close();
}
+ /**
+ * Returns how long a caller should wait for the transport to finish uploading events and closing
+ * gracefully. Setting the timeout to {@link Duration#ZERO} means that there's no timeout.
+ */
+ default Duration getTimeout() {
+ return Duration.ZERO;
+ }
+
@VisibleForTesting
@Nullable
BuildEventArtifactUploader getUploader();