Parallelize (some of) BinaryOperatorExpression

Adds evalConcurrently to QueryExpression so that expression
implementations that support concurrent evaluation can do so using the
supplied executor service.

Implements concurrent evaluation for the PLUS/UNION cases of
BinaryOperatorExpression.

Because evalConcurrently requires its callback to be threadsafe, but
the callback passed to evaluateQuery may only be called by one thread
at a time, this change makes the BatchStreamedCallback constructed by
SkyQueryEnvironment threadsafe, including its uniquifier.

However, there is a thread-safety problem when the operands of
BinaryOperatorExpression are LetExpressions, because their evaluation
involves mutating state in the query environment. A future change will
fix that. For now, concurrent evaluation is only attempted when the
query expression is a BinaryOperatorExpression and all its operands are
target literals.

--
MOS_MIGRATED_REVID=125505247
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2accdce..30a7aa6 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -36,6 +36,7 @@
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
 import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.EventHandler;
 import com.google.devtools.build.lib.graph.Digraph;
@@ -50,13 +51,13 @@
 import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator;
 import com.google.devtools.build.lib.profiler.AutoProfiler;
 import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
+import com.google.devtools.build.lib.query2.engine.BinaryOperatorExpression;
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.FunctionExpression;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
 import com.google.devtools.build.lib.query2.engine.RdepsFunction;
 import com.google.devtools.build.lib.query2.engine.TargetLiteral;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
@@ -85,6 +86,7 @@
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -93,6 +95,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
@@ -106,9 +109,11 @@
  * even if the full closure isn't needed.
  */
 public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
+
   // 10k is likely a good balance between using batch efficiently and not blowing up memory.
   // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
   private static final int BATCH_CALLBACK_SIZE = 10000;
+  private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
 
   protected WalkableGraph graph;
   private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
@@ -134,7 +139,7 @@
   private final ListeningExecutorService threadPool =
       MoreExecutors.listeningDecorator(
           Executors.newFixedThreadPool(
-              Runtime.getRuntime().availableProcessors(),
+              DEFAULT_THREAD_COUNT,
               new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build()));
   private RecursivePackageProviderBackedTargetPatternResolver resolver;
 
@@ -158,9 +163,11 @@
       int loadingPhaseThreads,
       EventHandler eventHandler,
       Set<Setting> settings,
-      Iterable<QueryFunction> extraFunctions, String parserPrefix,
+      Iterable<QueryFunction> extraFunctions,
+      String parserPrefix,
       WalkableGraphFactory graphFactory,
-      List<String> universeScope, PathPackageLocator pkgPath) {
+      List<String> universeScope,
+      PathPackageLocator pkgPath) {
     super(
         keepGoing,
         /*strictScope=*/ true,
@@ -268,21 +275,25 @@
     // This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
     // case of a race between the original callback and the eventHandler.
     final BatchStreamedCallback aggregator =
-        new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
+        new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
 
     final AtomicBoolean empty = new AtomicBoolean(true);
+    Callback<Target> callbackWithEmptyCheck =
+        new Callback<Target>() {
+          @Override
+          public void process(Iterable<Target> partialResult)
+              throws QueryException, InterruptedException {
+            empty.compareAndSet(true, Iterables.isEmpty(partialResult));
+            aggregator.process(partialResult);
+          }
+        };
     try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
       try {
-        expr.eval(
-            this,
-            new Callback<Target>() {
-              @Override
-              public void process(Iterable<Target> partialResult)
-                  throws QueryException, InterruptedException {
-                empty.compareAndSet(true, Iterables.isEmpty(partialResult));
-                aggregator.process(partialResult);
-              }
-            });
+        if (canEvalConcurrently(expr)) {
+          expr.evalConcurrently(this, callbackWithEmptyCheck, threadPool);
+        } else {
+          expr.eval(this, callbackWithEmptyCheck);
+        }
       } catch (QueryException e) {
         throw new QueryException(e, expr);
       }
@@ -293,17 +304,37 @@
       if (!keepGoing) {
         // This case represents loading-phase errors reported during evaluation
         // of target patterns that don't cause evaluation to fail per se.
-        throw new QueryException("Evaluation of query \"" + expr
-            + "\" failed due to BUILD file errors");
+        throw new QueryException(
+            "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
       } else {
-        eventHandler.handle(Event.warn("--keep_going specified, ignoring errors.  "
-            + "Results may be inaccurate"));
+        eventHandler.handle(
+            Event.warn("--keep_going specified, ignoring errors.  " + "Results may be inaccurate"));
       }
     }
 
     return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
   }
 
+  // TODO(mschaller): This method and its use above are a quick temporary fix to a threadsafety
+  // problem that can happen when the operands of a BinaryOperatorExpression contain LetExpressions.
+  // Namely, concurrent reads and writes to AbstractBlazeQueryEnvironment#letBindings may fail or
+  // produce the wrong results.
+  // For now, this limits concurrent query expression evaluation to BinaryOperatorExpressions with
+  // TargetLiteral operands.
+  private static boolean canEvalConcurrently(QueryExpression expr) {
+    if (!(expr instanceof BinaryOperatorExpression)) {
+      return false;
+    }
+    BinaryOperatorExpression binaryExpr = (BinaryOperatorExpression) expr;
+    ImmutableList<QueryExpression> operands = binaryExpr.getOperands();
+    for (QueryExpression operand : operands) {
+      if (!(operand instanceof TargetLiteral)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) {
     ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder();
     
@@ -447,18 +478,9 @@
     expr.eval(this, callback);
   }
 
-  private static Uniquifier<Target> uniquifier() {
-    return new AbstractUniquifier<Target, Label>() {
-      @Override
-      protected Label extractKey(Target target) {
-        return target.getLabel();
-      }
-    };
-  }
-
   @Override
   public Uniquifier<Target> createUniquifier() {
-    return uniquifier();
+    return new ConcurrentUniquifier();
   }
 
   @Override
@@ -841,6 +863,26 @@
         .build();
   }
 
+  @ThreadSafe
+  private static class ConcurrentUniquifier implements Uniquifier<Target> {
+
+    // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled.
+    private final Set<Label> seen =
+        Collections.newSetFromMap(
+            new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT));
+
+    @Override
+    public ImmutableList<Target> unique(Iterable<Target> newElements) {
+      ImmutableList.Builder<Target> builder = ImmutableList.builder();
+      for (Target newElement : newElements) {
+        if (seen.add(newElement.getLabel())) {
+          builder.add(newElement);
+        }
+      }
+      return builder.build();
+    }
+  }
+
   /**
    * Wraps a {@link Callback} and guarantees that all calls to the original will have at least
    * {@code batchThreshold} {@link Target}s, except for the final such call.
@@ -850,36 +892,44 @@
    * <p>After this object's {@link #process} has been called for the last time, {#link
    * #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
    * original.
+   *
+   * <p>This callback may be called from multiple threads concurrently. At most one thread will
+   * call the wrapped {@code callback} concurrently.
    */
+  @ThreadSafe
   private static class BatchStreamedCallback implements Callback<Target> {
 
     private final Callback<Target> callback;
-    private final Uniquifier<Target> uniquifier;
+    private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
+    private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
 
-    private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
-        Uniquifier<Target> uniquifier) {
+    private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
       this.callback = callback;
       this.batchThreshold = batchThreshold;
-      this.uniquifier = uniquifier;
     }
 
     @Override
     public void process(Iterable<Target> partialResult)
         throws QueryException, InterruptedException {
-      Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
-      pending.addAll(uniquifier.unique(partialResult));
-      if (pending.size() >= batchThreshold) {
-        callback.process(pending);
-        pending = new ArrayList<>();
+      ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
+      synchronized (pendingLock) {
+        Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+        pending.addAll(uniquifiedTargets);
+        if (pending.size() >= batchThreshold) {
+          callback.process(pending);
+          pending = new ArrayList<>();
+        }
       }
     }
 
     private void processLastPending() throws QueryException, InterruptedException {
-      if (!pending.isEmpty()) {
-        callback.process(pending);
-        pending = null;
+      synchronized (pendingLock) {
+        if (!pending.isEmpty()) {
+          callback.process(pending);
+          pending = null;
+        }
       }
     }
   }