Use ForkJoinPool, rather than ListeningExecutorService, for parallel query evaluation in SkyQueryEnvironment. FJP is nicer to program against, imo.

--
MOS_MIGRATED_REVID=133844508
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 4de4f87..39385dd 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -28,16 +28,13 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
 import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.ExecutorUtil;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.EventHandler;
@@ -103,7 +100,7 @@
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
@@ -136,14 +133,7 @@
   private final List<String> universeScope;
   protected final String parserPrefix;
   private final PathPackageLocator pkgPath;
-
-  // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads
-  // unless work is submitted to it.
-  private final ListeningExecutorService threadPool =
-      MoreExecutors.listeningDecorator(
-          Executors.newFixedThreadPool(
-              DEFAULT_THREAD_COUNT,
-              new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build()));
+  private final ForkJoinPool forkJoinPool;
 
   // The following fields are set in the #beforeEvaluateQuery method.
   protected WalkableGraph graph;
@@ -161,6 +151,34 @@
       WalkableGraphFactory graphFactory,
       List<String> universeScope,
       PathPackageLocator pkgPath) {
+    this(
+        keepGoing,
+        loadingPhaseThreads,
+        // SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query
+        // evaluation is completely CPU-bound.
+        /*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT,
+        eventHandler,
+        settings,
+        extraFunctions,
+        evalListener,
+        parserPrefix,
+        graphFactory,
+        universeScope,
+        pkgPath);
+  }
+
+  protected SkyQueryEnvironment(
+      boolean keepGoing,
+      int loadingPhaseThreads,
+      int queryEvaluationParallelismLevel,
+      EventHandler eventHandler,
+      Set<Setting> settings,
+      Iterable<QueryFunction> extraFunctions,
+      QueryExpressionEvalListener<Target> evalListener,
+      String parserPrefix,
+      WalkableGraphFactory graphFactory,
+      List<String> universeScope,
+      PathPackageLocator pkgPath) {
     super(
         keepGoing,
         /*strictScope=*/ true,
@@ -170,6 +188,9 @@
         extraFunctions,
         evalListener);
     this.loadingPhaseThreads = loadingPhaseThreads;
+    // Note that ForkJoinPool doesn't start any thread until work is submitted to it.
+    this.forkJoinPool = NamedForkJoinPool.newNamedPool(
+        "QueryEnvironment", queryEvaluationParallelismLevel);
     this.graphFactory = graphFactory;
     this.pkgPath = pkgPath;
     this.universeScope = Preconditions.checkNotNull(universeScope);
@@ -201,7 +222,7 @@
             graphBackedRecursivePackageProvider,
             eventHandler,
             TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
-            threadPool);
+            forkJoinPool);
   }
 
   /**
@@ -265,7 +286,7 @@
 
   @Override
   public void close() {
-    ExecutorUtil.interruptibleShutdown(threadPool);
+    forkJoinPool.shutdownNow();
   }
 
   @Override
@@ -505,7 +526,7 @@
     // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
     // all QueryEnvironment implementations.
     if (callback instanceof ThreadSafeCallback) {
-      expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, threadPool);
+      expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
     } else {
       expr.eval(this, context, callback);
     }