Fix bug with streaming bounded deps/allrdeps/rdeps.

--
PiperOrigin-RevId: 149431500
MOS_MIGRATED_REVID=149431500
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 3e1410f..9a5ffd4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -55,17 +55,21 @@
 import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.FunctionExpression;
+import com.google.devtools.build.lib.query2.engine.KeyExtractor;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeUniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.RdepsFunction;
 import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.TargetLiteral;
 import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeMinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
@@ -585,18 +589,26 @@
   }
 
   @ThreadSafe
+  @Override
+  public ThreadSafeMinDepthUniquifier<Target> createMinDepthUniquifier() {
+    return new ThreadSafeMinDepthUniquifierImpl<>(
+        TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  }
+
+  @ThreadSafe
   ThreadSafeUniquifier<Target> createTargetUniquifier() {
-    return new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+    return new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
   ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
-    return new ThreadSafeSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+    return new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
   ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
-    return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+    return new ThreadSafeUniquifierImpl<>(
+        ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -1034,7 +1046,8 @@
   void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
       throws QueryException, InterruptedException {
     Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
-    Uniquifier<SkyKey> keyUniquifier = new ThreadSafeSkyKeyUniquifier(/*concurrencyLevel=*/ 1);
+    Uniquifier<SkyKey> keyUniquifier =
+        new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
     Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
     Set<SkyKey> resultKeys = CompactHashSet.create();
     while (!current.isEmpty()) {
@@ -1093,42 +1106,31 @@
     }
   }
 
-  private static class ThreadSafeTargetUniquifier
-      extends AbstractThreadSafeUniquifier<Target, Label> {
-    protected ThreadSafeTargetUniquifier(int concurrencyLevel) {
-      super(concurrencyLevel);
+  private static class SkyKeyKeyExtractor implements KeyExtractor<SkyKey, SkyKey> {
+    private static final SkyKeyKeyExtractor INSTANCE = new SkyKeyKeyExtractor();
+
+    private SkyKeyKeyExtractor() {
     }
 
     @Override
-    protected Label extractKey(Target element) {
-      return element.getLabel();
-    }
-  }
-
-  private static class ThreadSafeSkyKeyUniquifier
-      extends AbstractThreadSafeUniquifier<SkyKey, SkyKey> {
-    protected ThreadSafeSkyKeyUniquifier(int concurrencyLevel) {
-      super(concurrencyLevel);
-    }
-
-    @Override
-    protected SkyKey extractKey(SkyKey element) {
+    public SkyKey extractKey(SkyKey element) {
       return element;
     }
   }
 
   /**
-   * A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second
-   * element (reverse dep).
+   * A {@link KeyExtractor} which takes a pair of parent and reverse dep, and uses the second
+   * element (reverse dep) as the key.
    */
-  private static class ThreadSafeReverseDepSkyKeyUniquifier
-      extends AbstractThreadSafeUniquifier<Pair<SkyKey, SkyKey>, SkyKey> {
-    protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) {
-      super(concurrencyLevel);
+  private static class ReverseDepSkyKeyKeyExtractor
+      implements KeyExtractor<Pair<SkyKey, SkyKey>, SkyKey> {
+    private static final ReverseDepSkyKeyKeyExtractor INSTANCE = new ReverseDepSkyKeyKeyExtractor();
+
+    private ReverseDepSkyKeyKeyExtractor() {
     }
 
     @Override
-    protected SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
+    public SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
       return element.second;
     }
   }
@@ -1152,7 +1154,7 @@
 
     private final OutputFormatterCallback<Target> callback;
     private final ThreadSafeUniquifier<Target> uniquifier =
-        new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+        new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
@@ -1238,27 +1240,27 @@
       int depth,
       int batchSize)
       throws QueryException, InterruptedException {
-    Uniquifier<Target> uniquifier = createUniquifier();
+    MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
     eval(
         expression,
         context,
-        new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize));
+        new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
   }
 
   private class BatchAllRdepsCallback implements Callback<Target> {
-    private final Uniquifier<Target> uniquifier;
+    private final MinDepthUniquifier<Target> minDepthUniquifier;
     private final Predicate<Target> universe;
     private final Callback<Target> callback;
     private final int depth;
     private final int batchSize;
 
     private BatchAllRdepsCallback(
-        Uniquifier<Target> uniquifier,
+        MinDepthUniquifier<Target> minDepthUniquifier,
         Predicate<Target> universe,
         Callback<Target> callback,
         int depth,
         int batchSize) {
-      this.uniquifier = uniquifier;
+      this.minDepthUniquifier = minDepthUniquifier;
       this.universe = universe;
       this.callback = callback;
       this.depth = depth;
@@ -1268,7 +1270,8 @@
     @Override
     public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
       Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
-      ImmutableList<Target> uniqueTargets = uniquifier.unique(currentInUniverse);
+      ImmutableList<Target> uniqueTargets =
+          minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0);
       callback.process(uniqueTargets);
 
       // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
@@ -1283,7 +1286,7 @@
       // processed by callback, the targets are dequeued and not referenced any more, making
       // them available for garbage collection.
 
-      for (int i = 0; i < depth; i++) {
+      for (int curDepth = 1; curDepth <= depth; curDepth++) {
         // The mappings between nodes and their reverse deps must be preserved instead of the
         // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
         // check if their allowed deps contain the dependencies.
@@ -1321,7 +1324,8 @@
               // separately.
               for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
                 reverseDepsMap.put(entry.getKey(), subList);
-                processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+                processReverseDepsMap(
+                    minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
               }
 
               reverseDepsQueue.poll();
@@ -1329,14 +1333,16 @@
             } else {
               // There are some nodes in the pending process list. Process them first and come
               // back to this node later (in next iteration).
-              processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+              processReverseDepsMap(
+                  minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
               batch = 0;
             }
           }
         }
 
         if (!reverseDepsMap.isEmpty()) {
-          processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+          processReverseDepsMap(
+              minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
         }
 
         // If the queue is empty after all nodes in the current level are processed, stop
@@ -1352,14 +1358,16 @@
      * list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
      */
     private void processReverseDepsMap(
-        Uniquifier<Target> uniquifier,
+        MinDepthUniquifier<Target> minDepthUniquifier,
         Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
         Callback<Target> callback,
-        Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue)
+        Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue,
+        int depth)
         throws QueryException, InterruptedException {
       Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap));
       Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
-      ImmutableList<Target> uniqueChildren = uniquifier.unique(currentInUniverse);
+      ImmutableList<Target> uniqueChildren =
+          minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth);
       reverseDepsMap.clear();
 
       if (!uniqueChildren.isEmpty()) {