Fix StackOverflowException in PriorityWorkerPool.

Enqueues directly into the ForkJoinPool instead of executing inline when the
queue is full. Inline execution may recursively enqueue more tasks, eventually
leading to stack overflow.

Deletes TieredPriorityExecutor.tryDoQueuedWork. This has never been shown to
improve performance.

PiperOrigin-RevId: 552579621
Change-Id: I2f5c6154b26b785ebc31aba44283abeeec6a6a99
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/PriorityWorkerPool.java b/src/main/java/com/google/devtools/build/lib/concurrent/PriorityWorkerPool.java
index 6808fbf..59678bf 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/PriorityWorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/PriorityWorkerPool.java
@@ -21,9 +21,7 @@
 import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_CPU_HEAVY_TASK;
 import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_TASK;
 import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.IDLE;
-import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.QUIESCENT;
 import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -131,15 +129,18 @@
     this.pool = newForkJoinPool();
     this.errorClassifier = errorClassifier;
 
-    long baseAddress = createPaddedBaseAddress(4);
+    long baseAddress = createPaddedBaseAddress(5);
     cleaner.register(this, new AddressFreer(baseAddress));
     this.countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
+
     this.queue =
         new TaskFifo(
             /* sizeAddress= */ getAlignedAddress(baseAddress, /* offset= */ 1),
             /* appendIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 2),
             /* takeIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 3));
 
+    this.activeWorkerCountAddress = getAlignedAddress(baseAddress, /* offset= */ 4);
+
     resetExecutionCounters();
   }
 
@@ -157,29 +158,34 @@
       if (task.isCpuHeavy()) {
         cpuHeavyQueue.add(task);
         if (acquireThreadAndCpuPermitElseReleaseCpuHeavyTask()) {
+          UNSAFE.getAndAddInt(null, activeWorkerCountAddress, 1);
           pool.execute(RUN_CPU_HEAVY_TASK);
         }
         return;
       }
     }
 
-    while (!queue.tryAppend(rawTask)) {
-      // If the queue is full, this thread donates some work to reduce the queue.
-      if (!tryAcquireTask()) {
-        // This should be very hard to reach if the queue is full except under cancellation. It's
-        // possible to perform the cancellation check in advance, but we can save doing the check in
-        // most cases by deferring it to this branch.
-        if (isCancelled()) {
-          return;
-        }
-        logger.atWarning().atMostEvery(5, SECONDS).log(
-            "Queue is full but no tasks could be acquired: %s", this);
-        continue;
+    if (!queue.tryAppend(rawTask)) {
+      if (!isCancelled()) {
+        // The task queue is full (and the pool is not cancelled). Enqueues the task directly in the
+        // ForkJoinPool. This should be rare in practice.
+        UNSAFE.getAndAddInt(null, activeWorkerCountAddress, 1);
+        pool.execute(
+            () -> {
+              try {
+                rawTask.run();
+              } catch (Throwable uncaught) {
+                handleUncaughtError(uncaught);
+              } finally {
+                workerBecomingIdle();
+              }
+            });
       }
-      dequeueTaskAndRun();
+      return;
     }
 
     if (acquireThreadElseReleaseTask()) {
+      UNSAFE.getAndAddInt(null, activeWorkerCountAddress, 1);
       pool.execute(RUN_TASK);
     }
   }
@@ -328,12 +334,9 @@
    *
    * <p>After completing a task, the worker checks if there are any available tasks that it may
    * execute, subject to CPU permit constraints. On finding and reserving an appropriate task, the
-   * worker returns its next planned activity, {@link #IDLE} or {@link #QUIESCENT} if it finds
-   * nothing to do.
+   * worker returns its next planned activity, {@link #IDLE} if it finds nothing to do.
    */
   enum NextWorkerActivity {
-    /** The worker will stop and is the last worker working. */
-    QUIESCENT,
     /** The worker will stop. */
     IDLE,
     /** The worker will perform a non-CPU heavy task. */
@@ -386,12 +389,8 @@
     private void runLoop(NextWorkerActivity nextActivity) {
       while (true) {
         switch (nextActivity) {
-          case QUIESCENT:
-            synchronized (quiescenceMonitor) {
-              quiescenceMonitor.notifyAll();
-            }
-            return;
           case IDLE:
+            workerBecomingIdle();
             return;
           case DO_TASK:
             dequeueTaskAndRun();
@@ -404,14 +403,6 @@
         }
       }
     }
-
-    boolean tryDoQueuedWork() {
-      if (!tryAcquireTask()) {
-        return false;
-      }
-      dequeueTaskAndRun();
-      return true;
-    }
   }
 
   private void dequeueTaskAndRun() {
@@ -431,6 +422,14 @@
     }
   }
 
+  private void workerBecomingIdle() {
+    if (UNSAFE.getAndAddInt(null, activeWorkerCountAddress, -1) == 1) {
+      synchronized (quiescenceMonitor) {
+        quiescenceMonitor.notifyAll();
+      }
+    }
+  }
+
   // The constants below apply to the 64-bit execution counters value.
 
   private static final long CANCEL_BIT = 0x8000_0000_0000_0000L;
@@ -479,10 +478,10 @@
    */
   private final long countersAddress;
 
+  private final long activeWorkerCountAddress;
+
   boolean isQuiescent() {
-    long snapshot = getExecutionCounters();
-    int threadsSnapshot = (int) ((snapshot & THREADS_MASK) >> THREADS_BIT_OFFSET);
-    return threadsSnapshot == poolSize;
+    return UNSAFE.getInt(null, activeWorkerCountAddress) == 0;
   }
 
   boolean isCancelled() {
@@ -505,6 +504,7 @@
         countersAddress,
         (((long) poolSize) << THREADS_BIT_OFFSET)
             | (((long) cpuPermits) << CPU_PERMITS_BIT_OFFSET));
+    UNSAFE.putInt(null, activeWorkerCountAddress, 0);
   }
 
   private boolean acquireThreadElseReleaseTask() {
@@ -532,17 +532,6 @@
     } while (true);
   }
 
-  private boolean tryAcquireTask() {
-    long snapshot;
-    do {
-      snapshot = getExecutionCounters();
-      if ((snapshot & TASKS_MASK) == 0 || snapshot < 0) {
-        return false;
-      }
-    } while (!tryUpdateExecutionCounters(snapshot, snapshot - ONE_TASK));
-    return true;
-  }
-
   /**
    * Worker threads determine their next action after completing a task using this method.
    *
@@ -564,7 +553,7 @@
       } else {
         long target = snapshot + ONE_THREAD;
         if (UNSAFE.compareAndSwapLong(null, countersAddress, snapshot, target)) {
-          return quiescentOrIdle(target);
+          return IDLE;
         }
       }
       snapshot = UNSAFE.getLong(null, countersAddress);
@@ -574,8 +563,8 @@
   /**
    * Worker threads call this to determine their next action after completing a CPU heavy task.
    *
-   * <p>This releases a CPU permit when returning {@link NextWorkerActivity#QUIESCENT}, {@link
-   * NextWorkerActivity#IDLE} or {@link NextWorkerActivity#DO_TASK}.
+   * <p>This releases a CPU permit when returning {@link NextWorkerActivity#IDLE} or {@link
+   * NextWorkerActivity#DO_TASK}.
    */
   private NextWorkerActivity getActivityFollowingCpuHeavyTask() {
     long snapshot = UNSAFE.getLongVolatile(null, countersAddress);
@@ -593,18 +582,13 @@
       } else {
         long target = snapshot + CPU_HEAVY_RESOURCES;
         if (UNSAFE.compareAndSwapLong(null, countersAddress, snapshot, target)) {
-          return quiescentOrIdle(target);
+          return IDLE;
         }
       }
       snapshot = UNSAFE.getLong(null, countersAddress);
     } while (true);
   }
 
-  private NextWorkerActivity quiescentOrIdle(long snapshot) {
-    int snapshotThreads = (int) ((snapshot & THREADS_MASK) >> THREADS_BIT_OFFSET);
-    return snapshotThreads == poolSize ? QUIESCENT : IDLE;
-  }
-
   // Throughout this class, the following wrappers are used where possible, but they are often not
   // inlined by the JVM even though they show up on profiles, so they are inlined explicitly in
   // numerous cases.
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutor.java
index 1d1912a..3aa135b 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutor.java
@@ -16,7 +16,6 @@
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.throwIfUnchecked;
-import static java.lang.Thread.currentThread;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -41,11 +40,6 @@
  *
  * <p>The queue for non-CPUHeavy tasks has a fixed capacity. When full, callers of execute assist
  * with enqueued work.
- *
- * <p>Threads may voluntarily assist with queued work by calling {@link
- * TieredPriorityExecutor#tryDoQueuedWork} when a thread is about to block. If tasks are available,
- * the current thread may perform a task inside {@link TieredPriorityExecutor#tryDoQueuedWork}
- * before it returns. This may add latency to the donating thread but can reduce overhead.
  */
 public final class TieredPriorityExecutor implements QuiescingExecutor {
   /** A common cleaner shared by all executors. */
@@ -164,23 +158,6 @@
   }
 
   /**
-   * Attempts to donate work on the current thread.
-   *
-   * <p>Calling this method may be useful if the current thread is about to block. Subject to
-   * scheduling constraints, attempts to poll work from the queue and execute it on the current
-   * thread.
-   *
-   * @return true if work was donated, false otherwise.
-   */
-  public static boolean tryDoQueuedWork() {
-    var thread = currentThread();
-    if (!(thread instanceof PriorityWorkerPool.WorkerThread)) {
-      return false;
-    }
-    return ((PriorityWorkerPool.WorkerThread) thread).tryDoQueuedWork();
-  }
-
-  /**
    * The parallelism target of the underlying thread pool.
    *
    * <p>Public to allow clients to examine this executor's configuration and suitability for reuse.
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutorTest.java
index bac6320..0809fe5 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/TieredPriorityExecutorTest.java
@@ -36,7 +36,6 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -494,94 +493,6 @@
   }
 
   @Test
-  public void workDonation_processesAllTasks() throws InterruptedException {
-    var holdAllThreads = new CountDownLatch(1);
-    for (int i = 0; i < POOL_SIZE - 1; ++i) {
-      executor.execute(() -> awaitUninterruptibly(holdAllThreads));
-    }
-
-    var donor = new AtomicReference<Thread>();
-    var donorSet = new CountDownLatch(1);
-    var gate = new CountDownLatch(1);
-    var donationDone = new CountDownLatch(1);
-    executor.execute(
-        () -> {
-          donor.set(Thread.currentThread());
-          donorSet.countDown();
-          awaitUninterruptibly(gate);
-          for (int i = 0; i < 100; ++i) {
-            assertThat(TieredPriorityExecutor.tryDoQueuedWork()).isTrue();
-          }
-          donationDone.countDown();
-          awaitUninterruptibly(holdAllThreads);
-        });
-
-    var receiver = new AtomicInteger();
-    for (int i = 0; i < 100; ++i) {
-      executor.execute(
-          () -> {
-            // This task is running from the donor's thread.
-            assertThat(Thread.currentThread()).isEqualTo(donor.get());
-            receiver.getAndIncrement();
-          });
-    }
-
-    donorSet.await();
-    gate.countDown();
-    donationDone.await();
-    assertThat(receiver.get()).isEqualTo(100);
-    holdAllThreads.countDown();
-    executor.awaitQuiescence(/* interruptWorkers= */ true);
-  }
-
-  @Test
-  public void workDonation_handlesErrorsInDonatedWork() throws InterruptedException {
-    var interruptedCount = new AtomicInteger();
-    var holdAllThreads = new CountDownLatch(1);
-    for (int i = 0; i < POOL_SIZE - 1; ++i) {
-      executor.execute(
-          () -> {
-            try {
-              while (!holdAllThreads.await(INTERRUPT_POLL_MS, MILLISECONDS)) {}
-            } catch (InterruptedException e) {
-              interruptedCount.getAndIncrement();
-            }
-          });
-    }
-
-    var donor = new AtomicReference<Thread>();
-    var donorSet = new CountDownLatch(1);
-    var gate = new CountDownLatch(1);
-    executor.execute(
-        () -> {
-          donor.set(Thread.currentThread());
-          donorSet.countDown();
-          awaitUninterruptibly(gate);
-          assertThat(TieredPriorityExecutor.tryDoQueuedWork()).isTrue();
-          try {
-            while (!holdAllThreads.await(INTERRUPT_POLL_MS, MILLISECONDS)) {}
-          } catch (InterruptedException e) {
-            interruptedCount.getAndIncrement();
-          }
-        });
-
-    executor.execute(
-        () -> {
-          assertThat(Thread.currentThread()).isEqualTo(donor.get());
-          throw new AssertionError("critical error");
-        });
-
-    donorSet.await();
-    gate.countDown();
-
-    var error =
-        assertThrows(
-            AssertionError.class, () -> executor.awaitQuiescence(/* interruptWorkers= */ true));
-    assertThat(error).hasMessageThat().contains("critical error");
-    assertThat(interruptedCount.get()).isEqualTo(POOL_SIZE);
-  }
-
-  @Test
   public void settableFuture_respondsToInterrupt() throws InterruptedException {
     var interruptedCount = new AtomicInteger();
     var allStarted = new CountDownLatch(POOL_SIZE);
@@ -682,31 +593,22 @@
     // Waits for holders to start, otherwise they might race against the filling of the queue below.
     allHoldersStarted.await();
 
-    // Fills up the queue.
-    var executed = new ArrayList<Integer>();
+    // Over-fills the queue.
+    var executed = Sets.<Integer>newConcurrentHashSet();
     var expected = new ArrayList<Integer>();
-    for (int i = 0; i < PriorityWorkerPool.TASKS_MAX_VALUE; ++i) {
+    for (int i = 0; i < 2 * PriorityWorkerPool.TASKS_MAX_VALUE; ++i) {
       expected.add(i);
 
       final int index = i;
       executor.execute(() -> executed.add(index));
     }
 
-    // Adds tasks that would overflow the queue. Since overflows consume tasks from the queue, this
-    // causes all the tasks above to be executed.
-    var donorValues = Sets.<Integer>newConcurrentHashSet();
-    for (int i = 0; i < PriorityWorkerPool.TASKS_MAX_VALUE; ++i) {
-      final int index = i;
-      executor.execute(() -> donorValues.add(index));
-    }
-
-    assertThat(executed).isEqualTo(expected);
-    assertThat(donorValues).isEmpty();
+    assertThat(executed).isEmpty();
 
     holdAllThreads.countDown();
     executor.awaitQuiescence(/* interruptWorkers= */ true);
 
-    assertThat(donorValues).containsExactlyElementsIn(expected);
+    assertThat(executed).containsExactlyElementsIn(expected);
   }
 
   @Test