Provide parallel implementations of bounded allrdeps and rdeps.

RELNOTES: None
PiperOrigin-RevId: 192681579
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 8b95172..b13ed47 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -16,7 +16,6 @@
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Ascii;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -27,6 +26,7 @@
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Iterables;
@@ -112,11 +112,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.RejectedExecutionException;
@@ -506,6 +504,21 @@
     return result;
   }
 
+  /**
+   * Returns deps in the form of {@link SkyKey}s.
+   *
+   * <p>The implementation of this method does not filter deps, therefore it is expected to be used
+   * only when {@link SkyQueryEnvironment#dependencyFilter} is set to {@link
+   * DependencyFilter#ALL_DEPS}.
+   */
+  Multimap<SkyKey, SkyKey> getDirectDepsOfSkyKeys(Iterable<SkyKey> keys)
+      throws InterruptedException {
+    Preconditions.checkState(dependencyFilter == DependencyFilter.ALL_DEPS, dependencyFilter);
+    ImmutableMultimap.Builder<SkyKey, SkyKey> builder = ImmutableMultimap.builder();
+    graph.getDirectDeps(keys).forEach(builder::putAll);
+    return builder.build();
+  }
+
   @Override
   public Collection<Target> getReverseDeps(Iterable<Target> targets) throws InterruptedException {
     return getReverseDepsOfTransitiveTraversalKeys(Iterables.transform(targets, TARGET_TO_SKY_KEY));
@@ -648,6 +661,11 @@
   }
 
   @ThreadSafe
+  protected MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
+    return new MinDepthUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  }
+
+  @ThreadSafe
   Uniquifier<Target> createTargetUniquifier() {
     return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
@@ -894,15 +912,11 @@
     // so no preloading of target patterns is necessary.
   }
 
+  static final Predicate<SkyKey> IS_TTV = SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL);
+
   static final Function<SkyKey, Label> SKYKEY_TO_LABEL =
-      skyKey -> {
-        SkyFunctionName functionName = skyKey.functionName();
-        if (!functionName.equals(Label.TRANSITIVE_TRAVERSAL)) {
-          // Skip non-targets.
-          return null;
-        }
-        return (Label) skyKey.argument();
-      };
+      skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null;
+
 
   static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER =
       skyKey -> (PackageIdentifier) skyKey.argument();
@@ -1266,176 +1280,53 @@
         this, expression, context, callback, packageSemaphore);
   }
 
+  @ThreadSafe
   @Override
-  public QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel(
+  public QueryTaskFuture<Void> getAllRdepsBoundedParallel(
       QueryExpression expression,
+      int depth,
       VariableContext<Target> context,
-      List<Argument> args,
       Callback<Target> callback) {
-    return RdepsFunction.evalWithBoundedDepth(this, context, expression, args, callback);
+    return ParallelSkyQueryUtils.getAllRdepsBoundedParallel(
+        this, expression, depth, context, callback, packageSemaphore);
+  }
+
+  protected QueryTaskFuture<Predicate<SkyKey>> getUniverseDTCSkyKeyPredicateFuture(
+      QueryExpression universe,
+      VariableContext<Target> context) {
+    return ParallelSkyQueryUtils.getDTCSkyKeyPredicateFuture(
+        this,
+        universe,
+        context,
+        BATCH_CALLBACK_SIZE,
+        DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
   @Override
-  public QueryTaskFuture<Void> getAllRdeps(
+  public QueryTaskFuture<Void> getRdepsUnboundedParallel(
       QueryExpression expression,
-      Predicate<Target> universe,
+      QueryExpression universe,
       VariableContext<Target> context,
-      Callback<Target> callback,
-      int depth) {
-    return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+      Callback<Target> callback) {
+    return transformAsync(
+        getUniverseDTCSkyKeyPredicateFuture(universe, context),
+        universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel(
+            this, expression, universePredicate, context, callback, packageSemaphore));
   }
 
-  /**
-   * Computes and applies the callback to the reverse dependencies of the expression.
-   *
-   * <p>Batch size is used to only populate at most N targets at one time, because some individual
-   * nodes are directly depended on by a large number of other nodes.
-   */
-  @VisibleForTesting
-  protected QueryTaskFuture<Void> getAllRdeps(
+  @ThreadSafe
+  @Override
+  public QueryTaskFuture<Void> getRdepsBoundedParallel(
       QueryExpression expression,
-      Predicate<Target> universe,
-      VariableContext<Target> context,
-      Callback<Target> callback,
       int depth,
-      int batchSize) {
-    MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
-    return eval(
-        expression,
-        context,
-        new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
-  }
-
-  private class BatchAllRdepsCallback implements Callback<Target> {
-    private final MinDepthUniquifier<Target> minDepthUniquifier;
-    private final Predicate<Target> universe;
-    private final Callback<Target> callback;
-    private final int depth;
-    private final int batchSize;
-
-    private BatchAllRdepsCallback(
-        MinDepthUniquifier<Target> minDepthUniquifier,
-        Predicate<Target> universe,
-        Callback<Target> callback,
-        int depth,
-        int batchSize) {
-      this.minDepthUniquifier = minDepthUniquifier;
-      this.universe = universe;
-      this.callback = callback;
-      this.depth = depth;
-      this.batchSize = batchSize;
-    }
-
-    @Override
-    public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
-      Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
-      ImmutableList<Target> uniqueTargets =
-          minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0);
-      callback.process(uniqueTargets);
-
-      // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
-      // as 1:N SkyKey mappings instead of fully populated Targets to save memory. Targets
-      // have a reference to their entire Package, which is really memory expensive.
-      Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue = new LinkedList<>();
-      reverseDepsQueue.addAll(
-          graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueTargets)).entrySet());
-
-      // In each iteration, we populate a size-limited (no more than batchSize) number of
-      // SkyKey mappings to targets, and append the SkyKey rdeps mappings to the queue. Once
-      // processed by callback, the targets are dequeued and not referenced any more, making
-      // them available for garbage collection.
-
-      for (int curDepth = 1; curDepth <= depth; curDepth++) {
-        // The mappings between nodes and their reverse deps must be preserved instead of the
-        // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
-        // check if their allowed deps contain the dependencies.
-        Map<SkyKey, Iterable<SkyKey>> reverseDepsMap = Maps.newHashMap();
-        int batch = 0; // Tracking the current total number of rdeps in reverseDepsMap.
-        int processed = 0;
-        // Save current size as when we are process nodes in the current level, new mappings (for
-        // the next level) are added to the queue.
-        int size = reverseDepsQueue.size();
-        while (processed < size) {
-          // We always peek the first element in the queue without polling it, to determine if
-          // adding it to the pending list will break the limit of max size. If yes then we process
-          // and empty the pending list first, and poll the element in the next iteration.
-          Map.Entry<SkyKey, Iterable<SkyKey>> entry = reverseDepsQueue.peek();
-
-          // The value of the entry is either a CompactHashSet or ImmutableList, which can return
-          // the size in O(1) time.
-          int rdepsSize = Iterables.size(entry.getValue());
-          if (rdepsSize == 0) {
-            reverseDepsQueue.poll();
-            processed++;
-            continue;
-          }
-
-          if ((rdepsSize + batch <= batchSize)) {
-            // If current size is less than batch size, dequeue the node, update the current
-            // batch size and map.
-            reverseDepsMap.put(entry.getKey(), entry.getValue());
-            batch += rdepsSize;
-            reverseDepsQueue.poll();
-            processed++;
-          } else {
-            if (batch == 0) {
-              // The (single) node has more rdeps than the limit, divide them up to process
-              // separately.
-              for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
-                reverseDepsMap.put(entry.getKey(), subList);
-                processReverseDepsMap(
-                    minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
-              }
-
-              reverseDepsQueue.poll();
-              processed++;
-            } else {
-              // There are some nodes in the pending process list. Process them first and come
-              // back to this node later (in next iteration).
-              processReverseDepsMap(
-                  minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
-              batch = 0;
-            }
-          }
-        }
-
-        if (!reverseDepsMap.isEmpty()) {
-          processReverseDepsMap(
-              minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
-        }
-
-        // If the queue is empty after all nodes in the current level are processed, stop
-        // processing as there are no more reverse deps.
-        if (reverseDepsQueue.isEmpty()) {
-          break;
-        }
-      }
-    }
-
-    /**
-     * Populates {@link Target}s from reverse dep mappings of {@link SkyKey}s, empties the pending
-     * list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
-     */
-    private void processReverseDepsMap(
-        MinDepthUniquifier<Target> minDepthUniquifier,
-        Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
-        Callback<Target> callback,
-        Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue,
-        int depth)
-        throws QueryException, InterruptedException {
-      Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap));
-      Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
-      ImmutableList<Target> uniqueChildren =
-          minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth);
-      reverseDepsMap.clear();
-
-      if (!uniqueChildren.isEmpty()) {
-        callback.process(uniqueChildren);
-        reverseDepsQueue.addAll(
-            graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueChildren)).entrySet());
-      }
-    }
+      QueryExpression universe,
+      VariableContext<Target> context,
+      Callback<Target> callback) {
+    return transformAsync(
+        getUniverseDTCSkyKeyPredicateFuture(universe, context),
+        universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel(
+            this, expression, depth, universePredicate, context, callback, packageSemaphore));
   }
 
   /**
@@ -1465,4 +1356,32 @@
       }
     }
   }
+
+  /** Pair of a key and a depth, useful for driving usages of {@link MinDepthUniquifier}. */
+  public static class KeyAtDepth {
+    public final SkyKey key;
+    public final int depth;
+
+    public KeyAtDepth(SkyKey key, int depth) {
+      this.key = key;
+      this.depth = depth;
+    }
+
+    @Override
+    public int hashCode() {
+      // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g.
+      // Objects#hash). This method is very hot during large visitations done by
+      // ParallelSkyQueryUtils.
+      return 31 * key.hashCode() + Integer.hashCode(depth);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof KeyAtDepth)) {
+        return false;
+      }
+      KeyAtDepth other = (KeyAtDepth) obj;
+      return key.equals(other.key) && depth == other.depth;
+    }
+  }
 }