Fix bug with streaming bounded deps/allrdeps/rdeps.

--
PiperOrigin-RevId: 149431500
MOS_MIGRATED_REVID=149431500
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
index 644adcf..307e73c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
@@ -24,6 +24,7 @@
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.packages.DependencyFilter;
 import com.google.devtools.build.lib.packages.Target;
+import com.google.devtools.build.lib.query2.engine.KeyExtractor;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
@@ -294,4 +295,17 @@
   public QueryExpressionEvalListener<T> getEvalListener() {
     return evalListener;
   }
+
+  /** A {@link KeyExtractor} that extracts {@code Label}s out of {@link Target}s. */
+  protected static class TargetKeyExtractor implements KeyExtractor<Target, Label> {
+    protected static final TargetKeyExtractor INSTANCE = new TargetKeyExtractor();
+
+    private TargetKeyExtractor() {
+    }
+
+    @Override
+    public Label extractKey(Target element) {
+      return element.getLabel();
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
index 867ec3d..e9a9dca 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
@@ -37,14 +37,16 @@
 import com.google.devtools.build.lib.pkgcache.TransitivePackageLoader;
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.DigraphQueryEvalResult;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryUtil;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException;
 import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
@@ -302,12 +304,13 @@
 
   @Override
   public Uniquifier<Target> createUniquifier() {
-    return new AbstractUniquifier<Target, Label>() {
-      @Override
-      protected Label extractKey(Target target) {
-        return target.getLabel();
-      }
-    };
+    return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
+  }
+
+  @Override
+  public MinDepthUniquifier<Target> createMinDepthUniquifier() {
+    return new ThreadSafeMinDepthUniquifierImpl<>(
+        TargetKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
   }
 
   private void preloadTransitiveClosure(Set<Target> targets, int maxDepth)
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 3e1410f..9a5ffd4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -55,17 +55,21 @@
 import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.FunctionExpression;
+import com.google.devtools.build.lib.query2.engine.KeyExtractor;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeUniquifierImpl;
 import com.google.devtools.build.lib.query2.engine.RdepsFunction;
 import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.TargetLiteral;
 import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeMinDepthUniquifier;
 import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
@@ -585,18 +589,26 @@
   }
 
   @ThreadSafe
+  @Override
+  public ThreadSafeMinDepthUniquifier<Target> createMinDepthUniquifier() {
+    return new ThreadSafeMinDepthUniquifierImpl<>(
+        TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  }
+
+  @ThreadSafe
   ThreadSafeUniquifier<Target> createTargetUniquifier() {
-    return new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+    return new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
   ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
-    return new ThreadSafeSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+    return new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
   ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
-    return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+    return new ThreadSafeUniquifierImpl<>(
+        ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
   }
 
   private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -1034,7 +1046,8 @@
   void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
       throws QueryException, InterruptedException {
     Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
-    Uniquifier<SkyKey> keyUniquifier = new ThreadSafeSkyKeyUniquifier(/*concurrencyLevel=*/ 1);
+    Uniquifier<SkyKey> keyUniquifier =
+        new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
     Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
     Set<SkyKey> resultKeys = CompactHashSet.create();
     while (!current.isEmpty()) {
@@ -1093,42 +1106,31 @@
     }
   }
 
-  private static class ThreadSafeTargetUniquifier
-      extends AbstractThreadSafeUniquifier<Target, Label> {
-    protected ThreadSafeTargetUniquifier(int concurrencyLevel) {
-      super(concurrencyLevel);
+  private static class SkyKeyKeyExtractor implements KeyExtractor<SkyKey, SkyKey> {
+    private static final SkyKeyKeyExtractor INSTANCE = new SkyKeyKeyExtractor();
+
+    private SkyKeyKeyExtractor() {
     }
 
     @Override
-    protected Label extractKey(Target element) {
-      return element.getLabel();
-    }
-  }
-
-  private static class ThreadSafeSkyKeyUniquifier
-      extends AbstractThreadSafeUniquifier<SkyKey, SkyKey> {
-    protected ThreadSafeSkyKeyUniquifier(int concurrencyLevel) {
-      super(concurrencyLevel);
-    }
-
-    @Override
-    protected SkyKey extractKey(SkyKey element) {
+    public SkyKey extractKey(SkyKey element) {
       return element;
     }
   }
 
   /**
-   * A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second
-   * element (reverse dep).
+   * A {@link KeyExtractor} which takes a pair of parent and reverse dep, and uses the second
+   * element (reverse dep) as the key.
    */
-  private static class ThreadSafeReverseDepSkyKeyUniquifier
-      extends AbstractThreadSafeUniquifier<Pair<SkyKey, SkyKey>, SkyKey> {
-    protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) {
-      super(concurrencyLevel);
+  private static class ReverseDepSkyKeyKeyExtractor
+      implements KeyExtractor<Pair<SkyKey, SkyKey>, SkyKey> {
+    private static final ReverseDepSkyKeyKeyExtractor INSTANCE = new ReverseDepSkyKeyKeyExtractor();
+
+    private ReverseDepSkyKeyKeyExtractor() {
     }
 
     @Override
-    protected SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
+    public SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
       return element.second;
     }
   }
@@ -1152,7 +1154,7 @@
 
     private final OutputFormatterCallback<Target> callback;
     private final ThreadSafeUniquifier<Target> uniquifier =
-        new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+        new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
@@ -1238,27 +1240,27 @@
       int depth,
       int batchSize)
       throws QueryException, InterruptedException {
-    Uniquifier<Target> uniquifier = createUniquifier();
+    MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
     eval(
         expression,
         context,
-        new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize));
+        new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
   }
 
   private class BatchAllRdepsCallback implements Callback<Target> {
-    private final Uniquifier<Target> uniquifier;
+    private final MinDepthUniquifier<Target> minDepthUniquifier;
     private final Predicate<Target> universe;
     private final Callback<Target> callback;
     private final int depth;
     private final int batchSize;
 
     private BatchAllRdepsCallback(
-        Uniquifier<Target> uniquifier,
+        MinDepthUniquifier<Target> minDepthUniquifier,
         Predicate<Target> universe,
         Callback<Target> callback,
         int depth,
         int batchSize) {
-      this.uniquifier = uniquifier;
+      this.minDepthUniquifier = minDepthUniquifier;
       this.universe = universe;
       this.callback = callback;
       this.depth = depth;
@@ -1268,7 +1270,8 @@
     @Override
     public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
       Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
-      ImmutableList<Target> uniqueTargets = uniquifier.unique(currentInUniverse);
+      ImmutableList<Target> uniqueTargets =
+          minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0);
       callback.process(uniqueTargets);
 
       // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
@@ -1283,7 +1286,7 @@
       // processed by callback, the targets are dequeued and not referenced any more, making
       // them available for garbage collection.
 
-      for (int i = 0; i < depth; i++) {
+      for (int curDepth = 1; curDepth <= depth; curDepth++) {
         // The mappings between nodes and their reverse deps must be preserved instead of the
         // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
         // check if their allowed deps contain the dependencies.
@@ -1321,7 +1324,8 @@
               // separately.
               for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
                 reverseDepsMap.put(entry.getKey(), subList);
-                processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+                processReverseDepsMap(
+                    minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
               }
 
               reverseDepsQueue.poll();
@@ -1329,14 +1333,16 @@
             } else {
               // There are some nodes in the pending process list. Process them first and come
               // back to this node later (in next iteration).
-              processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+              processReverseDepsMap(
+                  minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
               batch = 0;
             }
           }
         }
 
         if (!reverseDepsMap.isEmpty()) {
-          processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+          processReverseDepsMap(
+              minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
         }
 
         // If the queue is empty after all nodes in the current level are processed, stop
@@ -1352,14 +1358,16 @@
      * list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
      */
     private void processReverseDepsMap(
-        Uniquifier<Target> uniquifier,
+        MinDepthUniquifier<Target> minDepthUniquifier,
         Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
         Callback<Target> callback,
-        Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue)
+        Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue,
+        int depth)
         throws QueryException, InterruptedException {
       Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap));
       Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
-      ImmutableList<Target> uniqueChildren = uniquifier.unique(currentInUniverse);
+      ImmutableList<Target> uniqueChildren =
+          minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth);
       reverseDepsMap.clear();
 
       if (!uniqueChildren.isEmpty()) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index 518b674..ca3db15 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -74,7 +74,7 @@
       ((StreamableQueryEnvironment<T>) env)
           .getAllRdeps(args.get(0).getExpression(), universe, context, callback, depth);
     } else {
-      final Uniquifier<T> uniquifier = env.createUniquifier();
+      final MinDepthUniquifier<T> minDepthUniquifier = env.createMinDepthUniquifier();
       env.eval(
           args.get(0).getExpression(),
           context,
@@ -91,7 +91,8 @@
                 // Filter already visited nodes: if we see a node in a later round, then we don't
                 // need to visit it again, because the depth at which we see it must be greater
                 // than or equal to the last visit.
-                next.addAll(env.getReverseDeps(uniquifier.unique(currentInUniverse)));
+                next.addAll(env.getReverseDeps(
+                    minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, i)));
                 callback.process(currentInUniverse);
                 if (next.isEmpty()) {
                   // Exit when there are no more nodes to visit.
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
index 5eca701..0e618fb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
@@ -60,7 +60,7 @@
       List<Argument> args,
       final Callback<T> callback) throws QueryException, InterruptedException {
     final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
-    final Uniquifier<T> uniquifier = env.createUniquifier();
+    final MinDepthUniquifier<T> minDepthUniquifier = env.createMinDepthUniquifier();
     env.eval(args.get(0).getExpression(), context, new Callback<T>() {
       @Override
       public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
@@ -72,7 +72,8 @@
           // Filter already visited nodes: if we see a node in a later round, then we don't need to
           // visit it again, because the depth at which we see it at must be greater than or equal
           // to the last visit.
-          ImmutableList<T> toProcess = uniquifier.unique(current);
+          ImmutableList<T> toProcess =
+              minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(current, i);
           callback.process(toProcess);
           current = ImmutableList.copyOf(env.getFwdDeps(toProcess));
           if (current.isEmpty()) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/KeyExtractor.java b/src/main/java/com/google/devtools/build/lib/query2/engine/KeyExtractor.java
new file mode 100644
index 0000000..37b9ff8
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/KeyExtractor.java
@@ -0,0 +1,27 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
+/**
+ * Helper for extracting a key of type {@code K} from an element of type {@code T}.
+ *
+ * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
+ */
+@ThreadSafe
+public interface KeyExtractor<T, K> {
+  /** Extracts an unique key that can be used to dedupe the given {@code element}. */
+  K extractKey(T element);
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
new file mode 100644
index 0000000..8617004
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
@@ -0,0 +1,29 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A helper for deduping values that have already been seen at certain "depths".
+ *
+ * <p>This is similar to {@link Uniquifier}.
+ */
+public interface MinDepthUniquifier<T> {
+  /**
+   * Returns the subset of {@code newElements} that haven't been seen before at depths less than or
+   * equal to {@code depth}
+   */
+  ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(Iterable<T> newElements, int depth);
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
index bcee46f..ce0ae83 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
@@ -213,12 +213,20 @@
       throws QueryException, InterruptedException;
 
   /**
-   * Creates a Uniquifier for use in a {@code QueryExpression}. Note that the usage of this an
+   * Creates a Uniquifier for use in a {@code QueryExpression}. Note that the usage of this
    * uniquifier should not be used for returning unique results to the parent callback. It should
    * only be used to avoid processing the same elements multiple times within this QueryExpression.
    */
   Uniquifier<T> createUniquifier();
 
+  /**
+   * Creates a {@link MinDepthUniquifier} for use in a {@code QueryExpression}. Note that the usage
+   * of this uniquifier should not be used for returning unique results to the parent callback. It
+   * should only be used to try to avoid processing the same elements multiple times at the same
+   * depth bound within this QueryExpression.
+   */
+  MinDepthUniquifier<T> createMinDepthUniquifier();
+
   void reportBuildFileError(QueryExpression expression, String msg) throws QueryException;
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index 2d7be2e..a558e45 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -13,13 +13,14 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.engine;
 
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapMaker;
 import com.google.devtools.build.lib.collect.CompactHashSet;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Several query utilities to make easier to work with query callbacks and uniquifiers. */
 public final class QueryUtil {
@@ -86,47 +87,6 @@
     return callback.getResult();
   }
 
-  /** A trivial {@link Uniquifier} base class. */
-  public abstract static class AbstractUniquifier<T, K>
-      extends AbstractUniquifierBase<T, K> {
-    private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
-
-    @Override
-    public final boolean unique(T element) {
-      return alreadySeen.add(extractKey(element));
-    }
-
-    /**
-     * Extracts an unique key that can be used to dedupe the given {@code element}.
-     *
-     * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
-     */
-    protected abstract K extractKey(T element);
-  }
-
-  /** A trivial {@link ThreadSafeUniquifier} base class. */
-  public abstract static class AbstractThreadSafeUniquifier<T, K>
-      extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
-    private final Set<K> alreadySeen;
-
-    protected AbstractThreadSafeUniquifier(int concurrencyLevel) {
-      this.alreadySeen = Collections.newSetFromMap(
-          new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
-    }
-
-    @Override
-    public final boolean unique(T element) {
-      return alreadySeen.add(extractKey(element));
-    }
-
-    /**
-     * Extracts an unique key that can be used to dedupe the given {@code element}.
-     *
-     * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
-     */
-    protected abstract K extractKey(T element);
-  }
-
   private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> {
     @Override
     public final ImmutableList<T> unique(Iterable<T> newElements) {
@@ -139,4 +99,76 @@
       return result.build();
     }
   }
+
+  /** A trivial {@link Uniquifier} implementation. */
+  public static class UniquifierImpl<T, K> extends AbstractUniquifierBase<T, K> {
+    private final KeyExtractor<T, K> extractor;
+    private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
+
+    public UniquifierImpl(KeyExtractor<T, K> extractor) {
+      this.extractor = extractor;
+    }
+
+    @Override
+    public final boolean unique(T element) {
+      return alreadySeen.add(extractor.extractKey(element));
+    }
+  }
+
+  /** A trvial {@link ThreadSafeUniquifier} implementation. */
+  public static class ThreadSafeUniquifierImpl<T, K>
+      extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
+    private final KeyExtractor<T, K> extractor;
+    private final Set<K> alreadySeen;
+
+    public ThreadSafeUniquifierImpl(KeyExtractor<T, K> extractor, int concurrencyLevel) {
+      this.extractor = extractor;
+      this.alreadySeen = Collections.newSetFromMap(
+          new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
+    }
+
+    @Override
+    public final boolean unique(T element) {
+      return alreadySeen.add(extractor.extractKey(element));
+    }
+  }
+
+  /** A trivial {@link ThreadSafeMinDepthUniquifier} implementation. */
+  public static class ThreadSafeMinDepthUniquifierImpl<T, K>
+      implements ThreadSafeMinDepthUniquifier<T> {
+    private final KeyExtractor<T, K> extractor;
+    private final ConcurrentMap<K, AtomicInteger> alreadySeenAtDepth;
+
+    public ThreadSafeMinDepthUniquifierImpl(
+        KeyExtractor<T, K> extractor, int concurrencyLevel) {
+      this.extractor = extractor;
+      this.alreadySeenAtDepth = new MapMaker().concurrencyLevel(concurrencyLevel).makeMap();
+    }
+
+    @Override
+    public final ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(
+        Iterable<T> newElements, int depth) {
+      ImmutableList.Builder<T> result = ImmutableList.builder();
+      for (T element : newElements) {
+        AtomicInteger newDepth = new AtomicInteger(depth);
+        AtomicInteger previousDepth =
+            alreadySeenAtDepth.putIfAbsent(extractor.extractKey(element), newDepth);
+        if (previousDepth != null) {
+          if (depth < previousDepth.get()) {
+            synchronized (previousDepth) {
+              if (depth < previousDepth.get()) {
+                // We've seen the element before, but never at a depth this shallow.
+                previousDepth.set(depth);
+                result.add(element);
+              }
+            }
+          }
+        } else {
+          // We've never seen the element before.
+          result.add(element);
+        }
+      }
+      return result.build();
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeMinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeMinDepthUniquifier.java
new file mode 100644
index 0000000..9c2b429
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeMinDepthUniquifier.java
@@ -0,0 +1,26 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
+/** Marker interface for a {@link ThreadSafe} {@link MinDepthUniquifier}. */
+@ThreadSafe
+public interface ThreadSafeMinDepthUniquifier<T> extends MinDepthUniquifier<T> {
+  // There's a natural benign check-then-act race in all concurrent uses of this interface. Thread
+  // T1 may think it's about to be the first one to process an element at a depth no greater than
+  // d1. But before t1 finishes processing the element, Thread T2 may think _it's_ about to be first
+  // one to process an element at a depth no greater than d2. If d2 < d1, then T1's work is probably
+  // wasted.
+}