Add a mechanism for bounding the number of Packages SkyQueryEnvironment's expensive parallel operations can operate on at once.

--
MOS_MIGRATED_REVID=138779172
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index ac3e2a1..5f7447b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -18,12 +18,15 @@
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.collect.CompactHashSet;
 import com.google.devtools.build.lib.concurrent.MoreFutures;
+import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.Callback;
@@ -69,13 +72,14 @@
       QueryExpression expression,
       VariableContext<Target> context,
       ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
+      ForkJoinPool forkJoinPool,
+      MultisetSemaphore<PackageIdentifier> packageSemaphore)
           throws QueryException, InterruptedException {
     env.eval(
         expression,
         context,
         new SkyKeyBFSVisitorCallback(
-            new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool)));
+            new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore)));
   }
 
   /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -83,24 +87,29 @@
       SkyQueryEnvironment env,
       Collection<PathFragment> fileIdentifiers,
       ThreadSafeCallback<Target> callback,
-      ForkJoinPool forkJoinPool)
+      ForkJoinPool forkJoinPool,
+      MultisetSemaphore<PackageIdentifier> packageSemaphore)
           throws QueryException, InterruptedException {
     ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
-    RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback);
+    RBuildFilesVisitor visitor =
+        new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore);
     visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers));
   }
 
   /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
   private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> {
     private final SkyQueryEnvironment env;
+    private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
     private RBuildFilesVisitor(
         SkyQueryEnvironment env,
         ForkJoinPool forkJoinPool,
         ThreadSafeUniquifier<SkyKey> uniquifier,
-        Callback<Target> callback) {
+        Callback<Target> callback,
+        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
       super(forkJoinPool, uniquifier, callback);
       this.env = env;
+      this.packageSemaphore = packageSemaphore;
     }
 
     @Override
@@ -125,10 +134,21 @@
     }
 
     @Override
-    protected Iterable<Target> getTargetsToAddToResult(Iterable<SkyKey> keysToUseForResult)
-        throws InterruptedException {
-      return SkyQueryEnvironment.getBuildFilesForPackageValues(
-          env.graph.getSuccessfulValues(keysToUseForResult).values());
+    protected void processResultantTargets(
+        Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+            throws QueryException, InterruptedException {
+      Set<PackageIdentifier> pkgIdsNeededForResult =
+          ImmutableSet.copyOf(
+              Iterables.transform(
+                  keysToUseForResult,
+                  SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER));
+      packageSemaphore.acquireAll(pkgIdsNeededForResult);
+      try {
+        callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues(
+            env.graph.getSuccessfulValues(keysToUseForResult).values()));
+      } finally {
+        packageSemaphore.releaseAll(pkgIdsNeededForResult);
+      }
     }
 
     @Override
@@ -152,14 +172,17 @@
   private static class AllRdepsUnboundedVisitor
       extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> {
     private final SkyQueryEnvironment env;
+    private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
     private AllRdepsUnboundedVisitor(
         SkyQueryEnvironment env,
         ForkJoinPool forkJoinPool,
         ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
-        ThreadSafeCallback<Target> callback) {
+        ThreadSafeCallback<Target> callback,
+        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
       super(forkJoinPool, uniquifier, callback);
       this.env = env;
+      this.packageSemaphore = packageSemaphore;
     }
 
     /**
@@ -174,20 +197,24 @@
       private final ForkJoinPool forkJoinPool;
       private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
       private final ThreadSafeCallback<Target> callback;
+      private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
       private Factory(
         SkyQueryEnvironment env,
         ThreadSafeCallback<Target> callback,
-        ForkJoinPool forkJoinPool) {
+        ForkJoinPool forkJoinPool,
+        MultisetSemaphore<PackageIdentifier> packageSemaphore) {
         this.env = env;
         this.forkJoinPool = forkJoinPool;
         this.uniquifier = env.createReverseDepSkyKeyUniquifier();
         this.callback = callback;
+        this.packageSemaphore = packageSemaphore;
       }
 
       @Override
       public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() {
-        return new AllRdepsUnboundedVisitor(env, forkJoinPool, uniquifier, callback);
+        return new AllRdepsUnboundedVisitor(
+            env, forkJoinPool, uniquifier, callback, packageSemaphore);
       }
     }
 
@@ -213,13 +240,27 @@
         reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second);
       }
 
-      // Filter out disallowed deps. We cannot defer the targetification any further as we do not
-      // want to retrieve the rdeps of unwanted nodes (targets).
-      if (!reverseDepsMap.isEmpty()) {
-        Collection<Target> filteredTargets =
-            env.filterRawReverseDepsOfTransitiveTraversalKeys(reverseDepsMap);
-        filteredKeys.addAll(
-            Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY));
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+          env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values()));
+      Set<PackageIdentifier> pkgIdsNeededForTargetification =
+          ImmutableSet.copyOf(
+              Iterables.transform(
+                  packageKeyToTargetKeyMap.keySet(),
+                  SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER));
+      packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
+
+      try {
+        // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+        // want to retrieve the rdeps of unwanted nodes (targets).
+        if (!reverseDepsMap.isEmpty()) {
+          Collection<Target> filteredTargets =
+              env.filterRawReverseDepsOfTransitiveTraversalKeys(
+                  reverseDepsMap, packageKeyToTargetKeyMap);
+          filteredKeys.addAll(
+              Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY));
+        }
+      } finally {
+        packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
       }
 
       // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
@@ -252,9 +293,23 @@
     }
 
     @Override
-    protected Iterable<Target> getTargetsToAddToResult(Iterable<SkyKey> keysToUseForResult)
-        throws InterruptedException {
-      return env.makeTargetsFromSkyKeys(keysToUseForResult).values();
+    protected void processResultantTargets(
+        Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+            throws QueryException, InterruptedException {
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+          env.makePackageKeyToTargetKeyMap(keysToUseForResult);
+      Set<PackageIdentifier> pkgIdsNeededForResult =
+          ImmutableSet.copyOf(
+            Iterables.transform(
+                packageKeyToTargetKeyMap.keySet(),
+                SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER));
+      packageSemaphore.acquireAll(pkgIdsNeededForResult);
+      try {
+        callback.process(
+            env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
+      } finally {
+        packageSemaphore.releaseAll(pkgIdsNeededForResult);
+      }
     }
 
     @Override
@@ -294,7 +349,7 @@
 
   /**
    * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link
-   * ForkJoinQuiescingExecutor}.
+   * ForkJoinPool}.
    *
    * <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a
    * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a
@@ -310,7 +365,9 @@
     private static final int VISIT_BATCH_SIZE = 10000;
 
     private AbstractSkyKeyBFSVisitor(
-        ForkJoinPool forkJoinPool, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
+        ForkJoinPool forkJoinPool,
+        ThreadSafeUniquifier<T> uniquifier,
+        Callback<Target> callback) {
       this.forkJoinPool = forkJoinPool;
       this.uniquifier = uniquifier;
       this.callback = callback;
@@ -402,7 +459,7 @@
 
       @Override
       protected void computeImpl() throws QueryException, InterruptedException {
-        callback.process(getTargetsToAddToResult(keysToUseForResult));
+        processResultantTargets(keysToUseForResult, callback);
       }
     }
 
@@ -425,11 +482,12 @@
     }
 
     /**
-     * Gets the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in the
-     * full visitation.
+     * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s
+     * in the full visitation to the given {@link Callback}.
      */
-    protected abstract Iterable<Target> getTargetsToAddToResult(
-        Iterable<SkyKey> keysToUseForResult) throws InterruptedException;
+    protected abstract void processResultantTargets(
+        Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+            throws QueryException, InterruptedException;
 
     /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
     protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;