Description redacted.
--
PiperOrigin-RevId: 149585165
MOS_MIGRATED_REVID=149585165
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 9a5ffd4..de5175d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -29,14 +29,20 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
 import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
 import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
-import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
@@ -58,19 +64,18 @@
 import com.google.devtools.build.lib.query2.engine.KeyExtractor;
 import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.MinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.RdepsFunction;
 import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.TargetLiteral;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeMinDepthUniquifier;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -110,7 +115,9 @@
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -121,6 +128,10 @@
  * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any
  * particular order. As well, this class eagerly loads the full transitive closure of targets, even
  * if the full closure isn't needed.
+ *
+ * <p>This class has concurrent implementations of the
+ * {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the
+ * asynchronous evaluation model yields parallel query evaluation.
  */
 public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
     implements StreamableQueryEnvironment<Target> {
@@ -144,7 +155,7 @@
   protected WalkableGraph graph;
   private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
   private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider;
-  private ForkJoinPool forkJoinPool;
+  private ListeningExecutorService executor;
   private RecursivePackageProviderBackedTargetPatternResolver resolver;
   private final SkyKey universeKey;
   private final ImmutableList<TargetPatternKey> universeTargetPatternKeys;
@@ -155,7 +166,6 @@
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
       Iterable<QueryFunction> extraFunctions,
-      QueryExpressionEvalListener<Target> evalListener,
       String parserPrefix,
       WalkableGraphFactory graphFactory,
       List<String> universeScope,
@@ -169,7 +179,6 @@
         eventHandler,
         settings,
         extraFunctions,
-        evalListener,
         parserPrefix,
         graphFactory,
         universeScope,
@@ -183,7 +192,6 @@
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
       Iterable<QueryFunction> extraFunctions,
-      QueryExpressionEvalListener<Target> evalListener,
       String parserPrefix,
       WalkableGraphFactory graphFactory,
       List<String> universeScope,
@@ -194,8 +202,7 @@
         /*labelFilter=*/ Rule.ALL_LABELS,
         eventHandler,
         settings,
-        extraFunctions,
-        evalListener);
+        extraFunctions);
     this.loadingPhaseThreads = loadingPhaseThreads;
     this.graphFactory = graphFactory;
     this.pkgPath = pkgPath;
@@ -228,9 +235,15 @@
       graphBackedRecursivePackageProvider =
           new GraphBackedRecursivePackageProvider(graph, universeTargetPatternKeys, pkgPath);
     }
-    if (forkJoinPool == null) {
-      forkJoinPool =
-          NamedForkJoinPool.newNamedPool("QueryEnvironment", queryEvaluationParallelismLevel);
+    if (executor == null) {
+      executor = MoreExecutors.listeningDecorator(
+          new ThreadPoolExecutor(
+            /*corePoolSize=*/ queryEvaluationParallelismLevel,
+            /*maximumPoolSize=*/ queryEvaluationParallelismLevel,
+            /*keepAliveTime=*/ 1,
+            /*units=*/ TimeUnit.SECONDS,
+            /*workQueue=*/ new BlockingStack<Runnable>(),
+            new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build()));
     }
     resolver =
         new RecursivePackageProviderBackedTargetPatternResolver(
@@ -340,16 +353,17 @@
     } catch (Throwable throwable) {
       throwableToThrow = throwable;
     } finally {
-      if (throwableToThrow  != null) {
-        LOG.log(Level.INFO, "About to shutdown FJP because of throwable", throwableToThrow);
+      if (throwableToThrow != null) {
+        LOG.log(
+            Level.INFO,
+            "About to shutdown query threadpool because of throwable",
+            throwableToThrow);
         // Force termination of remaining tasks if evaluation failed abruptly (e.g. was
         // interrupted). We don't want to leave any dangling threads running tasks.
-        forkJoinPool.shutdownNow();
-      }
-      forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-      if (throwableToThrow  != null) {
-        // Signal that pool must be recreated on the next invocation.
-        forkJoinPool = null;
+        executor.shutdownNow();
+        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        // Signal that executor must be recreated on the next invocation.
+        executor = null;
         Throwables.propagateIfPossible(
             throwableToThrow, QueryException.class, InterruptedException.class);
       }
@@ -358,7 +372,7 @@
 
   @Override
   public QueryEvalResult evaluateQuery(
-      QueryExpression expr, OutputFormatterCallback<Target> callback)
+      QueryExpression expr, ThreadSafeOutputFormatterCallback<Target> callback)
           throws QueryException, InterruptedException, IOException {
     // Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The
     // result is set to have an error iff there were errors emitted during the query, so we reset
@@ -569,46 +583,85 @@
     return null;
   }
 
-  @ThreadSafe
-  @Override
-  public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
-      throws QueryException, InterruptedException {
-    // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
-    // all QueryEnvironment implementations.
-    if (callback instanceof ThreadSafeCallback) {
-      expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
-    } else {
-      expr.eval(this, context, callback);
+  private <R> ListenableFuture<R> safeSubmit(Callable<R> callable) {
+    try {
+      return executor.submit(callable);
+    } catch (RejectedExecutionException e) {
+      return Futures.immediateCancelledFuture();
     }
   }
 
   @ThreadSafe
   @Override
-  public ThreadSafeUniquifier<Target> createUniquifier() {
+  public QueryTaskFuture<Void> eval(
+      final QueryExpression expr,
+      final VariableContext<Target> context,
+      final Callback<Target> callback) {
+    // TODO(bazel-team): As in here, use concurrency for the async #eval of other QueryEnvironment
+    // implementations.
+    Callable<QueryTaskFutureImpl<Void>> task = new Callable<QueryTaskFutureImpl<Void>>() {
+      @Override
+      public QueryTaskFutureImpl<Void> call() {
+        return (QueryTaskFutureImpl<Void>) expr.eval(SkyQueryEnvironment.this, context, callback);
+      }
+    };
+    ListenableFuture<QueryTaskFutureImpl<Void>> futureFuture = safeSubmit(task);
+    return QueryTaskFutureImpl.ofDelegate(Futures.dereference(futureFuture));
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
+    return QueryTaskFutureImpl.ofDelegate(safeSubmit(callable));
+  }
+
+  @Override
+  public <T1, T2> QueryTaskFuture<T2> transformAsync(
+      QueryTaskFuture<T1> future,
+      final Function<T1, QueryTaskFuture<T2>> function) {
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.transformAsync(
+            (QueryTaskFutureImpl<T1>) future,
+            new AsyncFunction<T1, T2>() {
+              @Override
+              public ListenableFuture<T2> apply(T1 input) {
+                return (QueryTaskFutureImpl<T2>) function.apply(input);
+              }
+            },
+            executor));
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> whenAllSucceedCall(
+      Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.whenAllSucceed(cast(futures)).call(callable, executor));
+  }
+
+  @ThreadSafe
+  @Override
+  public Uniquifier<Target> createUniquifier() {
     return createTargetUniquifier();
   }
 
   @ThreadSafe
   @Override
-  public ThreadSafeMinDepthUniquifier<Target> createMinDepthUniquifier() {
-    return new ThreadSafeMinDepthUniquifierImpl<>(
-        TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  public MinDepthUniquifier<Target> createMinDepthUniquifier() {
+    return new MinDepthUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
-  ThreadSafeUniquifier<Target> createTargetUniquifier() {
-    return new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  Uniquifier<Target> createTargetUniquifier() {
+    return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
-  ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
-    return new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  Uniquifier<SkyKey> createSkyKeyUniquifier() {
+    return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
-  ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
-    return new ThreadSafeUniquifierImpl<>(
-        ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  Uniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
+    return new UniquifierImpl<>(ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -625,41 +678,44 @@
 
   @ThreadSafe
   @Override
-  public void getTargetsMatchingPattern(
-      QueryExpression owner, String pattern, Callback<Target> callback)
-      throws QueryException, InterruptedException {
+  public QueryTaskFuture<Void> getTargetsMatchingPattern(
+      final QueryExpression owner, String pattern, Callback<Target> callback) {
     // Directly evaluate the target pattern, making use of packages in the graph.
+    Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude;
     try {
-      Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
-          getPatternAndExcludes(pattern);
-      TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
-      ImmutableSet<PathFragment> subdirectoriesToExclude =
-          patternToEvalAndSubdirectoriesToExclude.getSecond();
-      patternToEval.eval(resolver, subdirectoriesToExclude, callback, QueryException.class);
-    } catch (TargetParsingException e) {
-      reportBuildFileError(owner, e.getMessage());
+      patternToEvalAndSubdirectoriesToExclude = getPatternAndExcludes(pattern);
+    } catch (TargetParsingException tpe) {
+      try {
+        reportBuildFileError(owner, tpe.getMessage());
+      } catch (QueryException qe) {
+        return immediateFailedFuture(qe);
+      }
+      return immediateSuccessfulFuture(null);
+    } catch (InterruptedException ie) {
+      return immediateCancelledFuture();
     }
-  }
-
-  @Override
-  public void getTargetsMatchingPatternPar(
-      QueryExpression owner,
-      String pattern,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    // Directly evaluate the target pattern, making use of packages in the graph.
-    try {
-      Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
-          getPatternAndExcludes(pattern);
-      TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
-      ImmutableSet<PathFragment> subdirectoriesToExclude =
-          patternToEvalAndSubdirectoriesToExclude.getSecond();
-      patternToEval.parEval(
-          resolver, subdirectoriesToExclude, callback, QueryException.class, forkJoinPool);
-    } catch (TargetParsingException e) {
-      reportBuildFileError(owner, e.getMessage());
-    }
+    TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
+    ImmutableSet<PathFragment> subdirectoriesToExclude =
+        patternToEvalAndSubdirectoriesToExclude.getSecond();
+    AsyncFunction<TargetParsingException, Void> reportBuildFileErrorAsyncFunction =
+        new AsyncFunction<TargetParsingException, Void>() {
+      @Override
+      public ListenableFuture<Void> apply(TargetParsingException exn) throws QueryException {
+        reportBuildFileError(owner, exn.getMessage());
+        return Futures.immediateFuture(null);
+      }
+    };
+    ListenableFuture<Void> evalFuture = patternToEval.evalAsync(
+        resolver,
+        subdirectoriesToExclude,
+        callback,
+        QueryException.class,
+        executor);
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.catchingAsync(
+            evalFuture,
+            TargetParsingException.class,
+            reportBuildFileErrorAsyncFunction));
   }
 
   @ThreadSafe
@@ -1030,12 +1086,17 @@
   }
 
   @ThreadSafe
-  void getRBuildFilesParallel(
-      Collection<PathFragment> fileIdentifiers,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
+  QueryTaskFuture<Void> getRBuildFilesParallel(
+      final Collection<PathFragment> fileIdentifiers,
+      final Callback<Target> callback) {
+    return QueryTaskFutureImpl.ofDelegate(safeSubmit(new Callable<Void>() {
+      @Override
+      public Void call() throws QueryException, InterruptedException {
+        ParallelSkyQueryUtils.getRBuildFilesParallel(
+            SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore);
+        return null;
+      }
+    }));
   }
 
   /**
@@ -1043,42 +1104,51 @@
    * on the given list of BUILD files and subincludes (other files are filtered out).
    */
   @ThreadSafe
-  void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
-      throws QueryException, InterruptedException {
-    Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
-    Uniquifier<SkyKey> keyUniquifier =
-        new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
-    Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
-    Set<SkyKey> resultKeys = CompactHashSet.create();
-    while (!current.isEmpty()) {
-      Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
-      current = new HashSet<>();
-      for (SkyKey rdep : Iterables.concat(reverseDeps)) {
-        if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
-          resultKeys.add(rdep);
-          // Every package has a dep on the external package, so we need to include those edges too.
-          if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+  QueryTaskFuture<Void> getRBuildFiles(
+      Collection<PathFragment> fileIdentifiers, Callback<Target> callback) {
+    try {
+      Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
+      Uniquifier<SkyKey> keyUniquifier =
+          new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
+      Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
+      Set<SkyKey> resultKeys = CompactHashSet.create();
+      while (!current.isEmpty()) {
+        Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
+        current = new HashSet<>();
+        for (SkyKey rdep : Iterables.concat(reverseDeps)) {
+          if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
+            resultKeys.add(rdep);
+            // Every package has a dep on the external package, so we need to include those edges
+            // too.
+            if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+              if (keyUniquifier.unique(rdep)) {
+                current.add(rdep);
+              }
+            }
+          } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
+            // Packages may depend on the existence of subpackages, but these edges aren't relevant
+            // to rbuildfiles.
             if (keyUniquifier.unique(rdep)) {
               current.add(rdep);
             }
           }
-        } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
-          // Packages may depend on the existence of subpackages, but these edges aren't relevant to
-          // rbuildfiles.
-          if (keyUniquifier.unique(rdep)) {
-            current.add(rdep);
+        }
+        if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
+          for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
+            callback.process(
+                getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
           }
+          resultKeys.clear();
         }
       }
-      if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
-        for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
-          callback.process(
-              getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
-        }
-        resultKeys.clear();
-      }
+      callback.process(
+          getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
+      return immediateSuccessfulFuture(null);
+    } catch (QueryException e) {
+      return immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return immediateCancelledFuture();
     }
-    callback.process(getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
   }
 
   @Override
@@ -1148,18 +1218,26 @@
    * <p>This callback may be called from multiple threads concurrently. At most one thread will call
    * the wrapped {@code callback} concurrently.
    */
-  @ThreadSafe
-  private static class BatchStreamedCallback extends OutputFormatterCallback<Target>
-      implements ThreadSafeCallback<Target> {
+  // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching
+  // strategy probably hurts performance since we can only start formatting results once the entire
+  // query is finished.
+  private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target>
+      implements Callback<Target> {
 
-    private final OutputFormatterCallback<Target> callback;
-    private final ThreadSafeUniquifier<Target> uniquifier =
-        new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+    // TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness
+    // concern that requires the prohibition of concurrent uses of the callback; the only concern is
+    // memory. We should have a threshold for when to invoke the callback with a batch, and also a
+    // separate, larger, bound on the number of targets being processed at the same time. 
+    private final ThreadSafeOutputFormatterCallback<Target> callback;
+    private final Uniquifier<Target> uniquifier =
+        new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
 
-    private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) {
+    private BatchStreamedCallback(
+        ThreadSafeOutputFormatterCallback<Target> callback,
+        int batchThreshold) {
       this.callback = callback;
       this.batchThreshold = batchThreshold;
     }
@@ -1203,26 +1281,23 @@
 
   @ThreadSafe
   @Override
-  public void getAllRdepsUnboundedParallel(
+  public QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
       QueryExpression expression,
       VariableContext<Target> context,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
+      Callback<Target> callback) {
+    return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
         this, expression, context, callback, packageSemaphore);
   }
 
   @ThreadSafe
   @Override
-  public void getAllRdeps(
+  public QueryTaskFuture<Void> getAllRdeps(
       QueryExpression expression,
       Predicate<Target> universe,
       VariableContext<Target> context,
       Callback<Target> callback,
-      int depth)
-      throws QueryException, InterruptedException {
-    getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+      int depth) {
+    return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
   }
 
   /**
@@ -1232,16 +1307,15 @@
    * nodes are directly depended on by a large number of other nodes.
    */
   @VisibleForTesting
-  protected void getAllRdeps(
+  protected QueryTaskFuture<Void> getAllRdeps(
       QueryExpression expression,
       Predicate<Target> universe,
       VariableContext<Target> context,
       Callback<Target> callback,
       int depth,
-      int batchSize)
-      throws QueryException, InterruptedException {
+      int batchSize) {
     MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
-    eval(
+    return eval(
         expression,
         context,
         new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));