Provide parallel implementations of bounded allrdeps and rdeps.
RELNOTES: None
PiperOrigin-RevId: 192681579
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 8b95172..b13ed47 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -16,7 +16,6 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -27,6 +26,7 @@
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
@@ -112,11 +112,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
@@ -506,6 +504,21 @@
return result;
}
+ /**
+ * Returns deps in the form of {@link SkyKey}s.
+ *
+ * <p>The implementation of this method does not filter deps, therefore it is expected to be used
+ * only when {@link SkyQueryEnvironment#dependencyFilter} is set to {@link
+ * DependencyFilter#ALL_DEPS}.
+ */
+ Multimap<SkyKey, SkyKey> getDirectDepsOfSkyKeys(Iterable<SkyKey> keys)
+ throws InterruptedException {
+ Preconditions.checkState(dependencyFilter == DependencyFilter.ALL_DEPS, dependencyFilter);
+ ImmutableMultimap.Builder<SkyKey, SkyKey> builder = ImmutableMultimap.builder();
+ graph.getDirectDeps(keys).forEach(builder::putAll);
+ return builder.build();
+ }
+
@Override
public Collection<Target> getReverseDeps(Iterable<Target> targets) throws InterruptedException {
return getReverseDepsOfTransitiveTraversalKeys(Iterables.transform(targets, TARGET_TO_SKY_KEY));
@@ -648,6 +661,11 @@
}
@ThreadSafe
+ protected MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
+ return new MinDepthUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ }
+
+ @ThreadSafe
Uniquifier<Target> createTargetUniquifier() {
return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@@ -894,15 +912,11 @@
// so no preloading of target patterns is necessary.
}
+ static final Predicate<SkyKey> IS_TTV = SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL);
+
static final Function<SkyKey, Label> SKYKEY_TO_LABEL =
- skyKey -> {
- SkyFunctionName functionName = skyKey.functionName();
- if (!functionName.equals(Label.TRANSITIVE_TRAVERSAL)) {
- // Skip non-targets.
- return null;
- }
- return (Label) skyKey.argument();
- };
+ skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null;
+
static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER =
skyKey -> (PackageIdentifier) skyKey.argument();
@@ -1266,176 +1280,53 @@
this, expression, context, callback, packageSemaphore);
}
+ @ThreadSafe
@Override
- public QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel(
+ public QueryTaskFuture<Void> getAllRdepsBoundedParallel(
QueryExpression expression,
+ int depth,
VariableContext<Target> context,
- List<Argument> args,
Callback<Target> callback) {
- return RdepsFunction.evalWithBoundedDepth(this, context, expression, args, callback);
+ return ParallelSkyQueryUtils.getAllRdepsBoundedParallel(
+ this, expression, depth, context, callback, packageSemaphore);
+ }
+
+ protected QueryTaskFuture<Predicate<SkyKey>> getUniverseDTCSkyKeyPredicateFuture(
+ QueryExpression universe,
+ VariableContext<Target> context) {
+ return ParallelSkyQueryUtils.getDTCSkyKeyPredicateFuture(
+ this,
+ universe,
+ context,
+ BATCH_CALLBACK_SIZE,
+ DEFAULT_THREAD_COUNT);
}
@ThreadSafe
@Override
- public QueryTaskFuture<Void> getAllRdeps(
+ public QueryTaskFuture<Void> getRdepsUnboundedParallel(
QueryExpression expression,
- Predicate<Target> universe,
+ QueryExpression universe,
VariableContext<Target> context,
- Callback<Target> callback,
- int depth) {
- return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+ Callback<Target> callback) {
+ return transformAsync(
+ getUniverseDTCSkyKeyPredicateFuture(universe, context),
+ universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel(
+ this, expression, universePredicate, context, callback, packageSemaphore));
}
- /**
- * Computes and applies the callback to the reverse dependencies of the expression.
- *
- * <p>Batch size is used to only populate at most N targets at one time, because some individual
- * nodes are directly depended on by a large number of other nodes.
- */
- @VisibleForTesting
- protected QueryTaskFuture<Void> getAllRdeps(
+ @ThreadSafe
+ @Override
+ public QueryTaskFuture<Void> getRdepsBoundedParallel(
QueryExpression expression,
- Predicate<Target> universe,
- VariableContext<Target> context,
- Callback<Target> callback,
int depth,
- int batchSize) {
- MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
- return eval(
- expression,
- context,
- new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
- }
-
- private class BatchAllRdepsCallback implements Callback<Target> {
- private final MinDepthUniquifier<Target> minDepthUniquifier;
- private final Predicate<Target> universe;
- private final Callback<Target> callback;
- private final int depth;
- private final int batchSize;
-
- private BatchAllRdepsCallback(
- MinDepthUniquifier<Target> minDepthUniquifier,
- Predicate<Target> universe,
- Callback<Target> callback,
- int depth,
- int batchSize) {
- this.minDepthUniquifier = minDepthUniquifier;
- this.universe = universe;
- this.callback = callback;
- this.depth = depth;
- this.batchSize = batchSize;
- }
-
- @Override
- public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
- Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
- ImmutableList<Target> uniqueTargets =
- minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0);
- callback.process(uniqueTargets);
-
- // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
- // as 1:N SkyKey mappings instead of fully populated Targets to save memory. Targets
- // have a reference to their entire Package, which is really memory expensive.
- Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue = new LinkedList<>();
- reverseDepsQueue.addAll(
- graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueTargets)).entrySet());
-
- // In each iteration, we populate a size-limited (no more than batchSize) number of
- // SkyKey mappings to targets, and append the SkyKey rdeps mappings to the queue. Once
- // processed by callback, the targets are dequeued and not referenced any more, making
- // them available for garbage collection.
-
- for (int curDepth = 1; curDepth <= depth; curDepth++) {
- // The mappings between nodes and their reverse deps must be preserved instead of the
- // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
- // check if their allowed deps contain the dependencies.
- Map<SkyKey, Iterable<SkyKey>> reverseDepsMap = Maps.newHashMap();
- int batch = 0; // Tracking the current total number of rdeps in reverseDepsMap.
- int processed = 0;
- // Save current size as when we are process nodes in the current level, new mappings (for
- // the next level) are added to the queue.
- int size = reverseDepsQueue.size();
- while (processed < size) {
- // We always peek the first element in the queue without polling it, to determine if
- // adding it to the pending list will break the limit of max size. If yes then we process
- // and empty the pending list first, and poll the element in the next iteration.
- Map.Entry<SkyKey, Iterable<SkyKey>> entry = reverseDepsQueue.peek();
-
- // The value of the entry is either a CompactHashSet or ImmutableList, which can return
- // the size in O(1) time.
- int rdepsSize = Iterables.size(entry.getValue());
- if (rdepsSize == 0) {
- reverseDepsQueue.poll();
- processed++;
- continue;
- }
-
- if ((rdepsSize + batch <= batchSize)) {
- // If current size is less than batch size, dequeue the node, update the current
- // batch size and map.
- reverseDepsMap.put(entry.getKey(), entry.getValue());
- batch += rdepsSize;
- reverseDepsQueue.poll();
- processed++;
- } else {
- if (batch == 0) {
- // The (single) node has more rdeps than the limit, divide them up to process
- // separately.
- for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
- reverseDepsMap.put(entry.getKey(), subList);
- processReverseDepsMap(
- minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
- }
-
- reverseDepsQueue.poll();
- processed++;
- } else {
- // There are some nodes in the pending process list. Process them first and come
- // back to this node later (in next iteration).
- processReverseDepsMap(
- minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
- batch = 0;
- }
- }
- }
-
- if (!reverseDepsMap.isEmpty()) {
- processReverseDepsMap(
- minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
- }
-
- // If the queue is empty after all nodes in the current level are processed, stop
- // processing as there are no more reverse deps.
- if (reverseDepsQueue.isEmpty()) {
- break;
- }
- }
- }
-
- /**
- * Populates {@link Target}s from reverse dep mappings of {@link SkyKey}s, empties the pending
- * list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
- */
- private void processReverseDepsMap(
- MinDepthUniquifier<Target> minDepthUniquifier,
- Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
- Callback<Target> callback,
- Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue,
- int depth)
- throws QueryException, InterruptedException {
- Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap));
- Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
- ImmutableList<Target> uniqueChildren =
- minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth);
- reverseDepsMap.clear();
-
- if (!uniqueChildren.isEmpty()) {
- callback.process(uniqueChildren);
- reverseDepsQueue.addAll(
- graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueChildren)).entrySet());
- }
- }
+ QueryExpression universe,
+ VariableContext<Target> context,
+ Callback<Target> callback) {
+ return transformAsync(
+ getUniverseDTCSkyKeyPredicateFuture(universe, context),
+ universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel(
+ this, expression, depth, universePredicate, context, callback, packageSemaphore));
}
/**
@@ -1465,4 +1356,32 @@
}
}
}
+
+ /** Pair of a key and a depth, useful for driving usages of {@link MinDepthUniquifier}. */
+ public static class KeyAtDepth {
+ public final SkyKey key;
+ public final int depth;
+
+ public KeyAtDepth(SkyKey key, int depth) {
+ this.key = key;
+ this.depth = depth;
+ }
+
+ @Override
+ public int hashCode() {
+ // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g.
+ // Objects#hash). This method is very hot during large visitations done by
+ // ParallelSkyQueryUtils.
+ return 31 * key.hashCode() + Integer.hashCode(depth);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof KeyAtDepth)) {
+ return false;
+ }
+ KeyAtDepth other = (KeyAtDepth) obj;
+ return key.equals(other.key) && depth == other.depth;
+ }
+ }
}