Modify SkyQueryEnvironment to work in stream mode. This streaming system is pretty simple: It aggregates up to 10k elements and then it notifies the parent.

--
MOS_MIGRATED_REVID=108144202
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 21e999b..5cb7a32 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -52,7 +52,6 @@
 import com.google.devtools.build.lib.query2.engine.QueryException;
 import com.google.devtools.build.lib.query2.engine.QueryExpression;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.skyframe.FileValue;
 import com.google.devtools.build.lib.skyframe.GraphBackedRecursivePackageProvider;
@@ -74,6 +73,7 @@
 import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
@@ -305,9 +305,11 @@
   @Override
   public void eval(QueryExpression expr, Callback<Target> callback)
       throws QueryException, InterruptedException {
-    AggregateAllCallback<Target> aggregator = new AggregateAllCallback<>();
+    // 10k is likely a good balance between using batch efficiently and not blowing up memory.
+    BatchStreamedCallback aggregator = new BatchStreamedCallback(callback, 10000,
+        createUniquifier());
     expr.eval(this, aggregator);
-    callback.process(aggregator.getResult());
+    aggregator.processLastPending();
   }
 
   @Override
@@ -691,4 +693,37 @@
         .add(new RBuildFilesFunction())
         .build();
   }
+
+  private static class BatchStreamedCallback implements Callback<Target> {
+
+    private final Callback<Target> callback;
+    private final Uniquifier<Target> uniquifier;
+    private List<Target> pending = new ArrayList<>();
+    private int batchThreshold;
+
+    private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
+        Uniquifier<Target> uniquifier) {
+      this.callback = callback;
+      this.batchThreshold = batchThreshold;
+      this.uniquifier = uniquifier;
+    }
+
+    @Override
+    public void process(Iterable<Target> partialResult)
+        throws QueryException, InterruptedException {
+      Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+      pending.addAll(uniquifier.unique(partialResult));
+      if (pending.size() >= batchThreshold) {
+        callback.process(pending);
+        pending = new ArrayList<>();
+      }
+    }
+
+    private void processLastPending() throws QueryException, InterruptedException {
+      if (!pending.isEmpty()) {
+        callback.process(pending);
+        pending = null;
+      }
+    }
+  }
 }