Description redacted.
--
PiperOrigin-RevId: 149585165
MOS_MIGRATED_REVID=149585165
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 9a5ffd4..de5175d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -29,14 +29,20 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
-import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
@@ -58,19 +64,18 @@
import com.google.devtools.build.lib.query2.engine.KeyExtractor;
import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMinDepthUniquifierImpl;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.MinDepthUniquifierImpl;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeMinDepthUniquifier;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -110,7 +115,9 @@
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -121,6 +128,10 @@
* reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any
* particular order. As well, this class eagerly loads the full transitive closure of targets, even
* if the full closure isn't needed.
+ *
+ * <p>This class has concurrent implementations of the
+ * {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the
+ * asynchronous evaluation model yields parallel query evaluation.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
implements StreamableQueryEnvironment<Target> {
@@ -144,7 +155,7 @@
protected WalkableGraph graph;
private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider;
- private ForkJoinPool forkJoinPool;
+ private ListeningExecutorService executor;
private RecursivePackageProviderBackedTargetPatternResolver resolver;
private final SkyKey universeKey;
private final ImmutableList<TargetPatternKey> universeTargetPatternKeys;
@@ -155,7 +166,6 @@
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
- QueryExpressionEvalListener<Target> evalListener,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
@@ -169,7 +179,6 @@
eventHandler,
settings,
extraFunctions,
- evalListener,
parserPrefix,
graphFactory,
universeScope,
@@ -183,7 +192,6 @@
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
- QueryExpressionEvalListener<Target> evalListener,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
@@ -194,8 +202,7 @@
/*labelFilter=*/ Rule.ALL_LABELS,
eventHandler,
settings,
- extraFunctions,
- evalListener);
+ extraFunctions);
this.loadingPhaseThreads = loadingPhaseThreads;
this.graphFactory = graphFactory;
this.pkgPath = pkgPath;
@@ -228,9 +235,15 @@
graphBackedRecursivePackageProvider =
new GraphBackedRecursivePackageProvider(graph, universeTargetPatternKeys, pkgPath);
}
- if (forkJoinPool == null) {
- forkJoinPool =
- NamedForkJoinPool.newNamedPool("QueryEnvironment", queryEvaluationParallelismLevel);
+ if (executor == null) {
+ executor = MoreExecutors.listeningDecorator(
+ new ThreadPoolExecutor(
+ /*corePoolSize=*/ queryEvaluationParallelismLevel,
+ /*maximumPoolSize=*/ queryEvaluationParallelismLevel,
+ /*keepAliveTime=*/ 1,
+ /*units=*/ TimeUnit.SECONDS,
+ /*workQueue=*/ new BlockingStack<Runnable>(),
+ new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build()));
}
resolver =
new RecursivePackageProviderBackedTargetPatternResolver(
@@ -340,16 +353,17 @@
} catch (Throwable throwable) {
throwableToThrow = throwable;
} finally {
- if (throwableToThrow != null) {
- LOG.log(Level.INFO, "About to shutdown FJP because of throwable", throwableToThrow);
+ if (throwableToThrow != null) {
+ LOG.log(
+ Level.INFO,
+ "About to shutdown query threadpool because of throwable",
+ throwableToThrow);
// Force termination of remaining tasks if evaluation failed abruptly (e.g. was
// interrupted). We don't want to leave any dangling threads running tasks.
- forkJoinPool.shutdownNow();
- }
- forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (throwableToThrow != null) {
- // Signal that pool must be recreated on the next invocation.
- forkJoinPool = null;
+ executor.shutdownNow();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ // Signal that executor must be recreated on the next invocation.
+ executor = null;
Throwables.propagateIfPossible(
throwableToThrow, QueryException.class, InterruptedException.class);
}
@@ -358,7 +372,7 @@
@Override
public QueryEvalResult evaluateQuery(
- QueryExpression expr, OutputFormatterCallback<Target> callback)
+ QueryExpression expr, ThreadSafeOutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException, IOException {
// Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The
// result is set to have an error iff there were errors emitted during the query, so we reset
@@ -569,46 +583,85 @@
return null;
}
- @ThreadSafe
- @Override
- public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
- throws QueryException, InterruptedException {
- // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
- // all QueryEnvironment implementations.
- if (callback instanceof ThreadSafeCallback) {
- expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
- } else {
- expr.eval(this, context, callback);
+ private <R> ListenableFuture<R> safeSubmit(Callable<R> callable) {
+ try {
+ return executor.submit(callable);
+ } catch (RejectedExecutionException e) {
+ return Futures.immediateCancelledFuture();
}
}
@ThreadSafe
@Override
- public ThreadSafeUniquifier<Target> createUniquifier() {
+ public QueryTaskFuture<Void> eval(
+ final QueryExpression expr,
+ final VariableContext<Target> context,
+ final Callback<Target> callback) {
+ // TODO(bazel-team): As in here, use concurrency for the async #eval of other QueryEnvironment
+ // implementations.
+ Callable<QueryTaskFutureImpl<Void>> task = new Callable<QueryTaskFutureImpl<Void>>() {
+ @Override
+ public QueryTaskFutureImpl<Void> call() {
+ return (QueryTaskFutureImpl<Void>) expr.eval(SkyQueryEnvironment.this, context, callback);
+ }
+ };
+ ListenableFuture<QueryTaskFutureImpl<Void>> futureFuture = safeSubmit(task);
+ return QueryTaskFutureImpl.ofDelegate(Futures.dereference(futureFuture));
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
+ return QueryTaskFutureImpl.ofDelegate(safeSubmit(callable));
+ }
+
+ @Override
+ public <T1, T2> QueryTaskFuture<T2> transformAsync(
+ QueryTaskFuture<T1> future,
+ final Function<T1, QueryTaskFuture<T2>> function) {
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.transformAsync(
+ (QueryTaskFutureImpl<T1>) future,
+ new AsyncFunction<T1, T2>() {
+ @Override
+ public ListenableFuture<T2> apply(T1 input) {
+ return (QueryTaskFutureImpl<T2>) function.apply(input);
+ }
+ },
+ executor));
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> whenAllSucceedCall(
+ Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.whenAllSucceed(cast(futures)).call(callable, executor));
+ }
+
+ @ThreadSafe
+ @Override
+ public Uniquifier<Target> createUniquifier() {
return createTargetUniquifier();
}
@ThreadSafe
@Override
- public ThreadSafeMinDepthUniquifier<Target> createMinDepthUniquifier() {
- return new ThreadSafeMinDepthUniquifierImpl<>(
- TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ public MinDepthUniquifier<Target> createMinDepthUniquifier() {
+ return new MinDepthUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@ThreadSafe
- ThreadSafeUniquifier<Target> createTargetUniquifier() {
- return new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ Uniquifier<Target> createTargetUniquifier() {
+ return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@ThreadSafe
- ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
- return new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ Uniquifier<SkyKey> createSkyKeyUniquifier() {
+ return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@ThreadSafe
- ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
- return new ThreadSafeUniquifierImpl<>(
- ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ Uniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
+ return new UniquifierImpl<>(ReverseDepSkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -625,41 +678,44 @@
@ThreadSafe
@Override
- public void getTargetsMatchingPattern(
- QueryExpression owner, String pattern, Callback<Target> callback)
- throws QueryException, InterruptedException {
+ public QueryTaskFuture<Void> getTargetsMatchingPattern(
+ final QueryExpression owner, String pattern, Callback<Target> callback) {
// Directly evaluate the target pattern, making use of packages in the graph.
+ Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude;
try {
- Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
- getPatternAndExcludes(pattern);
- TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
- ImmutableSet<PathFragment> subdirectoriesToExclude =
- patternToEvalAndSubdirectoriesToExclude.getSecond();
- patternToEval.eval(resolver, subdirectoriesToExclude, callback, QueryException.class);
- } catch (TargetParsingException e) {
- reportBuildFileError(owner, e.getMessage());
+ patternToEvalAndSubdirectoriesToExclude = getPatternAndExcludes(pattern);
+ } catch (TargetParsingException tpe) {
+ try {
+ reportBuildFileError(owner, tpe.getMessage());
+ } catch (QueryException qe) {
+ return immediateFailedFuture(qe);
+ }
+ return immediateSuccessfulFuture(null);
+ } catch (InterruptedException ie) {
+ return immediateCancelledFuture();
}
- }
-
- @Override
- public void getTargetsMatchingPatternPar(
- QueryExpression owner,
- String pattern,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- // Directly evaluate the target pattern, making use of packages in the graph.
- try {
- Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
- getPatternAndExcludes(pattern);
- TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
- ImmutableSet<PathFragment> subdirectoriesToExclude =
- patternToEvalAndSubdirectoriesToExclude.getSecond();
- patternToEval.parEval(
- resolver, subdirectoriesToExclude, callback, QueryException.class, forkJoinPool);
- } catch (TargetParsingException e) {
- reportBuildFileError(owner, e.getMessage());
- }
+ TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
+ ImmutableSet<PathFragment> subdirectoriesToExclude =
+ patternToEvalAndSubdirectoriesToExclude.getSecond();
+ AsyncFunction<TargetParsingException, Void> reportBuildFileErrorAsyncFunction =
+ new AsyncFunction<TargetParsingException, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(TargetParsingException exn) throws QueryException {
+ reportBuildFileError(owner, exn.getMessage());
+ return Futures.immediateFuture(null);
+ }
+ };
+ ListenableFuture<Void> evalFuture = patternToEval.evalAsync(
+ resolver,
+ subdirectoriesToExclude,
+ callback,
+ QueryException.class,
+ executor);
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.catchingAsync(
+ evalFuture,
+ TargetParsingException.class,
+ reportBuildFileErrorAsyncFunction));
}
@ThreadSafe
@@ -1030,12 +1086,17 @@
}
@ThreadSafe
- void getRBuildFilesParallel(
- Collection<PathFragment> fileIdentifiers,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
+ QueryTaskFuture<Void> getRBuildFilesParallel(
+ final Collection<PathFragment> fileIdentifiers,
+ final Callback<Target> callback) {
+ return QueryTaskFutureImpl.ofDelegate(safeSubmit(new Callable<Void>() {
+ @Override
+ public Void call() throws QueryException, InterruptedException {
+ ParallelSkyQueryUtils.getRBuildFilesParallel(
+ SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore);
+ return null;
+ }
+ }));
}
/**
@@ -1043,42 +1104,51 @@
* on the given list of BUILD files and subincludes (other files are filtered out).
*/
@ThreadSafe
- void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
- throws QueryException, InterruptedException {
- Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
- Uniquifier<SkyKey> keyUniquifier =
- new ThreadSafeUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
- Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
- Set<SkyKey> resultKeys = CompactHashSet.create();
- while (!current.isEmpty()) {
- Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
- current = new HashSet<>();
- for (SkyKey rdep : Iterables.concat(reverseDeps)) {
- if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
- resultKeys.add(rdep);
- // Every package has a dep on the external package, so we need to include those edges too.
- if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+ QueryTaskFuture<Void> getRBuildFiles(
+ Collection<PathFragment> fileIdentifiers, Callback<Target> callback) {
+ try {
+ Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
+ Uniquifier<SkyKey> keyUniquifier =
+ new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, /*concurrencyLevel=*/ 1);
+ Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
+ Set<SkyKey> resultKeys = CompactHashSet.create();
+ while (!current.isEmpty()) {
+ Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
+ current = new HashSet<>();
+ for (SkyKey rdep : Iterables.concat(reverseDeps)) {
+ if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
+ resultKeys.add(rdep);
+ // Every package has a dep on the external package, so we need to include those edges
+ // too.
+ if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+ if (keyUniquifier.unique(rdep)) {
+ current.add(rdep);
+ }
+ }
+ } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
+ // Packages may depend on the existence of subpackages, but these edges aren't relevant
+ // to rbuildfiles.
if (keyUniquifier.unique(rdep)) {
current.add(rdep);
}
}
- } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
- // Packages may depend on the existence of subpackages, but these edges aren't relevant to
- // rbuildfiles.
- if (keyUniquifier.unique(rdep)) {
- current.add(rdep);
+ }
+ if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
+ for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
+ callback.process(
+ getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
}
+ resultKeys.clear();
}
}
- if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
- for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
- callback.process(
- getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
- }
- resultKeys.clear();
- }
+ callback.process(
+ getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
+ return immediateSuccessfulFuture(null);
+ } catch (QueryException e) {
+ return immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return immediateCancelledFuture();
}
- callback.process(getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
}
@Override
@@ -1148,18 +1218,26 @@
* <p>This callback may be called from multiple threads concurrently. At most one thread will call
* the wrapped {@code callback} concurrently.
*/
- @ThreadSafe
- private static class BatchStreamedCallback extends OutputFormatterCallback<Target>
- implements ThreadSafeCallback<Target> {
+ // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching
+ // strategy probably hurts performance since we can only start formatting results once the entire
+ // query is finished.
+ private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target>
+ implements Callback<Target> {
- private final OutputFormatterCallback<Target> callback;
- private final ThreadSafeUniquifier<Target> uniquifier =
- new ThreadSafeUniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ // TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness
+ // concern that requires the prohibition of concurrent uses of the callback; the only concern is
+ // memory. We should have a threshold for when to invoke the callback with a batch, and also a
+ // separate, larger, bound on the number of targets being processed at the same time.
+ private final ThreadSafeOutputFormatterCallback<Target> callback;
+ private final Uniquifier<Target> uniquifier =
+ new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) {
+ private BatchStreamedCallback(
+ ThreadSafeOutputFormatterCallback<Target> callback,
+ int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
}
@@ -1203,26 +1281,23 @@
@ThreadSafe
@Override
- public void getAllRdepsUnboundedParallel(
+ public QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
QueryExpression expression,
VariableContext<Target> context,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
+ Callback<Target> callback) {
+ return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
this, expression, context, callback, packageSemaphore);
}
@ThreadSafe
@Override
- public void getAllRdeps(
+ public QueryTaskFuture<Void> getAllRdeps(
QueryExpression expression,
Predicate<Target> universe,
VariableContext<Target> context,
Callback<Target> callback,
- int depth)
- throws QueryException, InterruptedException {
- getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+ int depth) {
+ return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
}
/**
@@ -1232,16 +1307,15 @@
* nodes are directly depended on by a large number of other nodes.
*/
@VisibleForTesting
- protected void getAllRdeps(
+ protected QueryTaskFuture<Void> getAllRdeps(
QueryExpression expression,
Predicate<Target> universe,
VariableContext<Target> context,
Callback<Target> callback,
int depth,
- int batchSize)
- throws QueryException, InterruptedException {
+ int batchSize) {
MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
- eval(
+ return eval(
expression,
context,
new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));