Add a mechanism for bounding the number of Packages SkyQueryEnvironment's expensive parallel operations can operate on at once.

--
MOS_MIGRATED_REVID=138779172
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index c5efe91a..a3f7c6d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -34,6 +34,7 @@
 import com.google.devtools.build.lib.cmdline.TargetParsingException;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
 import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
 import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
@@ -132,6 +133,7 @@
   private final int queryEvaluationParallelismLevel;
 
   // The following fields are set in the #beforeEvaluateQuery method.
+  private MultisetSemaphore<PackageIdentifier> packageSemaphore;
   protected WalkableGraph graph;
   private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
   private ForkJoinPool forkJoinPool;
@@ -205,6 +207,7 @@
     }
     checkEvaluationResult(result);
 
+    packageSemaphore = makeFreshPackageMultisetSemaphore();
     graph = result.getWalkableGraph();
     blacklistPatternsSupplier = InterruptibleSupplier.Memoize.of(new BlacklistSupplier(graph));
 
@@ -220,7 +223,17 @@
             graphBackedRecursivePackageProvider,
             eventHandler,
             TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
-            forkJoinPool);
+            forkJoinPool,
+            packageSemaphore);
+  }
+
+  protected MultisetSemaphore<PackageIdentifier> makeFreshPackageMultisetSemaphore() {
+    return MultisetSemaphore.unbounded();
+  }
+
+  @ThreadSafe
+  public MultisetSemaphore<PackageIdentifier> getPackageMultisetSemaphore() {
+    return packageSemaphore;
   }
 
   /**
@@ -342,10 +355,18 @@
 
   private Map<SkyKey, Collection<Target>> targetifyValues(
       Map<SkyKey, ? extends Iterable<SkyKey>> input) throws InterruptedException {
+    return targetifyValues(
+        input,
+        makePackageKeyToTargetKeyMap(ImmutableSet.copyOf(Iterables.concat(input.values()))));
+  }
+
+  private Map<SkyKey, Collection<Target>> targetifyValues(
+      Map<SkyKey, ? extends Iterable<SkyKey>> input,
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
     ImmutableMap.Builder<SkyKey, Collection<Target>> result = ImmutableMap.builder();
 
     Map<SkyKey, Target> allTargets =
-        makeTargetsFromSkyKeys(Sets.newHashSet(Iterables.concat(input.values())));
+        makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap);
 
     for (Map.Entry<SkyKey, ? extends Iterable<SkyKey>> entry : input.entrySet()) {
       Iterable<SkyKey> skyKeys = entry.getValue();
@@ -446,8 +467,9 @@
 
   /** Targetify SkyKeys of reverse deps and filter out targets whose deps are not allowed. */
   Collection<Target> filterRawReverseDepsOfTransitiveTraversalKeys(
-      Map<SkyKey, ? extends Iterable<SkyKey>> rawReverseDeps) throws InterruptedException {
-    return processRawReverseDeps(targetifyValues(rawReverseDeps));
+      Map<SkyKey, ? extends Iterable<SkyKey>> rawReverseDeps,
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
+    return processRawReverseDeps(targetifyValues(rawReverseDeps, packageKeyToTargetKeyMap));
   }
 
   private Collection<Target> processRawReverseDeps(Map<SkyKey, Collection<Target>> rawReverseDeps)
@@ -752,9 +774,16 @@
         }
       };
 
+  static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER =
+      new Function<SkyKey, PackageIdentifier>() {
+        @Override
+        public PackageIdentifier apply(SkyKey skyKey) {
+          return (PackageIdentifier) skyKey.argument();
+        }
+      };
+
   @ThreadSafe
-  public Map<SkyKey, Target> makeTargetsFromSkyKeys(Iterable<SkyKey> keys)
-      throws InterruptedException {
+  Multimap<SkyKey, SkyKey> makePackageKeyToTargetKeyMap(Iterable<SkyKey> keys) {
     Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = ArrayListMultimap.create();
     for (SkyKey key : keys) {
       Label label = SKYKEY_TO_LABEL.apply(key);
@@ -763,6 +792,18 @@
       }
       packageKeyToTargetKeyMap.put(PackageValue.key(label.getPackageIdentifier()), key);
     }
+    return packageKeyToTargetKeyMap;
+  }
+
+  @ThreadSafe
+  public Map<SkyKey, Target> makeTargetsFromSkyKeys(Iterable<SkyKey> keys)
+      throws InterruptedException {
+    return makeTargetsFromPackageKeyToTargetKeyMap(makePackageKeyToTargetKeyMap(keys));
+  }
+
+  @ThreadSafe
+  public Map<SkyKey, Target> makeTargetsFromPackageKeyToTargetKeyMap(
+      Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
     ImmutableMap.Builder<SkyKey, Target> result = ImmutableMap.builder();
     Map<SkyKey, SkyValue> packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet());
     for (Map.Entry<SkyKey, SkyValue> entry : packageMap.entrySet()) {
@@ -924,7 +965,8 @@
       ThreadSafeCallback<Target> callback,
       ForkJoinPool forkJoinPool)
       throws QueryException, InterruptedException {
-    ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, forkJoinPool);
+    ParallelSkyQueryUtils.getRBuildFilesParallel(
+        this, fileIdentifiers, callback, forkJoinPool, packageSemaphore);
   }
 
   /**
@@ -1107,7 +1149,7 @@
       ForkJoinPool forkJoinPool)
       throws QueryException, InterruptedException {
     ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
-        this, expression, context, callback, forkJoinPool);
+        this, expression, context, callback, forkJoinPool, packageSemaphore);
   }
 
   @ThreadSafe