Have SkyQueryEnvironment use #createUniquifier for the BatchStreamedCallback instance it uses internally.
Also a few minor drive-by cleanups:
-Increase visibility of a few methods/variables needed by subclasses outside the package.
-Use 'queryEvaluationParallelismLevel' in more places.
RELNOTES: None
PiperOrigin-RevId: 212715507
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index c7fa31f..ca2ff3d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -145,7 +145,7 @@
protected final String parserPrefix;
protected final PathPackageLocator pkgPath;
- private final int queryEvaluationParallelismLevel;
+ protected final int queryEvaluationParallelismLevel;
// The following fields are set in the #beforeEvaluateQuery method.
private MultisetSemaphore<PackageIdentifier> packageSemaphore;
@@ -399,7 +399,10 @@
//
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
- BatchStreamedCallback batchCallback = new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
+ BatchStreamedCallback batchCallback = new BatchStreamedCallback(
+ callback,
+ BATCH_CALLBACK_SIZE,
+ createUniquifier());
return super.evaluateQuery(expr, batchCallback);
}
@@ -621,7 +624,7 @@
@Override
public ThreadSafeMutableSet<Target> createThreadSafeMutableSet() {
return new ThreadSafeMutableKeyExtractorBackedSetImpl<>(
- TargetKeyExtractor.INSTANCE, Target.class, DEFAULT_THREAD_COUNT);
+ TargetKeyExtractor.INSTANCE, Target.class, queryEvaluationParallelismLevel);
}
@Override
@@ -631,19 +634,21 @@
@ThreadSafe
@Override
- public Uniquifier<Target> createUniquifier() {
+ public UniquifierImpl<Target, ?> createUniquifier() {
return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
}
@ThreadSafe
@Override
public MinDepthUniquifier<Target> createMinDepthUniquifier() {
- return new MinDepthUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ return new MinDepthUniquifierImpl<>(
+ TargetKeyExtractor.INSTANCE, queryEvaluationParallelismLevel);
}
@ThreadSafe
- protected MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
- return new MinDepthUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ public MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
+ return new MinDepthUniquifierImpl<>(
+ SkyKeyKeyExtractor.INSTANCE, queryEvaluationParallelismLevel);
}
@ThreadSafe
@@ -1163,17 +1168,18 @@
// memory. We should have a threshold for when to invoke the callback with a batch, and also a
// separate, larger, bound on the number of targets being processed at the same time.
private final ThreadSafeOutputFormatterCallback<Target> callback;
- private final UniquifierImpl<Target, ?> uniquifier =
- new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
+ private final UniquifierImpl<Target, ?> uniquifier;
private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
private BatchStreamedCallback(
ThreadSafeOutputFormatterCallback<Target> callback,
- int batchThreshold) {
+ int batchThreshold,
+ UniquifierImpl<Target, ?> uniquifier) {
this.callback = callback;
this.batchThreshold = batchThreshold;
+ this.uniquifier = uniquifier;
}
@Override
@@ -1241,7 +1247,7 @@
universe,
context,
BATCH_CALLBACK_SIZE,
- DEFAULT_THREAD_COUNT);
+ queryEvaluationParallelismLevel);
}
@ThreadSafe