Update ParallelSkyQueryUtils to use QuiescingExecutor instead of ForkJoinPool
for concurrent visitations.
During BFS visitation of rdeps and rbuildfiles, it uses a centralized pool
(backed by a LinkedBlockingQueue) to store all pending visits, and a
periodically running scheduler to schedule tasks for each pending visit.
--
MOS_MIGRATED_REVID=140398162
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 705738c..87a57af 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -853,17 +853,20 @@
public Map<SkyKey, Target> makeTargetsFromPackageKeyToTargetKeyMap(
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
ImmutableMap.Builder<SkyKey, Target> result = ImmutableMap.builder();
+ Set<SkyKey> processedTargets = new HashSet<>();
Map<SkyKey, SkyValue> packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet());
for (Map.Entry<SkyKey, SkyValue> entry : packageMap.entrySet()) {
for (SkyKey targetKey : packageKeyToTargetKeyMap.get(entry.getKey())) {
- try {
- result.put(
- targetKey,
- ((PackageValue) entry.getValue())
- .getPackage()
- .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName()));
- } catch (NoSuchTargetException e) {
- // Skip missing target.
+ if (processedTargets.add(targetKey)) {
+ try {
+ result.put(
+ targetKey,
+ ((PackageValue) entry.getValue())
+ .getPackage()
+ .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName()));
+ } catch (NoSuchTargetException e) {
+ // Skip missing target.
+ }
}
}
}
@@ -1013,8 +1016,7 @@
ThreadSafeCallback<Target> callback,
ForkJoinPool forkJoinPool)
throws QueryException, InterruptedException {
- ParallelSkyQueryUtils.getRBuildFilesParallel(
- this, fileIdentifiers, callback, forkJoinPool, packageSemaphore);
+ ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
}
/**
@@ -1199,7 +1201,7 @@
ForkJoinPool forkJoinPool)
throws QueryException, InterruptedException {
ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
- this, expression, context, callback, forkJoinPool, packageSemaphore);
+ this, expression, context, callback, packageSemaphore);
}
@ThreadSafe