Fix bug with streaming bounded deps/allrdeps/rdeps.
--
PiperOrigin-RevId: 149431500
MOS_MIGRATED_REVID=149431500
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 3e1410f..9a5ffd4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -55,17 +55,21 @@
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
+import com.google.devtools.build.lib.query2.engine.KeyExtractor;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeUniquifierImpl;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeMinDepthUniquifier;
import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
@@ -585,18 +589,26 @@
}
@ThreadSafe
+ @Override
+ public ThreadSafeMinDepthUniquifier<Target> createMinDepthUniquifier() {
+ return new ThreadSafeMinDepthUniquifierImpl<>(
+ TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ }
+
+ @ThreadSafe
ThreadSafeUniquifier<Target> createTargetUniquifier() {
- return new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+ return new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@ThreadSafe
ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
- return new ThreadSafeSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+ return new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@ThreadSafe
ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
- return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+ return new ThreadSafeUniquifierImpl<>(
+ ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -1034,7 +1046,8 @@
void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
throws QueryException, InterruptedException {
Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
- Uniquifier<SkyKey> keyUniquifier = new ThreadSafeSkyKeyUniquifier(/*concurrencyLevel=*/ 1);
+ Uniquifier<SkyKey> keyUniquifier =
+ new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
Set<SkyKey> resultKeys = CompactHashSet.create();
while (!current.isEmpty()) {
@@ -1093,42 +1106,31 @@
}
}
- private static class ThreadSafeTargetUniquifier
- extends AbstractThreadSafeUniquifier<Target, Label> {
- protected ThreadSafeTargetUniquifier(int concurrencyLevel) {
- super(concurrencyLevel);
+ private static class SkyKeyKeyExtractor implements KeyExtractor<SkyKey, SkyKey> {
+ private static final SkyKeyKeyExtractor INSTANCE = new SkyKeyKeyExtractor();
+
+ private SkyKeyKeyExtractor() {
}
@Override
- protected Label extractKey(Target element) {
- return element.getLabel();
- }
- }
-
- private static class ThreadSafeSkyKeyUniquifier
- extends AbstractThreadSafeUniquifier<SkyKey, SkyKey> {
- protected ThreadSafeSkyKeyUniquifier(int concurrencyLevel) {
- super(concurrencyLevel);
- }
-
- @Override
- protected SkyKey extractKey(SkyKey element) {
+ public SkyKey extractKey(SkyKey element) {
return element;
}
}
/**
- * A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second
- * element (reverse dep).
+ * A {@link KeyExtractor} which takes a pair of parent and reverse dep, and uses the second
+ * element (reverse dep) as the key.
*/
- private static class ThreadSafeReverseDepSkyKeyUniquifier
- extends AbstractThreadSafeUniquifier<Pair<SkyKey, SkyKey>, SkyKey> {
- protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) {
- super(concurrencyLevel);
+ private static class ReverseDepSkyKeyKeyExtractor
+ implements KeyExtractor<Pair<SkyKey, SkyKey>, SkyKey> {
+ private static final ReverseDepSkyKeyKeyExtractor INSTANCE = new ReverseDepSkyKeyKeyExtractor();
+
+ private ReverseDepSkyKeyKeyExtractor() {
}
@Override
- protected SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
+ public SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
return element.second;
}
}
@@ -1152,7 +1154,7 @@
private final OutputFormatterCallback<Target> callback;
private final ThreadSafeUniquifier<Target> uniquifier =
- new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+ new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
@@ -1238,27 +1240,27 @@
int depth,
int batchSize)
throws QueryException, InterruptedException {
- Uniquifier<Target> uniquifier = createUniquifier();
+ MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
eval(
expression,
context,
- new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize));
+ new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
}
private class BatchAllRdepsCallback implements Callback<Target> {
- private final Uniquifier<Target> uniquifier;
+ private final MinDepthUniquifier<Target> minDepthUniquifier;
private final Predicate<Target> universe;
private final Callback<Target> callback;
private final int depth;
private final int batchSize;
private BatchAllRdepsCallback(
- Uniquifier<Target> uniquifier,
+ MinDepthUniquifier<Target> minDepthUniquifier,
Predicate<Target> universe,
Callback<Target> callback,
int depth,
int batchSize) {
- this.uniquifier = uniquifier;
+ this.minDepthUniquifier = minDepthUniquifier;
this.universe = universe;
this.callback = callback;
this.depth = depth;
@@ -1268,7 +1270,8 @@
@Override
public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
- ImmutableList<Target> uniqueTargets = uniquifier.unique(currentInUniverse);
+ ImmutableList<Target> uniqueTargets =
+ minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0);
callback.process(uniqueTargets);
// Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
@@ -1283,7 +1286,7 @@
// processed by callback, the targets are dequeued and not referenced any more, making
// them available for garbage collection.
- for (int i = 0; i < depth; i++) {
+ for (int curDepth = 1; curDepth <= depth; curDepth++) {
// The mappings between nodes and their reverse deps must be preserved instead of the
// reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
// check if their allowed deps contain the dependencies.
@@ -1321,7 +1324,8 @@
// separately.
for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
reverseDepsMap.put(entry.getKey(), subList);
- processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+ processReverseDepsMap(
+ minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
}
reverseDepsQueue.poll();
@@ -1329,14 +1333,16 @@
} else {
// There are some nodes in the pending process list. Process them first and come
// back to this node later (in next iteration).
- processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+ processReverseDepsMap(
+ minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
batch = 0;
}
}
}
if (!reverseDepsMap.isEmpty()) {
- processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+ processReverseDepsMap(
+ minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
}
// If the queue is empty after all nodes in the current level are processed, stop
@@ -1352,14 +1358,16 @@
* list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
*/
private void processReverseDepsMap(
- Uniquifier<Target> uniquifier,
+ MinDepthUniquifier<Target> minDepthUniquifier,
Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
Callback<Target> callback,
- Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue)
+ Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue,
+ int depth)
throws QueryException, InterruptedException {
Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap));
Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
- ImmutableList<Target> uniqueChildren = uniquifier.unique(currentInUniverse);
+ ImmutableList<Target> uniqueChildren =
+ minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth);
reverseDepsMap.clear();
if (!uniqueChildren.isEmpty()) {