Have SkyQueryEnvironment#getRBuildFiles not visit skyframe nodes more than once. This is helpful when there are multiple skylark reverse 'load' paths from the same popular .bzl file to a single BUILD file. As an implementation detail, introduce a few ThreadSafeUniquifier things (these will also be used in followup CLs).

--
MOS_MIGRATED_REVID=132680777
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 6623ca6..826ccdf 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -60,10 +60,12 @@
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractThreadSafeUniquifier;
 import com.google.devtools.build.lib.query2.engine.RdepsFunction;
 import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.TargetLiteral;
 import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -91,7 +93,6 @@
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -102,7 +103,6 @@
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
@@ -513,8 +513,8 @@
 
   @ThreadSafe
   @Override
-  public Uniquifier<Target> createUniquifier() {
-    return new ConcurrentUniquifier();
+  public ThreadSafeUniquifier<Target> createUniquifier() {
+    return new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
   }
 
   @ThreadSafe
@@ -880,7 +880,8 @@
   void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
       throws QueryException, InterruptedException {
     Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
-    Collection<SkyKey> current = graph.getSuccessfulValues(files).keySet();
+    Uniquifier<SkyKey> keyUniquifier = new ThreadSafeSkyKeyUniquifier(/*concurrencyLevel=*/ 1);
+    Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
     Set<SkyKey> resultKeys = CompactHashSet.create();
     while (!current.isEmpty()) {
       Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
@@ -890,12 +891,16 @@
           resultKeys.add(rdep);
           // Every package has a dep on the external package, so we need to include those edges too.
           if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
-            current.add(rdep);
+            if (keyUniquifier.unique(rdep)) {
+              current.add(rdep);
+            }
           }
         } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
           // Packages may depend on the existence of subpackages, but these edges aren't relevant to
           // rbuildfiles.
-          current.add(rdep);
+          if (keyUniquifier.unique(rdep)) {
+            current.add(rdep);
+          }
         }
       }
       if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
@@ -934,23 +939,27 @@
     }
   }
 
-  @ThreadSafe
-  private static class ConcurrentUniquifier implements Uniquifier<Target> {
-
-    // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled.
-    private final Set<Label> seen =
-        Collections.newSetFromMap(
-            new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT));
+  private static class ThreadSafeTargetUniquifier
+      extends AbstractThreadSafeUniquifier<Target, Label> {
+    protected ThreadSafeTargetUniquifier(int concurrencyLevel) {
+      super(concurrencyLevel);
+    }
 
     @Override
-    public ImmutableList<Target> unique(Iterable<Target> newElements) {
-      ImmutableList.Builder<Target> builder = ImmutableList.builder();
-      for (Target newElement : newElements) {
-        if (seen.add(newElement.getLabel())) {
-          builder.add(newElement);
-        }
-      }
-      return builder.build();
+    protected Label extractKey(Target element) {
+      return element.getLabel();
+    }
+  }
+
+  private static class ThreadSafeSkyKeyUniquifier
+      extends AbstractThreadSafeUniquifier<SkyKey, SkyKey> {
+    protected ThreadSafeSkyKeyUniquifier(int concurrencyLevel) {
+      super(concurrencyLevel);
+    }
+
+    @Override
+    protected SkyKey extractKey(SkyKey element) {
+      return element;
     }
   }
 
@@ -971,7 +980,8 @@
   private static class BatchStreamedCallback implements ThreadSafeCallback<Target> {
 
     private final Callback<Target> callback;
-    private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
+    private final ThreadSafeUniquifier<Target> uniquifier =
+        new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;