Automated g4 rollback of commit d9fea57268ff01c001fbcbdc2bd057c86c362e6f.

*** Reason for rollback ***

Rollforward with fix for test flakiness. BEP transport closed
events are delivered via their own threadpool and thus might
not have been sent immediately. BuildEventStreamerTest#testSimpleStream
now waits for a bit until the event has been delivered. I ran the test
with --runs_per_test=1000 several times and had no further failures.

*** Original change description ***

Automated g4 rollback of commit 3d596d63f883fff56001ed7b2e5cf51dba45f082.

*** Reason for rollback ***

Made BuildEventStreamerTest#testSimpleStream 3% flaky based on --runs_per_test=1000.
RELNOTES: None
PiperOrigin-RevId: 154170833
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
index 3e7fe5d..a360598 100644
--- a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
@@ -15,13 +15,17 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.EventReportingArtifacts;
 import com.google.devtools.build.lib.actions.Root;
+import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
 import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventConverters;
@@ -29,6 +33,7 @@
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.NamedSetOfFilesId;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventWithOrderConstraint;
 import com.google.devtools.build.lib.buildeventstream.GenericBuildEvent;
 import com.google.devtools.build.lib.buildeventstream.PathConverter;
@@ -43,11 +48,17 @@
 import com.google.devtools.build.lib.vfs.PathFragment;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
 
 /** Tests {@link BuildEventStreamer}. */
 @RunWith(JUnit4.class)
@@ -58,6 +69,11 @@
     private final List<BuildEventStreamProtos.BuildEvent> eventsAsProtos = new ArrayList<>();
 
     @Override
+    public String name() {
+      return this.getClass().getSimpleName();
+    }
+
+    @Override
     public void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
       events.add(event);
       eventsAsProtos.add(
@@ -81,7 +97,7 @@
     }
 
     @Override
-    public Future<Void> close() {
+    public ListenableFuture<Void> close() {
       return Futures.immediateFuture(null);
     }
 
@@ -182,14 +198,38 @@
     return BuildEventId.unknownBuildEventId(opaque);
   }
 
-  @Test
+  private static class EventBusHandler {
+
+    Set<BuildEventTransport> transportSet;
+
+    @Subscribe
+    void transportsAnnounced(AnnounceBuildEventTransportsEvent evt) {
+      transportSet = Collections.synchronizedSet(new HashSet<>(evt.transports()));
+    }
+
+    @Subscribe
+    void transportClosed(BuildEventTransportClosedEvent evt) {
+      transportSet.remove(evt.transport());
+    }
+  }
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test(timeout = 5000)
   public void testSimpleStream() {
     // Verify that a well-formed event is passed through and that completion of the
     // build clears the pending progress-update event.
 
+    EventBusHandler handler = new EventBusHandler();
+    eventBus.register(handler);
+    assertNull(handler.transportSet);
+
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEvent startEvent =
         new GenericBuildEvent(
@@ -201,6 +241,7 @@
     List<BuildEvent> afterFirstEvent = transport.getEvents();
     assertThat(afterFirstEvent).hasSize(1);
     assertEquals(startEvent.getEventId(), afterFirstEvent.get(0).getEventId());
+    assertEquals(1, handler.transportSet.size());
 
     streamer.buildEvent(new BuildCompleteEvent(new BuildResult(0)));
 
@@ -208,6 +249,10 @@
     assertThat(finalStream).hasSize(3);
     assertEquals(BuildEventId.buildFinished(), finalStream.get(1).getEventId());
     assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, finalStream.get(2).getEventId());
+
+    while (!handler.transportSet.isEmpty()) {
+      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+    }
   }
 
   @Test
@@ -217,7 +262,7 @@
 
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEvent startEvent =
         new GenericBuildEvent(
@@ -247,7 +292,7 @@
 
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEvent unexpectedStartEvent =
         new GenericBuildEvent(testId("unexpected start"), ImmutableSet.<BuildEventId>of());
@@ -288,7 +333,7 @@
     // late-referenced event is not expected again.
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEvent startEvent =
         new GenericBuildEvent(
@@ -322,7 +367,7 @@
 
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEventId expectedId = testId("the target");
     BuildEvent startEvent =
@@ -354,7 +399,7 @@
 
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEventId expectedId = testId("the target");
     BuildEvent startEvent =
@@ -385,7 +430,7 @@
     // Verify that we can handle an first event waiting for another event.
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEventId initialId = testId("Initial");
     BuildEventId waitId = testId("Waiting for initial event");
@@ -415,7 +460,7 @@
     // Verify that reported artifacts are correctly unfolded into the stream
     RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
     BuildEventStreamer streamer =
-        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport));
+        new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
 
     BuildEvent startEvent =
         new GenericBuildEvent(