Fix terrible ForkJoinQuiescingExecutor latent bug exposed by new usage of ForkJoinPool: only "adapt" a runnable to run in an existing FJP if that existing FJP is the same as the executor's: don't enqueue them to run in whatever random FJP is trying to enqueue this runnable. Big props to michajlo@ for quickly diagnosing this and suggesting the fix.

PiperOrigin-RevId: 391881393
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
index 593662c..1f1f610 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
@@ -46,18 +46,6 @@
 
     /**
      * Sets the {@link ForkJoinPool} that will be used by the to-be-built
-     * {@link ForkJoinQuiescingExecutor}. The given {@link ForkJoinPool} will _not_ be shut down on
-     * completion of the {@link ForkJoinQuiescingExecutor}.
-     */
-    public Builder withoutOwnershipOf(ForkJoinPool forkJoinPool) {
-      Preconditions.checkState(this.forkJoinPool == null);
-      this.forkJoinPool = forkJoinPool;
-      this.owned = false;
-      return this;
-    }
-
-    /**
-     * Sets the {@link ForkJoinPool} that will be used by the to-be-built
      * {@link ForkJoinQuiescingExecutor}. The given {@link ForkJoinPool} will be shut down on
      * completion of the {@link ForkJoinQuiescingExecutor}.
      */
@@ -94,8 +82,8 @@
 
   @Override
   protected void executeWrappedRunnable(WrappedRunnable runnable, ExecutorService executorService) {
-    if (ForkJoinTask.inForkJoinPool()) {
-      @SuppressWarnings("unused") 
+    if (ForkJoinTask.getPool() == executorService) {
+      @SuppressWarnings("unused")
       Future<?> possiblyIgnoredError = ForkJoinTask.adapt(runnable).fork();
     } else {
       super.executeWrappedRunnable(runnable, executorService);
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 405efce..a84da19 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -71,7 +71,9 @@
   // Default thread count is equal to the number of cores to exploit
   // that level of hardware parallelism, since invalidation should be CPU-bound.
   // We may consider increasing this in the future.
-  private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
+  @VisibleForTesting
+  static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
+
   private static final int EXPECTED_PENDING_SET_SIZE = DEFAULT_THREAD_COUNT * 8;
   private static final int EXPECTED_VISITED_SET_SIZE = 1024;
 
@@ -261,8 +263,8 @@
                   visit(
                       Iterables.transform(
                           pendingList.subList(
-                              index * listSize / DEFAULT_THREAD_COUNT,
-                              Math.min(listSize, (index + 1) * listSize / DEFAULT_THREAD_COUNT)),
+                              (index * listSize) / DEFAULT_THREAD_COUNT,
+                              ((index + 1) * listSize) / DEFAULT_THREAD_COUNT),
                           Pair::getFirst),
                       InvalidationType.DELETED));
         }
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 5d5e63c..9d2d260 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -39,6 +39,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
@@ -455,6 +456,34 @@
   }
 
   @Test
+  public void allNodesProcessed() throws Exception {
+    graph = new InMemoryGraphImpl();
+    ImmutableList.Builder<SkyKey> keysToDelete =
+        ImmutableList.builderWithExpectedSize(InvalidatingNodeVisitor.DEFAULT_THREAD_COUNT - 1);
+    for (int i = 0; i < InvalidatingNodeVisitor.DEFAULT_THREAD_COUNT - 1; i++) {
+      keysToDelete.add(GraphTester.nonHermeticKey("key" + i));
+    }
+    invalidate(graph, progressReceiver, keysToDelete.build().toArray(new SkyKey[0]));
+    assertThat(state.isEmpty()).isTrue();
+  }
+
+  @Test
+  public void deletingInsideForkJoinPoolWorks() throws Exception {
+    graph = new InMemoryGraphImpl();
+    ForkJoinPool outerPool = new ForkJoinPool(1);
+    outerPool
+        .submit(
+            () -> {
+              try {
+                invalidate(graph, progressReceiver, GraphTester.nonHermeticKey("a"));
+              } catch (InterruptedException e) {
+                throw new IllegalStateException(e);
+              }
+            })
+        .get();
+  }
+
+  @Test
   public void interruptThreadInReceiver() throws Exception {
     Random random = new Random(TestUtils.getRandomSeed());
     int graphSize = 1000;