Add a build-event streamer class

The BuildEventStreamer will listen for BuildEvents and stream them
to the provided transports. It also ensures events are properly chained:
for unsolicited events, it will add progress events to chain them and
at the end of a build it closes all announced but not produced events
as aborted.

--
Change-Id: I623b582657573fe1288821c96f084e0ab0bca4d4
Reviewed-on: https://bazel-review.googlesource.com/#/c/6275
MOS_MIGRATED_REVID=134787541
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index e38a6a4..45aa1c3 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -980,6 +980,7 @@
     ),
     deps = [
         ":build-base",
+        ":buildeventstream",
         ":clock",
         ":collect",
         ":concurrent",
@@ -997,6 +998,7 @@
         ":windows",
         "//src/main/java/com/google/devtools/build/docgen:docgen_javalib",
         "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
         "//src/main/java/com/google/devtools/build/lib/cmdline",
         "//src/main/java/com/google/devtools/build/lib/query2",
         "//src/main/java/com/google/devtools/build/lib/query2:query-engine",
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
new file mode 100644
index 0000000..6329d41
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -0,0 +1,151 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.runtime;
+
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.devtools.build.lib.analysis.NoBuildEvent;
+import com.google.devtools.build.lib.buildeventstream.AbortedEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEventId;
+import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.ProgressEvent;
+import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
+import com.google.devtools.build.lib.buildtool.buildevent.BuildInterruptedEvent;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/** Listen for {@link BuildEvent} and stream them to the provided {@link BuildEventTransport}. */
+public class BuildEventStreamer implements EventHandler {
+  private final Collection<BuildEventTransport> transports;
+  private Set<BuildEventId> announcedEvents;
+  private Set<BuildEventId> postedEvents;
+  private int progressCount;
+  private AbortReason abortReason = AbortReason.UNKNOWN;
+  private static final Logger LOG = Logger.getLogger(BuildEventStreamer.class.getName());
+
+  public BuildEventStreamer(Collection<BuildEventTransport> transports) {
+    this.transports = transports;
+    this.announcedEvents = null;
+    this.postedEvents = null;
+    this.progressCount = 0;
+  }
+
+  /**
+   * Post a new event to all transports; simultaneously keep track of the events we announce to
+   * still come.
+   *
+   * <p>Moreover, link unannounced events to the progress stream; we only expect failure events to
+   * come before their parents.
+   */
+  private void post(BuildEvent event) {
+    BuildEvent linkEvent = null;
+    BuildEventId id = event.getEventId();
+
+    synchronized (this) {
+      if (announcedEvents == null) {
+        announcedEvents = new HashSet<>();
+        postedEvents = new HashSet<>();
+        if (!event.getChildrenEvents().contains(ProgressEvent.INITIAL_PROGRESS_UPDATE)) {
+          linkEvent = ProgressEvent.progressChainIn(progressCount, event.getEventId());
+          progressCount++;
+          announcedEvents.addAll(linkEvent.getChildrenEvents());
+          postedEvents.add(linkEvent.getEventId());
+        }
+      } else {
+        if (!announcedEvents.contains(id)) {
+          linkEvent = ProgressEvent.progressChainIn(progressCount, id);
+          progressCount++;
+          announcedEvents.addAll(linkEvent.getChildrenEvents());
+          postedEvents.add(linkEvent.getEventId());
+        }
+        postedEvents.add(id);
+      }
+      announcedEvents.addAll(event.getChildrenEvents());
+    }
+
+    for (BuildEventTransport transport : transports) {
+      try {
+        if (linkEvent != null) {
+          transport.sendBuildEvent(linkEvent);
+        }
+        transport.sendBuildEvent(event);
+      } catch (IOException e) {
+        // TODO(aehlig): signal that the build ought to be aborted
+        LOG.severe("Failed to write to build event transport: " + e);
+      }
+    }
+  }
+
+  /**
+   * Clear all events that are still pending; events not naturally closed by the expected event
+   * normally only occur if the build is aborted.
+   */
+  private void clearPendingEvents() {
+    if (announcedEvents != null) {
+      // create a copy of the identifiers to clear, as the post method
+      // will change the set of already announced events.
+      Set<BuildEventId> ids;
+      synchronized (this) {
+        ids = Sets.difference(announcedEvents, postedEvents);
+      }
+      for (BuildEventId id : ids) {
+        post(new AbortedEvent(id, abortReason, ""));
+      }
+    }
+  }
+
+  private void close() {
+    for (BuildEventTransport transport : transports) {
+      try {
+        transport.close();
+      } catch (IOException e) {
+        // TODO(aehlig): signal that the build ought to be aborted
+        LOG.warning("Failure while closing build event transport: " + e);
+      }
+    }
+  }
+
+  @Override
+  public void handle(Event event) {}
+
+  @Subscribe
+  public void noBuild(NoBuildEvent event) {
+    close();
+  }
+
+  @Subscribe
+  public void buildInterrupted(BuildInterruptedEvent event) {
+    abortReason = AbortReason.USER_INTERRUPTED;
+  };
+
+  @Subscribe
+  public void buildComplete(BuildCompleteEvent event) {
+    post(ProgressEvent.finalProgressUpdate(progressCount));
+    clearPendingEvents();
+    close();
+  }
+
+  @Subscribe
+  public void buildEvent(BuildEvent event) {
+    post(event);
+  }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 3f126df..aebe6e1 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -992,6 +992,7 @@
         ":testutil",
         "//src/main/java/com/google/devtools/build/lib:bazel-rules",
         "//src/main/java/com/google/devtools/build/lib:build-base",
+        "//src/main/java/com/google/devtools/build/lib:buildeventstream",
         "//src/main/java/com/google/devtools/build/lib:clock",
         "//src/main/java/com/google/devtools/build/lib:flags",
         "//src/main/java/com/google/devtools/build/lib:io",
@@ -1000,6 +1001,7 @@
         "//src/main/java/com/google/devtools/build/lib:util",
         "//src/main/java/com/google/devtools/build/lib:vfs",
         "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
         "//src/main/java/com/google/devtools/common/options",
         "//src/main/protobuf:invocation_policy_java_proto",
         "//src/main/protobuf:test_status_java_proto",
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
new file mode 100644
index 0000000..1203c9d
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
@@ -0,0 +1,183 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.runtime;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEventId;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.GenericBuildEvent;
+import com.google.devtools.build.lib.buildeventstream.ProgressEvent;
+import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests {@link BuildEventStreamer}. */
+@RunWith(JUnit4.class)
+public class BuildEventStreamerTest {
+
+  private static class RecordingBuildEventTransport implements BuildEventTransport {
+    private final List<BuildEvent> events;
+
+    RecordingBuildEventTransport() {
+      events = new ArrayList<>();
+    }
+
+    @Override
+    public void sendBuildEvent(BuildEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public void close() {}
+
+    List<BuildEvent> getEvents() {
+      return events;
+    }
+  }
+
+  private static BuildEventId testId(String opaque) {
+    return BuildEventId.unknownBuildEventId(opaque);
+  }
+
+  @Test
+  public void testSimpleStream() {
+    // Verify that a well-formed event is passed through and that completion of the
+    // build clears the pending progress-update event.
+
+    RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+    BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+    BuildEvent startEvent =
+        new GenericBuildEvent(
+            testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+
+    streamer.buildEvent(startEvent);
+
+    List<BuildEvent> afterFirstEvent = transport.getEvents();
+    assertThat(afterFirstEvent).hasSize(1);
+    assertEquals(startEvent.getEventId(), afterFirstEvent.get(0).getEventId());
+
+    streamer.buildComplete(new BuildCompleteEvent(null));
+
+    List<BuildEvent> finalStream = transport.getEvents();
+    assertThat(finalStream).hasSize(2);
+    assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, finalStream.get(1).getEventId());
+  }
+
+  @Test
+  public void testChaining() {
+    // Verify that unannounced events are linked in with progress update events, assuming
+    // a correctly formed initial event.
+
+    RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+    BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+    BuildEvent startEvent =
+        new GenericBuildEvent(
+            testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+    BuildEvent unexpectedEvent = new GenericBuildEvent(testId("unexpected"), ImmutableSet.of());
+
+    streamer.buildEvent(startEvent);
+    streamer.buildEvent(unexpectedEvent);
+
+    List<BuildEvent> eventsSeen = transport.getEvents();
+    assertThat(eventsSeen).hasSize(3);
+    assertEquals(startEvent.getEventId(), eventsSeen.get(0).getEventId());
+    assertEquals(unexpectedEvent.getEventId(), eventsSeen.get(2).getEventId());
+    BuildEvent linkEvent = eventsSeen.get(1);
+    assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, linkEvent.getEventId());
+    assertTrue(
+        "Unexpected events should be linked",
+        linkEvent.getChildrenEvents().contains(unexpectedEvent.getEventId()));
+  }
+
+  @Test
+  public void testBadInitialEvent() {
+    // Verify that, if the initial event does not announce the initial progress update event,
+    // the initial progress event is used instead to chain that event; in this way, new
+    // progress updates can always be chained in.
+
+    RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+    BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+    BuildEvent unexpectedStartEvent =
+        new GenericBuildEvent(testId("unexpected start"), ImmutableSet.of());
+
+    streamer.buildEvent(unexpectedStartEvent);
+
+    List<BuildEvent> eventsSeen = transport.getEvents();
+    assertThat(eventsSeen).hasSize(2);
+    assertEquals(unexpectedStartEvent.getEventId(), eventsSeen.get(1).getEventId());
+    BuildEvent initial = eventsSeen.get(0);
+    assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, initial.getEventId());
+    assertTrue(
+        "Event should be linked",
+        initial.getChildrenEvents().contains(unexpectedStartEvent.getEventId()));
+
+    // The initial event should also announce a new progress event; we test this
+    // by streaming another unannounced event.
+
+    BuildEvent unexpectedEvent = new GenericBuildEvent(testId("unexpected"), ImmutableSet.of());
+
+    streamer.buildEvent(unexpectedEvent);
+    List<BuildEvent> allEventsSeen = transport.getEvents();
+    assertThat(allEventsSeen).hasSize(4);
+    assertEquals(unexpectedEvent.getEventId(), allEventsSeen.get(3).getEventId());
+    BuildEvent secondLinkEvent = allEventsSeen.get(2);
+    assertTrue(
+        "Progress should have been announced",
+        initial.getChildrenEvents().contains(secondLinkEvent.getEventId()));
+    assertTrue(
+        "Second event should be linked",
+        secondLinkEvent.getChildrenEvents().contains(unexpectedEvent.getEventId()));
+  }
+
+  @Test
+  public void testReferPastEvent() {
+    // Verify that, if an event is refers to a previously done event, that duplicated
+    // late-referenced event is not expected again.
+    RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+    BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+    BuildEvent startEvent =
+        new GenericBuildEvent(
+            testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+    BuildEvent earlyEvent = new GenericBuildEvent(testId("unexpected"), ImmutableSet.of());
+    BuildEvent lateReference =
+        new GenericBuildEvent(testId("late reference"), ImmutableSet.of(earlyEvent.getEventId()));
+
+    streamer.buildEvent(startEvent);
+    streamer.buildEvent(earlyEvent);
+    streamer.buildEvent(lateReference);
+    streamer.buildComplete(new BuildCompleteEvent(null));
+
+    List<BuildEvent> eventsSeen = transport.getEvents();
+    int earlyEventCount = 0;
+    for (BuildEvent event : eventsSeen) {
+      if (event.getEventId().equals(earlyEvent.getEventId())) {
+        earlyEventCount++;
+      }
+    }
+    // The early event should be reported precisely once.
+    assertEquals(1, earlyEventCount);
+  }
+}