Update ParallelSkyQueryUtils to use QuiescingExecutor instead of ForkJoinPool
for concurrent visitations.

During BFS visitation of rdeps and rbuildfiles, it uses a centralized pool
(backed by a LinkedBlockingQueue) to store all pending visits, and a
periodically running scheduler to schedule tasks for each pending visit.

--
MOS_MIGRATED_REVID=140398162
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 01cdca1..45d1c56 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -385,6 +385,11 @@
     }
   }
 
+  @Override
+  public long getRemainingTasksCount() {
+    return remainingTasks.get();
+  }
+
   /**
    * Subclasses may override this to make dynamic decisions about whether to run tasks
    * asynchronously versus in-thread.
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java b/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java
index f409bc2..93fc3c9 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java
@@ -22,11 +22,11 @@
 import java.util.concurrent.TimeUnit;
 
 /** A {@link BlockingQueue} with LIFO (last-in-first-out) ordering. */
-class BlockingStack<E> extends AbstractQueue<E> implements BlockingQueue<E> {
+public class BlockingStack<E> extends AbstractQueue<E> implements BlockingQueue<E> {
   // We just restrict to only using the *First methods on the deque, turning it into a stack.
   private final BlockingDeque<E> deque;
 
-  BlockingStack() {
+  public BlockingStack() {
     this.deque = new LinkedBlockingDeque<>();
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 65718c6..78bc93d 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -14,7 +14,6 @@
 package com.google.devtools.build.lib.concurrent;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
@@ -53,6 +52,9 @@
    */
   void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
 
+  /** Return the number of tasks which are not completed (running or waiting to be executed). */
+  long getRemainingTasksCount();
+
   /** Get latch that is released if a task throws an exception. Used only in tests. */
   @VisibleForTesting
   CountDownLatch getExceptionLatchForTestingOnly();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 5f7447b..086ecfc 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -22,11 +23,15 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
 import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.Callback;
@@ -45,10 +50,10 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.RecursiveAction;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
@@ -60,6 +65,10 @@
  */
 // TODO(bazel-team): Be more deliberate about bounding memory usage here.
 class ParallelSkyQueryUtils {
+
+  /** The maximum number of keys to visit at once. */
+  @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000;
+
   private ParallelSkyQueryUtils() {
   }
 
@@ -72,14 +81,13 @@
       QueryExpression expression,
       VariableContext<Target> context,
       ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool,
       MultisetSemaphore<PackageIdentifier> packageSemaphore)
           throws QueryException, InterruptedException {
     env.eval(
         expression,
         context,
         new SkyKeyBFSVisitorCallback(
-            new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore)));
+            new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore)));
   }
 
   /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -87,28 +95,24 @@
       SkyQueryEnvironment env,
       Collection<PathFragment> fileIdentifiers,
       ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool,
       MultisetSemaphore<PackageIdentifier> packageSemaphore)
           throws QueryException, InterruptedException {
     ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
     RBuildFilesVisitor visitor =
-        new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore);
+        new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore);
     visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers));
   }
 
   /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
   private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> {
-    private final SkyQueryEnvironment env;
     private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
     private RBuildFilesVisitor(
         SkyQueryEnvironment env,
-        ForkJoinPool forkJoinPool,
         ThreadSafeUniquifier<SkyKey> uniquifier,
         Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
-      super(forkJoinPool, uniquifier, callback);
-      this.env = env;
+      super(env, uniquifier, callback);
       this.packageSemaphore = packageSemaphore;
     }
 
@@ -171,17 +175,14 @@
    */
   private static class AllRdepsUnboundedVisitor
       extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> {
-    private final SkyQueryEnvironment env;
     private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
     private AllRdepsUnboundedVisitor(
         SkyQueryEnvironment env,
-        ForkJoinPool forkJoinPool,
         ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
         ThreadSafeCallback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
-      super(forkJoinPool, uniquifier, callback);
-      this.env = env;
+      super(env, uniquifier, callback);
       this.packageSemaphore = packageSemaphore;
     }
 
@@ -194,7 +195,6 @@
      */
     private static class Factory implements AbstractSkyKeyBFSVisitor.Factory {
       private final SkyQueryEnvironment env;
-      private final ForkJoinPool forkJoinPool;
       private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
       private final ThreadSafeCallback<Target> callback;
       private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
@@ -202,10 +202,8 @@
       private Factory(
         SkyQueryEnvironment env,
         ThreadSafeCallback<Target> callback,
-        ForkJoinPool forkJoinPool,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
         this.env = env;
-        this.forkJoinPool = forkJoinPool;
         this.uniquifier = env.createReverseDepSkyKeyUniquifier();
         this.callback = callback;
         this.packageSemaphore = packageSemaphore;
@@ -213,8 +211,7 @@
 
       @Override
       public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() {
-        return new AllRdepsUnboundedVisitor(
-            env, forkJoinPool, uniquifier, callback, packageSemaphore);
+        return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore);
       }
     }
 
@@ -267,29 +264,17 @@
       // recursive visitation.
       Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys);
 
-      // Build a collection of Pairs and group by package id so we can partition them efficiently
-      // later.
-      ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> rdepsByPackage =
-          ArrayListMultimap.create();
+      ImmutableList.Builder<Pair<SkyKey, SkyKey>> builder = ImmutableList.builder();
       for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
         for (SkyKey rdep : rdeps.getValue()) {
           Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
           if (label != null) {
-            rdepsByPackage.put(label.getPackageIdentifier(), Pair.of(rdeps.getKey(), rdep));
+            builder.add(Pair.of(rdeps.getKey(), rdep));
           }
         }
       }
 
-      // A couple notes here:
-      // (i)  ArrayListMultimap#values returns the values grouped by key, which is exactly what we
-      //      want.
-      // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
-      //      accidentally retaining the entire ArrayListMultimap object.
-      Iterable<Pair<SkyKey, SkyKey>> keysToVisit = ImmutableList.copyOf(rdepsByPackage.values());
-
-      // TODO(shazh): Use a global pool to store keys to be returned and keys to be processed, and
-      // assign them to VisitTasks. It allows us to better optimize package retrieval.
-      return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ keysToVisit);
+      return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ builder.build());
     }
 
     @Override
@@ -325,6 +310,33 @@
             }
           });
     }
+
+    @Override
+    protected Iterable<Task> getVisitTasks(Collection<Pair<SkyKey, SkyKey>> pendingKeysToVisit) {
+      // Group pending visits by package.
+      ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> visitsByPackage =
+          ArrayListMultimap.create();
+      for (Pair<SkyKey, SkyKey> visit : pendingKeysToVisit) {
+        Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(visit.second);
+        if (label != null) {
+          visitsByPackage.put(label.getPackageIdentifier(), visit);
+        }
+      }
+
+      ImmutableList.Builder<Task> builder = ImmutableList.builder();
+
+      // A couple notes here:
+      // (i)  ArrayListMultimap#values returns the values grouped by key, which is exactly what we
+      //      want.
+      // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
+      //      accidentally retaining the entire ArrayListMultimap object.
+      for (Iterable<Pair<SkyKey, SkyKey>> keysToVisitBatch :
+          Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
+        builder.add(new VisitTask(keysToVisitBatch));
+      }
+
+      return builder.build();
+    }
   }
 
   /**
@@ -349,28 +361,90 @@
 
   /**
    * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link
-   * ForkJoinPool}.
+   * QuiescingExecutor}.
    *
-   * <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a
-   * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a
-   * visitation and blocks on completion of it. But this visitation may never complete if there are
-   * a bounded number of threads in the global thread pool used for query evaluation!
+   * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool
+   * NOT part of the global query evaluation pool to avoid starvation.
    */
   @ThreadSafe
   private abstract static class AbstractSkyKeyBFSVisitor<T> {
-    private final ForkJoinPool forkJoinPool;
+    protected final SkyQueryEnvironment env;
     private final ThreadSafeUniquifier<T> uniquifier;
     private final Callback<Target> callback;
-    /** The maximum number of keys to visit at once. */
-    private static final int VISIT_BATCH_SIZE = 10000;
+
+    private final QuiescingExecutor executor;
+
+    /** A queue to store pending visits. */
+    private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
+
+    /**
+     * The max time interval between two scheduling passes in milliseconds. A scheduling pass is
+     * defined as the scheduler thread determining whether to drain all pending visits from the
+     * queue and submitting tasks to perform the visits.
+     *
+     * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a
+     * few facts about the scheduling interval:
+     *
+     * <p>1. A large interval adds systematic delay. In an extreme case, a BFS visit which is
+     * supposed to take only 1ms now may take 5ms. For most BFS visits which take longer than a few
+     * hundred milliseconds, it should not be noticeable.
+     *
+     * <p>2. A zero-interval config eats too much CPU.
+     *
+     * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time.
+     * Pending visits are drained only certain criteria are met.
+     */
+    private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1;
+
+    /**
+     * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on
+     * experiments. We do not want to schedule tasks too frequently to miss the benefits of large
+     * number of keys being grouped by packages. On the other hand, we want to keep all threads in
+     * the pool busy to achieve full capacity. A low number here will cause some of the worker
+     * threads to go idle at times before the next scheduling cycle.
+     *
+     * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance.
+     */
+    private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
+
+    /**
+     * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code
+     * RuntimeQueryException}, which are resulted from InterruptedException and QueryException.
+     */
+    static final ErrorClassifier SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER =
+        new ErrorClassifier() {
+          @Override
+          protected ErrorClassification classifyException(Exception e) {
+            return (e instanceof RuntimeException)
+                ? ErrorClassification.CRITICAL_AND_LOG
+                : ErrorClassification.NOT_CRITICAL;
+          }
+        };
+
+    /** All BFS visitors share a single global fixed thread pool. */
+    private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR =
+        new ThreadPoolExecutor(
+            // Must be at least 2 worker threads in the pool (1 for the scheduler thread).
+            /*corePoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
+            /*maximumPoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
+            /*keepAliveTime=*/ 1,
+            /*units=*/ TimeUnit.SECONDS,
+            /*workQueue=*/ new BlockingStack<Runnable>(),
+            new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build());
 
     private AbstractSkyKeyBFSVisitor(
-        ForkJoinPool forkJoinPool,
-        ThreadSafeUniquifier<T> uniquifier,
-        Callback<Target> callback) {
-      this.forkJoinPool = forkJoinPool;
+        SkyQueryEnvironment env, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
+      this.env = env;
       this.uniquifier = uniquifier;
       this.callback = callback;
+      this.executor =
+          new AbstractQueueVisitor(
+              /*concurrent=*/ true,
+              /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR,
+              // Leave the thread pool active for other current and future callers.
+              /*shutdownOnCompletion=*/ false,
+              /*failFastOnException=*/ true,
+              /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
     }
 
     /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
@@ -390,44 +464,103 @@
 
     void visitAndWaitForCompletion(Iterable<SkyKey> keys)
         throws QueryException, InterruptedException {
-      Iterable<ForkJoinTask<?>> tasks =
-          getTasks(
-              new Visit(
-                  /*keysToUseForResult=*/ ImmutableList.<SkyKey>of(),
-                  /*keysToVisit=*/ preprocessInitialVisit(keys)));
-      for (ForkJoinTask<?> task : tasks) {
-        forkJoinPool.execute(task);
-      }
+      processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
+      // We add the scheduler to the pool, allowing it (as well as any submitted tasks later)
+      // to be failed fast if any QueryException or InterruptedException is received.
+      executor.execute(new Scheduler());
       try {
-        MoreFutures.waitForAllInterruptiblyFailFast(tasks);
-      } catch (ExecutionException ee) {
-        Throwable cause = ee.getCause();
-        if (cause instanceof RuntimeQueryException) {
-          throw (QueryException) cause.getCause();
-        } else if (cause instanceof RuntimeInterruptedException) {
-          throw (InterruptedException) cause.getCause();
-        } else {
-          throw new IllegalStateException(cause);
+        executor.awaitQuiescence(true);
+      } catch (RuntimeQueryException e) {
+        throw (QueryException) e.getCause();
+      } catch (RuntimeInterruptedException e) {
+        throw (InterruptedException) e.getCause();
+      }
+    }
+
+    /**
+     * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in
+     * the full visitation to the given {@link Callback}.
+     */
+    protected abstract void processResultantTargets(
+        Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+        throws QueryException, InterruptedException;
+
+    /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
+    protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
+
+    /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
+    protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
+
+    protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) {
+      ImmutableList.Builder<Task> builder = ImmutableList.builder();
+      for (Iterable<T> keysToVisitBatch :
+          Iterables.partition(pendingKeysToVisit, VISIT_BATCH_SIZE)) {
+        builder.add(new VisitTask(keysToVisitBatch));
+      }
+
+      return builder.build();
+    }
+
+    private class Scheduler implements Runnable {
+      @Override
+      public void run() {
+        // The scheduler keeps running until both the following two conditions are met.
+        //
+        // 1. There is no pending visit in the queue.
+        // 2. There is no pending task (other than itself) in the pool.
+        if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) {
+          return;
+        }
+
+        // To achieve maximum efficiency, queue is drained in either of the following 2 conditions:
+        //
+        // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
+        // 2. The process queue size is large.
+        if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS
+            || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
+          drainProcessingQueue();
+        }
+
+        try {
+          // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds.
+          Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeInterruptedException(e);
+        }
+
+        executor.execute(new Scheduler());
+      }
+
+      private void drainProcessingQueue() {
+        Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
+        processingQueue.drainTo(pendingKeysToVisit);
+        if (pendingKeysToVisit.isEmpty()) {
+          return;
+        }
+
+        for (Task task : getVisitTasks(pendingKeysToVisit)) {
+          executor.execute(task);
         }
       }
     }
 
-    private abstract static class AbstractInternalRecursiveAction extends RecursiveAction {
-      protected abstract void computeImpl() throws QueryException, InterruptedException;
+    abstract static class Task implements Runnable {
 
       @Override
-      public final void compute() {
+      public void run() {
         try {
-          computeImpl();
-        } catch (QueryException queryException) {
-          throw new RuntimeQueryException(queryException);
-        } catch (InterruptedException interruptedException) {
-          throw new RuntimeInterruptedException(interruptedException);
+          process();
+        } catch (QueryException e) {
+          throw new RuntimeQueryException(e);
+        } catch (InterruptedException e) {
+          throw new RuntimeInterruptedException(e);
         }
       }
+
+      abstract void process() throws QueryException, InterruptedException;
     }
 
-    private class VisitTask extends AbstractInternalRecursiveAction {
+    class VisitTask extends Task {
       private final Iterable<T> keysToVisit;
 
       private VisitTask(Iterable<T> keysToVisit) {
@@ -435,22 +568,24 @@
       }
 
       @Override
-      protected void computeImpl() throws InterruptedException {
+      void process() throws InterruptedException {
         ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit);
         if (uniqueKeys.isEmpty()) {
           return;
         }
-        Iterable<ForkJoinTask<?>> tasks = getTasks(getVisitResult(uniqueKeys));
-        for (ForkJoinTask<?> task : tasks) {
-          task.fork();
+
+        Visit visit = getVisitResult(uniqueKeys);
+        for (Iterable<SkyKey> keysToUseForResultBatch :
+            Iterables.partition(
+                visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
+          executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch));
         }
-        for (ForkJoinTask<?> task : tasks) {
-          task.join();
-        }
+
+        processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit));
       }
     }
 
-    private class GetAndProcessResultsTask extends AbstractInternalRecursiveAction {
+    private class GetAndProcessResultsTask extends Task {
       private final Iterable<SkyKey> keysToUseForResult;
 
       private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) {
@@ -458,42 +593,10 @@
       }
 
       @Override
-      protected void computeImpl() throws QueryException, InterruptedException {
+      protected void process() throws QueryException, InterruptedException {
         processResultantTargets(keysToUseForResult, callback);
       }
     }
-
-    private Iterable<ForkJoinTask<?>> getTasks(Visit visit) {
-      // Split the given visit request into ForkJoinTasks for visiting keys and ForkJoinTasks for
-      // getting and outputting results, each of which obeys the separate batch limits.
-      // TODO(bazel-team): Attempt to group work on targets within the same package.
-      ImmutableList.Builder<ForkJoinTask<?>> tasksBuilder = ImmutableList.builder();
-      // Fork the tasks for getting and outputting results first - this way we maximize for
-      // throughput to the underlying callback.
-      for (Iterable<SkyKey> keysToUseForResultBatch : Iterables.partition(
-          visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
-        tasksBuilder.add(new GetAndProcessResultsTask(keysToUseForResultBatch));
-      }
-      for (Iterable<T> keysToVisitBatch :
-          Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) {
-        tasksBuilder.add(new VisitTask(keysToVisitBatch));
-      }
-      return tasksBuilder.build();
-    }
-
-    /**
-     * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s
-     * in the full visitation to the given {@link Callback}.
-     */
-    protected abstract void processResultantTargets(
-        Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
-            throws QueryException, InterruptedException;
-
-    /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
-    protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
-
-    /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
-    protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
   }
 
   private static class RuntimeQueryException extends RuntimeException {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 705738c..87a57af 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -853,17 +853,20 @@
   public Map<SkyKey, Target> makeTargetsFromPackageKeyToTargetKeyMap(
       Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
     ImmutableMap.Builder<SkyKey, Target> result = ImmutableMap.builder();
+    Set<SkyKey> processedTargets = new HashSet<>();
     Map<SkyKey, SkyValue> packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet());
     for (Map.Entry<SkyKey, SkyValue> entry : packageMap.entrySet()) {
       for (SkyKey targetKey : packageKeyToTargetKeyMap.get(entry.getKey())) {
-        try {
-          result.put(
-              targetKey,
-              ((PackageValue) entry.getValue())
-                  .getPackage()
-                  .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName()));
-        } catch (NoSuchTargetException e) {
-          // Skip missing target.
+        if (processedTargets.add(targetKey)) {
+          try {
+            result.put(
+                targetKey,
+                ((PackageValue) entry.getValue())
+                    .getPackage()
+                    .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName()));
+          } catch (NoSuchTargetException e) {
+            // Skip missing target.
+          }
         }
       }
     }
@@ -1013,8 +1016,7 @@
       ThreadSafeCallback<Target> callback,
       ForkJoinPool forkJoinPool)
       throws QueryException, InterruptedException {
-    ParallelSkyQueryUtils.getRBuildFilesParallel(
-        this, fileIdentifiers, callback, forkJoinPool, packageSemaphore);
+    ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
   }
 
   /**
@@ -1199,7 +1201,7 @@
       ForkJoinPool forkJoinPool)
       throws QueryException, InterruptedException {
     ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
-        this, expression, context, callback, forkJoinPool, packageSemaphore);
+        this, expression, context, callback, packageSemaphore);
   }
 
   @ThreadSafe