Stream result of TargetPattern#eval to a callback instead of returning it directly, and pass a Query callback in when resolving target patterns. This means that the targets a pattern resolves to can be processed incrementally.
This is the fifth step in a series to allow processing large sets of targets in query target patterns via streaming batches rather than all at once. This should improve performance for SkyQueryEnvironment for certain classes of large queries.
--
MOS_MIGRATED_REVID=111696713
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index b5bf4f8..f6181f2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -61,6 +61,7 @@
import com.google.devtools.build.lib.skyframe.TargetPatternValue;
import com.google.devtools.build.lib.skyframe.TargetPatternValue.TargetPatternKey;
import com.google.devtools.build.lib.skyframe.TransitiveTraversalValue;
+import com.google.devtools.build.lib.util.BatchCallback;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
@@ -77,7 +78,6 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -94,6 +94,9 @@
* even if the full closure isn't needed.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
+ // 10k is likely a good balance between using batch efficiently and not blowing up memory.
+ // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
+ private static final int BATCH_CALLBACK_SIZE = 10000;
private WalkableGraph graph;
@@ -307,9 +310,8 @@
@Override
public void eval(QueryExpression expr, Callback<Target> callback)
throws QueryException, InterruptedException {
- // 10k is likely a good balance between using batch efficiently and not blowing up memory.
- BatchStreamedCallback aggregator = new BatchStreamedCallback(callback, 10000,
- createUniquifier());
+ BatchStreamedCallback aggregator =
+ new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
expr.eval(this, aggregator);
aggregator.processLastPending();
}
@@ -324,14 +326,50 @@
};
}
+ /**
+ * Wraps a {@link Callback<Target>} with three additional filtering mechanisms. First, it
+ * validates the scope of the targets it's given before it passes them to the delegate Callback.
+ * Second, it removes {@link Target}s not in the graph (outside the universe scope). Third, it
+ * wraps the Callback in a {@link BatchStreamedCallback}, which aggregates results into batches of
+ * {@link #BATCH_CALLBACK_SIZE} and also deduplicates elements.
+ */
+ private class FilteringBatchingUniquifyingCallback
+ implements BatchCallback<Target, QueryException> {
+ private final BatchStreamedCallback batchStreamedCallback;
+
+ private FilteringBatchingUniquifyingCallback(Callback<Target> callback) {
+ this.batchStreamedCallback =
+ new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
+ }
+
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ Set<Target> targets = CompactHashSet.create();
+ for (Target target : partialResult) {
+ if (validateScope(target.getLabel(), strictScope)) {
+ targets.add(target);
+ }
+ }
+ batchStreamedCallback.process(filterTargetsNotInGraph(targets));
+ }
+
+ private void processLastPending() throws QueryException, InterruptedException {
+ batchStreamedCallback.processLastPending();
+ }
+ }
+
@Override
public void getTargetsMatchingPattern(
QueryExpression owner, String pattern, Callback<Target> callback) throws QueryException {
- Set<Target> targets = ImmutableSet.of();
if (precomputedPatterns.containsKey(pattern)) {
Set<Label> labels = precomputedPatterns.get(pattern);
if (labels != null) {
- targets = ImmutableSet.copyOf(makeTargetsFromLabels(labels));
+ try {
+ callback.process(ImmutableSet.copyOf(makeTargetsFromLabels(labels)));
+ } catch (InterruptedException e) {
+ throw new QueryException(owner, e.getMessage());
+ }
} else {
TargetParsingException exception;
try {
@@ -368,27 +406,16 @@
new RecursivePackageProviderBackedTargetPatternResolver(
provider, eventHandler, targetPatternKey.getPolicy());
TargetPattern parsedPattern = targetPatternKey.getParsedPattern();
- targets = parsedPattern.eval(resolver).getTargets();
+ FilteringBatchingUniquifyingCallback wrapper =
+ new FilteringBatchingUniquifyingCallback(callback);
+ parsedPattern.eval(resolver, wrapper);
+ wrapper.processLastPending();
} catch (TargetParsingException e) {
reportBuildFileError(owner, e.getMessage());
} catch (InterruptedException e) {
throw new QueryException(owner, e.getMessage());
}
}
-
- // Sets.filter would be more convenient here, but can't deal with exceptions.
- Iterator<Target> targetIterator = targets.iterator();
- while (targetIterator.hasNext()) {
- Target target = targetIterator.next();
- if (!validateScope(target.getLabel(), strictScope)) {
- targetIterator.remove();
- }
- }
- try {
- callback.process(filterTargetsNotInGraph(targets));
- } catch (InterruptedException e) {
- throw new QueryException(owner, e.getMessage());
- }
}
@Override