blob: 1034dc98884ef590cb16b3bf57f1d847c54ef4cb [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.query2;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionContext;
import com.google.devtools.build.lib.query2.engine.QueryUtil;
import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.skyframe.SkyKey;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
/**
* Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
*
* <p>Special attention is given to memory usage. Naive parallel implementations of query
* functionality would lead to memory blowup. Instead of dealing with {@link Target}s, we try to
* deal with {@link SkyKey}s as much as possible to reduce the number of {@link Package}s forcibly
* in memory at any given time.
*/
// TODO(bazel-team): Be more deliberate about bounding memory usage here.
public class ParallelSkyQueryUtils {
/** The maximum number of keys to visit at once. */
@VisibleForTesting public static final int VISIT_BATCH_SIZE = 10000;
private ParallelSkyQueryUtils() {
}
static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return env.eval(
expression,
context,
ParallelVisitorUtils.createParallelVisitorCallback(
new RdepsUnboundedVisitor.Factory(
env, /*unfilteredUniverse=*/ Predicates.alwaysTrue(), callback)));
}
static QueryTaskFuture<Void> getAllRdepsBoundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
int depth,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return env.eval(
expression,
context,
ParallelVisitorUtils.createParallelVisitorCallback(
new RdepsBoundedVisitor.Factory(
env, depth, /*universe=*/ Predicates.alwaysTrue(), callback)));
}
static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
Predicate<SkyKey> unfilteredUniverse,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return env.eval(
expression,
context,
ParallelVisitorUtils.createParallelVisitorCallback(
new RdepsUnboundedVisitor.Factory(env, unfilteredUniverse, callback)));
}
static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture(
SkyQueryEnvironment env,
QueryExpression expression,
QueryExpressionContext<Target> context,
int processResultsBatchSize,
int concurrencyLevel) {
QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture =
QueryUtil.evalAll(env, context, expression);
Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>>
getTransitiveClosureAsyncFunction =
universeValue -> {
ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback =
new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel);
return env.execute(
() -> {
UnfilteredSkyKeyLabelDTCVisitor visitor =
new UnfilteredSkyKeyLabelDTCVisitor.Factory(
env,
env.createSkyKeyUniquifier(),
processResultsBatchSize,
aggregateAllCallback)
.create();
visitor.visitAndWaitForCompletion(
SkyQueryEnvironment.makeLabelsStrict(universeValue));
return Predicates.in(aggregateAllCallback.getResult());
});
};
return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction);
}
static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
int depth,
Predicate<SkyKey> universe,
QueryExpressionContext<Target> context,
Callback<Target> callback) {
return env.eval(
expression,
context,
ParallelVisitorUtils.createParallelVisitorCallback(
new RdepsBoundedVisitor.Factory(env, depth, universe, callback)));
}
/** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
static void getRBuildFilesParallel(
SkyQueryEnvironment env,
Collection<PathFragment> fileIdentifiers,
QueryExpressionContext<Target> context,
Callback<Target> callback) throws QueryException, InterruptedException {
RBuildFilesVisitor visitor =
new RBuildFilesVisitor(
env,
/*visitUniquifier=*/ env.createSkyKeyUniquifier(),
/*resultUniquifier=*/ env.createSkyKeyUniquifier(),
context,
callback);
visitor.visitFileIdentifiersAndWaitForCompletion(env.graph, fileIdentifiers);
}
static QueryTaskFuture<Void> getDepsUnboundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
QueryExpressionContext<Target> context,
Callback<Target> callback,
boolean depsNeedFiltering,
QueryExpression caller) {
return env.eval(
expression,
context,
ParallelVisitorUtils.createParallelVisitorCallback(
new DepsUnboundedVisitor.Factory(env, callback, depsNeedFiltering, context, caller)));
}
static class DepAndRdep {
@Nullable final SkyKey dep;
final SkyKey rdep;
DepAndRdep(@Nullable SkyKey dep, SkyKey rdep) {
this.dep = dep;
this.rdep = rdep;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof DepAndRdep)) {
return false;
}
DepAndRdep other = (DepAndRdep) obj;
return Objects.equals(dep, other.dep) && rdep.equals(other.rdep);
}
@Override
public int hashCode() {
// N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g.
// Objects#hash). Depending on the structure of the graph being traversed, this method can
// be very hot.
return 31 * Objects.hashCode(dep) + rdep.hashCode();
}
}
static class DepAndRdepAtDepth {
final DepAndRdep depAndRdep;
final int rdepDepth;
DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) {
this.depAndRdep = depAndRdep;
this.rdepDepth = rdepDepth;
}
}
/** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */
@ThreadSafe
private static class ThreadSafeAggregateAllSkyKeysCallback
implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> {
private final Set<SkyKey> results;
private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) {
this.results =
Collections.newSetFromMap(
new ConcurrentHashMap<>(
/*initialCapacity=*/ concurrencyLevel, /*loadFactor=*/ 0.75f));
}
@Override
public void process(Iterable<SkyKey> partialResult)
throws QueryException, InterruptedException {
Iterables.addAll(results, partialResult);
}
@Override
public ImmutableSet<SkyKey> getResult() {
return ImmutableSet.copyOf(results);
}
}
}