Add a build-event streamer class
The BuildEventStreamer will listen for BuildEvents and stream them
to the provided transports. It also ensures events are properly chained:
for unsolicited events, it will add progress events to chain them and
at the end of a build it closes all announced but not produced events
as aborted.
--
Change-Id: I623b582657573fe1288821c96f084e0ab0bca4d4
Reviewed-on: https://bazel-review.googlesource.com/#/c/6275
MOS_MIGRATED_REVID=134787541
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index e38a6a4..45aa1c3 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -980,6 +980,7 @@
),
deps = [
":build-base",
+ ":buildeventstream",
":clock",
":collect",
":concurrent",
@@ -997,6 +998,7 @@
":windows",
"//src/main/java/com/google/devtools/build/docgen:docgen_javalib",
"//src/main/java/com/google/devtools/build/lib/actions",
+ "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/cmdline",
"//src/main/java/com/google/devtools/build/lib/query2",
"//src/main/java/com/google/devtools/build/lib/query2:query-engine",
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
new file mode 100644
index 0000000..6329d41
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -0,0 +1,151 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.runtime;
+
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.devtools.build.lib.analysis.NoBuildEvent;
+import com.google.devtools.build.lib.buildeventstream.AbortedEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEventId;
+import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.ProgressEvent;
+import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
+import com.google.devtools.build.lib.buildtool.buildevent.BuildInterruptedEvent;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/** Listen for {@link BuildEvent} and stream them to the provided {@link BuildEventTransport}. */
+public class BuildEventStreamer implements EventHandler {
+ private final Collection<BuildEventTransport> transports;
+ private Set<BuildEventId> announcedEvents;
+ private Set<BuildEventId> postedEvents;
+ private int progressCount;
+ private AbortReason abortReason = AbortReason.UNKNOWN;
+ private static final Logger LOG = Logger.getLogger(BuildEventStreamer.class.getName());
+
+ public BuildEventStreamer(Collection<BuildEventTransport> transports) {
+ this.transports = transports;
+ this.announcedEvents = null;
+ this.postedEvents = null;
+ this.progressCount = 0;
+ }
+
+ /**
+ * Post a new event to all transports; simultaneously keep track of the events we announce to
+ * still come.
+ *
+ * <p>Moreover, link unannounced events to the progress stream; we only expect failure events to
+ * come before their parents.
+ */
+ private void post(BuildEvent event) {
+ BuildEvent linkEvent = null;
+ BuildEventId id = event.getEventId();
+
+ synchronized (this) {
+ if (announcedEvents == null) {
+ announcedEvents = new HashSet<>();
+ postedEvents = new HashSet<>();
+ if (!event.getChildrenEvents().contains(ProgressEvent.INITIAL_PROGRESS_UPDATE)) {
+ linkEvent = ProgressEvent.progressChainIn(progressCount, event.getEventId());
+ progressCount++;
+ announcedEvents.addAll(linkEvent.getChildrenEvents());
+ postedEvents.add(linkEvent.getEventId());
+ }
+ } else {
+ if (!announcedEvents.contains(id)) {
+ linkEvent = ProgressEvent.progressChainIn(progressCount, id);
+ progressCount++;
+ announcedEvents.addAll(linkEvent.getChildrenEvents());
+ postedEvents.add(linkEvent.getEventId());
+ }
+ postedEvents.add(id);
+ }
+ announcedEvents.addAll(event.getChildrenEvents());
+ }
+
+ for (BuildEventTransport transport : transports) {
+ try {
+ if (linkEvent != null) {
+ transport.sendBuildEvent(linkEvent);
+ }
+ transport.sendBuildEvent(event);
+ } catch (IOException e) {
+ // TODO(aehlig): signal that the build ought to be aborted
+ LOG.severe("Failed to write to build event transport: " + e);
+ }
+ }
+ }
+
+ /**
+ * Clear all events that are still pending; events not naturally closed by the expected event
+ * normally only occur if the build is aborted.
+ */
+ private void clearPendingEvents() {
+ if (announcedEvents != null) {
+ // create a copy of the identifiers to clear, as the post method
+ // will change the set of already announced events.
+ Set<BuildEventId> ids;
+ synchronized (this) {
+ ids = Sets.difference(announcedEvents, postedEvents);
+ }
+ for (BuildEventId id : ids) {
+ post(new AbortedEvent(id, abortReason, ""));
+ }
+ }
+ }
+
+ private void close() {
+ for (BuildEventTransport transport : transports) {
+ try {
+ transport.close();
+ } catch (IOException e) {
+ // TODO(aehlig): signal that the build ought to be aborted
+ LOG.warning("Failure while closing build event transport: " + e);
+ }
+ }
+ }
+
+ @Override
+ public void handle(Event event) {}
+
+ @Subscribe
+ public void noBuild(NoBuildEvent event) {
+ close();
+ }
+
+ @Subscribe
+ public void buildInterrupted(BuildInterruptedEvent event) {
+ abortReason = AbortReason.USER_INTERRUPTED;
+ };
+
+ @Subscribe
+ public void buildComplete(BuildCompleteEvent event) {
+ post(ProgressEvent.finalProgressUpdate(progressCount));
+ clearPendingEvents();
+ close();
+ }
+
+ @Subscribe
+ public void buildEvent(BuildEvent event) {
+ post(event);
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 3f126df..aebe6e1 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -992,6 +992,7 @@
":testutil",
"//src/main/java/com/google/devtools/build/lib:bazel-rules",
"//src/main/java/com/google/devtools/build/lib:build-base",
+ "//src/main/java/com/google/devtools/build/lib:buildeventstream",
"//src/main/java/com/google/devtools/build/lib:clock",
"//src/main/java/com/google/devtools/build/lib:flags",
"//src/main/java/com/google/devtools/build/lib:io",
@@ -1000,6 +1001,7 @@
"//src/main/java/com/google/devtools/build/lib:util",
"//src/main/java/com/google/devtools/build/lib:vfs",
"//src/main/java/com/google/devtools/build/lib/actions",
+ "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/common/options",
"//src/main/protobuf:invocation_policy_java_proto",
"//src/main/protobuf:test_status_java_proto",
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
new file mode 100644
index 0000000..1203c9d
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
@@ -0,0 +1,183 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.runtime;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEventId;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.GenericBuildEvent;
+import com.google.devtools.build.lib.buildeventstream.ProgressEvent;
+import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests {@link BuildEventStreamer}. */
+@RunWith(JUnit4.class)
+public class BuildEventStreamerTest {
+
+ private static class RecordingBuildEventTransport implements BuildEventTransport {
+ private final List<BuildEvent> events;
+
+ RecordingBuildEventTransport() {
+ events = new ArrayList<>();
+ }
+
+ @Override
+ public void sendBuildEvent(BuildEvent event) {
+ events.add(event);
+ }
+
+ @Override
+ public void close() {}
+
+ List<BuildEvent> getEvents() {
+ return events;
+ }
+ }
+
+ private static BuildEventId testId(String opaque) {
+ return BuildEventId.unknownBuildEventId(opaque);
+ }
+
+ @Test
+ public void testSimpleStream() {
+ // Verify that a well-formed event is passed through and that completion of the
+ // build clears the pending progress-update event.
+
+ RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+ BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+ BuildEvent startEvent =
+ new GenericBuildEvent(
+ testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+
+ streamer.buildEvent(startEvent);
+
+ List<BuildEvent> afterFirstEvent = transport.getEvents();
+ assertThat(afterFirstEvent).hasSize(1);
+ assertEquals(startEvent.getEventId(), afterFirstEvent.get(0).getEventId());
+
+ streamer.buildComplete(new BuildCompleteEvent(null));
+
+ List<BuildEvent> finalStream = transport.getEvents();
+ assertThat(finalStream).hasSize(2);
+ assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, finalStream.get(1).getEventId());
+ }
+
+ @Test
+ public void testChaining() {
+ // Verify that unannounced events are linked in with progress update events, assuming
+ // a correctly formed initial event.
+
+ RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+ BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+ BuildEvent startEvent =
+ new GenericBuildEvent(
+ testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+ BuildEvent unexpectedEvent = new GenericBuildEvent(testId("unexpected"), ImmutableSet.of());
+
+ streamer.buildEvent(startEvent);
+ streamer.buildEvent(unexpectedEvent);
+
+ List<BuildEvent> eventsSeen = transport.getEvents();
+ assertThat(eventsSeen).hasSize(3);
+ assertEquals(startEvent.getEventId(), eventsSeen.get(0).getEventId());
+ assertEquals(unexpectedEvent.getEventId(), eventsSeen.get(2).getEventId());
+ BuildEvent linkEvent = eventsSeen.get(1);
+ assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, linkEvent.getEventId());
+ assertTrue(
+ "Unexpected events should be linked",
+ linkEvent.getChildrenEvents().contains(unexpectedEvent.getEventId()));
+ }
+
+ @Test
+ public void testBadInitialEvent() {
+ // Verify that, if the initial event does not announce the initial progress update event,
+ // the initial progress event is used instead to chain that event; in this way, new
+ // progress updates can always be chained in.
+
+ RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+ BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+ BuildEvent unexpectedStartEvent =
+ new GenericBuildEvent(testId("unexpected start"), ImmutableSet.of());
+
+ streamer.buildEvent(unexpectedStartEvent);
+
+ List<BuildEvent> eventsSeen = transport.getEvents();
+ assertThat(eventsSeen).hasSize(2);
+ assertEquals(unexpectedStartEvent.getEventId(), eventsSeen.get(1).getEventId());
+ BuildEvent initial = eventsSeen.get(0);
+ assertEquals(ProgressEvent.INITIAL_PROGRESS_UPDATE, initial.getEventId());
+ assertTrue(
+ "Event should be linked",
+ initial.getChildrenEvents().contains(unexpectedStartEvent.getEventId()));
+
+ // The initial event should also announce a new progress event; we test this
+ // by streaming another unannounced event.
+
+ BuildEvent unexpectedEvent = new GenericBuildEvent(testId("unexpected"), ImmutableSet.of());
+
+ streamer.buildEvent(unexpectedEvent);
+ List<BuildEvent> allEventsSeen = transport.getEvents();
+ assertThat(allEventsSeen).hasSize(4);
+ assertEquals(unexpectedEvent.getEventId(), allEventsSeen.get(3).getEventId());
+ BuildEvent secondLinkEvent = allEventsSeen.get(2);
+ assertTrue(
+ "Progress should have been announced",
+ initial.getChildrenEvents().contains(secondLinkEvent.getEventId()));
+ assertTrue(
+ "Second event should be linked",
+ secondLinkEvent.getChildrenEvents().contains(unexpectedEvent.getEventId()));
+ }
+
+ @Test
+ public void testReferPastEvent() {
+ // Verify that, if an event is refers to a previously done event, that duplicated
+ // late-referenced event is not expected again.
+ RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+ BuildEventStreamer streamer = new BuildEventStreamer(ImmutableSet.of(transport));
+
+ BuildEvent startEvent =
+ new GenericBuildEvent(
+ testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+ BuildEvent earlyEvent = new GenericBuildEvent(testId("unexpected"), ImmutableSet.of());
+ BuildEvent lateReference =
+ new GenericBuildEvent(testId("late reference"), ImmutableSet.of(earlyEvent.getEventId()));
+
+ streamer.buildEvent(startEvent);
+ streamer.buildEvent(earlyEvent);
+ streamer.buildEvent(lateReference);
+ streamer.buildComplete(new BuildCompleteEvent(null));
+
+ List<BuildEvent> eventsSeen = transport.getEvents();
+ int earlyEventCount = 0;
+ for (BuildEvent event : eventsSeen) {
+ if (event.getEventId().equals(earlyEvent.getEventId())) {
+ earlyEventCount++;
+ }
+ }
+ // The early event should be reported precisely once.
+ assertEquals(1, earlyEventCount);
+ }
+}