Do not use additional scheduling threads during parallel evaluation to prevent thread starvation

This change gets rid of the additional thread needed for task scheduling during
BFS visitation, which eliminates the possibility of thread starvation while a
single thread pool is used for multiple concurrent evaluations.

--
PiperOrigin-RevId: 148911346
MOS_MIGRATED_REVID=148911346
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 45d1c56..9bbb2af 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -108,9 +108,10 @@
    * Flag used to record when all threads were killed by failed action execution. Only ever
    * transitions from {@code false} to {@code true}.
    *
-   * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}.
+   * <p>Except for {@link #mustJobsBeStopped}, may only be accessed in a block that is synchronized
+   * on {@link #zeroRemainingTasks}.
    */
-  private boolean jobsMustBeStopped = false;
+  private volatile boolean jobsMustBeStopped = false;
 
   /** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
   private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
@@ -142,7 +143,7 @@
 
   private final ErrorClassifier errorClassifier;
 
-  private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
+  private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
 
   /**
    * Create the {@link AbstractQueueVisitor}.
@@ -385,11 +386,6 @@
     }
   }
 
-  @Override
-  public long getRemainingTasksCount() {
-    return remainingTasks.get();
-  }
-
   /**
    * Subclasses may override this to make dynamic decisions about whether to run tasks
    * asynchronously versus in-thread.
@@ -415,10 +411,10 @@
     ErrorClassification errorClassification = errorClassifier.classify(e);
     switch (errorClassification) {
         case AS_CRITICAL_AS_POSSIBLE:
-        case CRITICAL_AND_LOG:
-          critical = true;
-          LOG.log(Level.WARNING, "Found critical error in queue visitor", e);
-          break;
+      case CRITICAL_AND_LOG:
+        critical = true;
+        logger.log(Level.WARNING, "Found critical error in queue visitor", e);
+        break;
         case CRITICAL:
           critical = true;
           break;
@@ -524,7 +520,7 @@
   }
 
   /** Set an internal flag to show that an interrupt was detected. */
-  private void setInterrupted() {
+  protected final void setInterrupted() {
     threadInterrupted = true;
   }
 
@@ -568,17 +564,27 @@
    * Get number of jobs remaining. Note that this can increase in value if running tasks submit
    * further jobs.
    */
-  @VisibleForTesting
   protected final long getTaskCount() {
     return remainingTasks.get();
   }
 
   /**
-   * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and
-   * waits for it to terminate.  Throws (the same) unchecked exception if any
-   * worker thread failed unexpectedly.
+   * Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks
+   * will be rejected if this is true.
+   *
+   * <p>This function returns the CURRENT state of whether jobs should be stopped. If the value is
+   * false right now, it may be changed to true by another thread later.
    */
-  private void awaitTermination(boolean interruptWorkers) throws InterruptedException {
+  protected final boolean mustJobsBeStopped() {
+    return jobsMustBeStopped;
+  }
+
+  /**
+   * Waits for the task queue to drain. Then if {@code ownExecutorService} is true, shuts down the
+   * {@link ExecutorService} and waits for it to terminate. Throws (the same) unchecked exception if
+   * any worker thread failed unexpectedly.
+   */
+  protected final void awaitTermination(boolean interruptWorkers) throws InterruptedException {
     Throwables.propagateIfPossible(catastrophe);
     try {
       synchronized (zeroRemainingTasks) {
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 78bc93d..3424bb4 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -52,9 +52,6 @@
    */
   void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
 
-  /** Return the number of tasks which are not completed (running or waiting to be executed). */
-  long getRemainingTasksCount();
-
   /** Get latch that is released if a task throws an exception. Used only in tests. */
   @VisibleForTesting
   CountDownLatch getExceptionLatchForTestingOnly();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 30a27f8..95486c6 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -373,7 +373,7 @@
     private final ThreadSafeUniquifier<T> uniquifier;
     private final Callback<Target> callback;
 
-    private final QuiescingExecutor executor;
+    private final BFSVisitingTaskExecutor executor;
 
     /** A queue to store pending visits. */
     private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
@@ -439,13 +439,8 @@
       this.uniquifier = uniquifier;
       this.callback = callback;
       this.executor =
-          new AbstractQueueVisitor(
-              /*concurrent=*/ true,
-              /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR,
-              // Leave the thread pool active for other current and future callers.
-              /*shutdownOnCompletion=*/ false,
-              /*failFastOnException=*/ true,
-              /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
+          new BFSVisitingTaskExecutor(
+              FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
     }
 
     /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
@@ -466,16 +461,7 @@
     void visitAndWaitForCompletion(Iterable<SkyKey> keys)
         throws QueryException, InterruptedException {
       processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
-      // We add the scheduler to the pool, allowing it (as well as any submitted tasks later)
-      // to be failed fast if any QueryException or InterruptedException is received.
-      executor.execute(new Scheduler());
-      try {
-        executor.awaitQuiescence(true);
-      } catch (RuntimeQueryException e) {
-        throw (QueryException) e.getCause();
-      } catch (RuntimeInterruptedException e) {
-        throw (InterruptedException) e.getCause();
-      }
+      executor.bfsVisitAndWaitForCompletion();
     }
 
     /**
@@ -502,49 +488,6 @@
       return builder.build();
     }
 
-    private class Scheduler implements Runnable {
-      @Override
-      public void run() {
-        // The scheduler keeps running until both the following two conditions are met.
-        //
-        // 1. There is no pending visit in the queue.
-        // 2. There is no pending task (other than itself) in the pool.
-        if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) {
-          return;
-        }
-
-        // To achieve maximum efficiency, queue is drained in either of the following 2 conditions:
-        //
-        // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
-        // 2. The process queue size is large.
-        if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS
-            || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
-          drainProcessingQueue();
-        }
-
-        try {
-          // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds.
-          Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
-        } catch (InterruptedException e) {
-          throw new RuntimeInterruptedException(e);
-        }
-
-        executor.execute(new Scheduler());
-      }
-
-      private void drainProcessingQueue() {
-        Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
-        processingQueue.drainTo(pendingKeysToVisit);
-        if (pendingKeysToVisit.isEmpty()) {
-          return;
-        }
-
-        for (Task task : getVisitTasks(pendingKeysToVisit)) {
-          executor.execute(task);
-        }
-      }
-    }
-
     abstract static class Task implements Runnable {
 
       @Override
@@ -598,6 +541,75 @@
         processResultantTargets(keysToUseForResult, callback);
       }
     }
+
+    /**
+     * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and
+     * scheduler for parallel BFS visitations.
+     */
+    private class BFSVisitingTaskExecutor extends AbstractQueueVisitor {
+      private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
+        super(
+            /*concurrent=*/ true,
+            /*executorService=*/ executor,
+            // Leave the thread pool active for other current and future callers.
+            /*shutdownOnCompletion=*/ false,
+            /*failFastOnException=*/ true,
+            /*errorClassifier=*/ errorClassifier);
+      }
+
+      private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException {
+        // The scheduler keeps running until either of the following two conditions are met.
+        //
+        // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail
+        //    fast.
+        // 2. There is no pending visit in the queue and no pending task running.
+        while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) {
+          // To achieve maximum efficiency, queue is drained in either of the following two
+          // conditions:
+          //
+          // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
+          // 2. The process queue size is large.
+          if (getTaskCount() < MIN_PENDING_TASKS
+              || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
+
+            Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
+            processingQueue.drainTo(pendingKeysToVisit);
+            for (Task task : getVisitTasks(pendingKeysToVisit)) {
+              execute(task);
+            }
+          }
+
+          try {
+            Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
+          } catch (InterruptedException e) {
+            // If the main thread waiting for completion of the visitation is interrupted, we should
+            // gracefully terminate all running and pending tasks before exit. If QueryException
+            // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny
+            // propagates the QueryException instead of InterruptedException.
+            setInterrupted();
+            awaitTerminationAndPropagateErrorsIfAny();
+            throw e;
+          }
+        }
+
+        // We reach here either because the visitation is complete, or because an error prevents us
+        // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either
+        // gracefully exit if the visitation is complete, or propagate the exception if error
+        // occurred.
+        awaitTerminationAndPropagateErrorsIfAny();
+      }
+
+      private void awaitTerminationAndPropagateErrorsIfAny()
+          throws QueryException, InterruptedException {
+        try {
+          awaitTermination(/*interruptWorkers=*/ true);
+        } catch (RuntimeQueryException e) {
+          throw (QueryException) e.getCause();
+        } catch (RuntimeInterruptedException e) {
+          throw (InterruptedException) e.getCause();
+        }
+      }
+    }
   }
 
   private static class RuntimeQueryException extends RuntimeException {