blob: 1e1ff15eac72263f80cf898ac2eb2e009fcba52a [file] [log] [blame]
Nathan Harmata593dc522016-09-28 23:35:46 +00001// Copyright 2016 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.query2;
15
laurentlb3d2a68c2017-06-30 00:32:04 +020016import static com.google.common.collect.ImmutableSet.toImmutableSet;
17
Googler2b503882016-11-28 21:54:43 +000018import com.google.common.annotations.VisibleForTesting;
nharmata398e6dab2018-04-12 15:31:26 -070019import com.google.common.base.Function;
nharmata7bcdceb2018-04-13 11:55:00 -070020import com.google.common.base.Preconditions;
nharmata398e6dab2018-04-12 15:31:26 -070021import com.google.common.base.Predicate;
22import com.google.common.base.Predicates;
Nathan Harmataf44211c2016-10-10 16:31:18 +000023import com.google.common.collect.ArrayListMultimap;
Nathan Harmata593dc522016-09-28 23:35:46 +000024import com.google.common.collect.ImmutableList;
nharmata398e6dab2018-04-12 15:31:26 -070025import com.google.common.collect.ImmutableSet;
Nathan Harmata593dc522016-09-28 23:35:46 +000026import com.google.common.collect.Iterables;
Googler13dc56a2016-12-07 15:49:20 +000027import com.google.common.collect.ListMultimap;
Nathan Harmata41b54172016-11-10 18:54:09 +000028import com.google.common.collect.Multimap;
nharmata398e6dab2018-04-12 15:31:26 -070029import com.google.common.collect.Streams;
Nathan Harmata593dc522016-09-28 23:35:46 +000030import com.google.devtools.build.lib.cmdline.Label;
Nathan Harmataf44211c2016-10-10 16:31:18 +000031import com.google.devtools.build.lib.cmdline.PackageIdentifier;
philwo3bcb9f62017-09-06 12:52:21 +020032import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet;
Nathan Harmata41b54172016-11-10 18:54:09 +000033import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
nharmata398e6dab2018-04-12 15:31:26 -070034import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
Nathan Harmata593dc522016-09-28 23:35:46 +000035import com.google.devtools.build.lib.packages.Target;
36import com.google.devtools.build.lib.query2.engine.Callback;
nharmata398e6dab2018-04-12 15:31:26 -070037import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
Nathan Harmata7a5a2362017-03-08 22:42:01 +000038import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
nharmata398e6dab2018-04-12 15:31:26 -070039import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet;
Nathan Harmata593dc522016-09-28 23:35:46 +000040import com.google.devtools.build.lib.query2.engine.QueryException;
41import com.google.devtools.build.lib.query2.engine.QueryExpression;
nharmata398e6dab2018-04-12 15:31:26 -070042import com.google.devtools.build.lib.query2.engine.QueryUtil;
43import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
nharmata2399df02018-04-10 12:30:03 -070044import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
Nathan Harmata7a5a2362017-03-08 22:42:01 +000045import com.google.devtools.build.lib.query2.engine.Uniquifier;
Nathan Harmata593dc522016-09-28 23:35:46 +000046import com.google.devtools.build.lib.query2.engine.VariableContext;
47import com.google.devtools.build.lib.skyframe.PackageValue;
48import com.google.devtools.build.lib.skyframe.SkyFunctions;
49import com.google.devtools.build.lib.vfs.PathFragment;
50import com.google.devtools.build.skyframe.SkyKey;
Googlerb3610d52016-10-24 19:18:36 +000051import java.util.ArrayList;
Nathan Harmata593dc522016-09-28 23:35:46 +000052import java.util.Collection;
nharmata398e6dab2018-04-12 15:31:26 -070053import java.util.Collections;
54import java.util.HashMap;
Googlerb3610d52016-10-24 19:18:36 +000055import java.util.Map;
nharmata2399df02018-04-10 12:30:03 -070056import java.util.Objects;
Nathan Harmata593dc522016-09-28 23:35:46 +000057import java.util.Set;
Googler45b59532018-05-03 11:10:20 -070058import java.util.concurrent.ConcurrentHashMap;
nharmata2399df02018-04-10 12:30:03 -070059import javax.annotation.Nullable;
Nathan Harmata593dc522016-09-28 23:35:46 +000060
Nathan Harmata593dc522016-09-28 23:35:46 +000061/**
62 * Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
63 *
64 * <p>Special attention is given to memory usage. Naive parallel implementations of query
65 * functionality would lead to memory blowup. Instead of dealing with {@link Target}s, we try to
66 * deal with {@link SkyKey}s as much as possible to reduce the number of {@link Package}s forcibly
67 * in memory at any given time.
68 */
69// TODO(bazel-team): Be more deliberate about bounding memory usage here.
nharmata2399df02018-04-10 12:30:03 -070070public class ParallelSkyQueryUtils {
Googler2b503882016-11-28 21:54:43 +000071
72 /** The maximum number of keys to visit at once. */
73 @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000;
74
Nathan Harmata593dc522016-09-28 23:35:46 +000075 private ParallelSkyQueryUtils() {
76 }
77
Nathan Harmata7a5a2362017-03-08 22:42:01 +000078 static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
Nathan Harmata593dc522016-09-28 23:35:46 +000079 SkyQueryEnvironment env,
80 QueryExpression expression,
81 VariableContext<Target> context,
Nathan Harmata7a5a2362017-03-08 22:42:01 +000082 Callback<Target> callback,
83 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
84 return env.eval(
Nathan Harmata593dc522016-09-28 23:35:46 +000085 expression,
86 context,
Googler7184b6f2017-05-16 05:25:49 +020087 ParallelVisitor.createParallelVisitorCallback(
nharmata398e6dab2018-04-12 15:31:26 -070088 new RdepsUnboundedVisitor.Factory(
89 env,
90 /*universe=*/ Predicates.alwaysTrue(),
91 callback,
92 packageSemaphore)));
93 }
94
95 static QueryTaskFuture<Void> getAllRdepsBoundedParallel(
96 SkyQueryEnvironment env,
97 QueryExpression expression,
98 int depth,
99 VariableContext<Target> context,
100 Callback<Target> callback,
101 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
102 return env.eval(
103 expression,
104 context,
105 ParallelVisitor.createParallelVisitorCallback(
106 new RdepsBoundedVisitor.Factory(
107 env,
108 depth,
109 /*universe=*/ Predicates.alwaysTrue(),
110 callback,
111 packageSemaphore)));
112 }
113
114 static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel(
115 SkyQueryEnvironment env,
116 QueryExpression expression,
117 Predicate<SkyKey> universe,
118 VariableContext<Target> context,
119 Callback<Target> callback,
120 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
121 return env.eval(
122 expression,
123 context,
124 ParallelVisitor.createParallelVisitorCallback(
125 new RdepsUnboundedVisitor.Factory(env, universe, callback, packageSemaphore)));
126 }
127
128 static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture(
129 SkyQueryEnvironment env,
130 QueryExpression expression,
131 VariableContext<Target> context,
132 int processResultsBatchSize,
133 int concurrencyLevel) {
134 QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture =
135 QueryUtil.evalAll(env, context, expression);
136
137 Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>>
138 getTransitiveClosureAsyncFunction =
139 universeValue -> {
140 ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback =
141 new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel);
142 return env.executeAsync(
143 () -> {
144 Callback<Target> visitorCallback =
145 ParallelVisitor.createParallelVisitorCallback(
nharmata7bcdceb2018-04-13 11:55:00 -0700146 new TransitiveTraversalValueDTCVisitor.Factory(
nharmata398e6dab2018-04-12 15:31:26 -0700147 env,
148 env.createSkyKeyUniquifier(),
149 processResultsBatchSize,
150 aggregateAllCallback));
151 visitorCallback.process(universeValue);
152 return Predicates.in(aggregateAllCallback.getResult());
153 });
154 };
155
156 return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction);
157 }
158
159 static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel(
160 SkyQueryEnvironment env,
161 QueryExpression expression,
162 int depth,
163 Predicate<SkyKey> universe,
164 VariableContext<Target> context,
165 Callback<Target> callback,
166 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
167 return env.eval(
168 expression,
169 context,
170 ParallelVisitor.createParallelVisitorCallback(
171 new RdepsBoundedVisitor.Factory(
172 env,
173 depth,
174 universe,
175 callback,
176 packageSemaphore)));
Nathan Harmata593dc522016-09-28 23:35:46 +0000177 }
178
179 /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
180 static void getRBuildFilesParallel(
181 SkyQueryEnvironment env,
182 Collection<PathFragment> fileIdentifiers,
nharmata1bd4aaf2017-10-31 11:23:04 -0400183 Callback<Target> callback) throws QueryException, InterruptedException {
Nathan Harmata7a5a2362017-03-08 22:42:01 +0000184 Uniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
Nathan Harmata41b54172016-11-10 18:54:09 +0000185 RBuildFilesVisitor visitor =
nharmata1bd4aaf2017-10-31 11:23:04 -0400186 new RBuildFilesVisitor(env, keyUniquifier, callback);
nharmatafa9b01e2017-11-27 08:16:38 -0800187 visitor.visitAndWaitForCompletion(env.getFileStateKeysForFileFragments(fileIdentifiers));
Nathan Harmata593dc522016-09-28 23:35:46 +0000188 }
189
nharmata2399df02018-04-10 12:30:03 -0700190 /** A {@link ParallelVisitor} whose visitations occur on {@link SkyKey}s. */
191 public abstract static class AbstractSkyKeyParallelVisitor<T> extends ParallelVisitor<SkyKey, T> {
192 private final Uniquifier<SkyKey> uniquifier;
193
194 protected AbstractSkyKeyParallelVisitor(
195 Uniquifier<SkyKey> uniquifier,
196 Callback<T> callback,
197 int visitBatchSize,
198 int processResultsBatchSize) {
199 super(callback, visitBatchSize, processResultsBatchSize);
200 this.uniquifier = uniquifier;
201 }
202
203 @Override
204 protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) {
205 return uniquifier.unique(values);
206 }
207 }
208
Nathan Harmata593dc522016-09-28 23:35:46 +0000209 /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
nharmata2399df02018-04-10 12:30:03 -0700210 private static class RBuildFilesVisitor extends AbstractSkyKeyParallelVisitor<Target> {
nharmata1bd4aaf2017-10-31 11:23:04 -0400211 // Each target in the full output of 'rbuildfiles' corresponds to BUILD file InputFile of a
212 // unique package. So the processResultsBatchSize we choose to pass to the ParallelVisitor ctor
213 // influences how many packages each leaf task doing processPartialResults will have to
214 // deal with at once. A value of 100 was chosen experimentally.
215 private static final int PROCESS_RESULTS_BATCH_SIZE = 100;
Googler7184b6f2017-05-16 05:25:49 +0200216 private final SkyQueryEnvironment env;
Nathan Harmata593dc522016-09-28 23:35:46 +0000217
218 private RBuildFilesVisitor(
219 SkyQueryEnvironment env,
Nathan Harmata7a5a2362017-03-08 22:42:01 +0000220 Uniquifier<SkyKey> uniquifier,
nharmata1bd4aaf2017-10-31 11:23:04 -0400221 Callback<Target> callback) {
222 super(uniquifier, callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
Googler7184b6f2017-05-16 05:25:49 +0200223 this.env = env;
Nathan Harmata593dc522016-09-28 23:35:46 +0000224 }
225
226 @Override
227 protected Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException {
228 Collection<Iterable<SkyKey>> reverseDeps = env.graph.getReverseDeps(values).values();
229 Set<SkyKey> keysToUseForResult = CompactHashSet.create();
230 Set<SkyKey> keysToVisitNext = CompactHashSet.create();
231 for (SkyKey rdep : Iterables.concat(reverseDeps)) {
232 if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
233 keysToUseForResult.add(rdep);
234 // Every package has a dep on the external package, so we need to include those edges too.
235 if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
236 keysToVisitNext.add(rdep);
237 }
shreyax64632352017-10-13 18:28:55 +0200238 } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)
239 && !rdep.functionName().equals(SkyFunctions.GLOB)) {
Nathan Harmata593dc522016-09-28 23:35:46 +0000240 // Packages may depend on the existence of subpackages, but these edges aren't relevant to
shreyax64632352017-10-13 18:28:55 +0200241 // rbuildfiles. They may also depend on files transitively through globs, but these cannot
242 // be included in load statements and so we don't traverse through these either.
Nathan Harmata593dc522016-09-28 23:35:46 +0000243 keysToVisitNext.add(rdep);
244 }
245 }
246 return new Visit(keysToUseForResult, keysToVisitNext);
247 }
248
249 @Override
Googler96f95cc2017-09-02 00:54:18 +0200250 protected void processPartialResults(
Nathan Harmata41b54172016-11-10 18:54:09 +0000251 Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
Googler96f95cc2017-09-02 00:54:18 +0200252 throws QueryException, InterruptedException {
nharmata1bd4aaf2017-10-31 11:23:04 -0400253 env.getBuildFileTargetsForPackageKeysAndProcessViaCallback(keysToUseForResult, callback);
Nathan Harmata593dc522016-09-28 23:35:46 +0000254 }
Googlerb3610d52016-10-24 19:18:36 +0000255
256 @Override
257 protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) {
258 return keys;
259 }
Nathan Harmata593dc522016-09-28 23:35:46 +0000260 }
261
nharmata2399df02018-04-10 12:30:03 -0700262 private static class DepAndRdep {
263 @Nullable
264 private final SkyKey dep;
265 private final SkyKey rdep;
266
267 private DepAndRdep(@Nullable SkyKey dep, SkyKey rdep) {
268 this.dep = dep;
269 this.rdep = rdep;
270 }
271
272 @Override
273 public boolean equals(Object obj) {
274 if (!(obj instanceof DepAndRdep)) {
275 return false;
276 }
277 DepAndRdep other = (DepAndRdep) obj;
278 return Objects.equals(dep, other.dep) && rdep.equals(other.rdep);
279 }
280
281 @Override
282 public int hashCode() {
283 // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g.
284 // Objects#hash). Depending on the structure of the graph being traversed, this method can
285 // be very hot.
286 return 31 * Objects.hashCode(dep) + rdep.hashCode();
287 }
288 }
289
nharmata398e6dab2018-04-12 15:31:26 -0700290 private abstract static class AbstractRdepsVisitor<T> extends ParallelVisitor<T, Target> {
nharmata1bd4aaf2017-10-31 11:23:04 -0400291 private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
Nathan Harmata593dc522016-09-28 23:35:46 +0000292
nharmata398e6dab2018-04-12 15:31:26 -0700293 protected final SkyQueryEnvironment env;
294 protected final MultisetSemaphore<PackageIdentifier> packageSemaphore;
295
296 protected AbstractRdepsVisitor(
Nathan Harmata593dc522016-09-28 23:35:46 +0000297 SkyQueryEnvironment env,
Nathan Harmata7a5a2362017-03-08 22:42:01 +0000298 Callback<Target> callback,
Nathan Harmata41b54172016-11-10 18:54:09 +0000299 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
nharmata2399df02018-04-10 12:30:03 -0700300 super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
Googler7184b6f2017-05-16 05:25:49 +0200301 this.env = env;
Nathan Harmata41b54172016-11-10 18:54:09 +0000302 this.packageSemaphore = packageSemaphore;
Nathan Harmata593dc522016-09-28 23:35:46 +0000303 }
304
Nathan Harmata593dc522016-09-28 23:35:46 +0000305 @Override
Googler96f95cc2017-09-02 00:54:18 +0200306 protected void processPartialResults(
Nathan Harmata41b54172016-11-10 18:54:09 +0000307 Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
Googler96f95cc2017-09-02 00:54:18 +0200308 throws QueryException, InterruptedException {
Nathan Harmata41b54172016-11-10 18:54:09 +0000309 Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
310 env.makePackageKeyToTargetKeyMap(keysToUseForResult);
311 Set<PackageIdentifier> pkgIdsNeededForResult =
laurentlb3d2a68c2017-06-30 00:32:04 +0200312 packageKeyToTargetKeyMap
313 .keySet()
314 .stream()
315 .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
316 .collect(toImmutableSet());
Nathan Harmata41b54172016-11-10 18:54:09 +0000317 packageSemaphore.acquireAll(pkgIdsNeededForResult);
318 try {
319 callback.process(
320 env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
321 } finally {
322 packageSemaphore.releaseAll(pkgIdsNeededForResult);
323 }
Nathan Harmata593dc522016-09-28 23:35:46 +0000324 }
Googlerb3610d52016-10-24 19:18:36 +0000325
nharmata398e6dab2018-04-12 15:31:26 -0700326 protected abstract SkyKey getRdepOfVisit(T visit);
Googler2b503882016-11-28 21:54:43 +0000327
328 @Override
nharmata398e6dab2018-04-12 15:31:26 -0700329 protected Iterable<Task> getVisitTasks(Collection<T> pendingVisits) {
330 // Group pending visitation by the package of the rdep, since we'll be targetfying the
nharmata2399df02018-04-10 12:30:03 -0700331 // rdep during the visitation.
nharmata398e6dab2018-04-12 15:31:26 -0700332 ListMultimap<PackageIdentifier, T> visitsByPackage = ArrayListMultimap.create();
333 for (T visit : pendingVisits) {
334 Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(getRdepOfVisit(visit));
Googler2b503882016-11-28 21:54:43 +0000335 if (label != null) {
nharmata398e6dab2018-04-12 15:31:26 -0700336 visitsByPackage.put(label.getPackageIdentifier(), visit);
Googler2b503882016-11-28 21:54:43 +0000337 }
338 }
339
340 ImmutableList.Builder<Task> builder = ImmutableList.builder();
341
342 // A couple notes here:
343 // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
344 // want.
345 // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
346 // accidentally retaining the entire ArrayListMultimap object.
nharmata398e6dab2018-04-12 15:31:26 -0700347 for (Iterable<T> visitBatch :
Googler2b503882016-11-28 21:54:43 +0000348 Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
nharmata398e6dab2018-04-12 15:31:26 -0700349 builder.add(new VisitTask(visitBatch));
Googler2b503882016-11-28 21:54:43 +0000350 }
351
352 return builder.build();
353 }
nharmata398e6dab2018-04-12 15:31:26 -0700354 }
355
356 /**
357 * A helper class that computes unbounded 'allrdeps(<expr>)' or
358 * 'rdeps(<precomputed-universe>, <expr>)' via BFS.
359 *
360 * <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with
361 * targetification of reverse deps until they are needed. The rdep node itself is needed to filter
362 * out disallowed deps later. Compared against the approach using a single SkyKey, it consumes 16
363 * more bytes in a 64-bit environment for each edge. However it defers the need to load all the
364 * packages which have at least a target as a rdep of the current batch, thus greatly reduces the
365 * risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M
366 * edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC.
367 */
368 private static class RdepsUnboundedVisitor extends AbstractRdepsVisitor<DepAndRdep> {
369 /**
370 * A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which
371 * actually isn't that useful. See the method javadoc.
372 */
373 private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
374 /**
375 * A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might
376 * mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep}
377 * visitation may actually be invalid. Because the same rdep can be reached through more than
378 * one reverse edge, it'd be incorrect to naively dedupe visitations solely based on the rdep.
379 */
380 private final Uniquifier<SkyKey> validRdepUniquifier;
381 private final Predicate<SkyKey> universe;
382
383 private RdepsUnboundedVisitor(
384 SkyQueryEnvironment env,
385 Uniquifier<DepAndRdep> depAndRdepUniquifier,
386 Uniquifier<SkyKey> validRdepUniquifier,
387 Predicate<SkyKey> universe,
388 Callback<Target> callback,
389 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
390 super(env, callback, packageSemaphore);
391 this.depAndRdepUniquifier = depAndRdepUniquifier;
392 this.validRdepUniquifier = validRdepUniquifier;
393 this.universe = universe;
394 }
395
396 /**
397 * A {@link Factory} for {@link RdepsUnboundedVisitor} instances, each of which will be used
398 * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
399 * single {@link Callback#process} call. Note that all the created instances share the same
400 * {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
401 */
402 private static class Factory implements ParallelVisitor.Factory {
403 private final SkyQueryEnvironment env;
404 private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
405 private final Uniquifier<SkyKey> validRdepUniquifier;
406 private final Predicate<SkyKey> universe;
407 private final Callback<Target> callback;
408 private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
409
410 private Factory(
411 SkyQueryEnvironment env,
412 Predicate<SkyKey> universe,
413 Callback<Target> callback,
414 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
415 this.env = env;
416 this.universe = universe;
417 this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep);
418 this.validRdepUniquifier = env.createSkyKeyUniquifier();
419 this.callback = callback;
420 this.packageSemaphore = packageSemaphore;
421 }
422
423 @Override
424 public ParallelVisitor<DepAndRdep, Target> create() {
425 return new RdepsUnboundedVisitor(
426 env, depAndRdepUniquifier, validRdepUniquifier, universe, callback, packageSemaphore);
427 }
428 }
429
430 @Override
431 protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException {
432 Collection<SkyKey> validRdeps = new ArrayList<>();
433
434 // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
435 // disallowed deps.
436 Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
437 for (DepAndRdep depAndRdep : depAndRdeps) {
438 // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
439 if (depAndRdep.dep == null) {
440 validRdeps.add(depAndRdep.rdep);
441 } else {
442 reverseDepMultimap.put(depAndRdep.dep, depAndRdep.rdep);
443 }
444 }
445
446 Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
447 env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
448 Set<PackageIdentifier> pkgIdsNeededForTargetification =
449 packageKeyToTargetKeyMap
450 .keySet()
451 .stream()
452 .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
453 .collect(toImmutableSet());
454 packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
455
456 try {
457 // Filter out disallowed deps. We cannot defer the targetification any further as we do not
458 // want to retrieve the rdeps of unwanted nodes (targets).
459 if (!reverseDepMultimap.isEmpty()) {
460 Collection<Target> filteredTargets =
461 env.filterRawReverseDepsOfTransitiveTraversalKeys(
462 reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
463 filteredTargets
464 .stream()
465 .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
466 .forEachOrdered(validRdeps::add);
467 }
468 } finally {
469 packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
470 }
471
472 ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
473 .filter(validRdepUniquifier::unique)
474 .collect(ImmutableList.toImmutableList());
475
476 // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
477 // recursive visitation.
478 ImmutableList.Builder<DepAndRdep> depAndRdepsToVisitBuilder = ImmutableList.builder();
479 env.graph.getReverseDeps(uniqueValidRdeps).entrySet()
480 .forEach(reverseDepsEntry -> depAndRdepsToVisitBuilder.addAll(
481 Iterables.transform(
482 Iterables.filter(
483 reverseDepsEntry.getValue(),
484 Predicates.and(SkyQueryEnvironment.IS_TTV, universe)),
485 rdep -> new DepAndRdep(reverseDepsEntry.getKey(), rdep))));
486
487 return new Visit(
488 /*keysToUseForResult=*/ uniqueValidRdeps,
489 /*keysToVisit=*/ depAndRdepsToVisitBuilder.build());
490 }
491
492 @Override
493 protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) {
494 return Iterables.transform(
495 Iterables.filter(keys, k -> universe.apply(k)),
496 key -> new DepAndRdep(null, key));
497 }
498
499 @Override
500 protected SkyKey getRdepOfVisit(DepAndRdep visit) {
501 return visit.rdep;
502 }
nharmata2399df02018-04-10 12:30:03 -0700503
504 @Override
505 protected ImmutableList<DepAndRdep> getUniqueValues(Iterable<DepAndRdep> depAndRdeps) {
506 // See the javadoc for 'validRdepUniquifier'.
507 //
508 // N.B. - Except for the visitation roots, 'depAndRdepUniquifier' is actually completely
509 // unneeded in practice for ensuring literal unique {@link DepAndRdep} visitations. Valid rdep
510 // visitations are deduped in 'getVisitResult' using 'validRdepUniquifier', so there's
511 // actually no way the same DepAndRdep visitation can ever be returned from 'getVisitResult'.
512 // Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as
513 // to not be depending on implementation details of 'ParallelVisitor'.
514 //
nharmata398e6dab2018-04-12 15:31:26 -0700515 // Even so, there's value in not visiting a rdep if it's already been visited *validly*
nharmata2399df02018-04-10 12:30:03 -0700516 // before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this.
517 return depAndRdepUniquifier.unique(
518 Iterables.filter(
519 depAndRdeps,
520 depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep)));
521 }
Nathan Harmata593dc522016-09-28 23:35:46 +0000522 }
nharmata398e6dab2018-04-12 15:31:26 -0700523
524 private static class DepAndRdepAtDepth {
525 private final DepAndRdep depAndRdep;
526 private final int rdepDepth;
527
528 private DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) {
529 this.depAndRdep = depAndRdep;
530 this.rdepDepth = rdepDepth;
531 }
532 }
533
534 /**
535 * A helper class that computes bounded 'allrdeps(<expr>, <depth>)' or
536 * 'rdeps(<precomputed-universe>, <expr>, <depth>)' via BFS.
537 *
538 * <p>This is very similar to {@link RdepsUnboundedVisitor}. A lot of the same concerns apply here
539 * but there are additional subtle concerns about the correctness of the bounded traversal: just
540 * like for the sequential implementation of bounded allrdeps, we use {@link MinDepthUniquifier}.
541 */
542 private static class RdepsBoundedVisitor extends AbstractRdepsVisitor<DepAndRdepAtDepth> {
543 private final int depth;
544 private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
545 private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
546 private final Predicate<SkyKey> universe;
547
548 private RdepsBoundedVisitor(
549 SkyQueryEnvironment env,
550 int depth,
551 Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier,
552 MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier,
553 Predicate<SkyKey> universe,
554 Callback<Target> callback,
555 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
556 super(env, callback, packageSemaphore);
557 this.depth = depth;
558 this.depAndRdepAtDepthUniquifier = depAndRdepAtDepthUniquifier;
559 this.validRdepMinDepthUniquifier = validRdepMinDepthUniquifier;
560 this.universe = universe;
561 }
562
563 private static class Factory implements ParallelVisitor.Factory {
564 private final SkyQueryEnvironment env;
565 private final int depth;
566 private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
567 private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
568 private final Predicate<SkyKey> universe;
569 private final Callback<Target> callback;
570 private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
571
572 private Factory(
573 SkyQueryEnvironment env,
574 int depth,
575 Predicate<SkyKey> universe,
576 Callback<Target> callback,
577 MultisetSemaphore<PackageIdentifier> packageSemaphore) {
578 this.env = env;
579 this.depth = depth;
580 this.universe = universe;
581 this.depAndRdepAtDepthUniquifier =
582 new UniquifierImpl<>(depAndRdepAtDepth -> depAndRdepAtDepth);
583 this.validRdepMinDepthUniquifier = env.createMinDepthSkyKeyUniquifier();
584 this.callback = callback;
585 this.packageSemaphore = packageSemaphore;
586 }
587
588 @Override
589 public ParallelVisitor<DepAndRdepAtDepth, Target> create() {
590 return new RdepsBoundedVisitor(
591 env,
592 depth,
593 depAndRdepAtDepthUniquifier,
594 validRdepMinDepthUniquifier,
595 universe,
596 callback,
597 packageSemaphore);
598 }
599 }
600
601 @Override
602 protected Visit getVisitResult(Iterable<DepAndRdepAtDepth> depAndRdepAtDepths)
603 throws InterruptedException {
604 Map<SkyKey, Integer> shallowestRdepDepthMap = new HashMap<>();
605 depAndRdepAtDepths.forEach(
606 depAndRdepAtDepth -> shallowestRdepDepthMap.merge(
607 depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth, Integer::min));
608
609 Collection<SkyKey> validRdeps = new ArrayList<>();
610
611 // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
612 // disallowed deps.
613 Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
614 for (DepAndRdepAtDepth depAndRdepAtDepth : depAndRdepAtDepths) {
615 // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
616 if (depAndRdepAtDepth.depAndRdep.dep == null) {
617 validRdeps.add(depAndRdepAtDepth.depAndRdep.rdep);
618 } else {
619 reverseDepMultimap.put(
620 depAndRdepAtDepth.depAndRdep.dep, depAndRdepAtDepth.depAndRdep.rdep);
621 }
622 }
623
624 Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
625 env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
626 Set<PackageIdentifier> pkgIdsNeededForTargetification =
627 packageKeyToTargetKeyMap
628 .keySet()
629 .stream()
630 .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
631 .collect(toImmutableSet());
632 packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
633
634 try {
635 // Filter out disallowed deps. We cannot defer the targetification any further as we do not
636 // want to retrieve the rdeps of unwanted nodes (targets).
637 if (!reverseDepMultimap.isEmpty()) {
638 Collection<Target> filteredTargets =
639 env.filterRawReverseDepsOfTransitiveTraversalKeys(
640 reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
641 filteredTargets
642 .stream()
643 .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
644 .forEachOrdered(validRdeps::add);
645 }
646 } finally {
647 packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
648 }
649
650 ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
651 .filter(validRdep -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(
652 validRdep, shallowestRdepDepthMap.get(validRdep)))
653 .collect(ImmutableList.toImmutableList());
654
655 // Don't bother getting the rdeps of the rdeps that are already at the depth bound.
656 Iterable<SkyKey> uniqueValidRdepsBelowDepthBound = Iterables.filter(
657 uniqueValidRdeps,
658 uniqueValidRdep -> shallowestRdepDepthMap.get(uniqueValidRdep) < depth);
659
660 // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
661 // recursive visitation.
662 Map<SkyKey, Iterable<SkyKey>> unfilteredRdepsOfRdeps =
663 env.graph.getReverseDeps(uniqueValidRdepsBelowDepthBound);
664
665 ImmutableList.Builder<DepAndRdepAtDepth> depAndRdepAtDepthsToVisitBuilder =
666 ImmutableList.builder();
667 unfilteredRdepsOfRdeps.entrySet().forEach(entry -> {
668 SkyKey rdep = entry.getKey();
669 int depthOfRdepOfRdep = shallowestRdepDepthMap.get(rdep) + 1;
670 Streams.stream(entry.getValue())
671 .filter(Predicates.and(SkyQueryEnvironment.IS_TTV, universe))
672 .forEachOrdered(rdepOfRdep -> {
673 depAndRdepAtDepthsToVisitBuilder.add(
674 new DepAndRdepAtDepth(new DepAndRdep(rdep, rdepOfRdep), depthOfRdepOfRdep));
675 });
676 });
677
678 return new Visit(
679 /*keysToUseForResult=*/ uniqueValidRdeps,
680 /*keysToVisit=*/ depAndRdepAtDepthsToVisitBuilder.build());
681 }
682
683 @Override
684 protected Iterable<DepAndRdepAtDepth> preprocessInitialVisit(Iterable<SkyKey> keys) {
685 return Iterables.transform(
686 Iterables.filter(keys, k -> universe.apply(k)),
687 key -> new DepAndRdepAtDepth(new DepAndRdep(null, key), 0));
688 }
689
690 @Override
691 protected SkyKey getRdepOfVisit(DepAndRdepAtDepth visit) {
692 return visit.depAndRdep.rdep;
693 }
694
695 @Override
696 protected ImmutableList<DepAndRdepAtDepth> getUniqueValues(
697 Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) {
698 // See the comment in RdepsUnboundedVisitor#getUniqueValues.
699 return depAndRdepAtDepthUniquifier.unique(
700 Iterables.filter(
701 depAndRdepAtDepths,
702 depAndRdepAtDepth -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualToPure(
703 depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth)));
704 }
705 }
706
nharmata7bcdceb2018-04-13 11:55:00 -0700707 /**
708 * Helper class that computes the TTV-only DTC of some given TTV keys, via BFS following all
709 * TTV->TTV dep edges.
710 */
711 private static class TransitiveTraversalValueDTCVisitor extends ParallelVisitor<SkyKey, SkyKey> {
nharmata398e6dab2018-04-12 15:31:26 -0700712 private final SkyQueryEnvironment env;
713 private final Uniquifier<SkyKey> uniquifier;
714
nharmata7bcdceb2018-04-13 11:55:00 -0700715 private TransitiveTraversalValueDTCVisitor(
nharmata398e6dab2018-04-12 15:31:26 -0700716 SkyQueryEnvironment env,
717 Uniquifier<SkyKey> uniquifier,
718 int processResultsBatchSize,
719 AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
720 super(aggregateAllCallback, VISIT_BATCH_SIZE, processResultsBatchSize);
721 this.env = env;
722 this.uniquifier = uniquifier;
723 }
724
725 private static class Factory implements ParallelVisitor.Factory {
726 private final SkyQueryEnvironment env;
727 private final Uniquifier<SkyKey> uniquifier;
728 private final AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback;
729 private final int processResultsBatchSize;
730
731 private Factory(
732 SkyQueryEnvironment env,
733 Uniquifier<SkyKey> uniquifier,
734 int processResultsBatchSize,
735 AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
736 this.env = env;
737 this.uniquifier = uniquifier;
738 this.processResultsBatchSize = processResultsBatchSize;
739 this.aggregateAllCallback = aggregateAllCallback;
740 }
741
742 @Override
743 public ParallelVisitor<SkyKey, SkyKey> create() {
nharmata7bcdceb2018-04-13 11:55:00 -0700744 return new TransitiveTraversalValueDTCVisitor(
nharmata398e6dab2018-04-12 15:31:26 -0700745 env, uniquifier, processResultsBatchSize, aggregateAllCallback);
746 }
747 }
748
749 @Override
750 protected void processPartialResults(
751 Iterable<SkyKey> keysToUseForResult, Callback<SkyKey> callback)
752 throws QueryException, InterruptedException {
753 callback.process(keysToUseForResult);
754 }
755
756 @Override
nharmata7bcdceb2018-04-13 11:55:00 -0700757 protected Visit getVisitResult(Iterable<SkyKey> ttvKeys) throws InterruptedException {
758 Multimap<SkyKey, SkyKey> deps = env.getDirectDepsOfSkyKeys(ttvKeys);
nharmata398e6dab2018-04-12 15:31:26 -0700759 return new Visit(
760 /*keysToUseForResult=*/ deps.keySet(),
nharmata7bcdceb2018-04-13 11:55:00 -0700761 /*keysToVisit=*/ deps.values().stream()
762 .filter(SkyQueryEnvironment.IS_TTV)
763 .collect(ImmutableList.toImmutableList()));
nharmata398e6dab2018-04-12 15:31:26 -0700764 }
765
766 @Override
767 protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) {
nharmata7bcdceb2018-04-13 11:55:00 -0700768 // ParallelVisitorCallback passes in TTV keys.
769 Preconditions.checkState(Iterables.all(keys, SkyQueryEnvironment.IS_TTV), keys);
nharmata398e6dab2018-04-12 15:31:26 -0700770 return keys;
771 }
772
773 @Override
774 protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) {
775 return uniquifier.unique(values);
776 }
777 }
778
779 /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */
780 @ThreadSafe
781 private static class ThreadSafeAggregateAllSkyKeysCallback
782 implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> {
783
784 private final Set<SkyKey> results;
785
786 private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) {
787 this.results =
Googler45b59532018-05-03 11:10:20 -0700788 Collections.newSetFromMap(
789 new ConcurrentHashMap<>(
790 /*initialCapacity=*/ concurrencyLevel, /*loadFactor=*/ 0.75f));
nharmata398e6dab2018-04-12 15:31:26 -0700791 }
792
793 @Override
794 public void process(Iterable<SkyKey> partialResult)
795 throws QueryException, InterruptedException {
796 Iterables.addAll(results, partialResult);
797 }
798
799 @Override
800 public ImmutableSet<SkyKey> getResult() {
801 return ImmutableSet.copyOf(results);
802 }
803 }
Nathan Harmata593dc522016-09-28 23:35:46 +0000804}
805