Fix bug in SkyQuery's parallel allrdeps with --nohost_deps.

The was that the presence of invalid dependency edges would incorrectly cause 'allrdeps' to not visit targets. Concrete example: if --nohost_deps was set and T1 depended on T via a host edge and T2 depended on T via a non-host edge then 'allrdeps(T)' might incorrectly not contain T2.

Along with fixing the bug, refactor ParallelVisitor's deduping logic: instead of deduping visitations inside #getVisitResult, dedupe them before adding them to #processingQueue. This should be a strict, small, performance win, especially for trivial visitations (e.g. RBuildFilesVisitor).

RELNOTES: None
PiperOrigin-RevId: 192327607
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 57485d9..585b39d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -16,7 +16,6 @@
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -32,18 +31,20 @@
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.skyframe.PackageValue;
 import com.google.devtools.build.lib.skyframe.SkyFunctions;
-import com.google.devtools.build.lib.util.Pair;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.skyframe.SkyKey;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import javax.annotation.Nullable;
 
 /**
  * Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
@@ -54,7 +55,7 @@
  * in memory at any given time.
  */
 // TODO(bazel-team): Be more deliberate about bounding memory usage here.
-class ParallelSkyQueryUtils {
+public class ParallelSkyQueryUtils {
 
   /** The maximum number of keys to visit at once. */
   @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000;
@@ -90,8 +91,27 @@
     visitor.visitAndWaitForCompletion(env.getFileStateKeysForFileFragments(fileIdentifiers));
   }
 
+  /** A {@link ParallelVisitor} whose visitations occur on {@link SkyKey}s. */
+  public abstract static class AbstractSkyKeyParallelVisitor<T> extends ParallelVisitor<SkyKey, T> {
+    private final Uniquifier<SkyKey> uniquifier;
+
+    protected AbstractSkyKeyParallelVisitor(
+        Uniquifier<SkyKey> uniquifier,
+        Callback<T> callback,
+        int visitBatchSize,
+        int processResultsBatchSize) {
+      super(callback, visitBatchSize, processResultsBatchSize);
+      this.uniquifier = uniquifier;
+    }
+
+    @Override
+    protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) {
+      return uniquifier.unique(values);
+    }
+  }
+
   /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
-  private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey, Target> {
+  private static class RBuildFilesVisitor extends AbstractSkyKeyParallelVisitor<Target> {
     // Each target in the full output of 'rbuildfiles' corresponds to BUILD file InputFile of a
     // unique package. So the processResultsBatchSize we choose to pass to the ParallelVisitor ctor
     // influences how many packages each leaf task doing processPartialResults will have to
@@ -143,31 +163,72 @@
     }
   }
 
+  private static class DepAndRdep {
+    @Nullable
+    private final SkyKey dep;
+    private final SkyKey rdep;
+
+    private DepAndRdep(@Nullable SkyKey dep, SkyKey rdep) {
+      this.dep = dep;
+      this.rdep = rdep;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof DepAndRdep)) {
+        return false;
+      }
+      DepAndRdep other = (DepAndRdep) obj;
+      return Objects.equals(dep, other.dep) && rdep.equals(other.rdep);
+    }
+
+    @Override
+    public int hashCode() {
+      // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g.
+      // Objects#hash). Depending on the structure of the graph being traversed, this method can
+      // be very hot.
+      return 31 * Objects.hashCode(dep) + rdep.hashCode();
+    }
+  }
+
   /**
    * A helper class that computes 'allrdeps(<blah>)' via BFS.
    *
-   * <p>The visitor uses a pair of <node, reverse dep> to keep track the nodes to visit and avoid
-   * dealing with targetification of reverse deps until they are needed. The node itself is needed
-   * to filter out disallowed deps later. Compared against the approach using a single SkyKey, it
-   * consumes 16 more bytes in a 64-bit environment for each edge. However it defers the need to
-   * load all the packages which have at least a target as a rdep of the current batch, thus greatly
-   * reduces the risk of OOMs. The additional memory usage should not be a large concern here, as
-   * even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by
-   * regular GC.
+   * <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with
+   * targetification of reverse deps until they are needed. The rdep node itself is needed to filter
+   * out disallowed deps later. Compared against the approach using a single SkyKey, it consumes 16
+   * more bytes in a 64-bit environment for each edge. However it defers the need to load all the
+   * packages which have at least a target as a rdep of the current batch, thus greatly reduces the
+   * risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M
+   * edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC.
    */
-  private static class AllRdepsUnboundedVisitor
-      extends ParallelVisitor<Pair<SkyKey, SkyKey>, Target> {
+  private static class AllRdepsUnboundedVisitor extends ParallelVisitor<DepAndRdep, Target> {
     private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
     private final SkyQueryEnvironment env;
     private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+    /**
+     * A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which
+     * actually isn't that useful. See the method javadoc.
+     */
+    private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
+    /**
+     * A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might
+     * mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep}
+     * visitation may actually be invalid. Because the same rdep can be reached through more than
+     * one reverse edge, It'd be incorrectly to naively dedupe visitations solely based on the rdep.
+     */
+    private final Uniquifier<SkyKey> validRdepUniquifier;
 
     private AllRdepsUnboundedVisitor(
         SkyQueryEnvironment env,
-        Uniquifier<Pair<SkyKey, SkyKey>> uniquifier,
+        Uniquifier<DepAndRdep> depAndRdepUniquifier,
+        Uniquifier<SkyKey> validRdepUniquifier,
         Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
-      super(uniquifier, callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
+      super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
       this.env = env;
+      this.depAndRdepUniquifier = depAndRdepUniquifier;
+      this.validRdepUniquifier = validRdepUniquifier;
       this.packageSemaphore = packageSemaphore;
     }
 
@@ -179,7 +240,8 @@
      */
     private static class Factory implements ParallelVisitor.Factory {
       private final SkyQueryEnvironment env;
-      private final Uniquifier<Pair<SkyKey, SkyKey>> uniquifier;
+      private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
+      private final Uniquifier<SkyKey> validRdepUniquifier;
       private final Callback<Target> callback;
       private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
@@ -188,35 +250,35 @@
         Callback<Target> callback,
         MultisetSemaphore<PackageIdentifier> packageSemaphore) {
         this.env = env;
-        this.uniquifier = env.createReverseDepSkyKeyUniquifier();
+        this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep);
+        this.validRdepUniquifier = env.createSkyKeyUniquifier();
         this.callback = callback;
         this.packageSemaphore = packageSemaphore;
       }
 
       @Override
-      public ParallelVisitor<Pair<SkyKey, SkyKey>, Target> create() {
-        return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore);
+      public ParallelVisitor<DepAndRdep, Target> create() {
+        return new AllRdepsUnboundedVisitor(
+            env, depAndRdepUniquifier, validRdepUniquifier, callback, packageSemaphore);
       }
     }
 
     @Override
-    protected Visit getVisitResult(Iterable<Pair<SkyKey, SkyKey>> keys)
-        throws InterruptedException {
-      Collection<SkyKey> filteredKeys = new ArrayList<>();
+    protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException {
+      Collection<SkyKey> filteredUniqueKeys = new ArrayList<>();
 
       // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps.
       Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap();
-      for (Pair<SkyKey, SkyKey> reverseDepPair : keys) {
-        // First-level nodes do not have a parent node (they may have one in Skyframe but we do not
-        // need to retrieve them.
-        if (reverseDepPair.first == null) {
-          filteredKeys.add(Preconditions.checkNotNull(reverseDepPair.second));
+      for (DepAndRdep depAndRdep : depAndRdeps) {
+        // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
+        if (depAndRdep.dep == null && validRdepUniquifier.unique(depAndRdep.rdep)) {
+          filteredUniqueKeys.add(depAndRdep.rdep);
           continue;
         }
 
-        reverseDepsMap.computeIfAbsent(reverseDepPair.first, k -> new LinkedList<SkyKey>());
+        reverseDepsMap.computeIfAbsent(depAndRdep.dep, k -> new LinkedList<SkyKey>());
 
-        reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second);
+        reverseDepsMap.get(depAndRdep.dep).add(depAndRdep.rdep);
       }
 
       Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
@@ -239,27 +301,34 @@
           filteredTargets
               .stream()
               .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
-              .forEachOrdered(filteredKeys::add);
-        }
+              .forEachOrdered(
+                  rdep -> {
+                    if (validRdepUniquifier.unique(rdep)) {
+                      filteredUniqueKeys.add(rdep);
+                    }
+                  });
+          }
       } finally {
         packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
       }
 
       // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
       // recursive visitation.
-      Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys);
+      Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps =
+          env.graph.getReverseDeps(filteredUniqueKeys);
 
-      ImmutableList.Builder<Pair<SkyKey, SkyKey>> builder = ImmutableList.builder();
+      ImmutableList.Builder<DepAndRdep> builder = ImmutableList.builder();
       for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
         for (SkyKey rdep : rdeps.getValue()) {
           Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
           if (label != null) {
-            builder.add(Pair.of(rdeps.getKey(), rdep));
+            builder.add(new DepAndRdep(rdeps.getKey(), rdep));
           }
         }
       }
 
-      return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ builder.build());
+      return new Visit(
+          /*keysToUseForResult=*/ filteredUniqueKeys, /*keysToVisit=*/ builder.build());
     }
 
     @Override
@@ -284,19 +353,20 @@
     }
 
     @Override
-    protected Iterable<Pair<SkyKey, SkyKey>> preprocessInitialVisit(Iterable<SkyKey> keys) {
-      return Iterables.transform(keys, key -> Pair.of(null, key));
+    protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) {
+      return Iterables.transform(keys, key -> new DepAndRdep(null, key));
     }
 
     @Override
-    protected Iterable<Task> getVisitTasks(Collection<Pair<SkyKey, SkyKey>> pendingKeysToVisit) {
-      // Group pending visits by package.
-      ListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> visitsByPackage =
+    protected Iterable<Task> getVisitTasks(Collection<DepAndRdep> pendingKeysToVisit) {
+      // Group pending (dep, rdep) visits by the package of the rdep, since we'll be targetfying the
+      // rdep during the visitation.
+      ListMultimap<PackageIdentifier, DepAndRdep> visitsByPackage =
           ArrayListMultimap.create();
-      for (Pair<SkyKey, SkyKey> visit : pendingKeysToVisit) {
-        Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(visit.second);
+      for (DepAndRdep depAndRdep : pendingKeysToVisit) {
+        Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(depAndRdep.rdep);
         if (label != null) {
-          visitsByPackage.put(label.getPackageIdentifier(), visit);
+          visitsByPackage.put(label.getPackageIdentifier(), depAndRdep);
         }
       }
 
@@ -307,13 +377,32 @@
       //      want.
       // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
       //      accidentally retaining the entire ArrayListMultimap object.
-      for (Iterable<Pair<SkyKey, SkyKey>> keysToVisitBatch :
+      for (Iterable<DepAndRdep> depAndRdepBatch :
           Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
-        builder.add(new VisitTask(keysToVisitBatch));
+        builder.add(new VisitTask(depAndRdepBatch));
       }
 
       return builder.build();
     }
+
+    @Override
+    protected ImmutableList<DepAndRdep> getUniqueValues(Iterable<DepAndRdep> depAndRdeps) {
+      // See the javadoc for 'validRdepUniquifier'.
+      //
+      // N.B. - Except for the visitation roots, 'depAndRdepUniquifier' is actually completely
+      // unneeded in practice for ensuring literal unique {@link DepAndRdep} visitations. Valid rdep
+      // visitations are deduped in 'getVisitResult' using 'validRdepUniquifier', so there's
+      // actually no way the same DepAndRdep visitation can ever be returned from 'getVisitResult'.
+      // Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as
+      // to not be depending on implementation details of 'ParallelVisitor'.
+      //
+      // Even so, there's value in not visiting a rdep if it's already been visiting *validly*
+      // before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this.
+      return depAndRdepUniquifier.unique(
+          Iterables.filter(
+              depAndRdeps,
+              depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep)));
+    }
   }
 }