BES thread-safety: Reduce lock coarseness for build events while fixing some unlocked access to the pendingEvents field.
Interestingly, the crash occurs when we hit the final build event, which involves no parallelism. It is the race conditions operating on the ArrayMultimap earlier in the build which corrupts this data structure, leading to an inconsistency (an empty data structure that thinks it is non-empty).
This change yields ~5% wall-time improvement on the included benchmark.
PiperOrigin-RevId: 241790445
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
index 78490ee..baac274 100644
--- a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java
@@ -73,10 +73,15 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
+import org.apache.commons.lang.time.StopWatch;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -93,7 +98,7 @@
@Before
public void setUp() {
artifactGroupNamer = new CountingArtifactGroupNamer();
- transport = new RecordingBuildEventTransport(artifactGroupNamer);
+ transport = new RecordingBuildEventTransport(artifactGroupNamer, true);
streamer =
new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), artifactGroupNamer);
}
@@ -120,7 +125,7 @@
private final List<BuildEventStreamProtos.BuildEvent> eventsAsProtos = new ArrayList<>();
private ArtifactGroupNamer artifactGroupNamer;
- RecordingBuildEventTransport(ArtifactGroupNamer namer) {
+ RecordingBuildEventTransport(ArtifactGroupNamer namer, boolean recordEvents) {
this.artifactGroupNamer = namer;
}
@@ -130,7 +135,7 @@
}
@Override
- public void sendBuildEvent(BuildEvent event) {
+ public synchronized void sendBuildEvent(BuildEvent event) {
events.add(event);
eventsAsProtos.add(
event.asStreamProto(
@@ -491,6 +496,124 @@
assertThat(allEventsSeen.get(3).getEventId()).isEqualTo(failedTarget.getEventId());
}
+ private static BuildEvent indexOrderedBuildEvent(int index, int afterIndex) {
+ return new GenericOrderEvent(
+ testId("Concurrent-" + index),
+ ImmutableList.of(),
+ afterIndex == -1
+ ? ImmutableList.of()
+ : ImmutableList.of(testId("Concurrent-" + afterIndex)));
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ // Verify that we can blast the BuildEventStreamer with many build events in parallel without
+ // violating internal consistency. The thread-safety under test is primarily sensitive to the
+ // pendingEvents field constructed when there are ordering constraints, so we make sure to
+ // include such ordering constraints in this test.
+ BuildEvent startEvent =
+ new GenericBuildEvent(
+ testId("Initial"),
+ ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE, BuildEventId.buildFinished()));
+ streamer.buildEvent(startEvent);
+
+ int numThreads = 12;
+ int numEventsPerThread = 10_000;
+ int totalEvents = numThreads * numEventsPerThread;
+ AtomicInteger idIndex = new AtomicInteger();
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor(
+ numThreads,
+ numThreads,
+ /* keepAliveTime= */ 0,
+ TimeUnit.SECONDS,
+ /* workQueue= */ new LinkedBlockingQueue<>());
+
+ for (int i = 0; i < numThreads; i++) {
+ pool.execute(
+ () -> {
+ for (int j = 0; j < numEventsPerThread; j++) {
+ int index = idIndex.getAndIncrement();
+ // Arrange for half of the events to have an ordering constraint on the subsequent
+ // event. The ordering graph must avoid cycles.
+ int afterIndex = (index % 2 == 0) ? (index + 1) % totalEvents : -1;
+ streamer.buildEvent(indexOrderedBuildEvent(index, afterIndex));
+ }
+ });
+ }
+
+ pool.shutdown();
+ pool.awaitTermination(1, TimeUnit.DAYS);
+
+ BuildEventId lateId = testId("late event");
+ streamer.buildEvent(new BuildCompleteEvent(new BuildResult(0), ImmutableList.of(lateId)));
+ assertThat(streamer.isClosed()).isFalse();
+ streamer.buildEvent(new GenericBuildEvent(lateId, ImmutableSet.of()));
+ assertThat(streamer.isClosed()).isTrue();
+
+ List<BuildEvent> eventsSeen = transport.getEvents();
+ assertThat(eventsSeen.get(0).getEventId()).isEqualTo(startEvent.getEventId());
+ assertThat(eventsSeen).hasSize(4 + totalEvents * 2);
+ }
+
+ // Re-enable this "test" for ad-hoc benchmarking of many concurrent build events.
+ @Ignore
+ public void concurrencyBenchmark() throws Exception {
+ long time = 0;
+ for (int iteration = 0; iteration < 3; iteration++) {
+ StopWatch watch = new StopWatch();
+ watch.start();
+
+ transport = new RecordingBuildEventTransport(artifactGroupNamer, /*recordEvents=*/ false);
+ streamer =
+ new BuildEventStreamer(
+ ImmutableSet.<BuildEventTransport>of(transport), artifactGroupNamer);
+ BuildEvent startEvent =
+ new GenericBuildEvent(
+ testId("Initial"),
+ ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE, BuildEventId.buildFinished()));
+ streamer.buildEvent(startEvent);
+
+ int numThreads = 12;
+ int numEventsPerThread = 100_000;
+ int totalEvents = numThreads * numEventsPerThread;
+ AtomicInteger idIndex = new AtomicInteger();
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor(
+ numThreads, numThreads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+
+ for (int i = 0; i < numThreads; i++) {
+ pool.execute(
+ () -> {
+ for (int j = 0; j < numEventsPerThread; j++) {
+ int index = idIndex.getAndIncrement();
+ // Arrange for half of the events to have an ordering constraint on the subsequent
+ // event. The ordering graph must avoid cycles.
+ int afterIndex = (index % 2 == 0) ? (index + 1) % totalEvents : -1;
+ streamer.buildEvent(indexOrderedBuildEvent(index, afterIndex));
+ }
+ });
+ }
+
+ pool.shutdown();
+ pool.awaitTermination(1, TimeUnit.DAYS);
+ watch.stop();
+
+ time += watch.getTime();
+
+ BuildEventId lateId = testId("late event");
+ streamer.buildEvent(new BuildCompleteEvent(new BuildResult(0), ImmutableList.of(lateId)));
+ assertThat(streamer.isClosed()).isFalse();
+ streamer.buildEvent(new GenericBuildEvent(lateId, ImmutableSet.of()));
+ assertThat(streamer.isClosed()).isTrue();
+ }
+
+ System.err.println();
+ System.err.println("=============================================================");
+ System.err.println("Concurrent performance of BEP build event processing: " + time + "ms");
+ System.err.println("=============================================================");
+ }
+
@Test
public void testMissingPrerequisites() {
// Verify that an event where the prerequisite is never coming till the end of