Have SkyQueryEnvironment use #createUniquifier for the BatchStreamedCallback instance it uses internally.

Also a few minor drive-by cleanups:
-Increase visibility of a few methods/variables needed by subclasses outside the package.
-Use 'queryEvaluationParallelismLevel' in more places.

RELNOTES: None
PiperOrigin-RevId: 212715507
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index c7fa31f..ca2ff3d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -145,7 +145,7 @@
 
   protected final String parserPrefix;
   protected final PathPackageLocator pkgPath;
-  private final int queryEvaluationParallelismLevel;
+  protected final int queryEvaluationParallelismLevel;
 
   // The following fields are set in the #beforeEvaluateQuery method.
   private MultisetSemaphore<PackageIdentifier> packageSemaphore;
@@ -399,7 +399,10 @@
     //
     // This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
     // case of a race between the original callback and the eventHandler.
-    BatchStreamedCallback batchCallback = new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
+    BatchStreamedCallback batchCallback = new BatchStreamedCallback(
+        callback,
+        BATCH_CALLBACK_SIZE,
+        createUniquifier());
     return super.evaluateQuery(expr, batchCallback);
   }
 
@@ -621,7 +624,7 @@
   @Override
   public ThreadSafeMutableSet<Target> createThreadSafeMutableSet() {
     return new ThreadSafeMutableKeyExtractorBackedSetImpl<>(
-        TargetKeyExtractor.INSTANCE, Target.class, DEFAULT_THREAD_COUNT);
+        TargetKeyExtractor.INSTANCE, Target.class, queryEvaluationParallelismLevel);
   }
 
   @Override
@@ -631,19 +634,21 @@
 
   @ThreadSafe
   @Override
-  public Uniquifier<Target> createUniquifier() {
+  public UniquifierImpl<Target, ?> createUniquifier() {
     return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
   }
 
   @ThreadSafe
   @Override
   public MinDepthUniquifier<Target> createMinDepthUniquifier() {
-    return new MinDepthUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+    return new MinDepthUniquifierImpl<>(
+        TargetKeyExtractor.INSTANCE, queryEvaluationParallelismLevel);
   }
 
   @ThreadSafe
-  protected MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
-    return new MinDepthUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+  public MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
+    return new MinDepthUniquifierImpl<>(
+        SkyKeyKeyExtractor.INSTANCE, queryEvaluationParallelismLevel);
   }
 
   @ThreadSafe
@@ -1163,17 +1168,18 @@
     // memory. We should have a threshold for when to invoke the callback with a batch, and also a
     // separate, larger, bound on the number of targets being processed at the same time.
     private final ThreadSafeOutputFormatterCallback<Target> callback;
-    private final UniquifierImpl<Target, ?> uniquifier =
-        new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
+    private final UniquifierImpl<Target, ?> uniquifier;
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
 
     private BatchStreamedCallback(
         ThreadSafeOutputFormatterCallback<Target> callback,
-        int batchThreshold) {
+        int batchThreshold,
+        UniquifierImpl<Target, ?> uniquifier) {
       this.callback = callback;
       this.batchThreshold = batchThreshold;
+      this.uniquifier = uniquifier;
     }
 
     @Override
@@ -1241,7 +1247,7 @@
         universe,
         context,
         BATCH_CALLBACK_SIZE,
-        DEFAULT_THREAD_COUNT);
+        queryEvaluationParallelismLevel);
   }
 
   @ThreadSafe