Fix terrible ForkJoinQuiescingExecutor latent bug exposed by new usage of ForkJoinPool: only "adapt" a runnable to run in an existing FJP if that existing FJP is the same as the executor's: don't enqueue them to run in whatever random FJP is trying to enqueue this runnable. Big props to michajlo@ for quickly diagnosing this and suggesting the fix.
PiperOrigin-RevId: 391881393
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
index 593662c..1f1f610 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
@@ -46,18 +46,6 @@
/**
* Sets the {@link ForkJoinPool} that will be used by the to-be-built
- * {@link ForkJoinQuiescingExecutor}. The given {@link ForkJoinPool} will _not_ be shut down on
- * completion of the {@link ForkJoinQuiescingExecutor}.
- */
- public Builder withoutOwnershipOf(ForkJoinPool forkJoinPool) {
- Preconditions.checkState(this.forkJoinPool == null);
- this.forkJoinPool = forkJoinPool;
- this.owned = false;
- return this;
- }
-
- /**
- * Sets the {@link ForkJoinPool} that will be used by the to-be-built
* {@link ForkJoinQuiescingExecutor}. The given {@link ForkJoinPool} will be shut down on
* completion of the {@link ForkJoinQuiescingExecutor}.
*/
@@ -94,8 +82,8 @@
@Override
protected void executeWrappedRunnable(WrappedRunnable runnable, ExecutorService executorService) {
- if (ForkJoinTask.inForkJoinPool()) {
- @SuppressWarnings("unused")
+ if (ForkJoinTask.getPool() == executorService) {
+ @SuppressWarnings("unused")
Future<?> possiblyIgnoredError = ForkJoinTask.adapt(runnable).fork();
} else {
super.executeWrappedRunnable(runnable, executorService);
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 405efce..a84da19 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -71,7 +71,9 @@
// Default thread count is equal to the number of cores to exploit
// that level of hardware parallelism, since invalidation should be CPU-bound.
// We may consider increasing this in the future.
- private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
+ @VisibleForTesting
+ static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
+
private static final int EXPECTED_PENDING_SET_SIZE = DEFAULT_THREAD_COUNT * 8;
private static final int EXPECTED_VISITED_SET_SIZE = 1024;
@@ -261,8 +263,8 @@
visit(
Iterables.transform(
pendingList.subList(
- index * listSize / DEFAULT_THREAD_COUNT,
- Math.min(listSize, (index + 1) * listSize / DEFAULT_THREAD_COUNT)),
+ (index * listSize) / DEFAULT_THREAD_COUNT,
+ ((index + 1) * listSize) / DEFAULT_THREAD_COUNT),
Pair::getFirst),
InvalidationType.DELETED));
}
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 5d5e63c..9d2d260 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -39,6 +39,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
@@ -455,6 +456,34 @@
}
@Test
+ public void allNodesProcessed() throws Exception {
+ graph = new InMemoryGraphImpl();
+ ImmutableList.Builder<SkyKey> keysToDelete =
+ ImmutableList.builderWithExpectedSize(InvalidatingNodeVisitor.DEFAULT_THREAD_COUNT - 1);
+ for (int i = 0; i < InvalidatingNodeVisitor.DEFAULT_THREAD_COUNT - 1; i++) {
+ keysToDelete.add(GraphTester.nonHermeticKey("key" + i));
+ }
+ invalidate(graph, progressReceiver, keysToDelete.build().toArray(new SkyKey[0]));
+ assertThat(state.isEmpty()).isTrue();
+ }
+
+ @Test
+ public void deletingInsideForkJoinPoolWorks() throws Exception {
+ graph = new InMemoryGraphImpl();
+ ForkJoinPool outerPool = new ForkJoinPool(1);
+ outerPool
+ .submit(
+ () -> {
+ try {
+ invalidate(graph, progressReceiver, GraphTester.nonHermeticKey("a"));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ })
+ .get();
+ }
+
+ @Test
public void interruptThreadInReceiver() throws Exception {
Random random = new Random(TestUtils.getRandomSeed());
int graphSize = 1000;