Fix issue with large stdout/stderr payloads being sent over BEP atomically. While there is buffering/flushing logic built into the SynchronizedOutputStream, it did not split large atomic writes.
Now the SOS will split large payloads, requiring that the BuildEventStreamer handle multiple chunks gracefully.
To maintain ordering, we place messages with stdout first, then message with stderr. For backwards compatibility with unit tests and to optimize the number of messages sent, the final stdout message may include the start of the stderr payload.
RELNOTES: None
PiperOrigin-RevId: 232956532
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
index 40f1606..06545df 100644
--- a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
@@ -62,6 +62,7 @@
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.NestedSetView;
import com.google.devtools.build.lib.testutil.FoundationTestCase;
+import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Root;
@@ -454,7 +455,7 @@
}
@Test
- public void testReodering() {
+ public void testReordering() {
// Verify that an event requiring to be posted after another one is indeed.
RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
@@ -606,8 +607,8 @@
Mockito.mock(BuildEventStreamer.OutErrProvider.class);
String stdoutMsg = "Some text that was written to stdout.";
String stderrMsg = "The UI text that bazel wrote to stderr.";
- when(outErr.getOut()).thenReturn(stdoutMsg);
- when(outErr.getErr()).thenReturn(stderrMsg);
+ when(outErr.getOut()).thenReturn(ImmutableList.of(stdoutMsg));
+ when(outErr.getErr()).thenReturn(ImmutableList.of(stderrMsg));
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"),
@@ -650,8 +651,8 @@
Mockito.mock(BuildEventStreamer.OutErrProvider.class);
String stdoutMsg = "Some text that was written to stdout.";
String stderrMsg = "The UI text that bazel wrote to stderr.";
- when(outErr.getOut()).thenReturn(stdoutMsg);
- when(outErr.getErr()).thenReturn(stderrMsg);
+ when(outErr.getOut()).thenReturn(ImmutableList.of(stdoutMsg));
+ when(outErr.getErr()).thenReturn(ImmutableList.of(stderrMsg));
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"),
@@ -678,6 +679,49 @@
verify(outErr, times(1)).getErr();
}
+ private static <T> ImmutableList<ImmutableList<Pair<T, T>>> consumeToLists(
+ Iterable<T> left, Iterable<T> right) {
+ ImmutableList.Builder<Pair<T, T>> consumerBuilder = ImmutableList.builder();
+ ImmutableList.Builder<Pair<T, T>> lastConsumerBuilder = ImmutableList.builder();
+
+ BuildEventStreamer.consumeAsPairs(
+ left,
+ right,
+ (t1, t2) -> consumerBuilder.add(Pair.of(t1, t2)),
+ (t1, t2) -> lastConsumerBuilder.add(Pair.of(t1, t2)));
+
+ return ImmutableList.of(consumerBuilder.build(), lastConsumerBuilder.build());
+ }
+
+ @Test
+ public void testConsumeAsPairs() {
+ assertThat(consumeToLists(ImmutableList.of(1, 2, 3), ImmutableList.of(4, 5, 6)))
+ .containsExactly(
+ ImmutableList.of(Pair.of(1, null), Pair.of(2, null), Pair.of(3, 4), Pair.of(null, 5)),
+ ImmutableList.of(Pair.of(null, 6)))
+ .inOrder();
+
+ assertThat(consumeToLists(ImmutableList.of(), ImmutableList.of()))
+ .containsExactly(ImmutableList.of(), ImmutableList.of(Pair.of(null, null)))
+ .inOrder();
+
+ assertThat(consumeToLists(ImmutableList.of(1), ImmutableList.of(2)))
+ .containsExactly(ImmutableList.of(), ImmutableList.of(Pair.of(1, 2)))
+ .inOrder();
+
+ assertThat(consumeToLists(ImmutableList.of(1), ImmutableList.of(2, 3)))
+ .containsExactly(ImmutableList.of(Pair.of(1, 2)), ImmutableList.of(Pair.of(null, 3)))
+ .inOrder();
+
+ assertThat(consumeToLists(ImmutableList.of(1, 2), ImmutableList.of()))
+ .containsExactly(ImmutableList.of(Pair.of(1, null)), ImmutableList.of(Pair.of(2, null)))
+ .inOrder();
+
+ assertThat(consumeToLists(ImmutableList.of(), ImmutableList.of(1)))
+ .containsExactly(ImmutableList.of(), ImmutableList.of(Pair.of(null, 1)))
+ .inOrder();
+ }
+
@Test
public void testReportedConfigurations() throws Exception {
// Verify that configuration events are posted, but only once.
@@ -740,8 +784,12 @@
String firstStderrMsg = "The UI text that bazel wrote to stderr.";
String secondStdoutMsg = "More text that was written to stdout, still before the start event.";
String secondStderrMsg = "More text written to stderr, still before the start event.";
- when(outErr.getOut()).thenReturn(firstStdoutMsg).thenReturn(secondStdoutMsg);
- when(outErr.getErr()).thenReturn(firstStderrMsg).thenReturn(secondStderrMsg);
+ when(outErr.getOut())
+ .thenReturn(ImmutableList.of(firstStdoutMsg))
+ .thenReturn(ImmutableList.of(secondStdoutMsg));
+ when(outErr.getErr())
+ .thenReturn(ImmutableList.of(firstStderrMsg))
+ .thenReturn(ImmutableList.of(secondStderrMsg));
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"),
@@ -772,6 +820,55 @@
}
@Test
+ public void testChunkedFlush() throws Exception {
+ // Verify that the streamer calls to flush() that return multiple chunked buffers.
+ RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
+ BuildEventStreamer streamer =
+ new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
+ BuildEventStreamer.OutErrProvider outErr =
+ Mockito.mock(BuildEventStreamer.OutErrProvider.class);
+ String firstStdoutMsg = "Some text that was written to stdout.";
+ String firstStderrMsg = "The UI text that bazel wrote to stderr.";
+ String secondStdoutMsg = "More text that was written to stdout, still before the start event.";
+ String secondStderrMsg = "More text written to stderr, still before the start event.";
+ when(outErr.getOut()).thenReturn(ImmutableList.of(firstStdoutMsg, secondStdoutMsg));
+ when(outErr.getErr()).thenReturn(ImmutableList.of(firstStderrMsg, secondStderrMsg));
+ BuildEvent startEvent =
+ new GenericBuildEvent(
+ testId("Initial"),
+ ImmutableSet.<BuildEventId>of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
+
+ streamer.registerOutErrProvider(outErr);
+ streamer.buildEvent(startEvent);
+ streamer.flush();
+
+ assertThat(streamer.isClosed()).isFalse();
+ List<BuildEvent> eventsSeen = transport.getEvents();
+ assertThat(eventsSeen).hasSize(4);
+ assertThat(eventsSeen.get(0).getEventId()).isEqualTo(startEvent.getEventId());
+
+ // Expect to find 3 progress messages: (firstStdout, ""), (secondStdout, firstStderr),
+ // ("", secondStdErr). Assuming UIs display stdout first, this maintains ordering.
+ BuildEvent progressEvent = eventsSeen.get(1);
+ assertThat(progressEvent.getEventId()).isEqualTo(ProgressEvent.INITIAL_PROGRESS_UPDATE);
+ BuildEventStreamProtos.BuildEvent progressEventProto = transport.getEventProtos().get(1);
+ assertThat(progressEventProto.getProgress().getStdout()).isEqualTo(firstStdoutMsg);
+ assertThat(progressEventProto.getProgress().getStderr()).isEmpty();
+
+ BuildEventStreamProtos.BuildEvent secondProgressEventProto = transport.getEventProtos().get(2);
+ assertThat(secondProgressEventProto.getProgress().getStdout()).isEqualTo(secondStdoutMsg);
+ assertThat(secondProgressEventProto.getProgress().getStderr()).isEqualTo(firstStderrMsg);
+
+ BuildEventStreamProtos.BuildEvent thirdProgressEventProto = transport.getEventProtos().get(3);
+ assertThat(thirdProgressEventProto.getProgress().getStdout()).isEmpty();
+ assertThat(thirdProgressEventProto.getProgress().getStderr()).isEqualTo(secondStderrMsg);
+
+ // The OutErrProvider should be queried only once per flush().
+ verify(outErr, times(1)).getOut();
+ verify(outErr, times(1)).getErr();
+ }
+
+ @Test
public void testNoopFlush() throws Exception {
// Verify that the streamer ignores a flush, if neither stream produces any output.
RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
@@ -781,8 +878,8 @@
Mockito.mock(BuildEventStreamer.OutErrProvider.class);
String stdoutMsg = "Some text that was written to stdout.";
String stderrMsg = "The UI text that bazel wrote to stderr.";
- when(outErr.getOut()).thenReturn(stdoutMsg).thenReturn("");
- when(outErr.getErr()).thenReturn(stderrMsg).thenReturn("");
+ when(outErr.getOut()).thenReturn(ImmutableList.of(stdoutMsg)).thenReturn(ImmutableList.of());
+ when(outErr.getErr()).thenReturn(ImmutableList.of(stderrMsg)).thenReturn(ImmutableList.of());
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"),
@@ -816,8 +913,8 @@
Mockito.mock(BuildEventStreamer.OutErrProvider.class);
String stdoutMsg = "Some text that was written to stdout.";
String stderrMsg = "The UI text that bazel wrote to stderr.";
- when(outErr.getOut()).thenReturn(stdoutMsg);
- when(outErr.getErr()).thenReturn(stderrMsg);
+ when(outErr.getOut()).thenReturn(ImmutableList.of(stdoutMsg));
+ when(outErr.getErr()).thenReturn(ImmutableList.of(stderrMsg));
BuildEvent unexpectedStartEvent =
new GenericBuildEvent(testId("unexpected start"), ImmutableSet.<BuildEventId>of());