Unify AbstractBlazeQueryEnvironment#evaluateQuery with its subclass overrides. Also, have AbstractBlazeQueryEnvironment#evaluateQuery take an OutputFormatterCallback instance rather than a Callback instance. This is more sensible since the latter is only intended to be used intra-query, while the former is intended for usage in end-to-end query evaluation. This lets us slightly simplify QueryCommand, by shifting the responsibility for managing the OutputFormatterCallback to AbstractBlazeQueryEnvironment#evaluateQuery.

--
MOS_MIGRATED_REVID=134827588
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
index 1962cdf..76a7f36 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
@@ -25,15 +25,18 @@
 import com.google.devtools.build.lib.packages.DependencyFilter;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.profiler.AutoProfiler;
-import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
 import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
+import com.google.devtools.build.lib.query2.engine.QueryUtil;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.util.Preconditions;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -93,7 +96,20 @@
   }
 
   /**
-   * Evaluate the specified query expression in this environment.
+   * Used by {@link #evaluateQuery} to evaluate the given {@code expr}. The caller,
+   * {@link #evaluateQuery}, not {@link #evalTopLevelInternal}, is responsible for managing
+   * {@code callback}.
+   */
+  protected void evalTopLevelInternal(QueryExpression expr, OutputFormatterCallback<T> callback)
+      throws QueryException, InterruptedException {
+    eval(expr, VariableContext.<T>empty(), callback);
+  }
+
+  /**
+   * Evaluate the specified query expression in this environment, streaming results to the given
+   * {@code callback}. {@code callback.start()} will be called before query evaluation and
+   * {@code callback.close()} will be unconditionally called at the end of query evaluation
+   * (i.e. regardless of whether it was successful).
    *
    * @return a {@link QueryEvalResult} object that contains the resulting set of targets and a bit
    *   to indicate whether errors occurred during evaluation; note that the
@@ -101,12 +117,12 @@
    * @throws QueryException if the evaluation failed and {@code --nokeep_going} was in
    *   effect
    */
-  public QueryEvalResult evaluateQuery(QueryExpression expr, final Callback<T> callback)
-      throws QueryException, InterruptedException {
-
-    final AtomicBoolean empty = new AtomicBoolean(true);
+  public QueryEvalResult evaluateQuery(
+      QueryExpression expr,
+      final OutputFormatterCallback<T> callback)
+          throws QueryException, InterruptedException, IOException {
+    EmptinessSensingCallback<T> emptySensingCallback = createEmptinessSensingCallback(callback);
     try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
-
       // In the --nokeep_going case, errors are reported in the order in which the patterns are
       // specified; using a linked hash set here makes sure that the left-most error is reported.
       Set<String> targetPatternSet = new LinkedHashSet<>();
@@ -117,17 +133,24 @@
         // Unfortunately, by evaluating the patterns in parallel, we lose some location information.
         throw new QueryException(expr, e.getMessage());
       }
+      IOException ioExn = null;
       try {
-        this.eval(expr, VariableContext.<T>empty(), new Callback<T>() {
-          @Override
-          public void process(Iterable<T> partialResult)
-              throws QueryException, InterruptedException {
-            empty.compareAndSet(true, Iterables.isEmpty(partialResult));
-            callback.process(partialResult);
-          }
-        });
+        callback.start();
+        evalTopLevelInternal(expr, emptySensingCallback);
       } catch (QueryException e) {
         throw new QueryException(e, expr);
+      } catch (InterruptedException e) {
+        throw e;
+      } finally {
+        try {
+          callback.close();
+        } catch (IOException e) {
+          // Only throw this IOException if we weren't about to throw a different exception.
+          ioExn = e;
+        }
+      }
+      if (ioExn != null) {
+        throw ioExn;
       }
     }
 
@@ -143,16 +166,62 @@
       }
     }
 
-    return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
+    return new QueryEvalResult(!eventHandler.hasErrors(), emptySensingCallback.isEmpty());
+  }
+
+  private static <T> EmptinessSensingCallback<T> createEmptinessSensingCallback(
+      OutputFormatterCallback<T> callback) {
+    return (callback instanceof ThreadSafeCallback)
+        ? new ThreadSafeEmptinessSensingCallback<>(callback)
+        : new EmptinessSensingCallback<>(callback);
+  }
+
+  private static class EmptinessSensingCallback<T> extends OutputFormatterCallback<T> {
+    private final OutputFormatterCallback<T> callback;
+    private final AtomicBoolean empty = new AtomicBoolean(true);
+
+    private EmptinessSensingCallback(OutputFormatterCallback<T> callback) {
+      this.callback = callback;
+    }
+
+    @Override
+    public void start() throws IOException {
+      callback.start();
+    }
+
+    @Override
+    public void processOutput(Iterable<T> partialResult)
+        throws IOException, InterruptedException {
+      empty.compareAndSet(true, Iterables.isEmpty(partialResult));
+      callback.processOutput(partialResult);
+    }
+
+    @Override
+    public void close() throws InterruptedException, IOException {
+      callback.close();
+    }
+
+    boolean isEmpty() {
+      return empty.get();
+    }
+  }
+
+  private static class ThreadSafeEmptinessSensingCallback<T>
+      extends EmptinessSensingCallback<T> implements ThreadSafeCallback<T> {
+    private ThreadSafeEmptinessSensingCallback(OutputFormatterCallback<T> callback) {
+      super(callback);
+      Preconditions.checkState(callback instanceof ThreadSafeCallback);
+    }
   }
 
   public QueryExpression transformParsedQuery(QueryExpression queryExpression) {
     return queryExpression;
   }
 
-  public QueryEvalResult evaluateQuery(String query, Callback<T> callback)
-      throws QueryException, InterruptedException {
-    return evaluateQuery(QueryExpression.parse(query, this), callback);
+  public QueryEvalResult evaluateQuery(String query, OutputFormatterCallback<T> callback)
+      throws QueryException, InterruptedException, IOException {
+    return evaluateQuery(
+        QueryExpression.parse(query, this), callback);
   }
 
   @Override
@@ -189,7 +258,7 @@
       // Will skip the target and keep going if -k is specified.
       reportBuildFileError(caller, e.getMessage());
     }
-    AggregateAllCallback<T> aggregatingCallback = new AggregateAllCallback<>();
+    AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback();
     getTargetsMatchingPattern(caller, pattern, aggregatingCallback);
     return aggregatingCallback.getResult();
   }