Automated rollback of commit 2bc2efd8bd0af15bc727523a56fdcefe87e2f41d.
*** Reason for rollback ***
b/119861715
*** Original change description ***
bep: close the bep transports in afterCommand
Before this change we'd block an undefined SkyFrame thread when
waiting for the BEP/BES upload to finish. This would also not
allow us to reliably report errors. This change moves the waiting
to afterCommand() where one is allowed to block and can reliably
report errors by throwing an exception.
RELNOTES: None
PiperOrigin-RevId: 222549755
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
index 596b47a..325788f 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BUILD
@@ -22,7 +22,6 @@
"//src/main/java/com/google/devtools/build/lib/buildeventstream",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/transports",
- "//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//third_party:guava",
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index fb3832b..e59405e 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -20,11 +20,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceTransport.BuildEventLogger;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceTransport.ExitFunction;
@@ -33,14 +30,10 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
-import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent;
import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
import com.google.devtools.build.lib.buildeventstream.transports.BuildEventStreamOptions;
-import com.google.devtools.build.lib.concurrent.ExecutorUtil;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
-import com.google.devtools.build.lib.events.EventKind;
-import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.BuildEventStreamer;
import com.google.devtools.build.lib.runtime.BuildEventTransportFactory;
@@ -53,16 +46,7 @@
import com.google.devtools.common.options.OptionsParsingException;
import com.google.devtools.common.options.OptionsParsingResult;
import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -79,8 +63,6 @@
private Set<BuildEventTransport> transports = ImmutableSet.of();
- private Reporter reporter;
-
/** Whether an error in the Build Event Service upload causes the build to fail. */
protected boolean errorsShouldFailTheBuild() {
return true;
@@ -106,7 +88,6 @@
@Override
public void beforeCommand(CommandEnvironment commandEnvironment) {
- this.reporter = commandEnvironment.getReporter();
// Reset to null in case afterCommand was not called.
this.outErr = null;
if (!whitelistedCommands().contains(commandEnvironment.getCommandName())) {
@@ -147,88 +128,9 @@
}
@Override
- public void afterCommand() throws AbruptExitException {
- ScheduledExecutorService executor = null;
- ScheduledFuture<?> uploadProgressUpdaterFuture = null;
- boolean interrupted = false;
- try {
- if (transports.isEmpty()) {
- // Nothing to close.
- return;
- }
- List<ListenableFuture<Void>> closeFutures = new ArrayList<>(transports.size());
- for (final BuildEventTransport transport : transports) {
- // Trigger a graceful close for each transport.
- ListenableFuture<Void> closeFuture = transport.close();
- closeFuture.addListener(
- () -> reporter.post(new BuildEventTransportClosedEvent(transport)),
- MoreExecutors.directExecutor());
- closeFutures.add(closeFuture);
- }
-
- executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "bep-ui-updater"));
- // Send periodic events to the UI to show that the upload is still in progress.
- uploadProgressUpdaterFuture = sendUploadProgressEvents(executor);
-
- Throwable lastError = null;
- for (ListenableFuture<Void> closeFuture : closeFutures) {
- try {
- closeFuture.get();
- } catch (ExecutionException e) {
- logger.log(Level.WARNING, "Failed to close BEP transport", e.getCause());
- Throwables.throwIfInstanceOf(e.getCause(), RuntimeException.class);
- if (lastError != null) {
- reporter.handle(Event.error(lastError.getMessage()));
- }
- lastError = e.getCause();
- }
- }
-
- if (lastError != null) {
- throw new AbruptExitException(lastError.getMessage(), ExitCode.PUBLISH_ERROR);
- }
- } catch (InterruptedException e) {
- interrupted = true;
- // Close all transports immediately and don't wait for BEP upload.
- for (final BuildEventTransport transport : transports) {
- transport.closeNow();
- }
- } finally {
- if (uploadProgressUpdaterFuture != null) {
- uploadProgressUpdaterFuture.cancel(false);
- }
- if (executor != null) {
- ExecutorUtil.interruptibleShutdown(executor);
- }
-
- this.outErr = null;
- // Set to null after executor shutdown in order to
- // avoid a race condition that might lead to a NPE
- this.reporter = null;
- this.transports = ImmutableSet.of();
-
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private ScheduledFuture<?> sendUploadProgressEvents(ScheduledExecutorService executor) {
- final long startNanos = System.nanoTime();
- return executor.scheduleAtFixedRate(
- () -> {
- long deltaNanos = System.nanoTime() - startNanos;
- long deltaSeconds = Duration.ofNanos(deltaNanos).getSeconds();
- Event waitEvt =
- Event.of(
- EventKind.PROGRESS,
- null,
- "Waiting for Build Event Protocol upload: " + deltaSeconds + "s");
- reporter.handle(waitEvt);
- },
- 0,
- 1,
- TimeUnit.SECONDS);
+ public void afterCommand() {
+ this.outErr = null;
+ this.transports = ImmutableSet.of();
}
/** Returns {@code null} if no stream could be created. */
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index b9a099a..bf6f1c6 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -16,15 +16,19 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.devtools.build.lib.events.Event.of;
+import static com.google.devtools.build.lib.events.EventKind.PROGRESS;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.actions.ActionExecutedEvent;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactPathResolver;
@@ -70,6 +74,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@@ -109,6 +118,8 @@
// True, if we already closed the stream.
private boolean closed;
+ private static final Logger logger = Logger.getLogger(BuildEventStreamer.class.getName());
+
/**
* Provider for stdout and stderr output.
*/
@@ -195,9 +206,6 @@
* come before their parents.
*/
private void post(BuildEvent event) {
- Preconditions.checkState(!isClosed(),
- String.format("Received event of type '%s' after close. This is a bug.",
- event.getClass().getName()));
BuildEvent linkEvent = null;
BuildEventId id = event.getEventId();
List<BuildEvent> flushEvents = null;
@@ -333,17 +341,70 @@
}
}
- @VisibleForTesting
- boolean isClosed() {
+ private ScheduledFuture<?> bepUploadWaitEvent(ScheduledExecutorService executor) {
+ final long startNanos = System.nanoTime();
+ return executor.scheduleAtFixedRate(
+ () -> {
+ long deltaNanos = System.nanoTime() - startNanos;
+ long deltaSeconds = TimeUnit.NANOSECONDS.toSeconds(deltaNanos);
+ Event waitEvt =
+ of(PROGRESS, null, "Waiting for Build Event Protocol upload: " + deltaSeconds + "s");
+ if (reporter != null) {
+ reporter.handle(waitEvt);
+ }
+ },
+ 0,
+ 1,
+ TimeUnit.SECONDS);
+ }
+
+ public boolean isClosed() {
return closed;
}
- /**
- * Close marks the streamer as closed but does not release any resources. We only set
- * {@code closed} to {@code true} so that we can call {@link #isClosed()} in tests.
- */
private void close() {
- closed = true;
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ ScheduledExecutorService executor = null;
+ try {
+ executor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("build-event-streamer-%d").build());
+ List<ListenableFuture<Void>> closeFutures = new ArrayList<>(transports.size());
+ for (final BuildEventTransport transport : transports) {
+ ListenableFuture<Void> closeFuture = transport.close();
+ closeFuture.addListener(
+ () -> {
+ if (reporter != null) {
+ reporter.post(new BuildEventTransportClosedEvent(transport));
+ }
+ },
+ executor);
+ closeFutures.add(closeFuture);
+ }
+
+ try {
+ if (closeFutures.isEmpty()) {
+ // Don't spam events if there is nothing to close.
+ return;
+ }
+
+ ScheduledFuture<?> f = bepUploadWaitEvent(executor);
+ // Wait for all transports to close.
+ Futures.allAsList(closeFutures).get();
+ f.cancel(true);
+ } catch (Exception e) {
+ logger.severe("Failed to close a build event transport: " + e);
+ }
+ } finally {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
}
private void maybeReportArtifactSet(
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
index 9d054f7..7260a1e 100644
--- a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
@@ -48,6 +48,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.NamedSetOfFilesId;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventWithConfiguration;
import com.google.devtools.build.lib.buildeventstream.BuildEventWithOrderConstraint;
import com.google.devtools.build.lib.buildeventstream.GenericBuildEvent;
@@ -70,6 +71,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -279,6 +282,11 @@
void transportsAnnounced(AnnounceBuildEventTransportsEvent evt) {
transportSet = Collections.synchronizedSet(new HashSet<>(evt.transports()));
}
+
+ @Subscribe
+ void transportClosed(BuildEventTransportClosedEvent evt) {
+ transportSet.remove(evt.transport());
+ }
}
@Before
@@ -327,6 +335,10 @@
assertThat(transport.getEventProtos().get(0).getLastMessage()).isFalse();
assertThat(transport.getEventProtos().get(1).getLastMessage()).isFalse();
assertThat(transport.getEventProtos().get(2).getLastMessage()).isTrue();
+
+ while (!handler.transportSet.isEmpty()) {
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+ }
}
@Test