Unify AbstractBlazeQueryEnvironment#evaluateQuery with its subclass overrides. Also, have AbstractBlazeQueryEnvironment#evaluateQuery take an OutputFormatterCallback instance rather than a Callback instance. This is more sensible since the latter is only intended to be used intra-query, while the former is intended for usage in end-to-end query evaluation. This lets us slightly simplify QueryCommand, by shifting the responsibility for managing the OutputFormatterCallback to AbstractBlazeQueryEnvironment#evaluateQuery.

--
MOS_MIGRATED_REVID=134827588
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2913d1e..b920787 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -52,6 +52,7 @@
 import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
 import com.google.devtools.build.lib.query2.engine.Callback;
 import com.google.devtools.build.lib.query2.engine.FunctionExpression;
+import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
@@ -87,6 +88,7 @@
 import com.google.devtools.build.skyframe.SkyValue;
 import com.google.devtools.build.skyframe.WalkableGraph;
 import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -102,7 +104,6 @@
 import java.util.Set;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
 
@@ -309,8 +310,24 @@
   }
 
   @Override
-  public QueryEvalResult evaluateQuery(QueryExpression expr, Callback<Target> callback)
-      throws QueryException, InterruptedException {
+  protected void evalTopLevelInternal(
+      QueryExpression expr, OutputFormatterCallback<Target> callback)
+          throws QueryException, InterruptedException {
+    try {
+      super.evalTopLevelInternal(expr, callback);
+    } finally {
+      // Force termination of remaining tasks - if evaluateQuery was successful there should be
+      // none, if it failed abruptly (e.g. was interrupted) we don't want to leave any dangling
+      // threads running tasks.
+      forkJoinPool.shutdownNow();
+      forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @Override
+  public QueryEvalResult evaluateQuery(
+      QueryExpression expr, OutputFormatterCallback<Target> callback)
+          throws QueryException, InterruptedException, IOException {
     // Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The
     // result is set to have an error iff there were errors emitted during the query, so we reset
     // errors here.
@@ -324,47 +341,8 @@
     //
     // This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
     // case of a race between the original callback and the eventHandler.
-    final BatchStreamedCallback aggregator =
-        new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
-
-    final AtomicBoolean empty = new AtomicBoolean(true);
-    ThreadSafeCallback<Target> callbackWithEmptyCheck =
-        new ThreadSafeCallback<Target>() {
-          @Override
-          public void process(Iterable<Target> partialResult)
-              throws QueryException, InterruptedException {
-            empty.compareAndSet(true, Iterables.isEmpty(partialResult));
-            aggregator.process(partialResult);
-          }
-        };
-    try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
-      try {
-        eval(expr, VariableContext.<Target>empty(), callbackWithEmptyCheck);
-      } catch (QueryException e) {
-        throw new QueryException(e, expr);
-      } finally {
-        // Force termination of remaining tasks - if evaluateQuery was successful there should be
-        // none, if it failed abruptly (e.g. was interrupted) we don't want to leave any dangling
-        // threads running tasks.
-        forkJoinPool.shutdownNow();
-        forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-      }
-      aggregator.processLastPending();
-    }
-
-    if (eventHandler.hasErrors()) {
-      if (!keepGoing) {
-        // This case represents loading-phase errors reported during evaluation
-        // of target patterns that don't cause evaluation to fail per se.
-        throw new QueryException(
-            "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
-      } else {
-        eventHandler.handle(
-            Event.warn("--keep_going specified, ignoring errors.  " + "Results may be inaccurate"));
-      }
-    }
-
-    return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
+    BatchStreamedCallback batchCallback = new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
+    return super.evaluateQuery(expr, batchCallback);
   }
 
   private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input)
@@ -1025,38 +1003,50 @@
    * call the wrapped {@code callback} concurrently.
    */
   @ThreadSafe
-  private static class BatchStreamedCallback implements ThreadSafeCallback<Target> {
+  private static class BatchStreamedCallback
+      extends OutputFormatterCallback<Target> implements ThreadSafeCallback<Target> {
 
-    private final Callback<Target> callback;
+    private final OutputFormatterCallback<Target> callback;
     private final ThreadSafeUniquifier<Target> uniquifier =
         new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
     private final Object pendingLock = new Object();
     private List<Target> pending = new ArrayList<>();
     private int batchThreshold;
 
-    private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
+    private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) {
       this.callback = callback;
       this.batchThreshold = batchThreshold;
     }
 
     @Override
-    public void process(Iterable<Target> partialResult)
-        throws QueryException, InterruptedException {
+    public void start() throws IOException {
+      callback.start();
+    }
+
+    @Override
+    public void processOutput(Iterable<Target> partialResult)
+        throws IOException, InterruptedException {
       ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
       synchronized (pendingLock) {
         Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
         pending.addAll(uniquifiedTargets);
         if (pending.size() >= batchThreshold) {
-          callback.process(pending);
+          callback.processOutput(pending);
           pending = new ArrayList<>();
         }
       }
     }
 
-    private void processLastPending() throws QueryException, InterruptedException {
+    @Override
+    public void close() throws IOException, InterruptedException {
+      processLastPending();
+      callback.close();
+    }
+
+    private void processLastPending() throws IOException, InterruptedException {
       synchronized (pendingLock) {
         if (!pending.isEmpty()) {
-          callback.process(pending);
+          callback.processOutput(pending);
           pending = null;
         }
       }