Include intermediate output key as a typed parameter for ParallelVisitor.

This will allow for stricter type checking as well as the ability to store output keys other than just skykeys.

This CL is mostly a no-op but has one subtle change:  In AbstractEdgeVisitor (now AbstractTargetOuputtingVisitor), we acquire the semaphore only while fetching the packages but don't immediately process the results using the callback. We do this already in other visitors so this is more consistent, nor is it a good idea to have a semaphore lock around an unknown callback that may possibly try acquire keys too.

PiperOrigin-RevId: 253080306
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 5a1d7f5..7b944d3 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -33,7 +33,6 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import com.google.common.collect.Streams;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -823,6 +822,10 @@
     }
   }
 
+  protected int getVisitBatchSizeForParallelVisitation() {
+    return ParallelSkyQueryUtils.VISIT_BATCH_SIZE;
+  }
+
   private Target getLoadTarget(Label label, Package pkg) {
     return new FakeLoadTarget(label, pkg);
   }
@@ -880,22 +883,27 @@
     Map<PackageIdentifier, Package> packageIdToPackageMap =
         bulkGetPackages(packageIdToLabelMap.keySet());
     ImmutableMap.Builder<Label, Target> resultBuilder = ImmutableMap.builder();
-    for (PackageIdentifier pkgId : packageIdToLabelMap.keySet()) {
-      Package pkg = packageIdToPackageMap.get(pkgId);
-      if (pkg == null) {
-        continue;
-      }
-      for (Label label : packageIdToLabelMap.get(pkgId)) {
-        Target target;
-        try {
-          target = pkg.getTarget(label.getName());
-        } catch (NoSuchTargetException e) {
+    packageSemaphore.acquireAll(packageIdToLabelMap.keySet());
+    try {
+      for (PackageIdentifier pkgId : packageIdToLabelMap.keySet()) {
+        Package pkg = packageIdToPackageMap.get(pkgId);
+        if (pkg == null) {
           continue;
         }
-        resultBuilder.put(label, target);
+        for (Label label : packageIdToLabelMap.get(pkgId)) {
+          Target target;
+          try {
+            target = pkg.getTarget(label.getName());
+          } catch (NoSuchTargetException e) {
+            continue;
+          }
+          resultBuilder.put(label, target);
+        }
       }
+      return resultBuilder.build();
+    } finally {
+      packageSemaphore.releaseAll(packageIdToLabelMap.keySet());
     }
-    return resultBuilder.build();
   }
 
   @ThreadSafe
@@ -1143,22 +1151,14 @@
     return result;
   }
 
-  protected void getBuildFileTargetsForPackageKeysAndProcessViaCallback(
-      Iterable<SkyKey> packageKeys,
-      QueryExpressionContext<Target> context,
-      Callback<Target> callback)
+  protected Iterable<Target> getBuildFileTargetsForPackageKeys(
+      Set<PackageIdentifier> pkgIds, QueryExpressionContext<Target> context)
       throws QueryException, InterruptedException {
-    Set<PackageIdentifier> pkgIds =
-          Streams.stream(packageKeys)
-              .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
-              .collect(toImmutableSet());
     packageSemaphore.acquireAll(pkgIds);
     try {
-      Iterable<Target> buildFileTargets =
-          Iterables.transform(
-              graph.getSuccessfulValues(packageKeys).values(),
-              skyValue -> ((PackageValue) skyValue).getPackage().getBuildFile());
-      callback.process(buildFileTargets);
+      return Iterables.transform(
+          graph.getSuccessfulValues(PackageValue.keys(pkgIds)).values(),
+          skyValue -> ((PackageValue) skyValue).getPackage().getBuildFile());
     } finally {
       packageSemaphore.releaseAll(pkgIds);
     }
@@ -1308,8 +1308,7 @@
       QueryExpression expression,
       QueryExpressionContext<Target> context,
       Callback<Target> callback) {
-    return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
-        this, expression, context, callback, packageSemaphore);
+    return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(this, expression, context, callback);
   }
 
   @ThreadSafe
@@ -1320,7 +1319,7 @@
       QueryExpressionContext<Target> context,
       Callback<Target> callback) {
     return ParallelSkyQueryUtils.getAllRdepsBoundedParallel(
-        this, expression, depth, context, callback, packageSemaphore);
+        this, expression, depth, context, callback);
   }
 
   protected QueryTaskFuture<Predicate<SkyKey>> getUnfilteredUniverseDTCSkyKeyPredicateFuture(
@@ -1345,8 +1344,9 @@
         // unfiltered DTC visitation; the subsequent rdeps visitation will perform the edge
         // filtering.
         getUnfilteredUniverseDTCSkyKeyPredicateFuture(universe, context),
-        unfilteredUniversePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel(
-            this, expression, unfilteredUniversePredicate, context, callback, packageSemaphore));
+        unfilteredUniversePredicate ->
+            ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel(
+                this, expression, unfilteredUniversePredicate, context, callback));
   }
 
   @Override
@@ -1359,7 +1359,6 @@
         expression,
         context,
         callback,
-        packageSemaphore,
         /*depsNeedFiltering=*/ !dependencyFilter.equals(DependencyFilter.ALL_DEPS));
   }
 
@@ -1376,8 +1375,9 @@
         // unfiltered DTC visitation; the subsequent rdeps visitation will perform the edge
         // filtering.
         getUnfilteredUniverseDTCSkyKeyPredicateFuture(universe, context),
-        universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel(
-            this, expression, depth, universePredicate, context, callback, packageSemaphore));
+        universePredicate ->
+            ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel(
+                this, expression, depth, universePredicate, context, callback));
   }
 
   /**