Use ForkJoinPool, rather than ListeningExecutorService, for parallel query evaluation in SkyQueryEnvironment. FJP is nicer to program against, imo.

--
MOS_MIGRATED_REVID=133844508
diff --git a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
index 3077b68..d25717b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
@@ -16,7 +16,6 @@
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
@@ -30,6 +29,7 @@
 import com.google.devtools.build.lib.vfs.PathFragment;
 
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * An "rbuildfiles" query expression, which computes the set of packages (as represented by their
@@ -90,7 +90,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 4de4f87..39385dd 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -28,16 +28,13 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
 import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.ExecutorUtil;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.EventHandler;
@@ -103,7 +100,7 @@
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
@@ -136,14 +133,7 @@
   private final List<String> universeScope;
   protected final String parserPrefix;
   private final PathPackageLocator pkgPath;
-
-  // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads
-  // unless work is submitted to it.
-  private final ListeningExecutorService threadPool =
-      MoreExecutors.listeningDecorator(
-          Executors.newFixedThreadPool(
-              DEFAULT_THREAD_COUNT,
-              new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build()));
+  private final ForkJoinPool forkJoinPool;
 
   // The following fields are set in the #beforeEvaluateQuery method.
   protected WalkableGraph graph;
@@ -161,6 +151,34 @@
       WalkableGraphFactory graphFactory,
       List<String> universeScope,
       PathPackageLocator pkgPath) {
+    this(
+        keepGoing,
+        loadingPhaseThreads,
+        // SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query
+        // evaluation is completely CPU-bound.
+        /*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT,
+        eventHandler,
+        settings,
+        extraFunctions,
+        evalListener,
+        parserPrefix,
+        graphFactory,
+        universeScope,
+        pkgPath);
+  }
+
+  protected SkyQueryEnvironment(
+      boolean keepGoing,
+      int loadingPhaseThreads,
+      int queryEvaluationParallelismLevel,
+      EventHandler eventHandler,
+      Set<Setting> settings,
+      Iterable<QueryFunction> extraFunctions,
+      QueryExpressionEvalListener<Target> evalListener,
+      String parserPrefix,
+      WalkableGraphFactory graphFactory,
+      List<String> universeScope,
+      PathPackageLocator pkgPath) {
     super(
         keepGoing,
         /*strictScope=*/ true,
@@ -170,6 +188,9 @@
         extraFunctions,
         evalListener);
     this.loadingPhaseThreads = loadingPhaseThreads;
+    // Note that ForkJoinPool doesn't start any thread until work is submitted to it.
+    this.forkJoinPool = NamedForkJoinPool.newNamedPool(
+        "QueryEnvironment", queryEvaluationParallelismLevel);
     this.graphFactory = graphFactory;
     this.pkgPath = pkgPath;
     this.universeScope = Preconditions.checkNotNull(universeScope);
@@ -201,7 +222,7 @@
             graphBackedRecursivePackageProvider,
             eventHandler,
             TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
-            threadPool);
+            forkJoinPool);
   }
 
   /**
@@ -265,7 +286,7 @@
 
   @Override
   public void close() {
-    ExecutorUtil.interruptibleShutdown(threadPool);
+    forkJoinPool.shutdownNow();
   }
 
   @Override
@@ -505,7 +526,7 @@
     // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
     // all QueryEnvironment implementations.
     if (callback instanceof ThreadSafeCallback) {
-      expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, threadPool);
+      expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
     } else {
       expr.eval(this, context, callback);
     }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
index b491097..adc12d2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
@@ -18,7 +18,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
@@ -27,6 +26,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * Implementation of the <code>allpaths()</code> function.
@@ -89,7 +89,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index 0da05aa..faa9977 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -17,13 +17,13 @@
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * An "allrdeps" query expression, which computes the reverse dependencies of the argument within
@@ -111,7 +111,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
index 0b6a595..81f20d4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
@@ -15,7 +15,6 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.collect.CompactHashSet;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
@@ -23,6 +22,7 @@
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A buildfiles(x) query expression, which computes the set of BUILD files and
@@ -72,7 +72,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
index 5de84a1..482b4cd 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
@@ -15,7 +15,6 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
@@ -23,6 +22,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A "deps" query expression, which computes the dependencies of the argument. An optional
@@ -92,7 +92,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
index 5a918cb..6c9ae89 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
@@ -14,13 +14,13 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A label(attr_name, argument) expression, which computes the set of targets
@@ -89,7 +89,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
index 5370b4a..509f2ff 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
@@ -15,11 +15,11 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.collect.CompactHashSet;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A loadfiles(x) query expression, which computes the set of .bzl files
@@ -72,7 +72,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
index f415b41..5cf7c6e 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
@@ -14,10 +14,10 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import javax.annotation.Nonnull;
 
 /**
@@ -124,7 +124,7 @@
 
     /**
      * Same as {@link #eval(QueryEnvironment, VariableContext, QueryExpression, List, Callback)},
-     * except that this {@link QueryFunction} may use {@code executorService} to achieve
+     * except that this {@link QueryFunction} may use {@code forkJoinPool} to achieve
      * parallelism.
      *
      * <p>The caller must ensure that {@code env} is thread safe.
@@ -135,7 +135,7 @@
         QueryExpression expression,
         List<Argument> args,
         ThreadSafeCallback<T> callback,
-        ListeningExecutorService executorService) throws QueryException, InterruptedException;
+        ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
index 667fcec..e35e9e4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
@@ -13,10 +13,10 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 
 import java.util.Collection;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * Base class for expressions in the Blaze query language, revision 2.
@@ -86,7 +86,7 @@
 
   /**
    * Evaluates this query in the specified environment, as in
-   * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code executorService} to
+   * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code forkJoinPool} to
    * achieve parallelism.
    *
    * <p>The caller must ensure that {@code env} is thread safe.
@@ -96,17 +96,17 @@
       QueryEnvironment<T> env,
       VariableContext<T> context,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService)
+      ForkJoinPool forkJoinPool)
       throws QueryException, InterruptedException {
-    env.getEvalListener().onParEval(this, env, context, callback, executorService);
-    parEvalImpl(env, context, callback, executorService);
+    env.getEvalListener().onParEval(this, env, context, callback, forkJoinPool);
+    parEvalImpl(env, context, callback, forkJoinPool);
   }
 
   protected <T> void parEvalImpl(
       QueryEnvironment<T> env,
       VariableContext<T> context,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService)
+      ForkJoinPool forkJoinPool)
       throws QueryException, InterruptedException {
     evalImpl(env, context, callback);
   }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
index 7255a47..e6bdaef 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
@@ -13,8 +13,8 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import java.util.concurrent.ForkJoinPool;
 
 /** Listener for calls to the internal methods of {@link QueryExpression} used for evaluation. */
 @ThreadSafe
@@ -32,7 +32,7 @@
       QueryEnvironment<T> env,
       VariableContext<T> context,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService);
+      ForkJoinPool forkJoinPool);
 
   /** A {@link QueryExpressionEvalListener} that does nothing. */
   class NullListener<T> implements QueryExpressionEvalListener<T> {
@@ -60,7 +60,7 @@
         QueryEnvironment<T> env,
         VariableContext<T> context,
         ThreadSafeCallback<T> callback,
-        ListeningExecutorService executorService) {
+        ForkJoinPool forkJoinPool) {
     }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
index 44a40c5..9b24014 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
@@ -16,11 +16,11 @@
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 import java.util.regex.Pattern;
 
 /**
@@ -72,7 +72,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
index c482c2e..7913e2d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
@@ -16,12 +16,12 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -79,7 +79,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
index fd5a527..2d0df0e 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
@@ -17,13 +17,13 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A somepath(x, y) query expression, which computes the set of nodes
@@ -94,7 +94,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
index ac83b78..d802edd 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
@@ -15,7 +15,6 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
@@ -28,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A tests(x) filter expression, which returns all the tests in set x,
@@ -93,7 +93,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
index 2506d85..532f331 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
@@ -15,12 +15,12 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A visible(x, y) query expression, which computes the subset of nodes in y
@@ -78,7 +78,7 @@
       QueryExpression expression,
       List<Argument> args,
       ThreadSafeCallback<T> callback,
-      ListeningExecutorService executorService) throws QueryException, InterruptedException {
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
     eval(env, context, expression, args, callback);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index a01a1f4..3ed2394 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -22,9 +22,6 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
@@ -47,9 +44,11 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}.
@@ -64,13 +63,13 @@
   private final RecursivePackageProvider recursivePackageProvider;
   private final EventHandler eventHandler;
   private final FilteringPolicy policy;
-  private final ListeningExecutorService executor;
+  private final ExecutorService executor;
 
   public RecursivePackageProviderBackedTargetPatternResolver(
       RecursivePackageProvider recursivePackageProvider,
       EventHandler eventHandler,
       FilteringPolicy policy,
-      ListeningExecutorService executor) {
+      ExecutorService executor) {
     this.recursivePackageProvider = recursivePackageProvider;
     this.eventHandler = eventHandler;
     this.policy = policy;
@@ -147,7 +146,7 @@
   private Map<PackageIdentifier, ResolvedTargets<Target>> bulkGetTargetsInPackage(
           String originalPattern,
           Iterable<PackageIdentifier> pkgIds, FilteringPolicy policy)
-          throws TargetParsingException, InterruptedException {
+          throws InterruptedException {
     try {
       Map<PackageIdentifier, Package> pkgs = bulkGetPackages(pkgIds);
       if (pkgs.size() != Iterables.size(pkgIds)) {
@@ -204,74 +203,51 @@
               }
             });
     final AtomicBoolean foundTarget = new AtomicBoolean(false);
-    final AtomicReference<InterruptedException> interrupt = new AtomicReference<>();
-    final AtomicReference<TargetParsingException> parsingException = new AtomicReference<>();
-    final AtomicReference<Exception> genericException = new AtomicReference<>();
-
     final Object callbackLock = new Object();
 
     // For very large sets of packages, we may not want to process all of them at once, so we split
     // into batches.
     List<List<PackageIdentifier>> partitions =
         ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET));
-    ArrayList<ListenableFuture<?>> futures = new ArrayList<>(partitions.size());
+    ArrayList<Callable<Void>> callables = new ArrayList<>(partitions.size());
     for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) {
-      futures.add(
-          executor.submit(
-              new Runnable() {
-                @Override
-                public void run() {
-                  Iterable<ResolvedTargets<Target>> resolvedTargets;
-                  try {
-                    resolvedTargets =
-                        bulkGetTargetsInPackage(originalPattern, pkgIdBatch, NO_FILTER).values();
-                  } catch (InterruptedException e) {
-                    interrupt.compareAndSet(null, e);
-                    return;
-                  } catch (TargetParsingException e) {
-                    parsingException.compareAndSet(null, e);
-                    return;
-                  } catch (RuntimeException e) {
-                    // In particular, we're interested in remembering any thrown
-                    // MissingDepExceptions.
-                    genericException.compareAndSet(null, e);
-                    return;
-                  }
-
-                  List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets));
-                  for (ResolvedTargets<Target> targets : resolvedTargets) {
-                    for (Target target : targets.getTargets()) {
-                      // Perform the no-targets-found check before applying the filtering policy
-                      // so we only return the error if the input directory's subtree really
-                      // contains no targets.
-                      foundTarget.set(true);
-                      if (actualPolicy.shouldRetain(target, false)) {
-                        filteredTargets.add(target);
-                      }
-                    }
-                  }
-                  try {
-                    synchronized (callbackLock) {
-                      callback.process(filteredTargets);
-                    }
-                  } catch (InterruptedException e) {
-                    interrupt.compareAndSet(null, e);
-                  } catch (Exception e) {
-                    genericException.compareAndSet(e, null);
-                  }
+      callables.add(new Callable<Void>() {
+          @Override
+          public Void call() throws E, TargetParsingException, InterruptedException {
+            Iterable<ResolvedTargets<Target>> resolvedTargets =
+                bulkGetTargetsInPackage(originalPattern, pkgIdBatch, NO_FILTER).values();
+            List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets));
+            for (ResolvedTargets<Target> targets : resolvedTargets) {
+              for (Target target : targets.getTargets()) {
+                // Perform the no-targets-found check before applying the filtering policy
+                // so we only return the error if the input directory's subtree really
+                // contains no targets.
+                foundTarget.set(true);
+                if (actualPolicy.shouldRetain(target, false)) {
+                  filteredTargets.add(target);
                 }
-              }));
+              }
+            }
+            synchronized (callbackLock) {
+              callback.process(filteredTargets);
+            }
+            return null;
+          }
+        });
     }
 
-    try {
-      Futures.allAsList(futures).get();
-    } catch (ExecutionException e) {
-      throw new IllegalStateException(e);
+    // Note that ExecutorService#invokeAll _does_ block until all the Callables have been run.
+    List<Future<Void>> futures = executor.invokeAll(callables);
+    for (Future<Void> future : futures) {
+      try {
+        future.get();
+      } catch (ExecutionException e) {
+        Throwables.propagateIfPossible(e.getCause(), exceptionClass);
+        Throwables.propagateIfPossible(
+            e.getCause(), TargetParsingException.class, InterruptedException.class);
+        throw new IllegalStateException(e);
+      }
     }
-
-    Throwables.propagateIfInstanceOf(interrupt.get(), InterruptedException.class);
-    Throwables.propagateIfInstanceOf(parsingException.get(), TargetParsingException.class);
-    Throwables.propagateIfPossible(genericException.get(), exceptionClass);
     if (!foundTarget.get()) {
       throw new TargetParsingException("no targets found beneath '" + pathFragment + "'");
     }