Use a ForkJoinPool instead of a PriorityBlockingQueue-based ThreadPoolExecutor
for the analysis phase. During the analysis phase, lots of very short actions
are executed (File, FileState, ...) and contention in the executor has a
substantial performance impact.

Contention has always been a problem here, but the move towards rightfully
using PriorityBlockingQueue can cause even more work to be executed under a
lock. While I couldn't measure substantial wall-time differences between
PriorityBlockingQueue and the previously used LinkedBlockingQueue, this change
improves performance by about 10% on a variety of experiments I have
conducted.

RELNOTES: None.
PiperOrigin-RevId: 210398382
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeExecutor.java b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeExecutor.java
index 8d9309f..9efd99a 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeExecutor.java
@@ -85,6 +85,7 @@
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.ThreadSafety;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
 import com.google.devtools.build.lib.events.ErrorSensingEventHandler;
@@ -1881,7 +1882,11 @@
       keys.add(aspectKey);
     }
     EvaluationResult<ActionLookupValue> result =
-        buildDriver.evaluate(keys, keepGoing, numThreads, eventHandler);
+        buildDriver.evaluate(
+            keys,
+            keepGoing,
+            () -> NamedForkJoinPool.newNamedPool("skyframe-evaluator", numThreads),
+            eventHandler);
     // Get rid of any memory retained by the cache -- all loading is done.
     perBuildSyscallCache.clear();
     return result;
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
index bff776a..b4e43ad 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
@@ -14,7 +14,6 @@
 package com.google.devtools.build.skyframe;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -37,6 +36,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 /**
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
index 2d806f6..78073f5 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
@@ -14,7 +14,6 @@
 package com.google.devtools.build.skyframe;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -47,6 +46,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
 
diff --git a/src/main/java/com/google/devtools/build/skyframe/BuildDriver.java b/src/main/java/com/google/devtools/build/skyframe/BuildDriver.java
index b5a0ac2..d624fcf 100644
--- a/src/main/java/com/google/devtools/build/skyframe/BuildDriver.java
+++ b/src/main/java/com/google/devtools/build/skyframe/BuildDriver.java
@@ -17,6 +17,8 @@
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.util.AbruptExitException;
 import com.google.devtools.common.options.OptionsProvider;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 /** A BuildDriver wraps a MemoizingEvaluator, passing along the proper Version. */
@@ -28,7 +30,18 @@
   <T extends SkyValue> EvaluationResult<T> evaluate(
       Iterable<? extends SkyKey> roots, boolean keepGoing, int numThreads,
       ExtendedEventHandler reporter)
-          throws InterruptedException;
+      throws InterruptedException;
+
+  /**
+   * See {@link MemoizingEvaluator#evaluate}, which has the same semantics except for the inclusion
+   * of a {@link Version} value.
+   */
+  <T extends SkyValue> EvaluationResult<T> evaluate(
+      Iterable<? extends SkyKey> roots,
+      boolean keepGoing,
+      Supplier<ExecutorService> executorService,
+      ExtendedEventHandler reporter)
+      throws InterruptedException;
 
   /**
    * Retrieve metadata about the computation over the given roots. Data returned is specific to the
diff --git a/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
index 42ab317..d85decd 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.profiler.Profiler;
@@ -36,7 +37,9 @@
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 /**
@@ -157,6 +160,22 @@
       int numThreads,
       ExtendedEventHandler eventHandler)
       throws InterruptedException {
+    return evaluate(
+        roots,
+        version,
+        keepGoing,
+        () -> AbstractQueueVisitor.createExecutorService(numThreads),
+        eventHandler);
+  }
+
+  @Override
+  public <T extends SkyValue> EvaluationResult<T> evaluate(
+      Iterable<? extends SkyKey> roots,
+      Version version,
+      boolean keepGoing,
+      Supplier<ExecutorService> executorService,
+      ExtendedEventHandler eventHandler)
+      throws InterruptedException {
     // NOTE: Performance critical code. See bug "Null build performance parity".
     IntVersion intVersion = (IntVersion) version;
     Preconditions.checkState((lastGraphVersion == null && intVersion.getVal() == 0)
@@ -184,21 +203,23 @@
         injectValues(intVersion);
       }
 
-      ParallelEvaluator evaluator =
-          new ParallelEvaluator(
-              graph,
-              intVersion,
-              skyFunctions,
-              eventHandler,
-              emittedEventState,
-              eventFilter,
-              ErrorInfoManager.UseChildErrorInfoIfNecessary.INSTANCE,
-              keepGoing,
-              numThreads,
-              progressReceiver,
-              graphInconsistencyReceiver);
       EvaluationResult<T> result;
       try (SilentCloseable c = Profiler.instance().profile("ParallelEvaluator.eval")) {
+        ParallelEvaluator evaluator =
+            new ParallelEvaluator(
+                graph,
+                version,
+                skyFunctions,
+                eventHandler,
+                emittedEventState,
+                eventFilter,
+                ErrorInfoManager.UseChildErrorInfoIfNecessary.INSTANCE,
+                keepGoing,
+                progressReceiver,
+                graphInconsistencyReceiver,
+                executorService,
+                new SimpleCycleDetector(),
+                EvaluationVersionBehavior.MAX_CHILD_VERSIONS);
         result = evaluator.eval(roots);
       }
       return EvaluationResult.<T>builder()
diff --git a/src/main/java/com/google/devtools/build/skyframe/MemoizingEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/MemoizingEvaluator.java
index 73f39ac..c30c2c6 100644
--- a/src/main/java/com/google/devtools/build/skyframe/MemoizingEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/MemoizingEvaluator.java
@@ -22,6 +22,8 @@
 import com.google.devtools.build.lib.events.ExtendedEventHandler.Postable;
 import java.io.PrintStream;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 /**
@@ -54,6 +56,23 @@
       throws InterruptedException;
 
   /**
+   * Computes the transitive closure of a given set of values at the given {@link Version}. See
+   * {@link EagerInvalidator#invalidate}.
+   *
+   * <p>The returned EvaluationResult is guaranteed to contain a result for at least one root if
+   * keepGoing is false. It will contain a result for every root if keepGoing is true, <i>unless</i>
+   * the evaluation failed with a "catastrophic" error. In that case, some or all results may be
+   * missing.
+   */
+  <T extends SkyValue> EvaluationResult<T> evaluate(
+      Iterable<? extends SkyKey> roots,
+      Version version,
+      boolean keepGoing,
+      Supplier<ExecutorService> executorService,
+      ExtendedEventHandler reporter)
+      throws InterruptedException;
+
+  /**
    * Ensures that after the next completed {@link #evaluate} call the current values of any value
    * matching this predicate (and all values that transitively depend on them) will be removed from
    * the value cache. All values that were already marked dirty in the graph will also be deleted,
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index e36e547..7ec802e 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -14,13 +14,13 @@
 package com.google.devtools.build.skyframe;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.skyframe.MemoizingEvaluator.EmittedEventState;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 /**
@@ -42,37 +42,9 @@
       EventFilter storedEventFilter,
       ErrorInfoManager errorInfoManager,
       boolean keepGoing,
-      int threadCount,
-      DirtyTrackingProgressReceiver progressReceiver,
-      GraphInconsistencyReceiver graphInconsistencyReceiver) {
-    super(
-        graph,
-        graphVersion,
-        skyFunctions,
-        reporter,
-        emittedEventState,
-        storedEventFilter,
-        errorInfoManager,
-        keepGoing,
-        progressReceiver,
-        graphInconsistencyReceiver,
-        () -> AbstractQueueVisitor.createExecutorService(threadCount),
-        new SimpleCycleDetector(),
-        EvaluationVersionBehavior.MAX_CHILD_VERSIONS);
-  }
-
-  public ParallelEvaluator(
-      ProcessableGraph graph,
-      Version graphVersion,
-      ImmutableMap<SkyFunctionName, ? extends SkyFunction> skyFunctions,
-      final ExtendedEventHandler reporter,
-      EmittedEventState emittedEventState,
-      EventFilter storedEventFilter,
-      ErrorInfoManager errorInfoManager,
-      boolean keepGoing,
       DirtyTrackingProgressReceiver progressReceiver,
       GraphInconsistencyReceiver graphInconsistencyReceiver,
-      ExecutorService executorService,
+      Supplier<ExecutorService> executorService,
       CycleDetector cycleDetector,
       EvaluationVersionBehavior evaluationVersionBehavior) {
     super(
@@ -86,7 +58,7 @@
         keepGoing,
         progressReceiver,
         graphInconsistencyReceiver,
-        () -> executorService,
+        executorService,
         cycleDetector,
         evaluationVersionBehavior);
   }
diff --git a/src/main/java/com/google/devtools/build/skyframe/SequentialBuildDriver.java b/src/main/java/com/google/devtools/build/skyframe/SequentialBuildDriver.java
index ffd882e..2947e29 100644
--- a/src/main/java/com/google/devtools/build/skyframe/SequentialBuildDriver.java
+++ b/src/main/java/com/google/devtools/build/skyframe/SequentialBuildDriver.java
@@ -14,8 +14,11 @@
 package com.google.devtools.build.skyframe;
 
 import com.google.common.base.Preconditions;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.common.options.OptionsProvider;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 /**
@@ -32,11 +35,24 @@
 
   @Override
   public <T extends SkyValue> EvaluationResult<T> evaluate(
-      Iterable<? extends SkyKey> roots, boolean keepGoing, int numThreads,
+      Iterable<? extends SkyKey> roots,
+      boolean keepGoing,
+      int numThreads,
       ExtendedEventHandler reporter)
-          throws InterruptedException {
+      throws InterruptedException {
+    return evaluate(
+        roots, keepGoing, () -> AbstractQueueVisitor.createExecutorService(numThreads), reporter);
+  }
+
+  @Override
+  public <T extends SkyValue> EvaluationResult<T> evaluate(
+      Iterable<? extends SkyKey> roots,
+      boolean keepGoing,
+      Supplier<ExecutorService> executorService,
+      ExtendedEventHandler reporter)
+      throws InterruptedException {
     try {
-      return memoizingEvaluator.evaluate(roots, curVersion, keepGoing, numThreads, reporter);
+      return memoizingEvaluator.evaluate(roots, curVersion, keepGoing, executorService, reporter);
     } finally {
       curVersion = curVersion.next();
     }