Cosmetic refactor of some of the helper methods used by ParallelSkyQueryUtils.RBuildFilesVisitor. Also a minor tweak of the batch size for feeding results to the callback.

Also correctly use the packageSemaphore in SkyQueryEnvironment's non-parallel implementation of rbuildfiles.

RELNOTES: None
PiperOrigin-RevId: 174039067
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 6a9d2df..88072aa 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2;
 
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -31,6 +32,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.collect.Streams;
 import com.google.common.util.concurrent.AsyncCallable;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
@@ -137,7 +139,7 @@
     implements StreamableQueryEnvironment<Target> {
   // 10k is likely a good balance between using batch efficiently and not blowing up memory.
   // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
-  static final int BATCH_CALLBACK_SIZE = 10000;
+  protected static final int BATCH_CALLBACK_SIZE = 10000;
   protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
   private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000;
   private static final Logger logger = Logger.getLogger(SkyQueryEnvironment.class.getName());
@@ -1045,7 +1047,26 @@
     }
     return result;
   }
-  static Iterable<Target> getBuildFilesForPackageValues(Iterable<SkyValue> packageValues) {
+
+  void getBuildFileTargetsForPackageKeysAndProcessViaCallback(
+      Iterable<SkyKey> packageKeys, Callback<Target> callback)
+      throws QueryException, InterruptedException {
+    Set<PackageIdentifier> pkgIds =
+          Streams.stream(packageKeys)
+              .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+              .collect(toImmutableSet());
+    packageSemaphore.acquireAll(pkgIds);
+    try {
+      Iterable<SkyValue> packageValues = graph.getSuccessfulValues(packageKeys).values();
+      Iterable<Target> buildFileTargets = getBuildFileTargetsFromPackageValues(packageValues);
+      callback.process(buildFileTargets);
+    } finally {
+      packageSemaphore.releaseAll(pkgIds);
+    }
+  }
+
+  protected Iterable<Target> getBuildFileTargetsFromPackageValues(
+      Iterable<SkyValue> packageValues) {
     // TODO(laurentlb): Use streams?
     return Iterables.transform(
         Iterables.filter(
@@ -1062,7 +1083,7 @@
         safeSubmit(
             () -> {
               ParallelSkyQueryUtils.getRBuildFilesParallel(
-                  SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore);
+                  SkyQueryEnvironment.this, fileIdentifiers, callback);
               return null;
             }));
   }
@@ -1103,14 +1124,12 @@
         }
         if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
           for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
-            callback.process(
-                getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
+            getBuildFileTargetsForPackageKeysAndProcessViaCallback(batch, callback);
           }
           resultKeys.clear();
         }
       }
-      callback.process(
-          getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
+      getBuildFileTargetsForPackageKeysAndProcessViaCallback(resultKeys, callback);
       return immediateSuccessfulFuture(null);
     } catch (QueryException e) {
       return immediateFailedFuture(e);
@@ -1189,6 +1208,9 @@
   // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching
   // strategy probably hurts performance since we can only start formatting results once the entire
   // query is finished.
+  // TODO(nharmata): This batching strategy is also potentially harmful from a memory perspective
+  // since when the Targets being output are backed by Package instances, we're delaying GC of the
+  // Package instances until the output batch size is met.
   private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target>
       implements Callback<Target> {