Description redacted.
--
PiperOrigin-RevId: 149585165
MOS_MIGRATED_REVID=149585165
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java b/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java
index c0fce7a..b467448 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java
@@ -21,10 +21,10 @@
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.pkgcache.PackageCacheOptions;
 import com.google.devtools.build.lib.query2.AbstractBlazeQueryEnvironment;
-import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.runtime.BlazeCommand;
 import com.google.devtools.build.lib.runtime.BlazeRuntime;
 import com.google.devtools.build.lib.runtime.Command;
@@ -109,7 +109,7 @@
 
     // 2. Evaluate expression:
     try {
-      queryEnv.evaluateQuery(expr, new OutputFormatterCallback<Target>() {
+      queryEnv.evaluateQuery(expr, new ThreadSafeOutputFormatterCallback<Target>() {
         @Override
         public void processOutput(Iterable<Target> partialResult) {
           // Throw away the result.
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
index e997b9f..88f891c 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
@@ -19,6 +19,9 @@
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.cmdline.LabelValidator.BadLabelException;
 import com.google.devtools.build.lib.cmdline.LabelValidator.PackageAndTarget;
 import com.google.devtools.build.lib.util.BatchCallback;
@@ -26,15 +29,12 @@
 import com.google.devtools.build.lib.util.StringUtilities;
 import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.PathFragment;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ForkJoinPool;
 import java.util.regex.Pattern;
-
 import javax.annotation.concurrent.Immutable;
 
 /**
@@ -157,17 +157,48 @@
       throws TargetParsingException, E, InterruptedException;
 
   /**
-   * Same as {@link #eval}, but optionally making use of the given {@link ForkJoinPool} to achieve
-   * parallelism.
+   * Evaluates this {@link TargetPattern} synchronously, feeding the result to the given
+   * {@code callback}, and then returns an appropriate immediate {@link ListenableFuture}.
+   *
+   * <p>If the returned {@link ListenableFuture}'s {@link ListenableFuture#get} throws an
+   * {@link ExecutionException}, the cause will be an instance of either
+   * {@link TargetParsingException} or the given {@code exceptionClass}.
    */
-  public <T, E extends Exception> void parEval(
+  public final <T, E extends Exception> ListenableFuture<Void> evalAdaptedForAsync(
+      TargetPatternResolver<T> resolver,
+      ImmutableSet<PathFragment> excludedSubdirectories,
+      ThreadSafeBatchCallback<T, E> callback,
+      Class<E> exceptionClass) {
+    try {
+      eval(resolver, excludedSubdirectories, callback, exceptionClass);
+      return Futures.immediateFuture(null);
+    } catch (TargetParsingException e) {
+      return Futures.immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return Futures.immediateCancelledFuture();
+    } catch (Exception e) {
+      if (exceptionClass.isInstance(e)) {
+        return Futures.immediateFailedFuture(exceptionClass.cast(e));
+      }
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Returns a {@link ListenableFuture} representing the asynchronous evaluation of this
+   * {@link TargetPattern} that feeds the results to the given {@code callback}.
+   *
+   * <p>If the returned {@link ListenableFuture}'s {@link ListenableFuture#get} throws an
+   * {@link ExecutionException}, the cause will be an instance of either
+   * {@link TargetParsingException} or the given {@code exceptionClass}.
+   */
+  public <T, E extends Exception> ListenableFuture<Void> evalAsync(
       TargetPatternResolver<T> resolver,
       ImmutableSet<PathFragment> excludedSubdirectories,
       ThreadSafeBatchCallback<T, E> callback,
       Class<E> exceptionClass,
-      ForkJoinPool forkJoinPool)
-      throws TargetParsingException, E, InterruptedException {
-    eval(resolver, excludedSubdirectories, callback, exceptionClass);
+      ListeningExecutorService executor) {
+    return evalAdaptedForAsync(resolver, excludedSubdirectories, callback, exceptionClass);
   }
 
   /**
@@ -252,8 +283,8 @@
     public <T, E extends Exception> void eval(
         TargetPatternResolver<T> resolver,
         ImmutableSet<PathFragment> excludedSubdirectories,
-        BatchCallback<T, E> callback, Class<E> exceptionClass)
-        throws TargetParsingException, E, InterruptedException {
+        BatchCallback<T, E> callback,
+        Class<E> exceptionClass) throws TargetParsingException, E, InterruptedException {
       Preconditions.checkArgument(excludedSubdirectories.isEmpty(),
           "Target pattern \"%s\" of type %s cannot be evaluated with excluded subdirectories: %s.",
           getOriginalPattern(), getType(), excludedSubdirectories);
@@ -518,14 +549,13 @@
     }
 
     @Override
-    public <T, E extends Exception> void parEval(
+    public <T, E extends Exception> ListenableFuture<Void> evalAsync(
         TargetPatternResolver<T> resolver,
         ImmutableSet<PathFragment> excludedSubdirectories,
         ThreadSafeBatchCallback<T, E> callback,
         Class<E> exceptionClass,
-        ForkJoinPool forkJoinPool)
-        throws TargetParsingException, E, InterruptedException {
-      resolver.findTargetsBeneathDirectoryPar(
+        ListeningExecutorService executor) {
+      return resolver.findTargetsBeneathDirectoryAsync(
           directory.getRepository(),
           getOriginalPattern(),
           directory.getPackageFragment().getPathString(),
@@ -533,7 +563,7 @@
           excludedSubdirectories,
           callback,
           exceptionClass,
-          forkJoinPool);
+          executor);
     }
 
     @Override
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
index b6b384c..38b866b 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
@@ -15,35 +15,38 @@
 package com.google.devtools.build.lib.cmdline;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.util.BatchCallback;
 import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import java.util.concurrent.ForkJoinPool;
 
 /**
- * A callback interface that is used during the process of converting target patterns (such as
+ * A callback that is used during the process of converting target patterns (such as
  * <code>//foo:all</code>) into one or more lists of targets (such as <code>//foo:foo,
  * //foo:bar</code>). During a call to {@link TargetPattern#eval}, the {@link TargetPattern} makes
  * calls to this interface to implement the target pattern semantics. The generic type {@code T} is
  * only for compile-time type safety; there are no requirements to the actual type.
  */
-public interface TargetPatternResolver<T> {
+public abstract class TargetPatternResolver<T> {
 
   /**
    * Reports the given warning.
    */
-  void warn(String msg);
+  public abstract void warn(String msg);
 
   /**
    * Returns a single target corresponding to the given label, or null. This method may only throw
    * an exception if the current thread was interrupted.
    */
-  T getTargetOrNull(Label label) throws InterruptedException;
+  public abstract T getTargetOrNull(Label label) throws InterruptedException;
 
   /**
    * Returns a single target corresponding to the given label, or an empty or failed result.
    */
-  ResolvedTargets<T> getExplicitTarget(Label label)
+  public abstract ResolvedTargets<T> getExplicitTarget(Label label)
       throws TargetParsingException, InterruptedException;
 
   /**
@@ -55,7 +58,7 @@
    * @param packageIdentifier the identifier of the package
    * @param rulesOnly whether to return rules only
    */
-  ResolvedTargets<T> getTargetsInPackage(String originalPattern,
+  public abstract ResolvedTargets<T> getTargetsInPackage(String originalPattern,
       PackageIdentifier packageIdentifier, boolean rulesOnly)
       throws TargetParsingException, InterruptedException;
 
@@ -84,7 +87,7 @@
    * @param exceptionClass The class type of the parameterized exception.
    * @throws TargetParsingException under implementation-specific failure conditions
    */
-  <E extends Exception> void findTargetsBeneathDirectory(
+  public abstract <E extends Exception> void findTargetsBeneathDirectory(
       RepositoryName repository,
       String originalPattern,
       String directory,
@@ -98,7 +101,7 @@
    * Same as {@link #findTargetsBeneathDirectory}, but optionally making use of the given
    * {@link ForkJoinPool} to achieve parallelism.
    */
-  <E extends Exception> void findTargetsBeneathDirectoryPar(
+  public <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsync(
       RepositoryName repository,
       String originalPattern,
       String directory,
@@ -106,19 +109,38 @@
       ImmutableSet<PathFragment> excludedSubdirectories,
       ThreadSafeBatchCallback<T, E> callback,
       Class<E> exceptionClass,
-      ForkJoinPool forkJoinPool)
-      throws TargetParsingException, E, InterruptedException;
+      ListeningExecutorService executor) {
+      try {
+        findTargetsBeneathDirectory(
+            repository,
+            originalPattern,
+            directory,
+            rulesOnly,
+            excludedSubdirectories,
+            callback,
+            exceptionClass);
+        return Futures.immediateFuture(null);
+      } catch (TargetParsingException e) {
+        return Futures.immediateFailedFuture(e);
+      } catch (InterruptedException e) {
+        return Futures.immediateCancelledFuture();
+      } catch (Exception e) {
+        if (exceptionClass.isInstance(e)) {
+          return Futures.immediateFailedFuture(e);
+        }
+        throw new IllegalStateException(e);
+      }
+  }
 
   /**
    * Returns true, if and only if the given package identifier corresponds to a package, i.e., a
-   * file with the name {@code packageName/BUILD} exists in the appropriat repository.
+   * file with the name {@code packageName/BUILD} exists in the appropriate repository.
    */
-  boolean isPackage(PackageIdentifier packageIdentifier) throws InterruptedException;
+  public abstract boolean isPackage(PackageIdentifier packageIdentifier)
+      throws InterruptedException;
 
   /**
    * Returns the target kind of the given target, for example {@code cc_library rule}.
    */
-  String getTargetKind(T target);
-
-
+  public abstract String getTargetKind(T target);
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
index 307e73c..4b1a1d5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
@@ -24,16 +24,16 @@
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.packages.DependencyFilter;
 import com.google.devtools.build.lib.packages.Target;
+import com.google.devtools.build.lib.query2.engine.AbstractQueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.KeyExtractor;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryUtil;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.util.Preconditions;
 import java.io.IOException;
@@ -48,8 +48,7 @@
  * {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much of
  * QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored.
  */
-public abstract class AbstractBlazeQueryEnvironment<T>
-    implements QueryEnvironment<T> {
+public abstract class AbstractBlazeQueryEnvironment<T> extends AbstractQueryEnvironment<T> {
   protected ErrorSensingEventHandler eventHandler;
   protected final boolean keepGoing;
   protected final boolean strictScope;
@@ -59,7 +58,6 @@
 
   protected final Set<Setting> settings;
   protected final List<QueryFunction> extraFunctions;
-  private final QueryExpressionEvalListener<T> evalListener;
 
   private static final Logger logger =
       Logger.getLogger(AbstractBlazeQueryEnvironment.class.getName());
@@ -70,8 +68,7 @@
       Predicate<Label> labelFilter,
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
-      Iterable<QueryFunction> extraFunctions,
-      QueryExpressionEvalListener<T> evalListener) {
+      Iterable<QueryFunction> extraFunctions) {
     this.eventHandler = new ErrorSensingEventHandler(eventHandler);
     this.keepGoing = keepGoing;
     this.strictScope = strictScope;
@@ -79,7 +76,6 @@
     this.labelFilter = labelFilter;
     this.settings = Sets.immutableEnumSet(settings);
     this.extraFunctions = ImmutableList.copyOf(extraFunctions);
-    this.evalListener = evalListener;
   }
 
   private static DependencyFilter constructDependencyFilter(
@@ -104,7 +100,7 @@
    */
   protected void evalTopLevelInternal(QueryExpression expr, OutputFormatterCallback<T> callback)
       throws QueryException, InterruptedException {
-    eval(expr, VariableContext.<T>empty(), callback);
+    ((QueryTaskFutureImpl<Void>) eval(expr, VariableContext.<T>empty(), callback)).getChecked();
   }
 
   /**
@@ -121,9 +117,9 @@
    */
   public QueryEvalResult evaluateQuery(
       QueryExpression expr,
-      final OutputFormatterCallback<T> callback)
+      ThreadSafeOutputFormatterCallback<T> callback)
           throws QueryException, InterruptedException, IOException {
-    EmptinessSensingCallback<T> emptySensingCallback = createEmptinessSensingCallback(callback);
+    EmptinessSensingCallback<T> emptySensingCallback = new EmptinessSensingCallback<>(callback);
     long startTime = System.currentTimeMillis();
     // In the --nokeep_going case, errors are reported in the order in which the patterns are
     // specified; using a linked hash set here makes sure that the left-most error is reported.
@@ -176,13 +172,6 @@
     return new QueryEvalResult(!eventHandler.hasErrors(), emptySensingCallback.isEmpty());
   }
 
-  private static <T> EmptinessSensingCallback<T> createEmptinessSensingCallback(
-      OutputFormatterCallback<T> callback) {
-    return (callback instanceof ThreadSafeCallback)
-        ? new ThreadSafeEmptinessSensingCallback<>(callback)
-        : new EmptinessSensingCallback<>(callback);
-  }
-
   private static class EmptinessSensingCallback<T> extends OutputFormatterCallback<T> {
     private final OutputFormatterCallback<T> callback;
     private final AtomicBoolean empty = new AtomicBoolean(true);
@@ -213,19 +202,11 @@
     }
   }
 
-  private static class ThreadSafeEmptinessSensingCallback<T>
-      extends EmptinessSensingCallback<T> implements ThreadSafeCallback<T> {
-    private ThreadSafeEmptinessSensingCallback(OutputFormatterCallback<T> callback) {
-      super(callback);
-      Preconditions.checkState(callback instanceof ThreadSafeCallback);
-    }
-  }
-
   public QueryExpression transformParsedQuery(QueryExpression queryExpression) {
     return queryExpression;
   }
 
-  public QueryEvalResult evaluateQuery(String query, OutputFormatterCallback<T> callback)
+  public QueryEvalResult evaluateQuery(String query, ThreadSafeOutputFormatterCallback<T> callback)
       throws QueryException, InterruptedException, IOException {
     return evaluateQuery(
         QueryExpression.parse(query, this), callback);
@@ -257,17 +238,32 @@
     return true;
   }
 
-  public Set<T> evalTargetPattern(QueryExpression caller, String pattern)
-      throws QueryException, InterruptedException {
+  public QueryTaskFuture<Set<T>> evalTargetPattern(QueryExpression caller, String pattern) {
     try {
       preloadOrThrow(caller, ImmutableList.of(pattern));
-    } catch (TargetParsingException e) {
-      // Will skip the target and keep going if -k is specified.
-      reportBuildFileError(caller, e.getMessage());
+    } catch (TargetParsingException tpe) {
+      try {
+        // Will skip the target and keep going if -k is specified.
+        reportBuildFileError(caller, tpe.getMessage());
+      } catch (QueryException qe) {
+        return immediateFailedFuture(qe);
+      }
+    } catch (QueryException qe) {
+      return immediateFailedFuture(qe);
+    } catch (InterruptedException e) {
+      return immediateCancelledFuture();
     }
-    AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback();
-    getTargetsMatchingPattern(caller, pattern, aggregatingCallback);
-    return aggregatingCallback.getResult();
+    final AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback();
+    QueryTaskFuture<Void> evalFuture =
+        getTargetsMatchingPattern(caller, pattern, aggregatingCallback);
+    return whenSucceedsCall(
+        evalFuture,
+        new QueryTaskCallable<Set<T>>() {
+          @Override
+          public Set<T> call() {
+            return aggregatingCallback.getResult();
+          }
+        });
   }
 
   /**
@@ -291,11 +287,6 @@
     return builder.build();
   }
 
-  @Override
-  public QueryExpressionEvalListener<T> getEvalListener() {
-    return evalListener;
-  }
-
   /** A {@link KeyExtractor} that extracts {@code Label}s out of {@link Target}s. */
   protected static class TargetKeyExtractor implements KeyExtractor<Target, Label> {
     protected static final TargetKeyExtractor INSTANCE = new TargetKeyExtractor();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
index e9a9dca..b09abae 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
@@ -38,19 +38,14 @@
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.DigraphQueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
-import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
-import com.google.devtools.build.lib.query2.engine.QueryUtil;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.MinDepthUniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
-import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.util.Preconditions;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import java.io.IOException;
@@ -63,7 +58,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * The environment of a Blaze query. Not thread-safe.
@@ -110,10 +104,8 @@
       Predicate<Label> labelFilter,
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
-      Iterable<QueryFunction> extraFunctions,
-      QueryExpressionEvalListener<Target> evalListener) {
-    super(
-        keepGoing, strictScope, labelFilter, eventHandler, settings, extraFunctions, evalListener);
+      Iterable<QueryFunction> extraFunctions) {
+    super(keepGoing, strictScope, labelFilter, eventHandler, settings, extraFunctions);
     this.targetPatternEvaluator = targetPatternEvaluator;
     this.transitivePackageLoader = transitivePackageLoader;
     this.targetProvider = targetProvider;
@@ -125,7 +117,7 @@
   @Override
   public DigraphQueryEvalResult<Target> evaluateQuery(
       QueryExpression expr,
-      final OutputFormatterCallback<Target> callback)
+      ThreadSafeOutputFormatterCallback<Target> callback)
           throws QueryException, InterruptedException, IOException {
     eventHandler.resetErrors();
     resolvedTargetPatterns.clear();
@@ -135,8 +127,19 @@
   }
 
   @Override
-  public void getTargetsMatchingPattern(
-      QueryExpression caller, String pattern, Callback<Target> callback)
+  public QueryTaskFuture<Void> getTargetsMatchingPattern(
+      QueryExpression owner, String pattern, Callback<Target> callback) {
+    try {
+      getTargetsMatchingPatternImpl(pattern, callback);
+      return immediateSuccessfulFuture(null);
+    } catch (QueryException e) {
+      return immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return immediateCancelledFuture();
+    }
+  }
+
+  private void getTargetsMatchingPatternImpl(String pattern, Callback<Target> callback)
       throws QueryException, InterruptedException {
     // We can safely ignore the boolean error flag. The evaluateQuery() method above wraps the
     // entire query computation in an error sensor.
@@ -194,15 +197,6 @@
   }
 
   @Override
-  public void getTargetsMatchingPatternPar(
-      QueryExpression caller,
-      String pattern,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    getTargetsMatchingPattern(caller, pattern, callback);
-  }
-
-  @Override
   public Target getTarget(Label label)
       throws TargetNotFoundException, QueryException, InterruptedException {
     // Can't use strictScope here because we are expecting a target back.
@@ -295,22 +289,13 @@
   }
 
   @Override
-  public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
-      throws QueryException, InterruptedException {
-    AggregateAllCallback<Target> aggregator = QueryUtil.newAggregateAllCallback();
-    expr.eval(this, context, aggregator);
-    callback.process(aggregator.getResult());
-  }
-
-  @Override
   public Uniquifier<Target> createUniquifier() {
     return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
   }
 
   @Override
   public MinDepthUniquifier<Target> createMinDepthUniquifier() {
-    return new ThreadSafeMinDepthUniquifierImpl<>(
-        TargetKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
+    return new MinDepthUniquifierImpl<>(TargetKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
   }
 
   private void preloadTransitiveClosure(Set<Target> targets, int maxDepth)
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 95486c6..0052892 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -36,10 +36,10 @@
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.skyframe.PackageValue;
 import com.google.devtools.build.lib.skyframe.SkyFunctions;
@@ -77,14 +77,13 @@
    * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate
    * when there is no depth-bound.
    */
-  static void getAllRdepsUnboundedParallel(
+  static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
       SkyQueryEnvironment env,
       QueryExpression expression,
       VariableContext<Target> context,
-      ThreadSafeCallback<Target> callback,
-      MultisetSemaphore<PackageIdentifier> packageSemaphore)
-          throws QueryException, InterruptedException {
-    env.eval(
+      Callback<Target> callback,
+      MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+    return env.eval(
         expression,
         context,
         new SkyKeyBFSVisitorCallback(
@@ -95,10 +94,10 @@
   static void getRBuildFilesParallel(
       SkyQueryEnvironment env,
       Collection<PathFragment> fileIdentifiers,
-      ThreadSafeCallback<Target> callback,
+      Callback<Target> callback,
       MultisetSemaphore<PackageIdentifier> packageSemaphore)
           throws QueryException, InterruptedException {
-    ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
+    Uniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
     RBuildFilesVisitor visitor =
         new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore);
     visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers));
@@ -110,7 +109,7 @@
 
     private RBuildFilesVisitor(
         SkyQueryEnvironment env,
-        ThreadSafeUniquifier<SkyKey> uniquifier,
+        Uniquifier<SkyKey> uniquifier,
         Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
       super(env, uniquifier, callback);
@@ -180,8 +179,8 @@
 
     private AllRdepsUnboundedVisitor(
         SkyQueryEnvironment env,
-        ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
-        ThreadSafeCallback<Target> callback,
+        Uniquifier<Pair<SkyKey, SkyKey>> uniquifier,
+        Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
       super(env, uniquifier, callback);
       this.packageSemaphore = packageSemaphore;
@@ -190,19 +189,18 @@
     /**
      * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used
      * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
-     * single {@link ThreadSafeCallback#process} call. Note that all the created
-     * instances share the same {@code ThreadSafeUniquifier<SkyKey>} so that we don't visit the
-     * same Skyframe node more than once.
+     * single {@link Callback#process} call. Note that all the created instances share the same
+     * {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
      */
     private static class Factory implements AbstractSkyKeyBFSVisitor.Factory {
       private final SkyQueryEnvironment env;
-      private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
-      private final ThreadSafeCallback<Target> callback;
+      private final Uniquifier<Pair<SkyKey, SkyKey>> uniquifier;
+      private final Callback<Target> callback;
       private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
       private Factory(
         SkyQueryEnvironment env,
-        ThreadSafeCallback<Target> callback,
+        Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
         this.env = env;
         this.uniquifier = env.createReverseDepSkyKeyUniquifier();
@@ -341,10 +339,10 @@
   }
 
   /**
-   * A {@link ThreadSafeCallback} whose {@link ThreadSafeCallback#process} method kicks off a BFS
-   * visitation via a fresh {@link AbstractSkyKeyBFSVisitor} instance.
+   * A {@link Callback} whose {@link Callback#process} method kicks off a BFS visitation via a fresh
+   * {@link AbstractSkyKeyBFSVisitor} instance.
    */
-  private static class SkyKeyBFSVisitorCallback implements ThreadSafeCallback<Target> {
+  private static class SkyKeyBFSVisitorCallback implements Callback<Target> {
     private final AbstractSkyKeyBFSVisitor.Factory visitorFactory;
 
     private SkyKeyBFSVisitorCallback(AbstractSkyKeyBFSVisitor.Factory visitorFactory) {
@@ -355,6 +353,8 @@
     public void process(Iterable<Target> partialResult)
         throws QueryException, InterruptedException {
       AbstractSkyKeyBFSVisitor<?> visitor = visitorFactory.create();
+      // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on
+      // another, potentially expensive computation. Refactor to something like "processAsync".
       visitor.visitAndWaitForCompletion(
           SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult));
     }
@@ -370,7 +370,7 @@
   @ThreadSafe
   private abstract static class AbstractSkyKeyBFSVisitor<T> {
     protected final SkyQueryEnvironment env;
-    private final ThreadSafeUniquifier<T> uniquifier;
+    private final Uniquifier<T> uniquifier;
     private final Callback<Target> callback;
 
     private final BFSVisitingTaskExecutor executor;
@@ -434,7 +434,7 @@
             new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build());
 
     private AbstractSkyKeyBFSVisitor(
-        SkyQueryEnvironment env, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
+        SkyQueryEnvironment env, Uniquifier<T> uniquifier, Callback<Target> callback) {
       this.env = env;
       this.uniquifier = uniquifier;
       this.callback = callback;
diff --git a/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java b/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java
index f8c8327..d43f71b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java
@@ -24,7 +24,6 @@
 import com.google.devtools.build.lib.pkgcache.TransitivePackageLoader;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.util.Preconditions;
 import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
 import java.util.List;
@@ -48,7 +47,6 @@
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
       Iterable<QueryFunction> functions,
-      QueryExpressionEvalListener<Target> evalListener,
       @Nullable PathPackageLocator packagePath) {
     Preconditions.checkNotNull(universeScope);
     if (canUseSkyQuery(orderedResults, universeScope, packagePath, strictScope, labelFilter)) {
@@ -58,7 +56,6 @@
           eventHandler,
           settings,
           functions,
-          evalListener,
           targetPatternEvaluator.getOffset(),
           graphFactory,
           universeScope,
@@ -66,7 +63,7 @@
     } else {
       return new BlazeQueryEnvironment(transitivePackageLoader, targetProvider,
           targetPatternEvaluator, keepGoing, strictScope, loadingPhaseThreads, labelFilter,
-          eventHandler, settings, functions, evalListener);
+          eventHandler, settings, functions);
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
index 79614c2..e2fb0e4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
@@ -22,14 +22,12 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.vfs.PathFragment;
-
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * An "rbuildfiles" query expression, which computes the set of packages (as represented by their
@@ -69,36 +67,19 @@
 
   @Override
   @SuppressWarnings("unchecked") // Cast from <Target> to <T>. This will only be used with <Target>.
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       QueryEnvironment<T> env,
       VariableContext<T> context,
       QueryExpression expression,
       List<Argument> args,
-      Callback<T> callback) throws QueryException, InterruptedException {
+      Callback<T> callback) {
     if (!(env instanceof SkyQueryEnvironment)) {
-      throw new QueryException("rbuildfiles can only be used with SkyQueryEnvironment");
+      return env.immediateFailedFuture(
+          new QueryException("rbuildfiles can only be used with SkyQueryEnvironment"));
     }
-    ((SkyQueryEnvironment) env)
-        .getRBuildFiles(
-            Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT), (Callback<Target>) callback);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    if (!(env instanceof SkyQueryEnvironment)) {
-      throw new QueryException("rbuildfiles can only be used with SkyQueryEnvironment");
-    }
-    ((SkyQueryEnvironment) env)
-        .getRBuildFilesParallel(
-            Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT),
-            (ThreadSafeCallback<Target>) callback,
-            forkJoinPool);
+    SkyQueryEnvironment skyEnv = ((SkyQueryEnvironment) env);
+    return skyEnv.getRBuildFilesParallel(
+        Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT),
+        (Callback<Target>) callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 9a5ffd4..de5175d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -29,14 +29,20 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
 import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
 import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
-import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
@@ -58,19 +64,18 @@
 import com.google.devtools.build.lib.query2.engine.KeyExtractor;
 import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.MinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.RdepsFunction;
 import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.TargetLiteral;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeMinDepthUniquifier;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -110,7 +115,9 @@
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -121,6 +128,10 @@
  * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any
  * particular order. As well, this class eagerly loads the full transitive closure of targets, even
  * if the full closure isn't needed.
+ *
+ * <p>This class has concurrent implementations of the
+ * {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the
+ * asynchronous evaluation model yields parallel query evaluation.
  */
 public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
     implements StreamableQueryEnvironment<Target> {
@@ -144,7 +155,7 @@
   protected WalkableGraph graph;
   private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
   private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider;
-  private ForkJoinPool forkJoinPool;
+  private ListeningExecutorService executor;
   private RecursivePackageProviderBackedTargetPatternResolver resolver;
   private final SkyKey universeKey;
   private final ImmutableList<TargetPatternKey> universeTargetPatternKeys;
@@ -155,7 +166,6 @@
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
       Iterable<QueryFunction> extraFunctions,
-      QueryExpressionEvalListener<Target> evalListener,
       String parserPrefix,
       WalkableGraphFactory graphFactory,
       List<String> universeScope,
@@ -169,7 +179,6 @@
         eventHandler,
         settings,
         extraFunctions,
-        evalListener,
         parserPrefix,
         graphFactory,
         universeScope,
@@ -183,7 +192,6 @@
       ExtendedEventHandler eventHandler,
       Set<Setting> settings,
       Iterable<QueryFunction> extraFunctions,
-      QueryExpressionEvalListener<Target> evalListener,
       String parserPrefix,
       WalkableGraphFactory graphFactory,
       List<String> universeScope,
@@ -194,8 +202,7 @@
         /*labelFilter=*/ Rule.ALL_LABELS,
         eventHandler,
         settings,
-        extraFunctions,
-        evalListener);
+        extraFunctions);
     this.loadingPhaseThreads = loadingPhaseThreads;
     this.graphFactory = graphFactory;
     this.pkgPath = pkgPath;
@@ -228,9 +235,15 @@
       graphBackedRecursivePackageProvider =
           new GraphBackedRecursivePackageProvider(graph, universeTargetPatternKeys, pkgPath);
     }
-    if (forkJoinPool == null) {
-      forkJoinPool =
-          NamedForkJoinPool.newNamedPool("QueryEnvironment", queryEvaluationParallelismLevel);
+    if (executor == null) {
+      executor = MoreExecutors.listeningDecorator(
+          new ThreadPoolExecutor(
+            /*corePoolSize=*/ queryEvaluationParallelismLevel,
+            /*maximumPoolSize=*/ queryEvaluationParallelismLevel,
+            /*keepAliveTime=*/ 1,
+            /*units=*/ TimeUnit.SECONDS,
+            /*workQueue=*/ new BlockingStack<Runnable>(),
+            new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build()));
     }
     resolver =
         new RecursivePackageProviderBackedTargetPatternResolver(
@@ -340,16 +353,17 @@
     } catch (Throwable throwable) {
       throwableToThrow = throwable;
     } finally {
-      if (throwableToThrow  != null) {
-        LOG.log(Level.INFO, "About to shutdown FJP because of throwable", throwableToThrow);
+      if (throwableToThrow != null) {
+        LOG.log(
+            Level.INFO,
+            "About to shutdown query threadpool because of throwable",
+            throwableToThrow);
         // Force termination of remaining tasks if evaluation failed abruptly (e.g. was
         // interrupted). We don't want to leave any dangling threads running tasks.
-        forkJoinPool.shutdownNow();
-      }
-      forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-      if (throwableToThrow  != null) {
-        // Signal that pool must be recreated on the next invocation.
-        forkJoinPool = null;
+        executor.shutdownNow();
+        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        // Signal that executor must be recreated on the next invocation.
+        executor = null;
         Throwables.propagateIfPossible(
             throwableToThrow, QueryException.class, InterruptedException.class);
       }
@@ -358,7 +372,7 @@
 
   @Override
   public QueryEvalResult evaluateQuery(
-      QueryExpression expr, OutputFormatterCallback<Target> callback)
+      QueryExpression expr, ThreadSafeOutputFormatterCallback<Target> callback)
           throws QueryException, InterruptedException, IOException {
     // Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The
     // result is set to have an error iff there were errors emitted during the query, so we reset
@@ -569,46 +583,85 @@
     return null;
   }
 
-  @ThreadSafe
-  @Override
-  public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
-      throws QueryException, InterruptedException {
-    // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
-    // all QueryEnvironment implementations.
-    if (callback instanceof ThreadSafeCallback) {
-      expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
-    } else {
-      expr.eval(this, context, callback);
+  private <R> ListenableFuture<R> safeSubmit(Callable<R> callable) {
+    try {
+      return executor.submit(callable);
+    } catch (RejectedExecutionException e) {
+      return Futures.immediateCancelledFuture();
     }
   }
 
   @ThreadSafe
   @Override
-  public ThreadSafeUniquifier<Target> createUniquifier() {
+  public QueryTaskFuture<Void> eval(
+      final QueryExpression expr,
+      final VariableContext<Target> context,
+      final Callback<Target> callback) {
+    // TODO(bazel-team): As in here, use concurrency for the async #eval of other QueryEnvironment
+    // implementations.
+    Callable<QueryTaskFutureImpl<Void>> task = new Callable<QueryTaskFutureImpl<Void>>() {
+      @Override
+      public QueryTaskFutureImpl<Void> call() {
+        return (QueryTaskFutureImpl<Void>) expr.eval(SkyQueryEnvironment.this, context, callback);
+      }
+    };
+    ListenableFuture<QueryTaskFutureImpl<Void>> futureFuture = safeSubmit(task);
+    return QueryTaskFutureImpl.ofDelegate(Futures.dereference(futureFuture));
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
+    return QueryTaskFutureImpl.ofDelegate(safeSubmit(callable));
+  }
+
+  @Override
+  public <T1, T2> QueryTaskFuture<T2> transformAsync(
+      QueryTaskFuture<T1> future,
+      final Function<T1, QueryTaskFuture<T2>> function) {
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.transformAsync(
+            (QueryTaskFutureImpl<T1>) future,
+            new AsyncFunction<T1, T2>() {
+              @Override
+              public ListenableFuture<T2> apply(T1 input) {
+                return (QueryTaskFutureImpl<T2>) function.apply(input);
+              }
+            },
+            executor));
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> whenAllSucceedCall(
+      Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.whenAllSucceed(cast(futures)).call(callable, executor));
+  }
+
+  @ThreadSafe
+  @Override
+  public Uniquifier<Target> createUniquifier() {
     return createTargetUniquifier();
   }
 
   @ThreadSafe
   @Override
-  public ThreadSafeMinDepthUniquifier<Target> createMinDepthUniquifier() {
-    return new ThreadSafeMinDepthUniquifierImpl<>(
-        TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  public MinDepthUniquifier<Target> createMinDepthUniquifier() {
+    return new MinDepthUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
-  ThreadSafeUniquifier<Target> createTargetUniquifier() {
-    return new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  Uniquifier<Target> createTargetUniquifier() {
+    return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
-  ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
-    return new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  Uniquifier<SkyKey> createSkyKeyUniquifier() {
+    return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
-  ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
-    return new ThreadSafeUniquifierImpl<>(
-        ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  Uniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
+    return new UniquifierImpl<>(ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -625,41 +678,44 @@
 
   @ThreadSafe
   @Override
-  public void getTargetsMatchingPattern(
-      QueryExpression owner, String pattern, Callback<Target> callback)
-      throws QueryException, InterruptedException {
+  public QueryTaskFuture<Void> getTargetsMatchingPattern(
+      final QueryExpression owner, String pattern, Callback<Target> callback) {
     // Directly evaluate the target pattern, making use of packages in the graph.
+    Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude;
     try {
-      Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
-          getPatternAndExcludes(pattern);
-      TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
-      ImmutableSet<PathFragment> subdirectoriesToExclude =
-          patternToEvalAndSubdirectoriesToExclude.getSecond();
-      patternToEval.eval(resolver, subdirectoriesToExclude, callback, QueryException.class);
-    } catch (TargetParsingException e) {
-      reportBuildFileError(owner, e.getMessage());
+      patternToEvalAndSubdirectoriesToExclude = getPatternAndExcludes(pattern);
+    } catch (TargetParsingException tpe) {
+      try {
+        reportBuildFileError(owner, tpe.getMessage());
+      } catch (QueryException qe) {
+        return immediateFailedFuture(qe);
+      }
+      return immediateSuccessfulFuture(null);
+    } catch (InterruptedException ie) {
+      return immediateCancelledFuture();
     }
-  }
-
-  @Override
-  public void getTargetsMatchingPatternPar(
-      QueryExpression owner,
-      String pattern,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    // Directly evaluate the target pattern, making use of packages in the graph.
-    try {
-      Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
-          getPatternAndExcludes(pattern);
-      TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
-      ImmutableSet<PathFragment> subdirectoriesToExclude =
-          patternToEvalAndSubdirectoriesToExclude.getSecond();
-      patternToEval.parEval(
-          resolver, subdirectoriesToExclude, callback, QueryException.class, forkJoinPool);
-    } catch (TargetParsingException e) {
-      reportBuildFileError(owner, e.getMessage());
-    }
+    TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
+    ImmutableSet<PathFragment> subdirectoriesToExclude =
+        patternToEvalAndSubdirectoriesToExclude.getSecond();
+    AsyncFunction<TargetParsingException, Void> reportBuildFileErrorAsyncFunction =
+        new AsyncFunction<TargetParsingException, Void>() {
+      @Override
+      public ListenableFuture<Void> apply(TargetParsingException exn) throws QueryException {
+        reportBuildFileError(owner, exn.getMessage());
+        return Futures.immediateFuture(null);
+      }
+    };
+    ListenableFuture<Void> evalFuture = patternToEval.evalAsync(
+        resolver,
+        subdirectoriesToExclude,
+        callback,
+        QueryException.class,
+        executor);
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.catchingAsync(
+            evalFuture,
+            TargetParsingException.class,
+            reportBuildFileErrorAsyncFunction));
   }
 
   @ThreadSafe
@@ -1030,12 +1086,17 @@
   }
 
   @ThreadSafe
-  void getRBuildFilesParallel(
-      Collection<PathFragment> fileIdentifiers,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
+  QueryTaskFuture<Void> getRBuildFilesParallel(
+      final Collection<PathFragment> fileIdentifiers,
+      final Callback<Target> callback) {
+    return QueryTaskFutureImpl.ofDelegate(safeSubmit(new Callable<Void>() {
+      @Override
+      public Void call() throws QueryException, InterruptedException {
+        ParallelSkyQueryUtils.getRBuildFilesParallel(
+            SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore);
+        return null;
+      }
+    }));
   }
 
   /**
@@ -1043,42 +1104,51 @@
    * on the given list of BUILD files and subincludes (other files are filtered out).
    */
   @ThreadSafe
-  void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
-      throws QueryException, InterruptedException {
-    Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
-    Uniquifier<SkyKey> keyUniquifier =
-        new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
-    Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
-    Set<SkyKey> resultKeys = CompactHashSet.create();
-    while (!current.isEmpty()) {
-      Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
-      current = new HashSet<>();
-      for (SkyKey rdep : Iterables.concat(reverseDeps)) {
-        if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
-          resultKeys.add(rdep);
-          // Every package has a dep on the external package, so we need to include those edges too.
-          if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+  QueryTaskFuture<Void> getRBuildFiles(
+      Collection<PathFragment> fileIdentifiers, Callback<Target> callback) {
+    try {
+      Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
+      Uniquifier<SkyKey> keyUniquifier =
+          new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
+      Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
+      Set<SkyKey> resultKeys = CompactHashSet.create();
+      while (!current.isEmpty()) {
+        Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
+        current = new HashSet<>();
+        for (SkyKey rdep : Iterables.concat(reverseDeps)) {
+          if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
+            resultKeys.add(rdep);
+            // Every package has a dep on the external package, so we need to include those edges
+            // too.
+            if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+              if (keyUniquifier.unique(rdep)) {
+                current.add(rdep);
+              }
+            }
+          } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
+            // Packages may depend on the existence of subpackages, but these edges aren't relevant
+            // to rbuildfiles.
             if (keyUniquifier.unique(rdep)) {
               current.add(rdep);
             }
           }
-        } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
-          // Packages may depend on the existence of subpackages, but these edges aren't relevant to
-          // rbuildfiles.
-          if (keyUniquifier.unique(rdep)) {
-            current.add(rdep);
+        }
+        if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
+          for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
+            callback.process(
+                getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
           }
+          resultKeys.clear();
         }
       }
-      if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
-        for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
-          callback.process(
-              getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
-        }
-        resultKeys.clear();
-      }
+      callback.process(
+          getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
+      return immediateSuccessfulFuture(null);
+    } catch (QueryException e) {
+      return immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return immediateCancelledFuture();
     }
-    callback.process(getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
   }
 
   @Override
@@ -1148,18 +1218,26 @@
    * <p>This callback may be called from multiple threads concurrently. At most one thread will call
    * the wrapped {@code callback} concurrently.
    */
-  @ThreadSafe
-  private static class BatchStreamedCallback extends OutputFormatterCallback<Target>
-      implements ThreadSafeCallback<Target> {
+  // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching
+  // strategy probably hurts performance since we can only start formatting results once the entire
+  // query is finished.
+  private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target>
+      implements Callback<Target> {
 
-    private final OutputFormatterCallback<Target> callback;
-    private final ThreadSafeUniquifier<Target> uniquifier =
-        new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+    // TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness
+    // concern that requires the prohibition of concurrent uses of the callback; the only concern is
+    // memory. We should have a threshold for when to invoke the callback with a batch, and also a
+    // separate, larger, bound on the number of targets being processed at the same time. 
+    private final ThreadSafeOutputFormatterCallback<Target> callback;
+    private final Uniquifier<Target> uniquifier =
+        new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
 
-    private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) {
+    private BatchStreamedCallback(
+        ThreadSafeOutputFormatterCallback<Target> callback,
+        int batchThreshold) {
       this.callback = callback;
       this.batchThreshold = batchThreshold;
     }
@@ -1203,26 +1281,23 @@
 
   @ThreadSafe
   @Override
-  public void getAllRdepsUnboundedParallel(
+  public QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
       QueryExpression expression,
       VariableContext<Target> context,
-      ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
+      Callback<Target> callback) {
+    return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
         this, expression, context, callback, packageSemaphore);
   }
 
   @ThreadSafe
   @Override
-  public void getAllRdeps(
+  public QueryTaskFuture<Void> getAllRdeps(
       QueryExpression expression,
       Predicate<Target> universe,
       VariableContext<Target> context,
       Callback<Target> callback,
-      int depth)
-      throws QueryException, InterruptedException {
-    getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+      int depth) {
+    return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
   }
 
   /**
@@ -1232,16 +1307,15 @@
    * nodes are directly depended on by a large number of other nodes.
    */
   @VisibleForTesting
-  protected void getAllRdeps(
+  protected QueryTaskFuture<Void> getAllRdeps(
       QueryExpression expression,
       Predicate<Target> universe,
       VariableContext<Target> context,
       Callback<Target> callback,
       int depth,
-      int batchSize)
-      throws QueryException, InterruptedException {
+      int batchSize) {
     MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
-    eval(
+    return eval(
         expression,
         context,
         new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java
new file mode 100644
index 0000000..62fd91b
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java
@@ -0,0 +1,194 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.util.Preconditions;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A partial implementation of {@link QueryEnvironment} that has trivial in-thread implementations
+ * of all the {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods.
+ */
+public abstract class AbstractQueryEnvironment<T> implements QueryEnvironment<T> {
+  /** Concrete implementation of {@link QueryTaskFuture}. */
+  protected static final class QueryTaskFutureImpl<T>
+      extends QueryTaskFutureImplBase<T> implements ListenableFuture<T> {
+    private final ListenableFuture<T> delegate;
+
+    private QueryTaskFutureImpl(ListenableFuture<T> delegate) {
+      this.delegate = delegate;
+    }
+
+    public static <R> QueryTaskFutureImpl<R> ofDelegate(ListenableFuture<R> delegate) {
+      return (delegate instanceof QueryTaskFutureImpl)
+          ? (QueryTaskFutureImpl<R>) delegate
+          : new QueryTaskFutureImpl<>(delegate);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return delegate.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return delegate.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+      return delegate.isDone();
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+      return delegate.get();
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return delegate.get(timeout, unit);
+    }
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+      delegate.addListener(listener, executor);
+    }
+
+    @Override
+    public T getIfSuccessful() {
+      Preconditions.checkState(delegate.isDone());
+      try {
+        return delegate.get();
+      } catch (CancellationException | InterruptedException | ExecutionException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    public T getChecked() throws InterruptedException, QueryException {
+      try {
+        return get();
+      } catch (CancellationException e) {
+        throw new InterruptedException();
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, QueryException.class);
+        Throwables.propagateIfPossible(cause, InterruptedException.class);
+        throw new IllegalStateException(e.getCause());
+      }
+    }
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value) {
+    return new QueryTaskFutureImpl<>(Futures.immediateFuture(value));
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e) {
+    return new QueryTaskFutureImpl<>(Futures.<R>immediateFailedFuture(e));
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> immediateCancelledFuture() {
+    return new QueryTaskFutureImpl<>(Futures.<R>immediateCancelledFuture());
+  }
+
+  @Override
+  public QueryTaskFuture<Void> eval(
+      QueryExpression expr, VariableContext<T> context, Callback<T> callback) {
+    return expr.eval(this, context, callback);
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
+    try {
+      return immediateSuccessfulFuture(callable.call());
+    } catch (QueryException e) {
+      return immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return immediateCancelledFuture();
+    }
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> whenSucceedsCall(
+      QueryTaskFuture<?> future, QueryTaskCallable<R> callable) {
+    return whenAllSucceedCall(ImmutableList.of(future), callable);
+  }
+
+  private static class Dummy implements QueryTaskCallable<Void> {
+    public static final Dummy INSTANCE = new Dummy();
+
+    private Dummy() {}
+
+    @Override
+    public Void call() {
+      return null;
+    }
+  }
+
+  @Override
+  public QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures) {
+    return whenAllSucceedCall(futures, Dummy.INSTANCE);
+  }
+
+  @Override
+  public <R> QueryTaskFuture<R> whenAllSucceedCall(
+      Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.whenAllSucceed(cast(futures)).call(callable));
+  }
+
+  @Override
+  public <T1, T2> QueryTaskFuture<T2> transformAsync(
+      QueryTaskFuture<T1> future,
+      final Function<T1, QueryTaskFuture<T2>> function) {
+    return QueryTaskFutureImpl.ofDelegate(
+        Futures.transformAsync(
+            (QueryTaskFutureImpl<T1>) future,
+            new AsyncFunction<T1, T2>() {
+              @Override
+              public ListenableFuture<T2> apply(T1 input) throws Exception {
+                return (QueryTaskFutureImpl<T2>) function.apply(input);
+              }
+            }));
+  }
+
+  protected static Iterable<QueryTaskFutureImpl<?>> cast(
+      Iterable<? extends QueryTaskFuture<?>> futures) {
+    return Iterables.transform(
+        futures,
+        new Function<QueryTaskFuture<?>, QueryTaskFutureImpl<?>>() {
+          @Override
+          public QueryTaskFutureImpl<?> apply(QueryTaskFuture<?> future) {
+            return (QueryTaskFutureImpl<?>) future;
+          }
+        });
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
index adc12d2..81be4c8 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
@@ -21,12 +21,12 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * Implementation of the <code>allpaths()</code> function.
@@ -51,46 +51,47 @@
   }
 
   @Override
-  public <T> void eval(
-      QueryEnvironment<T> env,
+  public <T> QueryTaskFuture<Void> eval(
+      final QueryEnvironment<T> env,
       VariableContext<T> context,
-      QueryExpression expression,
+      final QueryExpression expression,
       List<Argument> args,
-      Callback<T> callback) throws QueryException, InterruptedException {
+      final Callback<T> callback) {
+    final QueryTaskFuture<Set<T>> fromValueFuture =
+        QueryUtil.evalAll(env, context, args.get(0).getExpression());
+    final QueryTaskFuture<Set<T>> toValueFuture =
+        QueryUtil.evalAll(env, context, args.get(1).getExpression());
 
-    Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
-    Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression());
+    return env.whenAllSucceedCall(
+        ImmutableList.of(fromValueFuture, toValueFuture),
+        new QueryTaskCallable<Void>() {
+          @Override
+          public Void call() throws QueryException, InterruptedException {
+            // Algorithm: compute "reachableFromX", the forward transitive closure of
+            // the "from" set, then find the intersection of "reachableFromX" with the
+            // reverse transitive closure of the "to" set.  The reverse transitive
+            // closure and intersection operations are interleaved for efficiency.
+            // "result" holds the intersection.
 
-    // Algorithm: compute "reachableFromX", the forward transitive closure of
-    // the "from" set, then find the intersection of "reachableFromX" with the
-    // reverse transitive closure of the "to" set.  The reverse transitive
-    // closure and intersection operations are interleaved for efficiency.
-    // "result" holds the intersection.
+            Set<T> fromValue = fromValueFuture.getIfSuccessful();
+            Set<T> toValue = toValueFuture.getIfSuccessful();
 
-    env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
+            env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
 
-    Set<T> reachableFromX = env.getTransitiveClosure(fromValue);
-    Predicate<T> reachable = Predicates.in(reachableFromX);
-    Uniquifier<T> uniquifier = env.createUniquifier();
-    Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue));
-    callback.process(result);
-    Collection<T> worklist = result;
-    while (!worklist.isEmpty()) {
-      Collection<T> reverseDeps = env.getReverseDeps(worklist);
-      worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable));
-      callback.process(worklist);
-    }
-  }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
+            Set<T> reachableFromX = env.getTransitiveClosure(fromValue);
+            Predicate<T> reachable = Predicates.in(reachableFromX);
+            Uniquifier<T> uniquifier = env.createUniquifier();
+            Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue));
+            callback.process(result);
+            Collection<T> worklist = result;
+            while (!worklist.isEmpty()) {
+              Collection<T> reverseDeps = env.getReverseDeps(worklist);
+              worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable));
+              callback.process(worklist);
+            }
+            return null;
+          }
+        });
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index ca3db15..d7123a4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
@@ -20,10 +21,9 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * An "allrdeps" query expression, which computes the reverse dependencies of the argument within
@@ -52,30 +52,34 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       QueryEnvironment<T> env,
       VariableContext<T> context,
       QueryExpression expression,
       List<Argument> args,
-      Callback<T> callback) throws QueryException, InterruptedException {
-    eval(env, context, args, callback, Predicates.<T>alwaysTrue());
+      Callback<T> callback) {
+    return eval(env, context, args, callback, Optional.<Predicate<T>>absent());
   }
 
-  protected <T> void eval(
+  protected <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       final List<Argument> args,
       final Callback<T> callback,
-      final Predicate<T> universe)
-      throws QueryException, InterruptedException {
-
+      Optional<Predicate<T>> universeMaybe) {
     final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
+    final Predicate<T> universe = universeMaybe.isPresent()
+        ? universeMaybe.get()
+        : Predicates.<T>alwaysTrue();
     if (env instanceof StreamableQueryEnvironment<?>) {
-      ((StreamableQueryEnvironment<T>) env)
-          .getAllRdeps(args.get(0).getExpression(), universe, context, callback, depth);
+      StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env);
+      return depth == Integer.MAX_VALUE && !universeMaybe.isPresent()
+        ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback)
+        : streamableEnv.getAllRdeps(
+            args.get(0).getExpression(), universe, context, callback, depth);
     } else {
       final MinDepthUniquifier<T> minDepthUniquifier = env.createMinDepthUniquifier();
-      env.eval(
+      return env.eval(
           args.get(0).getExpression(),
           context,
           new Callback<T>() {
@@ -104,21 +108,4 @@
           });
     }
   }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    boolean unbounded = args.size() == 1;
-    if (unbounded && env instanceof StreamableQueryEnvironment<?>) {
-      ((StreamableQueryEnvironment<T>) env).getAllRdepsUnboundedParallel(
-          args.get(0).getExpression(), context, callback, forkJoinPool);
-    } else {
-      eval(env, context, expression, args, callback);
-    }
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index 89374d0..f9d20db 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -13,16 +13,16 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
-import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.util.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A binary algebraic set operation.
@@ -56,40 +56,84 @@
   }
 
   @Override
-  protected <T> void evalImpl(
-      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
-          throws QueryException, InterruptedException {
-
-    if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
-      for (QueryExpression operand : operands) {
-        env.eval(operand, context, callback);
-      }
-      return;
+  public <T> QueryTaskFuture<Void> eval(
+      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+    switch (operator) {
+      case PLUS:
+      case UNION:
+        return evalPlus(operands, env, context, callback);
+      case MINUS:
+      case EXCEPT:
+        return evalMinus(operands, env, context, callback);
+      case INTERSECT:
+      case CARET:
+        return evalIntersect(env, context, callback);
+      default:
+        throw new IllegalStateException(operator.toString());
     }
+  }
 
-    // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side
-    // for minus operations. Note that this is suboptimal if the left-hand side results are very
-    // large compared to the right-hand side. Which is the case is hard to know before evaluating.
-    // We could consider determining this dynamically, however, by evaluating both the left and
-    // right hand side partially until one side finishes sooner.
-    final Set<T> lhsValue = QueryUtil.evalAll(env, context, operands.get(0));
-    if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
-      for (int i = 1; i < operands.size(); i++) {
-        env.eval(operands.get(i), context,
-            new Callback<T>() {
+  /**
+   * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
+   * separately.
+   *
+   * <p>N.B. {@code operands.size()} may be {@code 1}.
+   */
+  private static <T> QueryTaskFuture<Void> evalPlus(
+      ImmutableList<QueryExpression> operands,
+      QueryEnvironment<T> env,
+      VariableContext<T> context,
+      Callback<T> callback) {
+    ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(operands.size());
+    for (QueryExpression operand : operands) {
+      queryTasks.add(env.eval(operand, context, callback));
+    }
+    return env.whenAllSucceed(queryTasks);
+  }
+
+  /**
+   * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
+   * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately.
+   */
+  private static <T> QueryTaskFuture<Void> evalMinus(
+      final ImmutableList<QueryExpression> operands,
+      final QueryEnvironment<T> env,
+      final VariableContext<T> context,
+      final Callback<T> callback) {
+    QueryTaskFuture<Set<T>> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0));
+    Function<Set<T>, QueryTaskFuture<Void>> substractAsyncFunction =
+        new Function<Set<T>, QueryTaskFuture<Void>>() {
+      @Override
+      public QueryTaskFuture<Void> apply(Set<T> lhsValue) {
+        final Set<T> threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue);
+        Callback<T> subtractionCallback = new Callback<T>() {
+          @Override
+          public void process(Iterable<T> partialResult) {
+            for (T target : partialResult) {
+              threadSafeLhsValue.remove(target);
+            }
+          }
+        };
+        QueryTaskFuture<Void> rhsEvaluatedFuture = evalPlus(
+            operands.subList(1, operands.size()), env, context, subtractionCallback);
+        return env.whenSucceedsCall(
+            rhsEvaluatedFuture,
+            new QueryTaskCallable<Void>() {
               @Override
-              public void process(Iterable<T> partialResult)
-                  throws QueryException, InterruptedException {
-                for (T target : partialResult) {
-                  lhsValue.remove(target);
-                }
+              public Void call() throws QueryException, InterruptedException {
+                callback.process(threadSafeLhsValue);
+                return null;
               }
             });
       }
-      callback.process(lhsValue);
-      return;
-    }
+    };
+    return env.transformAsync(lhsValueFuture, substractAsyncFunction);
+  }
 
+  private <T> QueryTaskFuture<Void> evalIntersect(
+      final QueryEnvironment<T> env,
+      final VariableContext<T> context,
+      final Callback<T> callback) {
     // For each right-hand side operand, intersection cannot be performed in a streaming manner; the
     // entire result of that operand is needed. So, in order to avoid pinning too much in memory at
     // once, we process each right-hand side operand one at a time and throw away that operand's
@@ -97,77 +141,39 @@
     // TODO(bazel-team): Consider keeping just the name / label of the right-hand side results
     // instead of the potentially heavy-weight instances of type T. This would let us process all
     // right-hand side operands in parallel without worrying about memory usage.
-    Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET,
-        operator);
+    QueryTaskFuture<Set<T>> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0));
     for (int i = 1; i < operands.size(); i++) {
-      lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i)));
+      final int index = i;
+      Function<Set<T>, QueryTaskFuture<Set<T>>> evalOperandAndIntersectAsyncFunction =
+          new Function<Set<T>, QueryTaskFuture<Set<T>>>() {
+            @Override
+            public QueryTaskFuture<Set<T>> apply(final Set<T> rollingResult) {
+              final QueryTaskFuture<Set<T>> rhsOperandValueFuture =
+                  QueryUtil.evalAll(env, context, operands.get(index));
+              return env.whenSucceedsCall(
+                  rhsOperandValueFuture,
+                  new QueryTaskCallable<Set<T>>() {
+                    @Override
+                    public Set<T> call() throws QueryException, InterruptedException {
+                      rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful());
+                      return rollingResult;
+                    }
+                  });
+            }
+      };
+      rollingResultFuture =
+          env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction);
     }
-    callback.process(lhsValue);
-  }
-
-  @Override
-  protected <T> void parEvalImpl(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
-      parEvalPlus(operands, env, context, callback, forkJoinPool);
-    } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
-      parEvalMinus(operands, env, context, callback, forkJoinPool);
-    } else {
-      evalImpl(env, context, callback);
-    }
-  }
-
-  /**
-   * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
-   * in parallel.
-   */
-  private static <T> void parEvalPlus(
-      ImmutableList<QueryExpression> operands,
-      final QueryEnvironment<T> env,
-      final VariableContext<T> context,
-      final ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-          throws QueryException, InterruptedException {
-    ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size());
-    for (final QueryExpression operand : operands) {
-      queryTasks.add(new QueryTask() {
-        @Override
-        public void execute() throws QueryException, InterruptedException {
-          env.eval(operand, context, callback);
-        }
-      });
-    }
-    ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool);
-  }
-
-  /**
-   * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
-   * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel.
-   */
-  private static <T> void parEvalMinus(
-      ImmutableList<QueryExpression> operands,
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-          throws QueryException, InterruptedException {
-    final Set<T> lhsValue =
-        Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0)));
-    ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() {
-      @Override
-      public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
-        for (T target : partialResult) {
-          lhsValue.remove(target);
-        }
-      }
-    };
-    parEvalPlus(
-        operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool);
-    callback.process(lhsValue);
+    final QueryTaskFuture<Set<T>> resultFuture = rollingResultFuture;
+    return env.whenSucceedsCall(
+        resultFuture,
+        new QueryTaskCallable<Void>() {
+          @Override
+          public Void call() throws QueryException, InterruptedException {
+            callback.process(resultFuture.getIfSuccessful());
+            return null;
+          }
+        });
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
index d2a2eb0..cbc0ae8 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
@@ -19,10 +19,9 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A buildfiles(x) query expression, which computes the set of BUILD files and
@@ -42,18 +41,17 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       final QueryExpression expression,
       List<Argument> args,
-      final Callback<T> callback)
-      throws QueryException, InterruptedException {
+      final Callback<T> callback) {
     final Uniquifier<T> uniquifier = env.createUniquifier();
-    env.eval(
+    return env.eval(
         args.get(0).getExpression(),
         context,
-        new ThreadSafeCallback<T>() {
+        new Callback<T>() {
           @Override
           public void process(Iterable<T> partialResult)
               throws QueryException, InterruptedException {
@@ -67,18 +65,6 @@
   }
 
   @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    // 'eval' is written in such a way that it enables parallel evaluation of 'expression'.
-    eval(env, context, expression, args, callback);
-  }
-
-  @Override
   public int getMandatoryArguments() {
     return 1;
   }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
index 0f43211..51c51fa 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
@@ -13,14 +13,17 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.util.BatchCallback;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 
 /**
  * Query callback to be called by a {@link QueryExpression} when it has part of the computation
  * result. Assuming the {@code QueryEnvironment} supports it, it would allow the caller
  * to stream the results.
  */
-public interface Callback<T> extends BatchCallback<T, QueryException> {
+@ThreadSafe
+public interface Callback<T> extends ThreadSafeBatchCallback<T, QueryException> {
 
   /**
    * According to the {@link BatchCallback} interface, repeated elements may be passed in here.
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
index 0e618fb..8b1fc37 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
@@ -18,10 +18,10 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A "deps" query expression, which computes the dependencies of the argument. An optional
@@ -53,15 +53,15 @@
    * Breadth-first search from the arguments.
    */
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       final QueryExpression expression,
       List<Argument> args,
-      final Callback<T> callback) throws QueryException, InterruptedException {
+      final Callback<T> callback) {
     final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
     final MinDepthUniquifier<T> minDepthUniquifier = env.createMinDepthUniquifier();
-    env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+    return env.eval(args.get(0).getExpression(), context, new Callback<T>() {
       @Override
       public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
         Collection<T> current = Sets.newHashSet(partialResult);
@@ -84,15 +84,4 @@
       }
     });
   }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
index a31196ab..85cfe9f 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
@@ -20,10 +20,9 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A query expression for user-defined query functions.
@@ -46,19 +45,9 @@
   }
 
   @Override
-  protected <T> void evalImpl(
-      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
-          throws QueryException, InterruptedException {
-    function.eval(env, context, this, args, callback);
-  }
-
-  @Override
-  protected <T> void parEvalImpl(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    function.parEval(env, context, this, args, callback, forkJoinPool);
+  public <T> QueryTaskFuture<Void> eval(
+      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+    return function.eval(env, context, this, args, callback);
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
index 4fa428a..1d68573 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
@@ -17,9 +17,9 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A label(attr_name, argument) expression, which computes the set of targets
@@ -52,16 +52,15 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       final QueryExpression expression,
       final List<Argument> args,
-      final Callback<T> callback)
-      throws QueryException, InterruptedException {
+      final Callback<T> callback) {
     final String attrName = args.get(0).getWord();
     final Uniquifier<T> uniquifier = env.createUniquifier();
-    env.eval(args.get(1).getExpression(), context, new Callback<T>() {
+    return env.eval(args.get(1).getExpression(), context, new Callback<T>() {
       @Override
       public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
         for (T input : partialResult) {
@@ -80,15 +79,4 @@
       }
     });
   }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
index 64d94da..a7c3abe 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
@@ -13,6 +13,8 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.common.base.Function;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.Collection;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -64,15 +66,24 @@
   }
 
   @Override
-  protected <T> void evalImpl(
-      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
-          throws QueryException, InterruptedException {
+  public <T> QueryTaskFuture<Void> eval(
+      final QueryEnvironment<T> env,
+      final VariableContext<T> context,
+      final Callback<T> callback) {
     if (!NAME_PATTERN.matcher(varName).matches()) {
-      throw new QueryException(this, "invalid variable name '" + varName + "' in let expression");
+      return env.immediateFailedFuture(
+          new QueryException(this, "invalid variable name '" + varName + "' in let expression"));
     }
-    Set<T> varValue = QueryUtil.evalAll(env, context, varExpr);
-    VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue);
-    env.eval(bodyExpr, bodyContext, callback);
+    QueryTaskFuture<Set<T>> varValueFuture = QueryUtil.evalAll(env, context, varExpr);
+    Function<Set<T>, QueryTaskFuture<Void>> evalBodyAsyncFunction =
+        new Function<Set<T>, QueryTaskFuture<Void>>() {
+          @Override
+          public QueryTaskFuture<Void> apply(Set<T> varValue) {
+            VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue);
+            return env.eval(bodyExpr, bodyContext, callback);
+          }
+    };
+    return env.transformAsync(varValueFuture, evalBodyAsyncFunction);
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
index 311a6af..80b912f 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
@@ -16,10 +16,9 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A loadfiles(x) query expression, which computes the set of .bzl files
@@ -38,15 +37,14 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       final QueryExpression expression,
       List<QueryEnvironment.Argument> args,
-      final Callback<T> callback)
-      throws QueryException, InterruptedException {
+      final Callback<T> callback) {
     final Uniquifier<T> uniquifier = env.createUniquifier();
-    env.eval(
+    return env.eval(
         args.get(0).getExpression(),
         context,
         new Callback<T>() {
@@ -67,17 +65,6 @@
   }
 
   @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
-  }
-
-  @Override
   public int getMandatoryArguments() {
     return 1;
   }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
index 8617004..c62bea5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
@@ -14,16 +14,25 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 
 /**
  * A helper for deduping values that have already been seen at certain "depths".
  *
  * <p>This is similar to {@link Uniquifier}.
  */
+@ThreadSafe
 public interface MinDepthUniquifier<T> {
   /**
    * Returns the subset of {@code newElements} that haven't been seen before at depths less than or
    * equal to {@code depth}
+   *
+   * <p> There's a natural benign check-then-act race in all concurrent uses of this interface.
+   * Imagine we have an element e, two depths d1 and d2 (with d2 < d1), and two threads T1 and T2.
+   * T1 may think it's about to be the first one to process e at a depth less than or equal to d1.
+   * But before T1 finishes processing e, T2 may think _it's_ about to be first one to process an
+   * element at a depth less than or equal to than d2. T1's work is probably wasted.
    */
   ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(Iterable<T> newElements, int depth);
 }
+
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
index 5d21c87..50708d6 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
@@ -46,7 +46,7 @@
    * disambiguate between real interruptions or IO Exceptions.
    */
   @Override
-  public final void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
+  public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
     try {
       processOutput(partialResult);
     } catch (IOException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
deleted file mode 100644
index 6e22709..0000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
+++ /dev/null
@@ -1,188 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.common.collect.Iterables;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.Future;
-
-/** Several utilities to aid in writing {@link QueryExpression#parEvalImpl} implementations. */
-public class ParallelQueryUtils {
-  /**
-   * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See
-   * {@link #executeQueryTasksAndWaitInterruptiblyFailFast}.
-   */
-  @ThreadSafe
-  public interface QueryTask {
-    void execute() throws QueryException, InterruptedException;
-  }
-
-  /**
-   * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly
-   * waits for their completion. Throws the first {@link QueryException} encountered during parallel
-   * execution or an {@link InterruptedException} if the calling thread is interrupted.
-   *
-   * <p>These "fail-fast" semantics are desirable to avoid doing unneeded work when evaluating
-   * multiple {@link QueryTask}s in parallel: if serial execution of the tasks would result in a
-   * {@link QueryException} then we want parallel execution to do so as well, but there's no need to
-   * continue waiting for completion of the tasks after at least one of them results in a
-   * {@link QueryException}.
-   */
-  public static void executeQueryTasksAndWaitInterruptiblyFailFast(
-      List<QueryTask> queryTasks,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    int numTasks = queryTasks.size();
-    if (numTasks == 1) {
-      Iterables.getOnlyElement(queryTasks).execute();
-      return;
-    }
-    FailFastCountDownLatch failFastLatch = new FailFastCountDownLatch(numTasks);
-    ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(numTasks);
-    for (QueryTask queryTask : queryTasks) {
-      QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask, failFastLatch);
-      forkJoinTasks.add(forkJoinTask);
-      @SuppressWarnings("unused") 
-      Future<?> possiblyIgnoredError = forkJoinPool.submit(forkJoinTask);
-    }
-    failFastLatch.await();
-    try {
-      MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks);
-    } catch (ExecutionException e) {
-      throw rethrowCause(e);
-    }
-  }
-
-  private static QueryTaskForkJoinTask adaptAsForkJoinTask(
-      QueryTask queryTask,
-      FailFastCountDownLatch failFastLatch) {
-    return new QueryTaskForkJoinTask(queryTask, failFastLatch);
-  }
-
-  private static RuntimeException rethrowCause(ExecutionException e)
-      throws QueryException, InterruptedException {
-    Throwable cause = e.getCause();
-    if (cause instanceof ParallelRuntimeException) {
-      ((ParallelRuntimeException) cause).rethrow();
-    }
-    throw new IllegalStateException(e);
-  }
-
-  /**
-   * Wrapper around a {@link CountDownLatch} with initial count {@code n} that counts down once on
-   * "success" and {@code n} times on "failure".
-   *
-   * <p>This can be used in a concurrent context to wait until either {@code n} tasks are successful
-   * or at least one of them fails.
-   */
-  @ThreadSafe
-  private static class FailFastCountDownLatch {
-    private final int n;
-    private final CountDownLatch completionLatch;
-
-    private FailFastCountDownLatch(int n) {
-      this.n = n;
-      this.completionLatch = new CountDownLatch(n);
-    }
-
-    private void await() throws InterruptedException {
-      completionLatch.await();
-    }
-
-    private void countDown(boolean success) {
-      if (success) {
-        completionLatch.countDown();
-      } else {
-        for (int i = 0; i < n; i++) {
-          completionLatch.countDown();
-        }
-      }
-    }
-  }
-
-  // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid
-  // having to think about that messiness (which is inconsistent with other Future implementations)
-  // by having our own ForkJoinTask subclass and managing checked exceptions ourselves.
-  @ThreadSafe
-  private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> {
-    private final QueryTask queryTask;
-    private final FailFastCountDownLatch completionLatch;
-
-    private QueryTaskForkJoinTask(QueryTask queryTask, FailFastCountDownLatch completionLatch) {
-      this.queryTask = queryTask;
-      this.completionLatch = completionLatch;
-    }
-
-    @Override
-    public Void getRawResult() {
-      return null;
-    }
-
-    @Override
-    protected void setRawResult(Void value) {
-    }
-
-    @Override
-    protected boolean exec() {
-      boolean successful = false;
-      try {
-        queryTask.execute();
-        successful = true;
-        return true;
-      } catch (QueryException queryException) {
-        throw new ParallelRuntimeQueryException(queryException);
-      } catch (InterruptedException interruptedException) {
-        throw new ParallelInterruptedQueryException(interruptedException);
-      } finally {
-        completionLatch.countDown(successful);
-      }
-    }
-  }
-
-  private abstract static class ParallelRuntimeException extends RuntimeException {
-    abstract void rethrow() throws QueryException, InterruptedException;
-  }
-
-  private static class ParallelRuntimeQueryException extends ParallelRuntimeException {
-    private final QueryException queryException;
-
-    private ParallelRuntimeQueryException(QueryException queryException) {
-      this.queryException = queryException;
-    }
-
-    @Override
-    void rethrow() throws QueryException, InterruptedException {
-      throw queryException;
-    }
-  }
-
-  private static class ParallelInterruptedQueryException extends ParallelRuntimeException {
-    private final InterruptedException interruptedException;
-
-    private ParallelInterruptedQueryException(InterruptedException interruptedException) {
-      this.interruptedException = interruptedException;
-    }
-
-    @Override
-    void rethrow() throws QueryException, InterruptedException {
-      throw interruptedException;
-    }
-  }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
index ce0ae83..8281e5b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
@@ -13,11 +13,13 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
 import javax.annotation.Nonnull;
 
 /**
@@ -91,16 +93,14 @@
 
   /** A user-defined query function. */
   interface QueryFunction {
-    /**
-     * Name of the function as it appears in the query language.
-     */
+    /** Name of the function as it appears in the query language. */
     String getName();
 
     /**
      * The number of arguments that are required. The rest is optional.
      *
-     * <p>This should be greater than or equal to zero and at smaller than or equal to the length
-     * of the list returned by {@link #getArgumentTypes}.
+     * <p>This should be greater than or equal to zero and at smaller than or equal to the length of
+     * the list returned by {@link #getArgumentTypes}.
      */
     int getMandatoryArguments();
 
@@ -108,34 +108,21 @@
     Iterable<ArgumentType> getArgumentTypes();
 
     /**
-     * Called when a user-defined function is to be evaluated.
+     * Returns a {@link QueryTaskFuture} representing the asynchronous application of this
+     * {@link QueryFunction} to the given {@code args}, feeding the results to the given
+     * {@code callback}.
      *
      * @param env the query environment this function is evaluated in.
      * @param expression the expression being evaluated.
-     * @param args the input arguments. These are type-checked against the specification returned
-     *     by {@link #getArgumentTypes} and {@link #getMandatoryArguments}
+     * @param args the input arguments. These are type-checked against the specification returned by
+     *     {@link #getArgumentTypes} and {@link #getMandatoryArguments}
      */
-    <T> void eval(
+    <T> QueryTaskFuture<Void> eval(
         QueryEnvironment<T> env,
         VariableContext<T> context,
         QueryExpression expression,
         List<Argument> args,
-        Callback<T> callback) throws QueryException, InterruptedException;
-
-    /**
-     * Same as {@link #eval(QueryEnvironment, VariableContext, QueryExpression, List, Callback)},
-     * except that this {@link QueryFunction} may use {@code forkJoinPool} to achieve
-     * parallelism.
-     *
-     * <p>The caller must ensure that {@code env} is thread safe.
-     */
-    <T> void parEval(
-        QueryEnvironment<T> env,
-        VariableContext<T> context,
-        QueryExpression expression,
-        List<Argument> args,
-        ThreadSafeCallback<T> callback,
-        ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
+        Callback<T> callback);
   }
 
   /**
@@ -156,18 +143,8 @@
    * Invokes {@code callback} with the set of target nodes in the graph for the specified target
    * pattern, in 'blaze build' syntax.
    */
-  void getTargetsMatchingPattern(QueryExpression owner, String pattern, Callback<T> callback)
-      throws QueryException, InterruptedException;
-
-  /**
-   * Same as {@link #getTargetsMatchingPattern}, but optionally making use of the given
-   * {@link ForkJoinPool} to achieve parallelism.
-   */
-  void getTargetsMatchingPatternPar(
-      QueryExpression owner,
-      String pattern,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
+  QueryTaskFuture<Void> getTargetsMatchingPattern(
+      QueryExpression owner, String pattern, Callback<T> callback);
 
   /** Ensures the specified target exists. */
   // NOTE(bazel-team): this method is left here as scaffolding from a previous refactoring. It may
@@ -203,14 +180,159 @@
   Set<T> getNodesOnPath(T from, T to) throws InterruptedException;
 
   /**
-   * Eval an expression {@code expr} and pass the results to the {@code callback}.
+   * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of the given
+   * {@code expr} and passing of the results to the given {@code callback}.
    *
    * <p>Note that this method should guarantee that the callback does not see repeated elements.
+   *
    * @param expr The expression to evaluate
    * @param callback The caller callback to notify when results are available
    */
-  void eval(QueryExpression expr, VariableContext<T> context, Callback<T> callback)
-      throws QueryException, InterruptedException;
+  QueryTaskFuture<Void> eval(
+      QueryExpression expr, VariableContext<T> context, Callback<T> callback);
+
+  /**
+   * An asynchronous computation of part of a query evaluation.
+   *
+   * <p>A {@link QueryTaskFuture} can only be produced from scratch via {@link #eval},
+   * {@link #executeAsync}, {@link #immediateSuccessfulFuture}, {@link #immediateFailedFuture}, and
+   * {@link #immediateCancelledFuture}.
+   *
+   * <p>Combined with the helper methods like {@link #whenSucceedsCall} below, this is very similar
+   * to Guava's {@link ListenableFuture}.
+   *
+   * <p>This class is deliberately opaque; the only ways to compose/use {@link #QueryTaskFuture}
+   * instances are the helper methods like {@link #whenSucceedsCall} below. A crucial consequence of
+   * this is there is no way for a {@link QueryExpression} or {@link QueryFunction} implementation
+   * to block on the result of a {@link #QueryTaskFuture}. This eliminates a large class of
+   * deadlocks by design!
+   */
+  @ThreadSafe
+  public abstract class QueryTaskFuture<T> {
+    // We use a public abstract class with a private constructor so that this type is visible to all
+    // the query codebase, but yet the only possible implementation is under our control in this
+    // file.
+    private QueryTaskFuture() {}
+
+    /**
+     * If this {@link QueryTasksFuture}'s encapsulated computation is currently complete and
+     * successful, returns the result. This method is intended to be used in combination with
+     * {@link #whenSucceedsCall}.
+     *
+     * <p>See the javadoc for the various helper methods that produce {@link QueryTasksFuture} for
+     * the precise definition of "successful".
+     */
+    public abstract T getIfSuccessful();
+  }
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing the successful computation of {@code value}.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful}.
+   */
+  abstract <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value);
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing a computation that was unsuccessful because of
+   * {@code e}.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful}.
+   */
+  abstract <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e);
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing a cancelled computation.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful}.
+   */
+  abstract <R> QueryTaskFuture<R> immediateCancelledFuture();
+
+  /** A {@link ThreadSafe} {@link Callable} for computations during query evaluation. */
+  @ThreadSafe
+  public interface QueryTaskCallable<T> extends Callable<T> {
+    /**
+     * Returns the computed value or throws a {@link QueryException} on failure or a
+     * {@link InterruptedException} on interruption.
+     */
+    @Override
+    T call() throws QueryException, InterruptedException;
+  }
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
+   * performed asynchronously.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful} iff {@code callable#call} does not throw an exception.
+   */
+  <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable);
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
+   * performed after the successful completion of the computation encapsulated by the given
+   * {@code future} has completed successfully.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful} iff {@code future} is successful and
+   * {@code callable#call} does not throw an exception.
+   */
+  <R> QueryTaskFuture<R> whenSucceedsCall(QueryTaskFuture<?> future, QueryTaskCallable<R> callable);
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing the successful completion of all the
+   * computations encapsulated by the given {@code futures}.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful".
+   */
+  QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures);
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
+   * performed after the successful completion of all the computations encapsulated by the given
+   * {@code futures}.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful" and
+   * {@code callable#call} does not throw an exception.
+   */
+  <R> QueryTaskFuture<R> whenAllSucceedCall(
+      Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable);
+
+  /**
+   * Returns a {@link QueryTaskFuture} representing the asynchronous application of the given
+   * {@code function} to the value produced by the computation encapsulated by the given
+   * {@code future}.
+   *
+   * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+   * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+   * {@link QueryTaskFuture#getIfSuccessful} iff {@code} future is "successful".
+   */
+  <T1, T2> QueryTaskFuture<T2> transformAsync(
+      QueryTaskFuture<T1> future, Function<T1, QueryTaskFuture<T2>> function);
+
+  /**
+   * The sole package-protected subclass of {@link QueryTaskFuture}.
+   *
+   * <p>Do not subclass this class; it's an implementation detail. {@link QueryExpression} and
+   * {@link QueryFunction} implementations should use {@link #eval} and {@link #executeAsync} to get
+   * access to {@link QueryTaskFuture} instances and the then use the helper methods like
+   * {@link #whenSucceedsCall} to transform them.
+   */
+  abstract class QueryTaskFutureImplBase<T> extends QueryTaskFuture<T> {
+    protected QueryTaskFutureImplBase() {
+    }
+  }
 
   /**
    * Creates a Uniquifier for use in a {@code QueryExpression}. Note that the usage of this
@@ -380,9 +502,6 @@
     Set<QueryVisibility<T>> getVisibility(T from) throws QueryException, InterruptedException;
   }
 
-  /** Returns the {@link QueryExpressionEvalListener} that this {@link QueryEnvironment} uses. */
-  QueryExpressionEvalListener<T> getEvalListener();
-
   /** List of the default query functions. */
   ImmutableList<QueryFunction> DEFAULT_QUERY_FUNCTIONS =
       ImmutableList.of(
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
index e35e9e4..920722d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
@@ -14,9 +14,8 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.Collection;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * Base class for expressions in the Blaze query language, revision 2.
@@ -59,9 +58,9 @@
   protected QueryExpression() {}
 
   /**
-   * Evaluates this query in the specified environment, and notifies the callback with a result.
-   * Note that it is allowed to notify the callback with partial results instead of just one final
-   * result.
+   * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of this query in the
+   * specified environment, notifying the callback with a result. Note that it is allowed to notify
+   * the callback with partial results instead of just one final result.
    *
    * <p>Failures resulting from evaluation of an ill-formed query cause
    * QueryException to be thrown.
@@ -71,45 +70,10 @@
    * thrown.  If disabled, evaluation will stumble on to produce a (possibly
    * inaccurate) result, but a result nonetheless.
    */
-  public final <T> void eval(
+  public abstract <T> QueryTaskFuture<Void> eval(
       QueryEnvironment<T> env,
       VariableContext<T> context,
-      Callback<T> callback) throws QueryException, InterruptedException {
-    env.getEvalListener().onEval(this, env, context, callback);
-    evalImpl(env, context, callback);
-  }
-
-  protected abstract <T> void evalImpl(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      Callback<T> callback) throws QueryException, InterruptedException;
-
-  /**
-   * Evaluates this query in the specified environment, as in
-   * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code forkJoinPool} to
-   * achieve parallelism.
-   *
-   * <p>The caller must ensure that {@code env} is thread safe.
-   */
-  @ThreadSafe
-  public final <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    env.getEvalListener().onParEval(this, env, context, callback, forkJoinPool);
-    parEvalImpl(env, context, callback, forkJoinPool);
-  }
-
-  protected <T> void parEvalImpl(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
-    evalImpl(env, context, callback);
-  }
+      Callback<T> callback);
 
   /**
    * Collects all target patterns that are referenced anywhere within this query expression and adds
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
deleted file mode 100644
index e6bdaef..0000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import java.util.concurrent.ForkJoinPool;
-
-/** Listener for calls to the internal methods of {@link QueryExpression} used for evaluation. */
-@ThreadSafe
-public interface QueryExpressionEvalListener<T> {
-  /** Called right before {@link QueryExpression#evalImpl} is called. */
-  void onEval(
-      QueryExpression expr,
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      Callback<T> callback);
-
-  /** Called right before {@link QueryExpression#parEvalImpl} is called. */
-  void onParEval(
-      QueryExpression expr,
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool);
-
-  /** A {@link QueryExpressionEvalListener} that does nothing. */
-  class NullListener<T> implements QueryExpressionEvalListener<T> {
-    private static final NullListener<?> INSTANCE = new NullListener<>();
-
-    private NullListener() {
-    }
-
-    @SuppressWarnings("unchecked")
-    public static <T> NullListener<T> instance() {
-      return (NullListener<T>) INSTANCE;
-    }
-
-    @Override
-    public void onEval(
-        QueryExpression expr,
-        QueryEnvironment<T> env,
-        VariableContext<T> context,
-        Callback<T> callback) {
-    }
-
-    @Override
-    public void onParEval(
-        QueryExpression expr,
-        QueryEnvironment<T> env,
-        VariableContext<T> context,
-        ThreadSafeCallback<T> callback,
-        ForkJoinPool forkJoinPool) {
-    }
-  }
-}
-
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index a558e45..b423803 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -16,7 +16,10 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
 import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
@@ -29,17 +32,18 @@
 
   /** A {@link Callback} that can aggregate all the partial results into one set. */
   public interface AggregateAllCallback<T> extends Callback<T> {
+    /** Returns a (mutable) set of all the results. */
     Set<T> getResult();
   }
 
-  /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */
+  /** A {@link OutputFormatterCallback} that is also a {@link AggregateAllCallback}. */
   public abstract static class AggregateAllOutputFormatterCallback<T>
-      extends OutputFormatterCallback<T> implements AggregateAllCallback<T>  {
+      extends ThreadSafeOutputFormatterCallback<T> implements AggregateAllCallback<T>  {
   }
 
   private static class AggregateAllOutputFormatterCallbackImpl<T>
       extends AggregateAllOutputFormatterCallback<T> {
-    private final Set<T> result = CompactHashSet.create();
+    private final Set<T> result = Sets.newConcurrentHashSet();
 
     @Override
     public final void processOutput(Iterable<T> partialResult) {
@@ -52,44 +56,77 @@
     }
   }
 
-  /**
-   * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial
-   * results into one set.
-   *
-   * <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with
-   * {@link #newAggregateAllCallback}.
-   */
-  public static <T> AggregateAllOutputFormatterCallback<T>
-      newAggregateAllOutputFormatterCallback() {
-    return new AggregateAllOutputFormatterCallbackImpl<>();
+  private static class OrderedAggregateAllOutputFormatterCallbackImpl<T>
+      extends AggregateAllOutputFormatterCallback<T> {
+    private final Set<T> result = CompactHashSet.create();
+
+    @Override
+    public final synchronized void processOutput(Iterable<T> partialResult) {
+      Iterables.addAll(result, partialResult);
+    }
+
+    @Override
+    public synchronized Set<T> getResult() {
+      return result;
+    }
   }
 
   /**
-   * Returns a fresh {@link AggregateAllCallback}.
-   *
-   * <p>Intended to be used by {@link QueryExpression} implementations; contrast with
-   * {@link #newAggregateAllOutputFormatterCallback}.
+   * Returns a fresh {@link AggregateAllOutputFormatterCallback} instance whose
+   * {@link AggregateAllCallback#getResult} returns all the elements of the result in the order they
+   * were processed.
    */
+  public static <T> AggregateAllOutputFormatterCallback<T>
+      newOrderedAggregateAllOutputFormatterCallback() {
+    return new OrderedAggregateAllOutputFormatterCallbackImpl<>();
+  }
+
+  /** Returns a fresh {@link AggregateAllCallback} instance. */
   public static <T> AggregateAllCallback<T> newAggregateAllCallback() {
     return new AggregateAllOutputFormatterCallbackImpl<>();
   }
 
   /**
-   * Fully evaluate a {@code QueryExpression} and return a set with all the results.
+   * Returns a {@link QueryTaskFuture} representing the evaluation of {@code expr} as a (mutable)
+   * {@link Set} comprised of all the results.
    *
    * <p>Should only be used by QueryExpressions when it is the only way of achieving correctness.
    */
-  public static <T> Set<T> evalAll(
-      QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr)
-          throws QueryException, InterruptedException {
-    AggregateAllCallback<T> callback = newAggregateAllCallback();
-    env.eval(expr, context, callback);
-    return callback.getResult();
+  public static <T> QueryTaskFuture<Set<T>> evalAll(
+      QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) {
+    final AggregateAllCallback<T> callback = newAggregateAllCallback();
+    return env.whenSucceedsCall(
+        env.eval(expr, context, callback),
+        new QueryTaskCallable<Set<T>>() {
+          @Override
+          public Set<T> call() {
+            return callback.getResult();
+          }
+        });
   }
 
-  private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> {
+  /** A trivial {@link Uniquifier} implementation. */
+  public static class UniquifierImpl<T, K> implements Uniquifier<T> {
+    private final KeyExtractor<T, K> extractor;
+    private final Set<K> alreadySeen;
+
+    public UniquifierImpl(KeyExtractor<T, K> extractor) {
+      this(extractor, /*concurrencyLevel=*/ 1);
+    }
+
+    public UniquifierImpl(KeyExtractor<T, K> extractor, int concurrencyLevel) {
+      this.extractor = extractor;
+      this.alreadySeen = Collections.newSetFromMap(
+          new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
+    }
+
     @Override
-    public final ImmutableList<T> unique(Iterable<T> newElements) {
+    public boolean unique(T element) {
+      return alreadySeen.add(extractor.extractKey(element));
+    }
+
+    @Override
+    public ImmutableList<T> unique(Iterable<T> newElements) {
       ImmutableList.Builder<T> result = ImmutableList.builder();
       for (T element : newElements) {
         if (unique(element)) {
@@ -100,47 +137,12 @@
     }
   }
 
-  /** A trivial {@link Uniquifier} implementation. */
-  public static class UniquifierImpl<T, K> extends AbstractUniquifierBase<T, K> {
-    private final KeyExtractor<T, K> extractor;
-    private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
-
-    public UniquifierImpl(KeyExtractor<T, K> extractor) {
-      this.extractor = extractor;
-    }
-
-    @Override
-    public final boolean unique(T element) {
-      return alreadySeen.add(extractor.extractKey(element));
-    }
-  }
-
-  /** A trvial {@link ThreadSafeUniquifier} implementation. */
-  public static class ThreadSafeUniquifierImpl<T, K>
-      extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
-    private final KeyExtractor<T, K> extractor;
-    private final Set<K> alreadySeen;
-
-    public ThreadSafeUniquifierImpl(KeyExtractor<T, K> extractor, int concurrencyLevel) {
-      this.extractor = extractor;
-      this.alreadySeen = Collections.newSetFromMap(
-          new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
-    }
-
-    @Override
-    public final boolean unique(T element) {
-      return alreadySeen.add(extractor.extractKey(element));
-    }
-  }
-
-  /** A trivial {@link ThreadSafeMinDepthUniquifier} implementation. */
-  public static class ThreadSafeMinDepthUniquifierImpl<T, K>
-      implements ThreadSafeMinDepthUniquifier<T> {
+  /** A trivial {@link MinDepthUniquifier} implementation. */
+  public static class MinDepthUniquifierImpl<T, K> implements MinDepthUniquifier<T> {
     private final KeyExtractor<T, K> extractor;
     private final ConcurrentMap<K, AtomicInteger> alreadySeenAtDepth;
 
-    public ThreadSafeMinDepthUniquifierImpl(
-        KeyExtractor<T, K> extractor, int concurrencyLevel) {
+    public MinDepthUniquifierImpl(KeyExtractor<T, K> extractor, int concurrencyLevel) {
       this.extractor = extractor;
       this.alreadySeenAtDepth = new MapMaker().concurrencyLevel(concurrencyLevel).makeMap();
     }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
index 7d691c0b..82faf72 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
@@ -13,12 +13,14 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
 import java.util.Set;
 
@@ -54,16 +56,31 @@
    * towards the universe while staying within the transitive closure.
    */
   @Override
-  public <T> void eval(QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args, Callback<T> callback)
-      throws QueryException,
-      InterruptedException {
-    Set<T> universeValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
-    env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
-
-    Predicate<T> universe = Predicates.in(env.getTransitiveClosure(universeValue));
-    eval(env, context, args.subList(1, args.size()), callback, universe);
+  public <T> QueryTaskFuture<Void> eval(
+      final QueryEnvironment<T> env,
+      final VariableContext<T> context,
+      final QueryExpression expression,
+      final List<Argument> args,
+      final Callback<T> callback) {
+    QueryTaskFuture<Set<T>> universeValueFuture =
+        QueryUtil.evalAll(env, context, args.get(0).getExpression());
+    Function<Set<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction =
+        new Function<Set<T>, QueryTaskFuture<Void>>() {
+          @Override
+          public QueryTaskFuture<Void> apply(Set<T> universeValue) {
+            Predicate<T> universe;
+            try {
+              env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
+              universe = Predicates.in(env.getTransitiveClosure(universeValue));
+            } catch (InterruptedException e) {
+              return env.immediateCancelledFuture();
+            } catch (QueryException e) {
+              return env.immediateFailedFuture(e);
+            }
+            return RdepsFunction.this.eval(
+                env, context, args.subList(1, args.size()), callback, Optional.of(universe));
+          }
+        };
+    return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
index 9dc75a4..6b182ee 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
@@ -18,9 +18,8 @@
 import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -33,25 +32,24 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       QueryExpression expression,
       final List<Argument> args,
-      Callback<T> callback)
-      throws QueryException, InterruptedException {
+      Callback<T> callback) {
     String rawPattern = getPattern(args);
     final Pattern compiledPattern;
     try {
       compiledPattern = Pattern.compile(rawPattern);
     } catch (PatternSyntaxException e) {
-      throw new QueryException(
+      return env.immediateFailedFuture(new QueryException(
           expression,
           String.format(
               "illegal '%s' pattern regexp '%s': %s",
               getName(),
               rawPattern,
-              e.getMessage()));
+              e.getMessage())));
     }
 
     // Note that Patttern#matcher is thread-safe and so this Predicate can safely be used
@@ -68,21 +66,10 @@
       }
     };
 
-    env.eval(
+    return env.eval(
         Iterables.getLast(args).getExpression(),
         context,
-        filteredCallback(callback, matchFilter));
-  }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
+        new FilteredCallback<>(callback, matchFilter));
   }
 
   /**
@@ -111,21 +98,6 @@
 
   protected abstract String getPattern(List<Argument> args);
 
-  /**
-   * Returns a new {@link Callback} that forwards values that satisfies the given {@link Predicate}
-   * to the given {@code parentCallback}.
-   *
-   * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} iff
-   * {@code parentCallback} is as well.
-   */
-  private static <T> Callback<T> filteredCallback(
-      final Callback<T> parentCallback,
-      final Predicate<T> retainIfTrue) {
-    return (parentCallback instanceof ThreadSafeCallback)
-        ? new ThreadSafeFilteredCallback<>((ThreadSafeCallback<T>) parentCallback, retainIfTrue)
-        : new FilteredCallback<>(parentCallback, retainIfTrue);
-  }
-
   private static class FilteredCallback<T> implements Callback<T> {
     private final Callback<T> parentCallback;
     private final Predicate<T> retainIfTrue;
@@ -148,12 +120,4 @@
       return "filtered parentCallback of : " + retainIfTrue;
     }
   }
-
-  private static class ThreadSafeFilteredCallback<T>
-      extends FilteredCallback<T> implements ThreadSafeCallback<T> {
-    private ThreadSafeFilteredCallback(
-        ThreadSafeCallback<T> parentCallback, Predicate<T> retainIfTrue) {
-      super(parentCallback, retainIfTrue);
-    }
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
index ac4b460..e1eadf3 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
@@ -14,7 +14,8 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.base.Joiner;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
@@ -46,12 +47,13 @@
   }
 
   @Override
-  protected <T> void evalImpl(
-      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
-          throws QueryException, InterruptedException {
+  public <T> QueryTaskFuture<Void> eval(
+      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+    ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(words.size());
     for (TargetLiteral expr : words) {
-      env.eval(expr, context, callback);
+      queryTasks.add(env.eval(expr, context, callback));
     }
+    return env.whenAllSucceed(queryTasks);
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
index 8dc0442..4b07a99 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
@@ -19,8 +19,9 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -49,36 +50,37 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       QueryEnvironment<T> env,
       VariableContext<T> context,
-      QueryExpression expression,
+      final QueryExpression expression,
       List<Argument> args,
-      final Callback<T> callback) throws QueryException, InterruptedException {
+      final Callback<T> callback) {
     final AtomicBoolean someFound = new AtomicBoolean(false);
-    env.eval(args.get(0).getExpression(), context, new Callback<T>() {
-      @Override
-      public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
-        if (someFound.get() || Iterables.isEmpty(partialResult)) {
-          return;
-        }
-        callback.process(ImmutableSet.of(partialResult.iterator().next()));
-        someFound.set(true);
-      }
-    });
-    if (!someFound.get()) {
-      throw new QueryException(expression, "argument set is empty");
-    }
-  }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
+    QueryTaskFuture<Void> operandEvalFuture = env.eval(
+        args.get(0).getExpression(),
+        context,
+        new Callback<T>() {
+          @Override
+          public void process(Iterable<T> partialResult)
+              throws QueryException, InterruptedException {
+            if (someFound.get() || Iterables.isEmpty(partialResult)) {
+              return;
+            }
+            callback.process(ImmutableSet.of(partialResult.iterator().next()));
+            someFound.set(true);
+          }
+        });
+    return env.whenSucceedsCall(
+        operandEvalFuture,
+        new QueryTaskCallable<Void>() {
+          @Override
+          public Void call() throws QueryException {
+            if (!someFound.get()) {
+              throw new QueryException(expression, "argument set is empty");
+            }
+            return null;
+          }
+        });
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
index 2d0df0e..229863c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
@@ -20,10 +20,10 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A somepath(x, y) query expression, which computes the set of nodes
@@ -51,50 +51,52 @@
   }
 
   @Override
-  public <T> void eval(
-      QueryEnvironment<T> env,
+  public <T> QueryTaskFuture<Void> eval(
+      final QueryEnvironment<T> env,
       VariableContext<T> context,
-      QueryExpression expression,
+      final QueryExpression expression,
       List<Argument> args,
-      final Callback<T> callback) throws QueryException, InterruptedException {
-    Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
-    Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression());
+      final Callback<T> callback) {
+    final QueryTaskFuture<Set<T>> fromValueFuture =
+        QueryUtil.evalAll(env, context, args.get(0).getExpression());
+    final QueryTaskFuture<Set<T>> toValueFuture =
+        QueryUtil.evalAll(env, context, args.get(1).getExpression());
 
-    // Implementation strategy: for each x in "from", compute its forward
-    // transitive closure.  If it intersects "to", then do a path search from x
-    // to an arbitrary node in the intersection, and return the path.  This
-    // avoids computing the full transitive closure of "from" in some cases.
+    return env.whenAllSucceedCall(
+        ImmutableList.of(fromValueFuture, toValueFuture),
+        new QueryTaskCallable<Void>() {
+          @Override
+          public Void call() throws QueryException, InterruptedException {
+            // Implementation strategy: for each x in "from", compute its forward
+            // transitive closure.  If it intersects "to", then do a path search from x
+            // to an arbitrary node in the intersection, and return the path.  This
+            // avoids computing the full transitive closure of "from" in some cases.
 
-    env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
+            Set<T> fromValue = fromValueFuture.getIfSuccessful();
+            Set<T> toValue = toValueFuture.getIfSuccessful();
 
-    // This set contains all nodes whose TC does not intersect "toValue".
-    Uniquifier<T> uniquifier = env.createUniquifier();
+            env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
 
-    for (T x : uniquifier.unique(fromValue)) {
-      Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x));
-      SetView<T> result;
-      if (xtc.size() > toValue.size()) {
-        result = Sets.intersection(toValue, xtc);
-      } else {
-        result = Sets.intersection(xtc, toValue);
-      }
-      if (!result.isEmpty()) {
-        callback.process(env.getNodesOnPath(x, result.iterator().next()));
-        return;
-      }
-      uniquifier.unique(xtc);
-    }
-    callback.process(ImmutableSet.<T>of());
-  }
+            // This set contains all nodes whose TC does not intersect "toValue".
+            Uniquifier<T> uniquifier = env.createUniquifier();
 
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
+            for (T x : uniquifier.unique(fromValue)) {
+              Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x));
+              SetView<T> result;
+              if (xtc.size() > toValue.size()) {
+                result = Sets.intersection(toValue, xtc);
+              } else {
+                result = Sets.intersection(xtc, toValue);
+              }
+              if (!result.isEmpty()) {
+                callback.process(env.getNodesOnPath(x, result.iterator().next()));
+                return null;
+              }
+              uniquifier.unique(xtc);
+            }
+            callback.process(ImmutableSet.<T>of());
+            return null;
+          }
+        });
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
index eda505a..bb67e93 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
@@ -14,7 +14,6 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.base.Predicate;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * The environment of a Blaze query which supports predefined streaming operations.
@@ -24,22 +23,19 @@
 public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
 
   /** Retrieve and process all reverse dependencies of given expression in a streaming manner. */
-  void getAllRdeps(
+  QueryTaskFuture<Void> getAllRdeps(
       QueryExpression expression,
       Predicate<T> universe,
       VariableContext<T> context,
       Callback<T> callback,
-      int depth)
-      throws QueryException, InterruptedException;
+      int depth);
 
   /**
    * Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound, making use of the
    * provided {@code forkJoinPool}.
    */
-  void getAllRdepsUnboundedParallel(
+  QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
       QueryExpression expression,
       VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException;
+      Callback<T> callback);
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java
new file mode 100644
index 0000000..68a7933
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java
@@ -0,0 +1,58 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link ThreadSafeOutputFormatterCallback} wrapper around a {@link OutputFormatterCallback}
+ * delegate.
+ */
+public final class SynchronizedDelegatingOutputFormatterCallback<T>
+    extends ThreadSafeOutputFormatterCallback<T> {
+  private final OutputFormatterCallback<T> delegate;
+
+  public SynchronizedDelegatingOutputFormatterCallback(OutputFormatterCallback<T> delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public synchronized void start() throws IOException  {
+    delegate.start();
+  }
+
+  @Override
+  public synchronized void close(boolean failFast) throws InterruptedException, IOException {
+    delegate.close(failFast);
+  }
+
+  @Override
+  public synchronized void process(Iterable<T> partialResult)
+      throws QueryException, InterruptedException {
+    delegate.process(partialResult);
+  }
+
+  @Override
+  public synchronized void processOutput(Iterable<T> partialResult)
+      throws IOException, InterruptedException {
+    delegate.processOutput(partialResult);
+  }
+
+  @Override
+  @Nullable
+  public IOException getIoException() {
+    return delegate.getIoException();
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
index aeace9a..733bffb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
@@ -13,11 +13,10 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.util.Preconditions;
-
 import java.util.Collection;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A literal set of targets, using 'blaze build' syntax.  Or, a reference to a
@@ -45,38 +44,31 @@
     return LetExpression.isValidVarReference(pattern);
   }
 
-  private <T> void evalVarReference(VariableContext<T> context, Callback<T> callback)
-      throws QueryException, InterruptedException {
+  private <T> QueryTaskFuture<Void> evalVarReference(
+      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
     String varName = LetExpression.getNameFromReference(pattern);
     Set<T> value = context.get(varName);
     if (value == null) {
-      throw new QueryException(this, "undefined variable '" + varName + "'");
+      return env.immediateFailedFuture(
+          new QueryException(this, "undefined variable '" + varName + "'"));
     }
-    callback.process(value);
-  }
-
-  @Override
-  protected <T> void evalImpl(
-      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
-          throws QueryException, InterruptedException {
-    if (isVariableReference()) {
-      evalVarReference(context, callback);
-    } else {
-      env.getTargetsMatchingPattern(this, pattern, callback);
+    try {
+      callback.process(value);
+      return env.immediateSuccessfulFuture(null);
+    } catch (QueryException e) {
+      return env.immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return env.immediateCancelledFuture();
     }
   }
 
   @Override
-  protected <T> void parEvalImpl(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool)
-      throws QueryException, InterruptedException {
+  public <T> QueryTaskFuture<Void> eval(
+      QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
     if (isVariableReference()) {
-      evalVarReference(context, callback);
+      return evalVarReference(env, context, callback);
     } else {
-      env.getTargetsMatchingPatternPar(this, pattern, callback, forkJoinPool);
+      return env.getTargetsMatchingPattern(this, pattern, callback);
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
index 956e604..d9ed576 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
@@ -15,9 +15,11 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,7 +29,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A tests(x) filter expression, which returns all the tests in set x,
@@ -62,15 +63,15 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
       VariableContext<T> context,
       QueryExpression expression,
       List<Argument> args,
-      final Callback<T> callback) throws QueryException, InterruptedException {
+      final Callback<T> callback) {
     final Closure<T> closure = new Closure<>(expression, env);
 
-    env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+    return env.eval(args.get(0).getExpression(), context, new Callback<T>() {
       @Override
       public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
         for (T target : partialResult) {
@@ -86,17 +87,6 @@
     });
   }
 
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
-  }
-
   /**
    * Decides whether to include a test in a test_suite or not.
    * @param testTags Collection of all tags exhibited by a given test.
@@ -151,10 +141,8 @@
     }
   }
 
-  /**
-   * A closure over the temporary state needed to compute the expression. This makes the evaluation
-   * thread-safe, as long as instances of this class are used only within a single thread.
-   */
+  /** A closure over the temporary state needed to compute the expression. */
+  @ThreadSafe
   private static final class Closure<T> {
     private final QueryExpression expression;
     /** A dynamically-populated mapping from test_suite rules to their tests. */
@@ -177,7 +165,8 @@
      *
      * @precondition env.getAccessor().isTestSuite(testSuite)
      */
-    private Set<T> getTestsInSuite(T testSuite) throws QueryException, InterruptedException {
+    private synchronized Set<T> getTestsInSuite(T testSuite)
+        throws QueryException, InterruptedException {
       Set<T> tests = testsInSuite.get(testSuite);
       if (tests == null) {
         tests = Sets.newHashSet();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
deleted file mode 100644
index 950335e..0000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright 2014 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
-
-/** Marker interface for a {@link Callback} that is {@link ThreadSafe}. */
-@ThreadSafe
-public interface ThreadSafeCallback<T>
-    extends Callback<T>, ThreadSafeBatchCallback<T, QueryException> {
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeMinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeMinDepthUniquifier.java
deleted file mode 100644
index 9c2b429..0000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeMinDepthUniquifier.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-
-/** Marker interface for a {@link ThreadSafe} {@link MinDepthUniquifier}. */
-@ThreadSafe
-public interface ThreadSafeMinDepthUniquifier<T> extends MinDepthUniquifier<T> {
-  // There's a natural benign check-then-act race in all concurrent uses of this interface. Thread
-  // T1 may think it's about to be the first one to process an element at a depth no greater than
-  // d1. But before t1 finishes processing the element, Thread T2 may think _it's_ about to be first
-  // one to process an element at a depth no greater than d2. If d2 < d1, then T1's work is probably
-  // wasted.
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java
similarity index 74%
rename from src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java
rename to src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java
index 7471855..bc3eb59 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java
@@ -1,4 +1,4 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
+// Copyright 2017 The Bazel Authors. All rights reserved.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -15,8 +15,7 @@
 
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 
-/** Marker interface for a {@link ThreadSafe} {@link Uniquifier}. */
+/** A marker parent class for a {@link ThreadSafe} {@link OutputFormatterCallback}. */
 @ThreadSafe
-public interface ThreadSafeUniquifier<T> extends Uniquifier<T> {
-}
-
+public abstract class ThreadSafeOutputFormatterCallback<T> extends OutputFormatterCallback<T> {
+}
\ No newline at end of file
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
index ed2b237..5f8faf5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
@@ -14,8 +14,10 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 
 /** A helper for deduping values. */
+@ThreadSafe
 public interface Uniquifier<T> {
   /** Returns whether {@code newElement} has been seen before. */
   boolean unique(T newElement);
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
index 532f331..b09910c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
@@ -14,13 +14,14 @@
 
 package com.google.devtools.build.lib.query2.engine;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  * A visible(x, y) query expression, which computes the subset of nodes in y
@@ -52,34 +53,32 @@
   }
 
   @Override
-  public <T> void eval(
+  public <T> QueryTaskFuture<Void> eval(
       final QueryEnvironment<T> env,
-      VariableContext<T> context,
+      final VariableContext<T> context,
       QueryExpression expression,
-      List<Argument> args,
-      final Callback<T> callback) throws QueryException, InterruptedException {
-    final Set<T> toSet = QueryUtil.evalAll(env, context, args.get(0).getExpression());
-    env.eval(args.get(1).getExpression(), context, new Callback<T>() {
-      @Override
-      public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
-        for (T t : partialResult) {
-          if (visibleToAll(env, toSet, t)) {
-            callback.process(ImmutableList.of(t));
+      final List<Argument> args,
+      final Callback<T> callback) {
+    final QueryTaskFuture<Set<T>> toSetFuture =
+        QueryUtil.evalAll(env, context, args.get(0).getExpression());
+    Function<Set<T>, QueryTaskFuture<Void>> computeVisibleNodesAsyncFunction =
+        new Function<Set<T>, QueryTaskFuture<Void>>() {
+          @Override
+          public QueryTaskFuture<Void> apply(final Set<T> toSet) {
+            return env.eval(args.get(1).getExpression(), context, new Callback<T>() {
+              @Override
+              public void process(Iterable<T> partialResult)
+                  throws QueryException, InterruptedException {
+                for (T t : partialResult) {
+                  if (visibleToAll(env, toSet, t)) {
+                    callback.process(ImmutableList.of(t));
+                  }
+                }
+              }
+            });
           }
-        }
-      }
-    });
-  }
-
-  @Override
-  public <T> void parEval(
-      QueryEnvironment<T> env,
-      VariableContext<T> context,
-      QueryExpression expression,
-      List<Argument> args,
-      ThreadSafeCallback<T> callback,
-      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
-    eval(env, context, expression, args, callback);
+        };
+    return env.transformAsync(toSetFuture, computeVisibleNodesAsyncFunction);
   }
 
   /** Returns true if {@code target} is visible to all targets in {@code toSet}. */
diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java
index 18890c3..5c314b6 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java
@@ -35,6 +35,8 @@
 import com.google.devtools.build.lib.packages.TriState;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
+import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.output.QueryOptions.OrderOutput;
 import com.google.devtools.build.lib.syntax.EvalUtils;
 import com.google.devtools.build.lib.syntax.Printer;
@@ -178,15 +180,16 @@
     void setOptions(QueryOptions options, AspectResolver aspectResolver);
 
     /**
-     * Returns a {@link OutputFormatterCallback} whose {@link OutputFormatterCallback#process}
-     * outputs formatted {@link Target}s to the given {@code out}.
+     * Returns a {@link ThreadSafeOutputFormatterCallback} whose
+     * {@link OutputFormatterCallback#process} outputs formatted {@link Target}s to the given
+     * {@code out}.
      *
      * <p>Takes any options specified via the most recent call to {@link #setOptions} into
      * consideration.
      *
      * <p>Intended to be use for streaming out during evaluation of a query.
      */
-    OutputFormatterCallback<Target> createStreamCallback(
+    ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
         OutputStream out, QueryOptions options, QueryEnvironment<?> env);
 
     /**
@@ -288,9 +291,10 @@
     }
 
     @Override
-    public OutputFormatterCallback<Target> createStreamCallback(
+    public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
         OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
-      return createPostFactoStreamCallback(out, options);
+      return new SynchronizedDelegatingOutputFormatterCallback<>(
+          createPostFactoStreamCallback(out, options));
     }
   }
 
@@ -345,9 +349,10 @@
     }
 
     @Override
-    public OutputFormatterCallback<Target> createStreamCallback(
+    public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
         OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
-      return createPostFactoStreamCallback(out, options);
+      return new SynchronizedDelegatingOutputFormatterCallback<>(
+          createPostFactoStreamCallback(out, options));
     }
   }
 
@@ -387,9 +392,10 @@
     }
 
     @Override
-    public OutputFormatterCallback<Target> createStreamCallback(
+    public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
         OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
-      return createPostFactoStreamCallback(out, options);
+      return new SynchronizedDelegatingOutputFormatterCallback<>(
+          createPostFactoStreamCallback(out, options));
     }
   }
 
@@ -478,9 +484,10 @@
     }
 
     @Override
-    public OutputFormatterCallback<Target> createStreamCallback(
+    public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
         OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
-      return createPostFactoStreamCallback(out, options);
+      return new SynchronizedDelegatingOutputFormatterCallback<>(
+          createPostFactoStreamCallback(out, options));
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java
index 66e1182..f966afb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java
@@ -41,6 +41,8 @@
 import com.google.devtools.build.lib.query2.FakeSubincludeTarget;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
+import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.output.AspectResolver.BuildFileDependencyMode;
 import com.google.devtools.build.lib.query2.output.OutputFormatter.AbstractUnorderedFormatter;
 import com.google.devtools.build.lib.query2.output.QueryOptions.OrderOutput;
@@ -50,7 +52,6 @@
 import com.google.devtools.build.lib.query2.proto.proto2api.Build.SourceFile;
 import com.google.devtools.build.lib.syntax.Environment;
 import com.google.devtools.build.lib.syntax.Type;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
@@ -130,9 +131,10 @@
   }
 
   @Override
-  public OutputFormatterCallback<Target> createStreamCallback(
+  public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
       OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
-    return createPostFactoStreamCallback(out, options);
+    return new SynchronizedDelegatingOutputFormatterCallback<>(
+        createPostFactoStreamCallback(out, options));
   }
 
   private static Iterable<Target> getSortedLabels(Digraph<Target> result) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java
index 193ef18..6784ed3 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java
@@ -29,10 +29,11 @@
 import com.google.devtools.build.lib.query2.FakeSubincludeTarget;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
+import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.output.AspectResolver.BuildFileDependencyMode;
 import com.google.devtools.build.lib.query2.output.OutputFormatter.AbstractUnorderedFormatter;
 import com.google.devtools.build.lib.syntax.Type;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
@@ -62,9 +63,10 @@
   }
 
   @Override
-  public OutputFormatterCallback<Target> createStreamCallback(
+  public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
       OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
-    return createPostFactoStreamCallback(out, options);
+    return new SynchronizedDelegatingOutputFormatterCallback<>(
+        createPostFactoStreamCallback(out, options));
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
index 589822b..e01aa4f 100644
--- a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
+++ b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
@@ -55,7 +55,6 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
 import com.google.devtools.build.lib.query2.engine.QueryException;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryUtil;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException;
@@ -284,7 +283,7 @@
     DigraphQueryEvalResult<Target> queryResult;
     OutputFormatter formatter;
     AggregateAllOutputFormatterCallback<Target> targets =
-        QueryUtil.newAggregateAllOutputFormatterCallback();
+        QueryUtil.newOrderedAggregateAllOutputFormatterCallback();
     try {
       Set<Setting> settings = queryOptions.toSettings();
 
@@ -318,7 +317,6 @@
               getEventHandler(ruleContext),
               settings,
               ImmutableList.<QueryFunction>of(),
-              QueryExpressionEvalListener.NullListener.<Target>instance(),
               /*packagePath=*/null);
       queryResult = (DigraphQueryEvalResult<Target>) queryEnvironment.evaluateQuery(query, targets);
     } catch (SkyframeRestartQueryException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
index c52a3fe..118f529 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
@@ -23,14 +23,13 @@
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.pkgcache.PackageCacheOptions;
 import com.google.devtools.build.lib.query2.AbstractBlazeQueryEnvironment;
-import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryUtil;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
 import com.google.devtools.build.lib.query2.output.OutputFormatter;
 import com.google.devtools.build.lib.query2.output.OutputFormatter.StreamedFormatter;
 import com.google.devtools.build.lib.query2.output.QueryOptions;
@@ -150,7 +149,7 @@
     expr = queryEnv.transformParsedQuery(expr);
 
     OutputStream out = env.getReporter().getOutErr().getOutputStream();
-    OutputFormatterCallback<Target> callback;
+    ThreadSafeOutputFormatterCallback<Target> callback;
     if (streamResults) {
       disableAnsiCharactersFiltering(env);
 
@@ -161,7 +160,7 @@
           queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter()));
       callback = streamedFormatter.createStreamCallback(out, queryOptions, queryEnv);
     } else {
-      callback = QueryUtil.newAggregateAllOutputFormatterCallback();
+      callback = QueryUtil.newOrderedAggregateAllOutputFormatterCallback();
     }
     boolean catastrophe = true;
     try {
@@ -207,8 +206,7 @@
 
       // 3. Output results:
       try {
-        Set<Target> targets =
-            ((AggregateAllOutputFormatterCallback<Target>) callback).getResult();
+        Set<Target> targets = ((AggregateAllOutputFormatterCallback<Target>) callback).getResult();
         QueryOutputUtils.output(
             queryOptions,
             result,
@@ -277,7 +275,6 @@
             env.getReporter(),
             settings,
             env.getRuntime().getQueryFunctions(),
-            QueryExpressionEvalListener.NullListener.<Target>instance(),
             env.getPackageManager().getPackagePath());
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
index df6351a..3b4b768 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
@@ -37,7 +37,6 @@
 import com.google.devtools.build.lib.util.BatchCallback;
 import com.google.devtools.build.lib.util.BatchCallback.NullCallback;
 import com.google.devtools.build.lib.util.Preconditions;
-import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.RootedPath;
@@ -47,7 +46,6 @@
 import com.google.devtools.build.skyframe.SkyValue;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
@@ -131,7 +129,7 @@
    * transitive dependencies. Its methods may throw {@link MissingDepException} if the package
    * values this depends on haven't been calculated and added to its environment.
    */
-  static class DepsOfPatternPreparer implements TargetPatternResolver<Void> {
+  static class DepsOfPatternPreparer extends TargetPatternResolver<Void> {
 
     private final EnvironmentBackedRecursivePackageProvider packageProvider;
     private final Environment env;
@@ -230,7 +228,8 @@
         String directory,
         boolean rulesOnly,
         ImmutableSet<PathFragment> excludedSubdirectories,
-        BatchCallback<Void, E> callback, Class<E> exceptionClass)
+        BatchCallback<Void, E> callback,
+        Class<E> exceptionClass)
         throws TargetParsingException, E, InterruptedException {
       FilteringPolicy policy =
           rulesOnly ? FilteringPolicies.RULES_ONLY : FilteringPolicies.NO_FILTER;
@@ -261,26 +260,5 @@
         }
       }
     }
-
-    @Override
-    public <E extends Exception> void findTargetsBeneathDirectoryPar(
-        RepositoryName repository,
-        String originalPattern,
-        String directory,
-        boolean rulesOnly,
-        ImmutableSet<PathFragment> excludedSubdirectories,
-        ThreadSafeBatchCallback<Void, E> callback,
-        Class<E> exceptionClass,
-        ForkJoinPool forkJoinPool)
-        throws TargetParsingException, E, InterruptedException {
-      findTargetsBeneathDirectory(
-          repository,
-          originalPattern,
-          directory,
-          rulesOnly,
-          excludedSubdirectories,
-          callback,
-          exceptionClass);
-    }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index 5c6bd42..dc84f5b 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -22,6 +22,9 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
@@ -29,7 +32,6 @@
 import com.google.devtools.build.lib.cmdline.ResolvedTargets;
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPatternResolver;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
 import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
 import com.google.devtools.build.lib.events.Event;
@@ -51,9 +53,6 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -61,7 +60,7 @@
  */
 @ThreadCompatible
 public class RecursivePackageProviderBackedTargetPatternResolver
-    implements TargetPatternResolver<Target> {
+    extends TargetPatternResolver<Target> {
 
   // TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value?
   private static final int MAX_PACKAGES_BULK_GET = 1000;
@@ -194,56 +193,64 @@
       BatchCallback<Target, E> callback,
       Class<E> exceptionClass)
       throws TargetParsingException, E, InterruptedException {
-    findTargetsBeneathDirectoryParImpl(
+    try {
+      findTargetsBeneathDirectoryAsyncImpl(
+          repository,
+          originalPattern,
+          directory,
+          rulesOnly,
+          excludedSubdirectories,
+          new SynchronizedBatchCallback<Target, E>(callback),
+          MoreExecutors.newDirectExecutorService()).get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, TargetParsingException.class, exceptionClass);
+      throw new IllegalStateException(e.getCause());
+    }
+  }
+
+  @Override
+  public <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsync(
+      RepositoryName repository,
+      String originalPattern,
+      String directory,
+      boolean rulesOnly,
+      ImmutableSet<PathFragment> excludedSubdirectories,
+      ThreadSafeBatchCallback<Target, E> callback,
+      Class<E> exceptionClass,
+      ListeningExecutorService executor) {
+    return findTargetsBeneathDirectoryAsyncImpl(
         repository,
         originalPattern,
         directory,
         rulesOnly,
         excludedSubdirectories,
         new SynchronizedBatchCallback<Target, E>(callback),
-        exceptionClass,
-        MoreExecutors.newDirectExecutorService());
+        executor);
   }
 
-  @Override
-  public <E extends Exception> void findTargetsBeneathDirectoryPar(
+  private <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsyncImpl(
       final RepositoryName repository,
       final String originalPattern,
       String directory,
       boolean rulesOnly,
       ImmutableSet<PathFragment> excludedSubdirectories,
       final ThreadSafeBatchCallback<Target, E> callback,
-      Class<E> exceptionClass,
-      ForkJoinPool forkJoinPool)
-      throws TargetParsingException, E, InterruptedException {
-    findTargetsBeneathDirectoryParImpl(
-        repository,
-        originalPattern,
-        directory,
-        rulesOnly,
-        excludedSubdirectories,
-        callback,
-        exceptionClass,
-        forkJoinPool);
-  }
-
-  private <E extends Exception> void findTargetsBeneathDirectoryParImpl(
-      final RepositoryName repository,
-      final String originalPattern,
-      String directory,
-      boolean rulesOnly,
-      ImmutableSet<PathFragment> excludedSubdirectories,
-      final ThreadSafeBatchCallback<Target, E> callback,
-      Class<E> exceptionClass,
-      ExecutorService executor)
-      throws TargetParsingException, E, InterruptedException {
+      ListeningExecutorService executor) {
     final FilteringPolicy actualPolicy = rulesOnly
         ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy)
         : policy;
-    PathFragment pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
-    Iterable<PathFragment> packagesUnderDirectory =
-        recursivePackageProvider.getPackagesUnderDirectory(
-            repository, pathFragment, excludedSubdirectories);
+    final PathFragment pathFragment;
+    Iterable<PathFragment> packagesUnderDirectory;
+    try {
+      pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
+      packagesUnderDirectory = recursivePackageProvider.getPackagesUnderDirectory(
+          repository, pathFragment, excludedSubdirectories);
+    } catch (TargetParsingException e) {
+      return Futures.immediateFailedFuture(e);
+    } catch (InterruptedException e) {
+      return Futures.immediateCancelledFuture();
+    }
 
     Iterable<PackageIdentifier> pkgIds = Iterables.transform(packagesUnderDirectory,
             new Function<PathFragment, PackageIdentifier>() {
@@ -258,9 +265,9 @@
     // into batches.
     List<List<PackageIdentifier>> partitions =
         ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET));
-    ArrayList<Future<Void>> tasks = new ArrayList<>(partitions.size());
+    ArrayList<ListenableFuture<Void>> futures = new ArrayList<>(partitions.size());
     for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) {
-      tasks.add(executor.submit(new Callable<Void>() {
+      futures.add(executor.submit(new Callable<Void>() {
           @Override
           public Void call() throws E, TargetParsingException, InterruptedException {
             ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(pkgIdBatch);
@@ -288,17 +295,15 @@
           }
         }));
     }
-    try {
-      MoreFutures.waitForAllInterruptiblyFailFast(tasks);
-    } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause(), exceptionClass);
-      Throwables.propagateIfPossible(
-          e.getCause(), TargetParsingException.class, InterruptedException.class);
-      throw new IllegalStateException(e);
-    }
-    if (!foundTarget.get()) {
-      throw new TargetParsingException("no targets found beneath '" + pathFragment + "'");
-    }
+    return Futures.whenAllSucceed(futures).call(new Callable<Void>() {
+      @Override
+      public Void call() throws TargetParsingException {
+        if (!foundTarget.get()) {
+          throw new TargetParsingException("no targets found beneath '" + pathFragment + "'");
+        }
+        return null;
+      }
+    });
   }
 
   private static <T> int calculateSize(Iterable<ResolvedTargets<T>> resolvedTargets) {
@@ -308,5 +313,6 @@
     }
     return size;
   }
+
 }
 
diff --git a/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java b/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java
index bc74b7a..91b4be8 100644
--- a/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java
@@ -13,11 +13,14 @@
 // limitations under the License.
 package com.google.devtools.build.lib.util;
 
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
 /**
  * Callback to be invoked when part of a result has been computed. Allows a client interested in
  * the result to process it as it is computed, for instance by streaming it, if it is too big to
  * fit in memory.
  */
+@ThreadSafe
 public interface BatchCallback<T, E extends Exception> {
   /**
    * Called when part of a result has been computed.