A bunch of small changes to prepare SkyQueryEnvironment for full-parallel evaluation:
-Rename QueryExpression#evalConcurrently to QueryExpression#parEval. (parallelism is not concurrency! See https://existentialtype.wordpress.com/2011/03/17/parallelism-is-not-concurrency/)
-Have SkyQueryEnvironment#eval (used recursively in #evaluateQuery) dynamically call QueryExpression#parEval when appropriate.
-Delete QueryExpression#canEvalConcurrently.
-Add ThreadSafety annotations in a bunch of relevant places in the query codebase.
-A bunch of testing infrastructure to test parallel query evaluation.
-TODOs for implementing parallel evaluation of all QueryExpression nodes.
--
MOS_MIGRATED_REVID=132436340
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 407c4f2..6623ca6 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -58,10 +58,12 @@
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -154,6 +156,7 @@
EventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
+ QueryExpressionEvalListener<Target> evalListener,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
@@ -164,7 +167,8 @@
/*labelFilter=*/ Rule.ALL_LABELS,
eventHandler,
settings,
- extraFunctions);
+ extraFunctions,
+ evalListener);
this.loadingPhaseThreads = loadingPhaseThreads;
this.graphFactory = graphFactory;
this.pkgPath = pkgPath;
@@ -306,8 +310,8 @@
new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
final AtomicBoolean empty = new AtomicBoolean(true);
- Callback<Target> callbackWithEmptyCheck =
- new Callback<Target>() {
+ ThreadSafeCallback<Target> callbackWithEmptyCheck =
+ new ThreadSafeCallback<Target>() {
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
@@ -317,12 +321,7 @@
};
try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
try {
- if (expr.canEvalConcurrently()) {
- expr.evalConcurrently(
- this, VariableContext.<Target>empty(), callbackWithEmptyCheck, threadPool);
- } else {
- expr.eval(this, VariableContext.<Target>empty(), callbackWithEmptyCheck);
- }
+ eval(expr, VariableContext.<Target>empty(), callbackWithEmptyCheck);
} catch (QueryException e) {
throw new QueryException(e, expr);
}
@@ -499,17 +498,26 @@
return null;
}
+ @ThreadSafe
@Override
public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
throws QueryException, InterruptedException {
- expr.eval(this, context, callback);
+ // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
+ // all QueryEnvironment implementations.
+ if (callback instanceof ThreadSafeCallback) {
+ expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, threadPool);
+ } else {
+ expr.eval(this, context, callback);
+ }
}
+ @ThreadSafe
@Override
public Uniquifier<Target> createUniquifier() {
return new ConcurrentUniquifier();
}
+ @ThreadSafe
@Override
public void getTargetsMatchingPattern(
QueryExpression owner, String pattern, Callback<Target> callback)
@@ -530,6 +538,7 @@
}
}
+ @ThreadSafe
@Override
public Set<Target> getBuildFiles(
QueryExpression caller,
@@ -592,11 +601,13 @@
return new FakeSubincludeTarget(label, pkg);
}
+ @ThreadSafe
@Override
public TargetAccessor<Target> getAccessor() {
return accessor;
}
+ @ThreadSafe
@Override
public Target getTarget(Label label)
throws TargetNotFoundException, QueryException, InterruptedException {
@@ -621,6 +632,7 @@
}
}
+ @ThreadSafe
public Map<PackageIdentifier, Package> bulkGetPackages(Iterable<PackageIdentifier> pkgIds)
throws InterruptedException {
Set<SkyKey> pkgKeys = ImmutableSet.copyOf(PackageValue.keys(pkgIds));
@@ -864,6 +876,7 @@
* Calculates the set of {@link Package} objects, represented as source file targets, that depend
* on the given list of BUILD files and subincludes (other files are filtered out).
*/
+ @ThreadSafe
void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
throws QueryException, InterruptedException {
Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
@@ -955,7 +968,7 @@
* call the wrapped {@code callback} concurrently.
*/
@ThreadSafe
- private static class BatchStreamedCallback implements Callback<Target> {
+ private static class BatchStreamedCallback implements ThreadSafeCallback<Target> {
private final Callback<Target> callback;
private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
@@ -992,6 +1005,7 @@
}
}
+ @ThreadSafe
@Override
public void getAllRdeps(
QueryExpression expression,