blob: 73e36c089c4be9e28ab98bd8257d0d77a2e70e7a [file] [log] [blame]
// Copyright 2015 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.query2;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.pkgcache.FilteringPolicies.NO_FILTER;
import com.google.common.base.Ascii;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.actions.FileStateValue;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.LabelConstants;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet;
import com.google.devtools.build.lib.concurrent.BlockingStack;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.DelegatingEventHandler;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventKind;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.packages.BuildFileContainsErrorsException;
import com.google.devtools.build.lib.packages.DependencyFilter;
import com.google.devtools.build.lib.packages.NoSuchPackageException;
import com.google.devtools.build.lib.packages.NoSuchTargetException;
import com.google.devtools.build.lib.packages.NoSuchThingException;
import com.google.devtools.build.lib.packages.Package;
import com.google.devtools.build.lib.packages.Rule;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.pkgcache.PathPackageLocator;
import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.KeyExtractor;
import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionContext;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
import com.google.devtools.build.lib.query2.engine.QueryUtil.MinDepthUniquifierImpl;
import com.google.devtools.build.lib.query2.engine.QueryUtil.MutableKeyExtractorBackedMapImpl;
import com.google.devtools.build.lib.query2.engine.QueryUtil.NonExceptionalUniquifier;
import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMutableKeyExtractorBackedSetImpl;
import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
import com.google.devtools.build.lib.skyframe.ContainingPackageLookupFunction;
import com.google.devtools.build.lib.skyframe.GraphBackedRecursivePackageProvider;
import com.google.devtools.build.lib.skyframe.PackageLookupValue;
import com.google.devtools.build.lib.skyframe.PackageValue;
import com.google.devtools.build.lib.skyframe.PrepareDepsOfPatternsFunction;
import com.google.devtools.build.lib.skyframe.RecursivePackageProviderBackedTargetPatternResolver;
import com.google.devtools.build.lib.skyframe.TargetPatternValue;
import com.google.devtools.build.lib.skyframe.TargetPatternValue.TargetPatternKey;
import com.google.devtools.build.lib.skyframe.TransitiveTraversalValue;
import com.google.devtools.build.lib.skyframe.TraversalInfoRootPackageExtractor;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
import com.google.devtools.build.skyframe.EvaluationContext;
import com.google.devtools.build.skyframe.EvaluationResult;
import com.google.devtools.build.skyframe.InterruptibleSupplier;
import com.google.devtools.build.skyframe.SkyFunctionName;
import com.google.devtools.build.skyframe.SkyKey;
import com.google.devtools.build.skyframe.SkyValue;
import com.google.devtools.build.skyframe.WalkableGraph;
import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* {@link AbstractBlazeQueryEnvironment} that introspects the Skyframe graph to find forward and
* reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any
* particular order. As well, this class eagerly loads the full transitive closure of targets, even
* if the full closure isn't needed.
*
* <p>This class has concurrent implementations of the
* {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the
* asynchronous evaluation model yields parallel query evaluation.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
implements StreamableQueryEnvironment<Target> {
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
protected static final int BATCH_CALLBACK_SIZE = 10000;
protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000;
private static final Logger logger = Logger.getLogger(SkyQueryEnvironment.class.getName());
private final BlazeTargetAccessor accessor = new BlazeTargetAccessor(this);
protected final int loadingPhaseThreads;
protected final WalkableGraphFactory graphFactory;
protected final ImmutableList<String> universeScope;
protected boolean blockUniverseEvaluationErrors;
protected ExtendedEventHandler universeEvalEventHandler;
protected final String parserPrefix;
protected final PathPackageLocator pkgPath;
protected final int queryEvaluationParallelismLevel;
// The following fields are set in the #beforeEvaluateQuery method.
private MultisetSemaphore<PackageIdentifier> packageSemaphore;
protected WalkableGraph graph;
private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider;
private ListeningExecutorService executor;
private RecursivePackageProviderBackedTargetPatternResolver resolver;
protected final SkyKey universeKey;
private final ImmutableList<TargetPatternKey> universeTargetPatternKeys;
public SkyQueryEnvironment(
boolean keepGoing,
int loadingPhaseThreads,
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
PathPackageLocator pkgPath,
boolean blockUniverseEvaluationErrors) {
this(
keepGoing,
loadingPhaseThreads,
// SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query
// evaluation is completely CPU-bound.
/*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT,
eventHandler,
settings,
extraFunctions,
parserPrefix,
graphFactory,
universeScope,
pkgPath,
blockUniverseEvaluationErrors);
}
protected SkyQueryEnvironment(
boolean keepGoing,
int loadingPhaseThreads,
int queryEvaluationParallelismLevel,
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
PathPackageLocator pkgPath,
boolean blockUniverseEvaluationErrors) {
super(
keepGoing,
/*strictScope=*/ true,
/*labelFilter=*/ Rule.ALL_LABELS,
eventHandler,
settings,
extraFunctions);
this.loadingPhaseThreads = loadingPhaseThreads;
this.graphFactory = graphFactory;
this.pkgPath = pkgPath;
this.universeScope = ImmutableList.copyOf(Preconditions.checkNotNull(universeScope));
this.parserPrefix = parserPrefix;
Preconditions.checkState(
!universeScope.isEmpty(), "No queries can be performed with an empty universe");
this.queryEvaluationParallelismLevel = queryEvaluationParallelismLevel;
this.universeKey = graphFactory.getUniverseKey(universeScope, parserPrefix);
this.blockUniverseEvaluationErrors = blockUniverseEvaluationErrors;
this.universeEvalEventHandler =
this.blockUniverseEvaluationErrors
? new ErrorBlockingForwardingEventHandler(this.eventHandler)
: this.eventHandler;
this.universeTargetPatternKeys =
PrepareDepsOfPatternsFunction.getTargetPatternKeys(
PrepareDepsOfPatternsFunction.getSkyKeys(universeKey, eventHandler));
}
@Override
public void close() {
if (executor != null) {
executor.shutdownNow();
executor = null;
}
}
/** Gets roots of graph which contains all nodes needed to evaluate {@code expr}. */
protected Set<SkyKey> getGraphRootsFromExpression(QueryExpression expr)
throws QueryException, InterruptedException {
return ImmutableSet.of(universeKey);
}
protected void beforeEvaluateQuery(QueryExpression expr)
throws QueryException, InterruptedException {
Set<SkyKey> roots = getGraphRootsFromExpression(expr);
EvaluationResult<SkyValue> result;
try (AutoProfiler p = AutoProfiler.logged("evaluation and walkable graph", logger)) {
EvaluationContext evaluationContext =
EvaluationContext.newBuilder()
.setNumThreads(loadingPhaseThreads)
.setEventHander(universeEvalEventHandler)
.build();
result = graphFactory.prepareAndGet(roots, configureEvaluationContext(evaluationContext));
}
if (graph == null || graph != result.getWalkableGraph()) {
checkEvaluationResult(roots, result);
packageSemaphore = makeFreshPackageMultisetSemaphore();
graph = result.getWalkableGraph();
blacklistPatternsSupplier = InterruptibleSupplier.Memoize.of(new BlacklistSupplier(graph));
graphBackedRecursivePackageProvider =
new GraphBackedRecursivePackageProvider(
graph, universeTargetPatternKeys, pkgPath, new TraversalInfoRootPackageExtractor());
}
if (executor == null) {
executor = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(
/*corePoolSize=*/ queryEvaluationParallelismLevel,
/*maximumPoolSize=*/ queryEvaluationParallelismLevel,
/*keepAliveTime=*/ 1,
/*units=*/ TimeUnit.SECONDS,
/*workQueue=*/ new BlockingStack<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build()));
}
resolver =
new RecursivePackageProviderBackedTargetPatternResolver(
graphBackedRecursivePackageProvider,
eventHandler,
TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
packageSemaphore);
}
/**
* Configures the default {@link EvaluationContext} to change the behavior of how evaluations in
* {@link WalkableGraphFactory#prepareAndGet} work.
*/
protected EvaluationContext configureEvaluationContext(EvaluationContext evaluationContext) {
return evaluationContext;
}
protected MultisetSemaphore<PackageIdentifier> makeFreshPackageMultisetSemaphore() {
return MultisetSemaphore.unbounded();
}
@ThreadSafe
public MultisetSemaphore<PackageIdentifier> getPackageMultisetSemaphore() {
return packageSemaphore;
}
protected void checkEvaluationResult(Set<SkyKey> roots, EvaluationResult<SkyValue> result)
throws QueryException {
// If the only root is the universe key, we expect to see either a single successfully evaluated
// value or a cycle in the result.
if (roots.size() == 1 && Iterables.getOnlyElement(roots).equals(universeKey)) {
Collection<SkyValue> values = result.values();
if (!values.isEmpty()) {
Preconditions.checkState(
values.size() == 1,
"Universe query \"%s\" returned multiple values unexpectedly (%s values in result)",
universeScope,
values.size());
Preconditions.checkNotNull(result.get(universeKey), result);
} else {
// No values in the result, so there must be an error. We expect the error to be a cycle.
boolean foundCycle = !Iterables.isEmpty(result.getError().getCycleInfo());
Preconditions.checkState(
foundCycle,
"Universe query \"%s\" failed with non-cycle error: %s",
universeScope,
result.getError());
}
}
}
@Override
public final QueryExpression transformParsedQuery(QueryExpression queryExpression) {
QueryExpressionMapper<Void> mapper = getQueryExpressionMapper();
QueryExpression transformedQueryExpression = queryExpression.accept(mapper);
logger.info(
String.format(
"transformed query [%s] to [%s]",
Ascii.truncate(
queryExpression.toString(), MAX_QUERY_EXPRESSION_LOG_CHARS, "[truncated]"),
Ascii.truncate(
transformedQueryExpression.toString(),
MAX_QUERY_EXPRESSION_LOG_CHARS,
"[truncated]")));
return transformedQueryExpression;
}
protected QueryExpressionMapper<Void> getQueryExpressionMapper() {
if (universeScope.size() != 1) {
return QueryExpressionMapper.identity();
}
TargetPattern.Parser targetPatternParser = new TargetPattern.Parser(parserPrefix);
String universeScopePattern = Iterables.getOnlyElement(universeScope);
return new RdepsToAllRdepsQueryExpressionMapper(targetPatternParser, universeScopePattern);
}
@Override
protected void evalTopLevelInternal(
QueryExpression expr, OutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException {
Throwable throwableToThrow = null;
try {
super.evalTopLevelInternal(expr, callback);
} catch (Throwable throwable) {
throwableToThrow = throwable;
} finally {
if (throwableToThrow != null) {
logger.log(
Level.INFO,
"About to shutdown query threadpool because of throwable",
throwableToThrow);
ListeningExecutorService obsoleteExecutor = executor;
// Signal that executor must be recreated on the next invocation.
executor = null;
// If evaluation failed abruptly (e.g. was interrupted), attempt to terminate all remaining
// tasks and then wait for them all to finish. We don't want to leave any dangling threads
// running tasks.
obsoleteExecutor.shutdownNow();
boolean interrupted = false;
boolean executorTerminated = false;
try {
while (!executorTerminated) {
try {
executorTerminated =
obsoleteExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
handleInterruptedShutdown();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
Throwables.propagateIfPossible(
throwableToThrow, QueryException.class, InterruptedException.class);
}
}
}
/**
* Subclasses may implement special handling when the query threadpool shutdown process is
* interrupted. This isn't likely to happen unless there's a bug in the lifecycle management of
* query tasks.
*/
protected void handleInterruptedShutdown() {}
@Override
public QueryEvalResult evaluateQuery(
QueryExpression expr, ThreadSafeOutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException, IOException {
beforeEvaluateQuery(expr);
// SkyQueryEnvironment batches callback invocations using a BatchStreamedCallback, created here
// so that there's one per top-level evaluateQuery call. The batch size is large enough that
// per-call costs of calling the original callback are amortized over a good number of targets,
// and small enough that holding a batch of targets in memory doesn't risk an OOM error.
//
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
BatchStreamedCallback batchCallback = new BatchStreamedCallback(
callback,
BATCH_CALLBACK_SIZE,
createUniquifierForOuterBatchStreamedCallback(expr));
return super.evaluateQuery(expr, batchCallback);
}
private Map<SkyKey, Collection<Target>> targetifyValues(
Map<SkyKey, ? extends Iterable<SkyKey>> input) throws InterruptedException {
return targetifyValues(
input,
makePackageKeyToTargetKeyMap(ImmutableSet.copyOf(Iterables.concat(input.values()))));
}
private Map<SkyKey, Collection<Target>> targetifyValues(
Map<SkyKey, ? extends Iterable<SkyKey>> input,
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
ImmutableMap.Builder<SkyKey, Collection<Target>> result = ImmutableMap.builder();
Map<SkyKey, Target> allTargets =
getTargetKeyToTargetMapForPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap);
for (Map.Entry<SkyKey, ? extends Iterable<SkyKey>> entry : input.entrySet()) {
Iterable<SkyKey> skyKeys = entry.getValue();
Set<Target> targets = CompactHashSet.createWithExpectedSize(Iterables.size(skyKeys));
for (SkyKey key : skyKeys) {
Target target = allTargets.get(key);
if (target != null) {
targets.add(target);
}
}
result.put(entry.getKey(), targets);
}
return result.build();
}
private Map<SkyKey, Collection<Target>> getRawReverseDeps(
Iterable<SkyKey> transitiveTraversalKeys) throws InterruptedException {
return targetifyValues(graph.getReverseDeps(transitiveTraversalKeys));
}
private Set<Label> getAllowedDeps(Rule rule) throws InterruptedException {
Set<Label> allowedLabels = new HashSet<>(rule.getTransitions(dependencyFilter).values());
allowedLabels.addAll(rule.getVisibility().getDependencyLabels());
// We should add deps from aspects, otherwise they are going to be filtered out.
allowedLabels.addAll(rule.getAspectLabelsSuperset(dependencyFilter));
return allowedLabels;
}
private Collection<Target> filterFwdDeps(Target target, Collection<Target> rawFwdDeps)
throws InterruptedException {
if (!(target instanceof Rule)) {
return rawFwdDeps;
}
final Set<Label> allowedLabels = getAllowedDeps((Rule) target);
return Collections2.filter(rawFwdDeps,
new Predicate<Target>() {
@Override
public boolean apply(Target target) {
return allowedLabels.contains(target.getLabel());
}
});
}
@Override
public ThreadSafeMutableSet<Target> getFwdDeps(
Iterable<Target> targets, QueryExpressionContext<Target> context)
throws InterruptedException {
Map<SkyKey, Target> targetsByKey = Maps.newHashMapWithExpectedSize(Iterables.size(targets));
for (Target target : targets) {
targetsByKey.put(TARGET_TO_SKY_KEY.apply(target), target);
}
Map<SkyKey, Collection<Target>> directDeps = targetifyValues(
graph.getDirectDeps(targetsByKey.keySet()));
if (targetsByKey.keySet().size() != directDeps.keySet().size()) {
Iterable<Label> missingTargets = Iterables.transform(
Sets.difference(targetsByKey.keySet(), directDeps.keySet()),
SKYKEY_TO_LABEL);
eventHandler.handle(Event.warn("Targets were missing from graph: " + missingTargets));
}
ThreadSafeMutableSet<Target> result = createThreadSafeMutableSet();
for (Map.Entry<SkyKey, Collection<Target>> entry : directDeps.entrySet()) {
result.addAll(filterFwdDeps(targetsByKey.get(entry.getKey()), entry.getValue()));
}
return result;
}
/**
* Returns deps in the form of {@link SkyKey}s.
*
* <p>The implementation of this method does not filter out deps due to disallowed edges,
* therefore callers are responsible for doing the right thing themselves.
*/
public Multimap<SkyKey, SkyKey> getUnfilteredDirectDepsOfSkyKeys(Iterable<SkyKey> keys)
throws InterruptedException {
ImmutableMultimap.Builder<SkyKey, SkyKey> builder = ImmutableMultimap.builder();
graph.getDirectDeps(keys).forEach(builder::putAll);
return builder.build();
}
@Override
public Collection<Target> getReverseDeps(
Iterable<Target> targets, QueryExpressionContext<Target> context)
throws InterruptedException {
return getReverseDepsOfTransitiveTraversalKeys(Iterables.transform(targets, TARGET_TO_SKY_KEY));
}
private Collection<Target> getReverseDepsOfTransitiveTraversalKeys(
Iterable<SkyKey> transitiveTraversalKeys) throws InterruptedException {
Map<SkyKey, Collection<Target>> rawReverseDeps = getRawReverseDeps(transitiveTraversalKeys);
return processRawReverseDeps(rawReverseDeps);
}
/** Targetify SkyKeys of reverse deps and filter out targets whose deps are not allowed. */
Collection<Target> filterRawReverseDepsOfTransitiveTraversalKeys(
Map<SkyKey, ? extends Iterable<SkyKey>> rawReverseDeps,
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
return processRawReverseDeps(targetifyValues(rawReverseDeps, packageKeyToTargetKeyMap));
}
private Collection<Target> processRawReverseDeps(Map<SkyKey, Collection<Target>> rawReverseDeps)
throws InterruptedException {
Set<Target> result = CompactHashSet.create();
CompactHashSet<Target> visited =
CompactHashSet.createWithExpectedSize(totalSizeOfCollections(rawReverseDeps.values()));
Set<Label> keys = CompactHashSet.create(Collections2.transform(rawReverseDeps.keySet(),
SKYKEY_TO_LABEL));
for (Collection<Target> parentCollection : rawReverseDeps.values()) {
for (Target parent : parentCollection) {
if (visited.add(parent)) {
if (parent instanceof Rule && dependencyFilter != DependencyFilter.ALL_DEPS) {
for (Label label : getAllowedDeps((Rule) parent)) {
if (keys.contains(label)) {
result.add(parent);
}
}
} else {
result.add(parent);
}
}
}
}
return result;
}
private static <T> int totalSizeOfCollections(Iterable<Collection<T>> nestedCollections) {
int totalSize = 0;
for (Collection<T> collection : nestedCollections) {
totalSize += collection.size();
}
return totalSize;
}
@Override
public ThreadSafeMutableSet<Target> getTransitiveClosure(
ThreadSafeMutableSet<Target> targets, QueryExpressionContext<Target> context)
throws InterruptedException {
return SkyQueryUtils.getTransitiveClosure(
targets, targets1 -> getFwdDeps(targets1, context), createThreadSafeMutableSet());
}
@Override
public ImmutableList<Target> getNodesOnPath(
Target from, Target to, QueryExpressionContext<Target> context) throws InterruptedException {
return SkyQueryUtils.getNodesOnPath(
from, to, targets -> getFwdDeps(targets, context), Target::getLabel);
}
protected final <R> ListenableFuture<R> safeSubmit(Callable<R> callable) {
try {
return executor.submit(callable);
} catch (RejectedExecutionException e) {
return Futures.immediateCancelledFuture();
}
}
private <R> ListenableFuture<R> safeSubmitAsync(AsyncCallable<R> callable) {
try {
return Futures.submitAsync(callable, executor);
} catch (RejectedExecutionException e) {
return Futures.immediateCancelledFuture();
}
}
@ThreadSafe
@Override
public QueryTaskFuture<Void> eval(
final QueryExpression expr,
final QueryExpressionContext<Target> context,
final Callback<Target> callback) {
// TODO(bazel-team): As in here, use concurrency for the async #eval of other QueryEnvironment
// implementations.
AsyncCallable<Void> task =
() -> (QueryTaskFutureImpl<Void>) expr.eval(SkyQueryEnvironment.this, context, callback);
return QueryTaskFutureImpl.ofDelegate(safeSubmitAsync(task));
}
@Override
public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
return QueryTaskFutureImpl.ofDelegate(safeSubmit(callable));
}
@Override
public <T1, T2> QueryTaskFuture<T2> transformAsync(
QueryTaskFuture<T1> future,
final Function<T1, QueryTaskFuture<T2>> function) {
return QueryTaskFutureImpl.ofDelegate(
Futures.transformAsync(
(QueryTaskFutureImpl<T1>) future,
input -> (QueryTaskFutureImpl<T2>) function.apply(input),
executor));
}
@Override
public <R> QueryTaskFuture<R> whenAllSucceedCall(
Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
return QueryTaskFutureImpl.ofDelegate(
Futures.whenAllSucceed(cast(futures)).call(callable, executor));
}
@ThreadSafe
@Override
public ThreadSafeMutableSet<Target> createThreadSafeMutableSet() {
return new ThreadSafeMutableKeyExtractorBackedSetImpl<>(
TargetKeyExtractor.INSTANCE, Target.class, queryEvaluationParallelismLevel);
}
@Override
public <V> MutableMap<Target, V> createMutableMap() {
return new MutableKeyExtractorBackedMapImpl<>(TargetKeyExtractor.INSTANCE);
}
@ThreadSafe
protected NonExceptionalUniquifier<Target> createUniquifierForOuterBatchStreamedCallback(
QueryExpression expr) {
return createUniquifier();
}
@ThreadSafe
@Override
public NonExceptionalUniquifier<Target> createUniquifier() {
return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE);
}
@ThreadSafe
@Override
public MinDepthUniquifier<Target> createMinDepthUniquifier() {
return new MinDepthUniquifierImpl<>(
TargetKeyExtractor.INSTANCE, queryEvaluationParallelismLevel);
}
@ThreadSafe
public MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
return new MinDepthUniquifierImpl<>(
SkyKeyKeyExtractor.INSTANCE, queryEvaluationParallelismLevel);
}
@ThreadSafe
public Uniquifier<SkyKey> createSkyKeyUniquifier() {
return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE);
}
private ImmutableSet<PathFragment> getBlacklistedExcludes(TargetPatternKey targetPatternKey)
throws InterruptedException {
return targetPatternKey.getAllBlacklistedSubdirectoriesToExclude(blacklistPatternsSupplier);
}
@ThreadSafe
@Override
public Collection<Target> getSiblingTargetsInPackage(Target target) {
return target.getPackage().getTargets().values();
}
@ThreadSafe
@Override
public QueryTaskFuture<Void> getTargetsMatchingPattern(
QueryExpression owner, String pattern, Callback<Target> callback) {
TargetPatternKey targetPatternKey;
try {
targetPatternKey = TargetPatternValue.key(
pattern, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, parserPrefix);
} catch (TargetParsingException tpe) {
try {
reportBuildFileError(owner, tpe.getMessage());
} catch (QueryException qe) {
return immediateFailedFuture(qe);
}
return immediateSuccessfulFuture(null);
}
return evalTargetPatternKey(owner, targetPatternKey, callback);
}
@ThreadSafe
public QueryTaskFuture<Void> evalTargetPatternKey(
QueryExpression owner, TargetPatternKey targetPatternKey, Callback<Target> callback) {
ImmutableSet<PathFragment> blacklistedSubdirectoriesToExclude;
try {
blacklistedSubdirectoriesToExclude = getBlacklistedExcludes(targetPatternKey);
} catch (InterruptedException ie) {
return immediateCancelledFuture();
}
TargetPattern patternToEval = targetPatternKey.getParsedPattern();
ImmutableSet<PathFragment> additionalSubdirectoriesToExclude =
targetPatternKey.getExcludedSubdirectories();
AsyncFunction<TargetParsingException, Void> reportBuildFileErrorAsyncFunction =
exn -> {
reportBuildFileError(owner, exn.getMessage());
return Futures.immediateFuture(null);
};
Callback<Target> filteredCallback = callback;
if (!targetPatternKey.getPolicy().equals(NO_FILTER)) {
filteredCallback =
targets ->
callback.process(
Iterables.filter(
targets,
target ->
targetPatternKey.getPolicy().shouldRetain(target, /*explicit=*/ false)));
}
ListenableFuture<Void> evalFuture =
patternToEval.evalAsync(
resolver,
blacklistedSubdirectoriesToExclude,
additionalSubdirectoriesToExclude,
filteredCallback,
QueryException.class,
executor);
return QueryTaskFutureImpl.ofDelegate(
Futures.catchingAsync(
evalFuture,
TargetParsingException.class,
reportBuildFileErrorAsyncFunction,
directExecutor()));
}
@ThreadSafe
@Override
public ThreadSafeMutableSet<Target> getBuildFiles(
QueryExpression caller,
ThreadSafeMutableSet<Target> nodes,
boolean buildFiles,
boolean loads,
QueryExpressionContext<Target> context)
throws QueryException, InterruptedException {
ThreadSafeMutableSet<Target> dependentFiles = createThreadSafeMutableSet();
Set<PackageIdentifier> seenPackages = new HashSet<>();
// Keep track of seen labels, to avoid adding a fake subinclude label that also exists as a
// real target.
Set<Label> seenLabels = new HashSet<>();
// Adds all the package definition files (BUILD files and build
// extensions) for package "pkg", to "buildfiles".
for (Target x : nodes) {
Package pkg = x.getPackage();
if (seenPackages.add(pkg.getPackageIdentifier())) {
if (buildFiles) {
addIfUniqueLabel(pkg.getBuildFile(), seenLabels, dependentFiles);
}
List<Label> extensions = new ArrayList<>();
if (loads) {
extensions.addAll(pkg.getSkylarkFileDependencies());
}
for (Label extension : extensions) {
Target loadTarget = getLoadTarget(extension, pkg);
addIfUniqueLabel(loadTarget, seenLabels, dependentFiles);
// Also add the BUILD file of the extension.
if (buildFiles) {
Label buildFileLabel = getBuildFileLabel(loadTarget.getLabel().getPackageIdentifier());
addIfUniqueLabel(new FakeLoadTarget(buildFileLabel, pkg), seenLabels, dependentFiles);
}
}
}
}
return dependentFiles;
}
protected Label getBuildFileLabel(PackageIdentifier packageIdentifier) throws QueryException {
// TODO(bazel-team): Try avoid filesystem access here.
Path buildFileForLoad = null;
try {
buildFileForLoad = pkgPath.getPackageBuildFile(packageIdentifier);
} catch (NoSuchPackageException e) {
throw new QueryException(packageIdentifier + " does not exist in graph");
}
return Label.createUnvalidated(packageIdentifier, buildFileForLoad.getBaseName());
}
private static void addIfUniqueLabel(Target node, Set<Label> labels, Set<Target> nodes) {
if (labels.add(node.getLabel())) {
nodes.add(node);
}
}
private Target getLoadTarget(Label label, Package pkg) {
return new FakeLoadTarget(label, pkg);
}
@ThreadSafe
@Override
public TargetAccessor<Target> getAccessor() {
return accessor;
}
@ThreadSafe
private Package getPackage(PackageIdentifier packageIdentifier)
throws InterruptedException, QueryException, NoSuchPackageException {
SkyKey packageKey = PackageValue.key(packageIdentifier);
PackageValue packageValue = (PackageValue) graph.getValue(packageKey);
if (packageValue != null) {
Package pkg = packageValue.getPackage();
if (pkg.containsErrors()) {
throw new BuildFileContainsErrorsException(packageIdentifier);
}
return pkg;
} else {
NoSuchPackageException exception = (NoSuchPackageException) graph.getException(packageKey);
if (exception != null) {
throw exception;
}
if (graph.isCycle(packageKey)) {
throw new NoSuchPackageException(packageIdentifier, "Package depends on a cycle");
} else {
throw new QueryException(packageKey + " does not exist in graph");
}
}
}
@ThreadSafe
@Override
public Target getTarget(Label label)
throws TargetNotFoundException, QueryException, InterruptedException {
try {
Package pkg = getPackage(label.getPackageIdentifier());
return pkg.getTarget(label.getName());
} catch (NoSuchThingException e) {
throw new TargetNotFoundException(e);
}
}
@ThreadSafe
public Map<PackageIdentifier, Package> bulkGetPackages(Iterable<PackageIdentifier> pkgIds)
throws InterruptedException {
Set<SkyKey> pkgKeys = ImmutableSet.copyOf(PackageValue.keys(pkgIds));
ImmutableMap.Builder<PackageIdentifier, Package> pkgResults = ImmutableMap.builder();
Map<SkyKey, SkyValue> packages = graph.getSuccessfulValues(pkgKeys);
for (Map.Entry<SkyKey, SkyValue> pkgEntry : packages.entrySet()) {
PackageIdentifier pkgId = (PackageIdentifier) pkgEntry.getKey().argument();
PackageValue pkgValue = (PackageValue) pkgEntry.getValue();
pkgResults.put(pkgId, Preconditions.checkNotNull(pkgValue.getPackage(), pkgId));
}
return pkgResults.build();
}
@Override
public void buildTransitiveClosure(
QueryExpression caller,
ThreadSafeMutableSet<Target> targets,
int maxDepth) throws QueryException, InterruptedException {
// Everything has already been loaded, so here we just check for errors so that we can
// pre-emptively throw/report if needed.
Iterable<SkyKey> transitiveTraversalKeys = makeTransitiveTraversalKeys(targets);
ImmutableList.Builder<String> errorMessagesBuilder = ImmutableList.builder();
// First, look for errors in the successfully evaluated TransitiveTraversalValues. They may
// have encountered errors that they were able to recover from.
Set<Map.Entry<SkyKey, SkyValue>> successfulEntries =
graph.getSuccessfulValues(transitiveTraversalKeys).entrySet();
ImmutableSet.Builder<SkyKey> successfulKeysBuilder = ImmutableSet.builder();
for (Map.Entry<SkyKey, SkyValue> successfulEntry : successfulEntries) {
successfulKeysBuilder.add(successfulEntry.getKey());
TransitiveTraversalValue value = (TransitiveTraversalValue) successfulEntry.getValue();
String firstErrorMessage = value.getFirstErrorMessage();
if (firstErrorMessage != null) {
errorMessagesBuilder.add(firstErrorMessage);
}
}
ImmutableSet<SkyKey> successfulKeys = successfulKeysBuilder.build();
// Next, look for errors from the unsuccessfully evaluated TransitiveTraversal skyfunctions.
Iterable<SkyKey> unsuccessfulKeys =
Iterables.filter(transitiveTraversalKeys, Predicates.not(Predicates.in(successfulKeys)));
Set<Map.Entry<SkyKey, Exception>> errorEntries =
graph.getMissingAndExceptions(unsuccessfulKeys).entrySet();
for (Map.Entry<SkyKey, Exception> entry : errorEntries) {
if (entry.getValue() == null) {
// Targets may be in the graph because they are not in the universe or depend on cycles.
eventHandler.handle(Event.warn(entry.getKey().argument() + " does not exist in graph"));
} else {
errorMessagesBuilder.add(entry.getValue().getMessage());
}
}
// Lastly, report all found errors.
ImmutableList<String> errorMessages = errorMessagesBuilder.build();
for (String errorMessage : errorMessages) {
reportBuildFileError(caller, errorMessage);
}
}
@Override
protected void preloadOrThrow(QueryExpression caller, Collection<String> patterns)
throws QueryException, TargetParsingException {
// SkyQueryEnvironment directly evaluates target patterns in #getTarget and similar methods
// using its graph, which is prepopulated using the universeScope (see #beforeEvaluateQuery),
// so no preloading of target patterns is necessary.
}
public ExtendedEventHandler getEventHandler() {
return eventHandler;
}
public static final Predicate<SkyKey> IS_TTV =
SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL);
public static final Function<SkyKey, Label> SKYKEY_TO_LABEL =
skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null;
static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER =
skyKey -> (PackageIdentifier) skyKey.argument();
public static Multimap<SkyKey, SkyKey> makePackageKeyToTargetKeyMap(Iterable<SkyKey> keys) {
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = ArrayListMultimap.create();
for (SkyKey key : keys) {
Label label = SKYKEY_TO_LABEL.apply(key);
if (label == null) {
continue;
}
packageKeyToTargetKeyMap.put(PackageValue.key(label.getPackageIdentifier()), key);
}
return packageKeyToTargetKeyMap;
}
public static Set<PackageIdentifier> getPkgIdsNeededForTargetification(
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) {
return packageKeyToTargetKeyMap
.keySet()
.stream()
.map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
.collect(toImmutableSet());
}
@ThreadSafe
public Map<SkyKey, Target> getTargetKeyToTargetMapForPackageKeyToTargetKeyMap(
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
ImmutableMap.Builder<SkyKey, Target> resultBuilder = ImmutableMap.builder();
getTargetsForPackageKeyToTargetKeyMapHelper(packageKeyToTargetKeyMap, resultBuilder::put);
return resultBuilder.build();
}
@ThreadSafe
public Multimap<PackageIdentifier, Target> getPkgIdToTargetMultimapForPackageKeyToTargetKeyMap(
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
Multimap<PackageIdentifier, Target> result = ArrayListMultimap.create();
getTargetsForPackageKeyToTargetKeyMapHelper(
packageKeyToTargetKeyMap,
(k, t) -> result.put(t.getLabel().getPackageIdentifier(), t));
return result;
}
private void getTargetsForPackageKeyToTargetKeyMapHelper(
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap,
BiConsumer<SkyKey, Target> targetKeyAndTargetConsumer) throws InterruptedException {
Set<SkyKey> processedTargets = new HashSet<>();
Map<SkyKey, SkyValue> packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet());
for (Map.Entry<SkyKey, SkyValue> entry : packageMap.entrySet()) {
Package pkg = ((PackageValue) entry.getValue()).getPackage();
for (SkyKey targetKey : packageKeyToTargetKeyMap.get(entry.getKey())) {
if (processedTargets.add(targetKey)) {
try {
Target target = pkg.getTarget(SKYKEY_TO_LABEL.apply(targetKey).getName());
targetKeyAndTargetConsumer.accept(targetKey, target);
} catch (NoSuchTargetException e) {
// Skip missing target.
}
}
}
}
}
static final Function<Target, SkyKey> TARGET_TO_SKY_KEY =
target -> TransitiveTraversalValue.key(target.getLabel());
/** A strict (i.e. non-lazy) variant of {@link #makeTransitiveTraversalKeys}. */
public static Iterable<SkyKey> makeTransitiveTraversalKeysStrict(Iterable<Target> targets) {
return ImmutableList.copyOf(makeTransitiveTraversalKeys(targets));
}
private static Iterable<SkyKey> makeTransitiveTraversalKeys(Iterable<Target> targets) {
return Iterables.transform(targets, TARGET_TO_SKY_KEY);
}
@Override
public Target getOrCreate(Target target) {
return target;
}
/**
* Returns package lookup keys for looking up the package root for which there may be a relevant
* (from the perspective of {@link #getRBuildFiles}) {@link FileStateValue} node in the graph for
* {@code originalFileFragment}, which is assumed to be a file path.
*
* <p>This is a helper function for {@link #getFileStateKeysForFileFragments}.
*/
private static Iterable<SkyKey> getPkgLookupKeysForFile(PathFragment originalFileFragment,
PathFragment currentPathFragment) {
if (originalFileFragment.equals(currentPathFragment)
&& originalFileFragment.equals(LabelConstants.WORKSPACE_FILE_NAME)) {
// TODO(mschaller): this should not be checked at runtime. These are constants!
Preconditions.checkState(
LabelConstants.WORKSPACE_FILE_NAME
.getParentDirectory()
.equals(PathFragment.EMPTY_FRAGMENT),
LabelConstants.WORKSPACE_FILE_NAME);
return ImmutableList.of(
PackageLookupValue.key(LabelConstants.EXTERNAL_PACKAGE_IDENTIFIER),
PackageLookupValue.key(PackageIdentifier.createInMainRepo(PathFragment.EMPTY_FRAGMENT)));
}
PathFragment parentPathFragment = currentPathFragment.getParentDirectory();
return parentPathFragment == null
? ImmutableList.of()
: ImmutableList.of(
PackageLookupValue.key(PackageIdentifier.createInMainRepo(parentPathFragment)));
}
/**
* Returns FileStateValue keys for which there may be relevant (from the perspective of {@link
* #getRBuildFiles}) FileStateValues in the graph corresponding to the given {@code
* pathFragments}, which are assumed to be file paths.
*
* <p>To do this, we emulate the {@link ContainingPackageLookupFunction} logic: for each given
* file path, we look for the nearest ancestor directory (starting with its parent directory), if
* any, that has a package. The {@link PackageLookupValue} for this package tells us the package
* root that we should use for the {@link RootedPath} for the {@link FileStateValue} key.
*
* <p>Note that there may not be nodes in the graph corresponding to the returned SkyKeys.
*/
protected Collection<SkyKey> getFileStateKeysForFileFragments(
Iterable<PathFragment> pathFragments) throws InterruptedException {
Set<SkyKey> result = new HashSet<>();
Multimap<PathFragment, PathFragment> currentToOriginal = ArrayListMultimap.create();
for (PathFragment pathFragment : pathFragments) {
currentToOriginal.put(pathFragment, pathFragment);
}
while (!currentToOriginal.isEmpty()) {
Multimap<SkyKey, PathFragment> packageLookupKeysToOriginal = ArrayListMultimap.create();
Multimap<SkyKey, PathFragment> packageLookupKeysToCurrent = ArrayListMultimap.create();
for (Map.Entry<PathFragment, PathFragment> entry : currentToOriginal.entries()) {
PathFragment current = entry.getKey();
PathFragment original = entry.getValue();
for (SkyKey packageLookupKey : getPkgLookupKeysForFile(original, current)) {
packageLookupKeysToOriginal.put(packageLookupKey, original);
packageLookupKeysToCurrent.put(packageLookupKey, current);
}
}
Map<SkyKey, SkyValue> lookupValues =
graph.getSuccessfulValues(packageLookupKeysToOriginal.keySet());
for (Map.Entry<SkyKey, SkyValue> entry : lookupValues.entrySet()) {
SkyKey packageLookupKey = entry.getKey();
PackageLookupValue packageLookupValue = (PackageLookupValue) entry.getValue();
if (packageLookupValue.packageExists()) {
Collection<PathFragment> originalFiles =
packageLookupKeysToOriginal.get(packageLookupKey);
Preconditions.checkState(!originalFiles.isEmpty(), entry);
for (PathFragment fileName : originalFiles) {
result.add(FileStateValue.key(
RootedPath.toRootedPath(packageLookupValue.getRoot(), fileName)));
}
for (PathFragment current : packageLookupKeysToCurrent.get(packageLookupKey)) {
currentToOriginal.removeAll(current);
}
}
}
Multimap<PathFragment, PathFragment> newCurrentToOriginal = ArrayListMultimap.create();
for (PathFragment pathFragment : currentToOriginal.keySet()) {
PathFragment parent = pathFragment.getParentDirectory();
if (parent != null) {
newCurrentToOriginal.putAll(parent, currentToOriginal.get(pathFragment));
}
}
currentToOriginal = newCurrentToOriginal;
}
return result;
}
protected void getBuildFileTargetsForPackageKeysAndProcessViaCallback(
Iterable<SkyKey> packageKeys,
QueryExpressionContext<Target> context,
Callback<Target> callback)
throws QueryException, InterruptedException {
Set<PackageIdentifier> pkgIds =
Streams.stream(packageKeys)
.map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
.collect(toImmutableSet());
packageSemaphore.acquireAll(pkgIds);
try {
Iterable<Target> buildFileTargets =
Iterables.transform(
graph.getSuccessfulValues(packageKeys).values(),
skyValue -> ((PackageValue) skyValue).getPackage().getBuildFile());
callback.process(buildFileTargets);
} finally {
packageSemaphore.releaseAll(pkgIds);
}
}
/**
* Calculates the set of packages whose evaluation transitively depends on (e.g. via 'load'
* statements) the contents of the specified paths. The emitted {@link Target}s are BUILD file
* targets.
*/
@ThreadSafe
QueryTaskFuture<Void> getRBuildFiles(
Collection<PathFragment> fileIdentifiers,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return QueryTaskFutureImpl.ofDelegate(
safeSubmit(
() -> {
ParallelSkyQueryUtils.getRBuildFilesParallel(
SkyQueryEnvironment.this, fileIdentifiers, context, callback);
return null;
}));
}
@Override
public Iterable<QueryFunction> getFunctions() {
return ImmutableList.<QueryFunction>builder()
.addAll(super.getFunctions())
.add(new AllRdepsFunction())
.add(new RBuildFilesFunction())
.build();
}
private static class BlacklistSupplier
implements InterruptibleSupplier<ImmutableSet<PathFragment>> {
private final WalkableGraph graph;
private BlacklistSupplier(WalkableGraph graph) {
this.graph = graph;
}
@Override
public ImmutableSet<PathFragment> get() throws InterruptedException {
return ((BlacklistedPackagePrefixesValue)
graph.getValue(BlacklistedPackagePrefixesValue.key()))
.getPatterns();
}
}
private static class SkyKeyKeyExtractor implements KeyExtractor<SkyKey, SkyKey> {
private static final SkyKeyKeyExtractor INSTANCE = new SkyKeyKeyExtractor();
private SkyKeyKeyExtractor() {
}
@Override
public SkyKey extractKey(SkyKey element) {
return element;
}
}
/**
* Wraps a {@link Callback} and guarantees that all calls to the original will have at least
* {@code batchThreshold} {@link Target}s, except for the final such call.
*
* <p>Retains fewer than {@code batchThreshold} {@link Target}s at a time.
*
* <p>After this object's {@link #process} has been called for the last time, {#link
* #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
* original.
*
* <p>This callback may be called from multiple threads concurrently. At most one thread will call
* the wrapped {@code callback} concurrently.
*/
// TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching
// strategy probably hurts performance since we can only start formatting results once the entire
// query is finished.
// TODO(nharmata): This batching strategy is also potentially harmful from a memory perspective
// since when the Targets being output are backed by Package instances, we're delaying GC of the
// Package instances until the output batch size is met.
private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target>
implements Callback<Target> {
// TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness
// concern that requires the prohibition of concurrent uses of the callback; the only concern is
// memory. We should have a threshold for when to invoke the callback with a batch, and also a
// separate, larger, bound on the number of targets being processed at the same time.
private final ThreadSafeOutputFormatterCallback<Target> callback;
private final NonExceptionalUniquifier<Target> uniquifier;
private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
private BatchStreamedCallback(
ThreadSafeOutputFormatterCallback<Target> callback,
int batchThreshold,
NonExceptionalUniquifier<Target> uniquifier) {
this.callback = callback;
this.batchThreshold = batchThreshold;
this.uniquifier = uniquifier;
}
@Override
public void start() throws IOException {
callback.start();
}
@Override
public void processOutput(Iterable<Target> partialResult)
throws IOException, InterruptedException {
ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
Iterable<Target> toProcess = null;
synchronized (pendingLock) {
Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
pending.addAll(uniquifiedTargets);
if (pending.size() >= batchThreshold) {
toProcess = pending;
pending = new ArrayList<>();
}
}
if (toProcess != null) {
callback.processOutput(toProcess);
}
}
@Override
public void close(boolean failFast) throws IOException, InterruptedException {
if (!failFast) {
processLastPending();
}
callback.close(failFast);
}
private void processLastPending() throws IOException, InterruptedException {
synchronized (pendingLock) {
if (!pending.isEmpty()) {
callback.processOutput(pending);
pending = null;
}
}
}
}
@ThreadSafe
@Override
public QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
QueryExpression expression,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
this, expression, context, callback, packageSemaphore);
}
@ThreadSafe
@Override
public QueryTaskFuture<Void> getAllRdepsBoundedParallel(
QueryExpression expression,
int depth,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return ParallelSkyQueryUtils.getAllRdepsBoundedParallel(
this, expression, depth, context, callback, packageSemaphore);
}
protected QueryTaskFuture<Predicate<SkyKey>> getUnfilteredUniverseDTCSkyKeyPredicateFuture(
QueryExpression universe, QueryExpressionContext<Target> context) {
return ParallelSkyQueryUtils.getDTCSkyKeyPredicateFuture(
this,
universe,
context,
BATCH_CALLBACK_SIZE,
queryEvaluationParallelismLevel);
}
@ThreadSafe
@Override
public QueryTaskFuture<Void> getRdepsUnboundedParallel(
QueryExpression expression,
QueryExpression universe,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return transformAsync(
// Even if we need to do edge filtering, it's fine to construct the rdeps universe via an
// unfiltered DTC visitation; the subsequent rdeps visitation will perform the edge
// filtering.
getUnfilteredUniverseDTCSkyKeyPredicateFuture(universe, context),
unfilteredUniversePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel(
this, expression, unfilteredUniversePredicate, context, callback, packageSemaphore));
}
@Override
public QueryTaskFuture<Void> getDepsUnboundedParallel(
QueryExpression expression,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return ParallelSkyQueryUtils.getDepsUnboundedParallel(
SkyQueryEnvironment.this,
expression,
context,
callback,
packageSemaphore,
/*depsNeedFiltering=*/ !dependencyFilter.equals(DependencyFilter.ALL_DEPS));
}
@ThreadSafe
@Override
public QueryTaskFuture<Void> getRdepsBoundedParallel(
QueryExpression expression,
int depth,
QueryExpression universe,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return transformAsync(
// Even if we need to do edge filtering, it's fine to construct the rdeps universe via an
// unfiltered DTC visitation; the subsequent rdeps visitation will perform the edge
// filtering.
getUnfilteredUniverseDTCSkyKeyPredicateFuture(universe, context),
universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel(
this, expression, depth, universePredicate, context, callback, packageSemaphore));
}
/**
* Query evaluation behavior is specified with respect to errors it emits. (Or at least it should
* be. Tools rely on it.) Notably, errors that occur during evaluation of a query's universe must
* not be emitted during query command evaluation. Consider the case of a simple single target
* query when {@code //...} is the universe: errors in far flung parts of the workspace should not
* be emitted when that query command is evaluated.
*
* <p>Non-error message events are not specified. For instance, it's useful (and expected by some
* unit tests that should know better) for query commands to emit {@link EventKind#PROGRESS}
* events during package loading.
*
* <p>Therefore, this class is used to forward only non-{@link EventKind#ERROR} events during
* universe loading to the {@link SkyQueryEnvironment}'s {@link ExtendedEventHandler}.
*/
protected static class ErrorBlockingForwardingEventHandler extends DelegatingEventHandler {
public ErrorBlockingForwardingEventHandler(ExtendedEventHandler delegate) {
super(delegate);
}
@Override
public void handle(Event e) {
if (!e.getKind().equals(EventKind.ERROR)) {
super.handle(e);
}
}
}
}