Provide parallel implementations of bounded allrdeps and rdeps.

RELNOTES: None
PiperOrigin-RevId: 192681579
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 585b39d..e0a0985 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -16,21 +16,31 @@
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.collect.MapMaker;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Streams;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet;
 import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.QueryUtil;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
@@ -40,7 +50,8 @@
 import com.google.devtools.build.skyframe.SkyKey;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -63,10 +74,6 @@
   private ParallelSkyQueryUtils() {
   }
 
-  /**
-   * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate
-   * when there is no depth-bound.
-   */
   static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
       SkyQueryEnvironment env,
       QueryExpression expression,
@@ -77,7 +84,95 @@
         expression,
         context,
         ParallelVisitor.createParallelVisitorCallback(
-            new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore)));
+            new RdepsUnboundedVisitor.Factory(
+                env,
+                /*universe=*/ Predicates.alwaysTrue(),
+                callback,
+                packageSemaphore)));
+  }
+
+  static QueryTaskFuture<Void> getAllRdepsBoundedParallel(
+      SkyQueryEnvironment env,
+      QueryExpression expression,
+      int depth,
+      VariableContext<Target> context,
+      Callback<Target> callback,
+      MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+    return env.eval(
+        expression,
+        context,
+        ParallelVisitor.createParallelVisitorCallback(
+            new RdepsBoundedVisitor.Factory(
+                env,
+                depth,
+                /*universe=*/ Predicates.alwaysTrue(),
+                callback,
+                packageSemaphore)));
+  }
+
+  static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel(
+      SkyQueryEnvironment env,
+      QueryExpression expression,
+      Predicate<SkyKey> universe,
+      VariableContext<Target> context,
+      Callback<Target> callback,
+      MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+    return env.eval(
+        expression,
+        context,
+        ParallelVisitor.createParallelVisitorCallback(
+            new RdepsUnboundedVisitor.Factory(env, universe, callback, packageSemaphore)));
+  }
+
+  static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture(
+      SkyQueryEnvironment env,
+      QueryExpression expression,
+      VariableContext<Target> context,
+      int processResultsBatchSize,
+      int concurrencyLevel) {
+    QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture =
+        QueryUtil.evalAll(env, context, expression);
+
+    Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>>
+        getTransitiveClosureAsyncFunction =
+        universeValue -> {
+          ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback =
+              new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel);
+          return env.executeAsync(
+              () -> {
+                Callback<Target> visitorCallback =
+                    ParallelVisitor.createParallelVisitorCallback(
+                        new TransitiveClosureVisitor.Factory(
+                            env,
+                            env.createSkyKeyUniquifier(),
+                            processResultsBatchSize,
+                            aggregateAllCallback));
+                visitorCallback.process(universeValue);
+                return Predicates.in(aggregateAllCallback.getResult());
+              });
+        };
+
+    return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction);
+  }
+
+  static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel(
+      SkyQueryEnvironment env,
+      QueryExpression expression,
+      int depth,
+      Predicate<SkyKey> universe,
+      VariableContext<Target> context,
+      Callback<Target> callback,
+      MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+    return env.eval(
+        expression,
+        context,
+        ParallelVisitor.createParallelVisitorCallback(
+            new RdepsBoundedVisitor.Factory(
+                env,
+                depth,
+                universe,
+                callback,
+                packageSemaphore)));
   }
 
   /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -191,146 +286,21 @@
     }
   }
 
-  /**
-   * A helper class that computes 'allrdeps(<blah>)' via BFS.
-   *
-   * <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with
-   * targetification of reverse deps until they are needed. The rdep node itself is needed to filter
-   * out disallowed deps later. Compared against the approach using a single SkyKey, it consumes 16
-   * more bytes in a 64-bit environment for each edge. However it defers the need to load all the
-   * packages which have at least a target as a rdep of the current batch, thus greatly reduces the
-   * risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M
-   * edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC.
-   */
-  private static class AllRdepsUnboundedVisitor extends ParallelVisitor<DepAndRdep, Target> {
+  private abstract static class AbstractRdepsVisitor<T> extends ParallelVisitor<T, Target> {
     private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
-    private final SkyQueryEnvironment env;
-    private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
-    /**
-     * A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which
-     * actually isn't that useful. See the method javadoc.
-     */
-    private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
-    /**
-     * A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might
-     * mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep}
-     * visitation may actually be invalid. Because the same rdep can be reached through more than
-     * one reverse edge, It'd be incorrectly to naively dedupe visitations solely based on the rdep.
-     */
-    private final Uniquifier<SkyKey> validRdepUniquifier;
 
-    private AllRdepsUnboundedVisitor(
+    protected final SkyQueryEnvironment env;
+    protected final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+    protected AbstractRdepsVisitor(
         SkyQueryEnvironment env,
-        Uniquifier<DepAndRdep> depAndRdepUniquifier,
-        Uniquifier<SkyKey> validRdepUniquifier,
         Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
       super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
       this.env = env;
-      this.depAndRdepUniquifier = depAndRdepUniquifier;
-      this.validRdepUniquifier = validRdepUniquifier;
       this.packageSemaphore = packageSemaphore;
     }
 
-    /**
-     * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used
-     * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
-     * single {@link Callback#process} call. Note that all the created instances share the same
-     * {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
-     */
-    private static class Factory implements ParallelVisitor.Factory {
-      private final SkyQueryEnvironment env;
-      private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
-      private final Uniquifier<SkyKey> validRdepUniquifier;
-      private final Callback<Target> callback;
-      private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
-
-      private Factory(
-        SkyQueryEnvironment env,
-        Callback<Target> callback,
-        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
-        this.env = env;
-        this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep);
-        this.validRdepUniquifier = env.createSkyKeyUniquifier();
-        this.callback = callback;
-        this.packageSemaphore = packageSemaphore;
-      }
-
-      @Override
-      public ParallelVisitor<DepAndRdep, Target> create() {
-        return new AllRdepsUnboundedVisitor(
-            env, depAndRdepUniquifier, validRdepUniquifier, callback, packageSemaphore);
-      }
-    }
-
-    @Override
-    protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException {
-      Collection<SkyKey> filteredUniqueKeys = new ArrayList<>();
-
-      // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps.
-      Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap();
-      for (DepAndRdep depAndRdep : depAndRdeps) {
-        // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
-        if (depAndRdep.dep == null && validRdepUniquifier.unique(depAndRdep.rdep)) {
-          filteredUniqueKeys.add(depAndRdep.rdep);
-          continue;
-        }
-
-        reverseDepsMap.computeIfAbsent(depAndRdep.dep, k -> new LinkedList<SkyKey>());
-
-        reverseDepsMap.get(depAndRdep.dep).add(depAndRdep.rdep);
-      }
-
-      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
-          env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values()));
-      Set<PackageIdentifier> pkgIdsNeededForTargetification =
-          packageKeyToTargetKeyMap
-              .keySet()
-              .stream()
-              .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
-              .collect(toImmutableSet());
-      packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
-
-      try {
-        // Filter out disallowed deps. We cannot defer the targetification any further as we do not
-        // want to retrieve the rdeps of unwanted nodes (targets).
-        if (!reverseDepsMap.isEmpty()) {
-          Collection<Target> filteredTargets =
-              env.filterRawReverseDepsOfTransitiveTraversalKeys(
-                  reverseDepsMap, packageKeyToTargetKeyMap);
-          filteredTargets
-              .stream()
-              .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
-              .forEachOrdered(
-                  rdep -> {
-                    if (validRdepUniquifier.unique(rdep)) {
-                      filteredUniqueKeys.add(rdep);
-                    }
-                  });
-          }
-      } finally {
-        packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
-      }
-
-      // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
-      // recursive visitation.
-      Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps =
-          env.graph.getReverseDeps(filteredUniqueKeys);
-
-      ImmutableList.Builder<DepAndRdep> builder = ImmutableList.builder();
-      for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
-        for (SkyKey rdep : rdeps.getValue()) {
-          Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
-          if (label != null) {
-            builder.add(new DepAndRdep(rdeps.getKey(), rdep));
-          }
-        }
-      }
-
-      return new Visit(
-          /*keysToUseForResult=*/ filteredUniqueKeys, /*keysToVisit=*/ builder.build());
-    }
-
     @Override
     protected void processPartialResults(
         Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
@@ -352,21 +322,17 @@
       }
     }
 
-    @Override
-    protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) {
-      return Iterables.transform(keys, key -> new DepAndRdep(null, key));
-    }
+    protected abstract SkyKey getRdepOfVisit(T visit);
 
     @Override
-    protected Iterable<Task> getVisitTasks(Collection<DepAndRdep> pendingKeysToVisit) {
-      // Group pending (dep, rdep) visits by the package of the rdep, since we'll be targetfying the
+    protected Iterable<Task> getVisitTasks(Collection<T> pendingVisits) {
+      // Group pending visitation by the package of the rdep, since we'll be targetfying the
       // rdep during the visitation.
-      ListMultimap<PackageIdentifier, DepAndRdep> visitsByPackage =
-          ArrayListMultimap.create();
-      for (DepAndRdep depAndRdep : pendingKeysToVisit) {
-        Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(depAndRdep.rdep);
+      ListMultimap<PackageIdentifier, T> visitsByPackage = ArrayListMultimap.create();
+      for (T visit : pendingVisits) {
+        Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(getRdepOfVisit(visit));
         if (label != null) {
-          visitsByPackage.put(label.getPackageIdentifier(), depAndRdep);
+          visitsByPackage.put(label.getPackageIdentifier(), visit);
         }
       }
 
@@ -377,13 +343,162 @@
       //      want.
       // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
       //      accidentally retaining the entire ArrayListMultimap object.
-      for (Iterable<DepAndRdep> depAndRdepBatch :
+      for (Iterable<T> visitBatch :
           Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
-        builder.add(new VisitTask(depAndRdepBatch));
+        builder.add(new VisitTask(visitBatch));
       }
 
       return builder.build();
     }
+  }
+
+  /**
+   * A helper class that computes unbounded 'allrdeps(<expr>)' or
+   * 'rdeps(<precomputed-universe>, <expr>)' via BFS.
+   *
+   * <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with
+   * targetification of reverse deps until they are needed. The rdep node itself is needed to filter
+   * out disallowed deps later. Compared against the approach using a single SkyKey, it consumes 16
+   * more bytes in a 64-bit environment for each edge. However it defers the need to load all the
+   * packages which have at least a target as a rdep of the current batch, thus greatly reduces the
+   * risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M
+   * edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC.
+   */
+  private static class RdepsUnboundedVisitor extends AbstractRdepsVisitor<DepAndRdep> {
+    /**
+     * A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which
+     * actually isn't that useful. See the method javadoc.
+     */
+    private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
+    /**
+     * A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might
+     * mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep}
+     * visitation may actually be invalid. Because the same rdep can be reached through more than
+     * one reverse edge, it'd be incorrect to naively dedupe visitations solely based on the rdep.
+     */
+    private final Uniquifier<SkyKey> validRdepUniquifier;
+    private final Predicate<SkyKey> universe;
+
+    private RdepsUnboundedVisitor(
+        SkyQueryEnvironment env,
+        Uniquifier<DepAndRdep> depAndRdepUniquifier,
+        Uniquifier<SkyKey> validRdepUniquifier,
+        Predicate<SkyKey> universe,
+        Callback<Target> callback,
+        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+      super(env, callback, packageSemaphore);
+      this.depAndRdepUniquifier = depAndRdepUniquifier;
+      this.validRdepUniquifier = validRdepUniquifier;
+      this.universe = universe;
+    }
+
+    /**
+     * A {@link Factory} for {@link RdepsUnboundedVisitor} instances, each of which will be used
+     * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
+     * single {@link Callback#process} call. Note that all the created instances share the same
+     * {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
+     */
+    private static class Factory implements ParallelVisitor.Factory {
+      private final SkyQueryEnvironment env;
+      private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
+      private final Uniquifier<SkyKey> validRdepUniquifier;
+      private final Predicate<SkyKey> universe;
+      private final Callback<Target> callback;
+      private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+      private Factory(
+        SkyQueryEnvironment env,
+        Predicate<SkyKey> universe,
+        Callback<Target> callback,
+        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+        this.env = env;
+        this.universe = universe;
+        this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep);
+        this.validRdepUniquifier = env.createSkyKeyUniquifier();
+        this.callback = callback;
+        this.packageSemaphore = packageSemaphore;
+      }
+
+      @Override
+      public ParallelVisitor<DepAndRdep, Target> create() {
+        return new RdepsUnboundedVisitor(
+            env, depAndRdepUniquifier, validRdepUniquifier, universe, callback, packageSemaphore);
+      }
+    }
+
+    @Override
+    protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException {
+      Collection<SkyKey> validRdeps = new ArrayList<>();
+
+      // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
+      // disallowed deps.
+      Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
+      for (DepAndRdep depAndRdep : depAndRdeps) {
+        // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
+        if (depAndRdep.dep == null) {
+          validRdeps.add(depAndRdep.rdep);
+        } else {
+          reverseDepMultimap.put(depAndRdep.dep, depAndRdep.rdep);
+        }
+      }
+
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+          env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
+      Set<PackageIdentifier> pkgIdsNeededForTargetification =
+          packageKeyToTargetKeyMap
+              .keySet()
+              .stream()
+              .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+              .collect(toImmutableSet());
+      packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
+
+      try {
+        // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+        // want to retrieve the rdeps of unwanted nodes (targets).
+        if (!reverseDepMultimap.isEmpty()) {
+          Collection<Target> filteredTargets =
+              env.filterRawReverseDepsOfTransitiveTraversalKeys(
+                  reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
+          filteredTargets
+              .stream()
+              .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
+              .forEachOrdered(validRdeps::add);
+        }
+      } finally {
+        packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
+      }
+
+      ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
+          .filter(validRdepUniquifier::unique)
+          .collect(ImmutableList.toImmutableList());
+
+      // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
+      // recursive visitation.
+      ImmutableList.Builder<DepAndRdep> depAndRdepsToVisitBuilder = ImmutableList.builder();
+      env.graph.getReverseDeps(uniqueValidRdeps).entrySet()
+          .forEach(reverseDepsEntry -> depAndRdepsToVisitBuilder.addAll(
+              Iterables.transform(
+                  Iterables.filter(
+                      reverseDepsEntry.getValue(),
+                      Predicates.and(SkyQueryEnvironment.IS_TTV, universe)),
+                  rdep -> new DepAndRdep(reverseDepsEntry.getKey(), rdep))));
+
+      return new Visit(
+          /*keysToUseForResult=*/ uniqueValidRdeps,
+          /*keysToVisit=*/ depAndRdepsToVisitBuilder.build());
+    }
+
+    @Override
+    protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) {
+      return Iterables.transform(
+          Iterables.filter(keys, k -> universe.apply(k)),
+          key -> new DepAndRdep(null, key));
+    }
+
+    @Override
+    protected SkyKey getRdepOfVisit(DepAndRdep visit) {
+      return visit.rdep;
+    }
 
     @Override
     protected ImmutableList<DepAndRdep> getUniqueValues(Iterable<DepAndRdep> depAndRdeps) {
@@ -396,7 +511,7 @@
       // Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as
       // to not be depending on implementation details of 'ParallelVisitor'.
       //
-      // Even so, there's value in not visiting a rdep if it's already been visiting *validly*
+      // Even so, there's value in not visiting a rdep if it's already been visited *validly*
       // before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this.
       return depAndRdepUniquifier.unique(
           Iterables.filter(
@@ -404,5 +519,278 @@
               depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep)));
     }
   }
+
+  private static class DepAndRdepAtDepth {
+    private final DepAndRdep depAndRdep;
+    private final int rdepDepth;
+
+    private DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) {
+      this.depAndRdep = depAndRdep;
+      this.rdepDepth = rdepDepth;
+    }
+  }
+
+  /**
+   * A helper class that computes bounded 'allrdeps(<expr>, <depth>)' or
+   * 'rdeps(<precomputed-universe>, <expr>, <depth>)' via BFS.
+   *
+   * <p>This is very similar to {@link RdepsUnboundedVisitor}. A lot of the same concerns apply here
+   * but there are additional subtle concerns about the correctness of the bounded traversal: just
+   * like for the sequential implementation of bounded allrdeps, we use {@link MinDepthUniquifier}.
+   */
+  private static class RdepsBoundedVisitor extends AbstractRdepsVisitor<DepAndRdepAtDepth> {
+    private final int depth;
+    private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
+    private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
+    private final Predicate<SkyKey> universe;
+
+    private RdepsBoundedVisitor(
+        SkyQueryEnvironment env,
+        int depth,
+        Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier,
+        MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier,
+        Predicate<SkyKey> universe,
+        Callback<Target> callback,
+        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+      super(env, callback, packageSemaphore);
+      this.depth = depth;
+      this.depAndRdepAtDepthUniquifier = depAndRdepAtDepthUniquifier;
+      this.validRdepMinDepthUniquifier = validRdepMinDepthUniquifier;
+      this.universe = universe;
+    }
+
+    private static class Factory implements ParallelVisitor.Factory {
+      private final SkyQueryEnvironment env;
+      private final int depth;
+      private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
+      private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
+      private final Predicate<SkyKey> universe;
+      private final Callback<Target> callback;
+      private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+      private Factory(
+          SkyQueryEnvironment env,
+          int depth,
+          Predicate<SkyKey> universe,
+          Callback<Target> callback,
+          MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+        this.env = env;
+        this.depth = depth;
+        this.universe = universe;
+        this.depAndRdepAtDepthUniquifier =
+            new UniquifierImpl<>(depAndRdepAtDepth -> depAndRdepAtDepth);
+        this.validRdepMinDepthUniquifier = env.createMinDepthSkyKeyUniquifier();
+        this.callback = callback;
+        this.packageSemaphore = packageSemaphore;
+      }
+
+      @Override
+      public ParallelVisitor<DepAndRdepAtDepth, Target> create() {
+        return new RdepsBoundedVisitor(
+            env,
+            depth,
+            depAndRdepAtDepthUniquifier,
+            validRdepMinDepthUniquifier,
+            universe,
+            callback,
+            packageSemaphore);
+      }
+    }
+
+    @Override
+    protected Visit getVisitResult(Iterable<DepAndRdepAtDepth> depAndRdepAtDepths)
+        throws InterruptedException {
+      Map<SkyKey, Integer> shallowestRdepDepthMap = new HashMap<>();
+      depAndRdepAtDepths.forEach(
+          depAndRdepAtDepth -> shallowestRdepDepthMap.merge(
+              depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth, Integer::min));
+
+      Collection<SkyKey> validRdeps = new ArrayList<>();
+
+      // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
+      // disallowed deps.
+      Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
+      for (DepAndRdepAtDepth depAndRdepAtDepth : depAndRdepAtDepths) {
+        // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
+        if (depAndRdepAtDepth.depAndRdep.dep == null) {
+          validRdeps.add(depAndRdepAtDepth.depAndRdep.rdep);
+        } else {
+          reverseDepMultimap.put(
+              depAndRdepAtDepth.depAndRdep.dep, depAndRdepAtDepth.depAndRdep.rdep);
+        }
+      }
+
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+          env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
+      Set<PackageIdentifier> pkgIdsNeededForTargetification =
+          packageKeyToTargetKeyMap
+              .keySet()
+              .stream()
+              .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+              .collect(toImmutableSet());
+      packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
+
+      try {
+        // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+        // want to retrieve the rdeps of unwanted nodes (targets).
+        if (!reverseDepMultimap.isEmpty()) {
+          Collection<Target> filteredTargets =
+              env.filterRawReverseDepsOfTransitiveTraversalKeys(
+                  reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
+          filteredTargets
+              .stream()
+              .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
+              .forEachOrdered(validRdeps::add);
+        }
+      } finally {
+        packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
+      }
+
+      ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
+          .filter(validRdep -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(
+              validRdep, shallowestRdepDepthMap.get(validRdep)))
+          .collect(ImmutableList.toImmutableList());
+
+      // Don't bother getting the rdeps of the rdeps that are already at the depth bound.
+      Iterable<SkyKey> uniqueValidRdepsBelowDepthBound = Iterables.filter(
+          uniqueValidRdeps,
+          uniqueValidRdep -> shallowestRdepDepthMap.get(uniqueValidRdep) < depth);
+
+      // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
+      // recursive visitation.
+      Map<SkyKey, Iterable<SkyKey>> unfilteredRdepsOfRdeps =
+          env.graph.getReverseDeps(uniqueValidRdepsBelowDepthBound);
+
+      ImmutableList.Builder<DepAndRdepAtDepth> depAndRdepAtDepthsToVisitBuilder =
+          ImmutableList.builder();
+      unfilteredRdepsOfRdeps.entrySet().forEach(entry -> {
+        SkyKey rdep = entry.getKey();
+        int depthOfRdepOfRdep = shallowestRdepDepthMap.get(rdep) + 1;
+        Streams.stream(entry.getValue())
+            .filter(Predicates.and(SkyQueryEnvironment.IS_TTV, universe))
+            .forEachOrdered(rdepOfRdep -> {
+          depAndRdepAtDepthsToVisitBuilder.add(
+              new DepAndRdepAtDepth(new DepAndRdep(rdep, rdepOfRdep), depthOfRdepOfRdep));
+        });
+      });
+
+      return new Visit(
+          /*keysToUseForResult=*/ uniqueValidRdeps,
+          /*keysToVisit=*/ depAndRdepAtDepthsToVisitBuilder.build());
+    }
+
+    @Override
+    protected Iterable<DepAndRdepAtDepth> preprocessInitialVisit(Iterable<SkyKey> keys) {
+      return Iterables.transform(
+          Iterables.filter(keys, k -> universe.apply(k)),
+          key -> new DepAndRdepAtDepth(new DepAndRdep(null, key), 0));
+    }
+
+    @Override
+    protected SkyKey getRdepOfVisit(DepAndRdepAtDepth visit) {
+      return visit.depAndRdep.rdep;
+    }
+
+    @Override
+    protected ImmutableList<DepAndRdepAtDepth> getUniqueValues(
+        Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) {
+      // See the comment in RdepsUnboundedVisitor#getUniqueValues.
+      return depAndRdepAtDepthUniquifier.unique(
+          Iterables.filter(
+              depAndRdepAtDepths,
+              depAndRdepAtDepth -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualToPure(
+                  depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth)));
+    }
+  }
+
+  /** Helper class that computes DTC in the form of {@link SkyKey} via BFS. */
+  // TODO(nharmata): This should only be for the TTV-land DTC (i.e. only follow TTV -> TTV edges).
+  private static class TransitiveClosureVisitor extends ParallelVisitor<SkyKey, SkyKey> {
+    private final SkyQueryEnvironment env;
+    private final Uniquifier<SkyKey> uniquifier;
+
+    private TransitiveClosureVisitor(
+        SkyQueryEnvironment env,
+        Uniquifier<SkyKey> uniquifier,
+        int processResultsBatchSize,
+        AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
+      super(aggregateAllCallback, VISIT_BATCH_SIZE, processResultsBatchSize);
+      this.env = env;
+      this.uniquifier = uniquifier;
+    }
+
+    private static class Factory implements ParallelVisitor.Factory {
+      private final SkyQueryEnvironment env;
+      private final Uniquifier<SkyKey> uniquifier;
+      private final AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback;
+      private final int processResultsBatchSize;
+
+      private Factory(
+          SkyQueryEnvironment env,
+          Uniquifier<SkyKey> uniquifier,
+          int processResultsBatchSize,
+          AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
+        this.env = env;
+        this.uniquifier = uniquifier;
+        this.processResultsBatchSize = processResultsBatchSize;
+        this.aggregateAllCallback = aggregateAllCallback;
+      }
+
+      @Override
+      public ParallelVisitor<SkyKey, SkyKey> create() {
+        return new TransitiveClosureVisitor(
+            env, uniquifier, processResultsBatchSize, aggregateAllCallback);
+      }
+    }
+
+    @Override
+    protected void processPartialResults(
+        Iterable<SkyKey> keysToUseForResult, Callback<SkyKey> callback)
+        throws QueryException, InterruptedException {
+      callback.process(keysToUseForResult);
+    }
+
+    @Override
+    protected Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException {
+      Multimap<SkyKey, SkyKey> deps = env.getDirectDepsOfSkyKeys(values);
+      return new Visit(
+          /*keysToUseForResult=*/ deps.keySet(),
+          /*keysToVisit=*/ ImmutableSet.copyOf(deps.values()));
+    }
+
+    @Override
+    protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) {
+      return keys;
+    }
+
+    @Override
+    protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) {
+      return uniquifier.unique(values);
+    }
+  }
+
+  /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */
+  @ThreadSafe
+  private static class ThreadSafeAggregateAllSkyKeysCallback
+      implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> {
+
+    private final Set<SkyKey> results;
+
+    private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) {
+      this.results =
+          Collections.newSetFromMap(new MapMaker().concurrencyLevel(concurrencyLevel).makeMap());
+    }
+
+    @Override
+    public void process(Iterable<SkyKey> partialResult)
+        throws QueryException, InterruptedException {
+      Iterables.addAll(results, partialResult);
+    }
+
+    @Override
+    public ImmutableSet<SkyKey> getResult() {
+      return ImmutableSet.copyOf(results);
+    }
+  }
 }