Unify AbstractBlazeQueryEnvironment#evaluateQuery with its subclass overrides. Also, have AbstractBlazeQueryEnvironment#evaluateQuery take an OutputFormatterCallback instance rather than a Callback instance. This is more sensible since the latter is only intended to be used intra-query, while the former is intended for usage in end-to-end query evaluation. This lets us slightly simplify QueryCommand, by shifting the responsibility for managing the OutputFormatterCallback to AbstractBlazeQueryEnvironment#evaluateQuery.
--
MOS_MIGRATED_REVID=134827588
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2913d1e..b920787 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -52,6 +52,7 @@
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
+import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
@@ -87,6 +88,7 @@
import com.google.devtools.build.skyframe.SkyValue;
import com.google.devtools.build.skyframe.WalkableGraph;
import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
+import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -102,7 +104,6 @@
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -309,8 +310,24 @@
}
@Override
- public QueryEvalResult evaluateQuery(QueryExpression expr, Callback<Target> callback)
- throws QueryException, InterruptedException {
+ protected void evalTopLevelInternal(
+ QueryExpression expr, OutputFormatterCallback<Target> callback)
+ throws QueryException, InterruptedException {
+ try {
+ super.evalTopLevelInternal(expr, callback);
+ } finally {
+ // Force termination of remaining tasks - if evaluateQuery was successful there should be
+ // none, if it failed abruptly (e.g. was interrupted) we don't want to leave any dangling
+ // threads running tasks.
+ forkJoinPool.shutdownNow();
+ forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public QueryEvalResult evaluateQuery(
+ QueryExpression expr, OutputFormatterCallback<Target> callback)
+ throws QueryException, InterruptedException, IOException {
// Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The
// result is set to have an error iff there were errors emitted during the query, so we reset
// errors here.
@@ -324,47 +341,8 @@
//
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
- final BatchStreamedCallback aggregator =
- new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
-
- final AtomicBoolean empty = new AtomicBoolean(true);
- ThreadSafeCallback<Target> callbackWithEmptyCheck =
- new ThreadSafeCallback<Target>() {
- @Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
- empty.compareAndSet(true, Iterables.isEmpty(partialResult));
- aggregator.process(partialResult);
- }
- };
- try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
- try {
- eval(expr, VariableContext.<Target>empty(), callbackWithEmptyCheck);
- } catch (QueryException e) {
- throw new QueryException(e, expr);
- } finally {
- // Force termination of remaining tasks - if evaluateQuery was successful there should be
- // none, if it failed abruptly (e.g. was interrupted) we don't want to leave any dangling
- // threads running tasks.
- forkJoinPool.shutdownNow();
- forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
- aggregator.processLastPending();
- }
-
- if (eventHandler.hasErrors()) {
- if (!keepGoing) {
- // This case represents loading-phase errors reported during evaluation
- // of target patterns that don't cause evaluation to fail per se.
- throw new QueryException(
- "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
- } else {
- eventHandler.handle(
- Event.warn("--keep_going specified, ignoring errors. " + "Results may be inaccurate"));
- }
- }
-
- return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
+ BatchStreamedCallback batchCallback = new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
+ return super.evaluateQuery(expr, batchCallback);
}
private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input)
@@ -1025,38 +1003,50 @@
* call the wrapped {@code callback} concurrently.
*/
@ThreadSafe
- private static class BatchStreamedCallback implements ThreadSafeCallback<Target> {
+ private static class BatchStreamedCallback
+ extends OutputFormatterCallback<Target> implements ThreadSafeCallback<Target> {
- private final Callback<Target> callback;
+ private final OutputFormatterCallback<Target> callback;
private final ThreadSafeUniquifier<Target> uniquifier =
new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
+ private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
}
@Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
+ public void start() throws IOException {
+ callback.start();
+ }
+
+ @Override
+ public void processOutput(Iterable<Target> partialResult)
+ throws IOException, InterruptedException {
ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
synchronized (pendingLock) {
Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
pending.addAll(uniquifiedTargets);
if (pending.size() >= batchThreshold) {
- callback.process(pending);
+ callback.processOutput(pending);
pending = new ArrayList<>();
}
}
}
- private void processLastPending() throws QueryException, InterruptedException {
+ @Override
+ public void close() throws IOException, InterruptedException {
+ processLastPending();
+ callback.close();
+ }
+
+ private void processLastPending() throws IOException, InterruptedException {
synchronized (pendingLock) {
if (!pending.isEmpty()) {
- callback.process(pending);
+ callback.processOutput(pending);
pending = null;
}
}