Multithread BFS in TraversalInfoRootPackageExtractor using a ParallelVisitor.

RELNOTES:
None.
PiperOrigin-RevId: 265063633
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
index 8cf5de8..af2f3a8 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
@@ -147,6 +147,12 @@
   protected abstract Iterable<OutputResultT> outputKeysToOutputValues(
       Iterable<OutputKeyT> targetKeys) throws ExceptionT, InterruptedException;
 
+  /**
+   * Suitable exception type to use with {@link ParallelVisitor} when no checked exception is
+   * appropriate.
+   */
+  public static final class UnusedException extends RuntimeException {}
+
   /** An object to hold keys to visit and keys ready for processing. */
   protected final class Visit {
     private final Iterable<OutputKeyT> keysToUseForResult;
diff --git a/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
index a395124..429a159 100644
--- a/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
@@ -17,6 +17,8 @@
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.packages.NoSuchPackageException;
 import com.google.devtools.build.lib.packages.NoSuchTargetException;
@@ -25,7 +27,6 @@
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.RootedPath;
 import java.util.Map;
-import java.util.function.Consumer;
 
 /**
  * Support for resolving {@code package/...} target patterns.
@@ -47,10 +48,9 @@
    * @param blacklistedSubdirectories a set of {@link PathFragment}s specifying transitive
    *     subdirectories that have been blacklisted
    * @param excludedSubdirectories a set of {@link PathFragment}s specifying transitive
-   *     subdirectories to exclude
    */
   void streamPackagesUnderDirectory(
-      Consumer<PackageIdentifier> results,
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
       ExtendedEventHandler eventHandler,
       RepositoryName repository,
       PathFragment directory,
@@ -116,7 +116,7 @@
 
     @Override
     public void streamPackagesUnderDirectory(
-        Consumer<PackageIdentifier> results,
+        ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
         ExtendedEventHandler eventHandler,
         RepositoryName repository,
         PathFragment directory,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java b/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
index 1eb90b8..6be671b 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
@@ -21,6 +21,7 @@
 import com.google.devtools.build.lib.cmdline.RepositoryName;
 import com.google.devtools.build.lib.concurrent.BlazeInterners;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider;
 import com.google.devtools.build.lib.skyframe.serialization.autocodec.AutoCodec;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.RootedPath;
@@ -34,10 +35,10 @@
  * The value computed by {@link CollectPackagesUnderDirectoryFunction}. Contains a mapping for all
  * its non-excluded directories to whether there are packages or error messages beneath them.
  *
- * <p>This value is used by {@link GraphBackedRecursivePackageProvider#streamPackagesUnderDirectory}
- * to help it traverse the graph and find the set of packages under a directory, recursively by
- * {@link CollectPackagesUnderDirectoryFunction} which computes a value for a directory by
- * aggregating results calculated from its subdirectories, and by {@link
+ * <p>This value is used by {@link RecursivePackageProvider#streamPackagesUnderDirectory} to help it
+ * traverse the graph and find the set of packages under a directory, recursively by {@link
+ * CollectPackagesUnderDirectoryFunction} which computes a value for a directory by aggregating
+ * results calculated from its subdirectories, and by {@link
  * PrepareDepsOfTargetsUnderDirectoryFunction} which uses this value to find transitive targets to
  * load.
  *
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
index 8b06189..f3926b7 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
@@ -14,12 +14,15 @@
 package com.google.devtools.build.lib.skyframe;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.actions.InconsistentFilesystemException;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.packages.BuildFileContainsErrorsException;
@@ -38,7 +41,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 
 /**
  * A {@link RecursivePackageProvider} backed by an {@link Environment}. Its methods may throw {@link
@@ -60,7 +62,8 @@
 
   /**
    * Whether any of the calls to {@link #getPackage}, {@link #getTarget}, {@link #bulkGetPackages},
-   * or {@link #streamPackagesUnderDirectory} encountered a package in error.
+   * or {@link RecursivePackageProvider#streamPackagesUnderDirectory} encountered a package in
+   * error.
    *
    * <p>The client of {@link EnvironmentBackedRecursivePackageProvider} may want to check this. See
    * comments in {@link #getPackage} for details.
@@ -139,7 +142,7 @@
 
   @Override
   public void streamPackagesUnderDirectory(
-      Consumer<PackageIdentifier> results,
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
       ExtendedEventHandler eventHandler,
       RepositoryName repository,
       PathFragment directory,
@@ -203,10 +206,9 @@
         // TODO(bazel-team): Make RecursivePkgValue return NestedSet<PathFragment> so this transform
         // is unnecessary.
         PathFragment packageNamePathFragment = PathFragment.create(packageName);
-        if (!Iterables.any(
-            excludedSubdirectories,
-            excludedSubdirectory -> packageNamePathFragment.startsWith(excludedSubdirectory))) {
-          results.accept(PackageIdentifier.create(repository, packageNamePathFragment));
+        if (!Iterables.any(excludedSubdirectories, packageNamePathFragment::startsWith)) {
+          results.process(
+              ImmutableList.of(PackageIdentifier.create(repository, packageNamePathFragment)));
         }
       }
     }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
index 38d3dba..5867051 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
@@ -25,6 +25,8 @@
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
 import com.google.devtools.build.lib.cmdline.TargetPattern;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
@@ -44,7 +46,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.logging.Logger;
 
 /**
@@ -207,7 +208,7 @@
 
   @Override
   public void streamPackagesUnderDirectory(
-      Consumer<PackageIdentifier> results,
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
       ExtendedEventHandler eventHandler,
       RepositoryName repository,
       PathFragment directory,
@@ -219,7 +220,7 @@
             repository, directory, blacklistedSubdirectories, excludedSubdirectories);
 
     rootPackageExtractor.streamPackagesFromRoots(
-        path -> results.accept(PackageIdentifier.create(repository, path)),
+        results,
         graph,
         roots,
         eventHandler,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java b/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
index ecabd39..b731907 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
@@ -15,52 +15,66 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider;
-import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A callback for {@link RecursivePackageProvider#streamPackagesUnderDirectory} that buffers the
- * PackageIdentifiers it receives into batches that it delivers to a supplied {@code
- * Consumer<ImmutableList<PackageIdentifier>>}.
+ * PackageIdentifiers it receives into fixed-size batches that it delivers to a supplied {@code
+ * ThreadSafeBatchCallback<PackageIdentifier, RuntimeException>}.
+ *
+ * <p>The final batch delivered to the delegate callback may be smaller than the fixed size; the
+ * callback must be {@link #close() closed} to deliver this final batch.
  */
-@ThreadCompatible
+@ThreadSafe
 public class PackageIdentifierBatchingCallback
-    implements Consumer<PackageIdentifier>, AutoCloseable {
+    implements ThreadSafeBatchCallback<PackageIdentifier, UnusedException>, AutoCloseable {
 
-  private final Consumer<ImmutableList<PackageIdentifier>> batchResults;
+  private final ThreadSafeBatchCallback<PackageIdentifier, UnusedException> batchResults;
   private final int batchSize;
+
+  @GuardedBy("this")
   private ImmutableList.Builder<PackageIdentifier> packageIdentifiers;
+
+  @GuardedBy("this")
   private int bufferedPackageIds;
 
   public PackageIdentifierBatchingCallback(
-      Consumer<ImmutableList<PackageIdentifier>> batchResults, int batchSize) {
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> batchResults, int batchSize) {
     this.batchResults = batchResults;
     this.batchSize = batchSize;
     reset();
   }
 
   @Override
-  public void accept(PackageIdentifier path) {
-    packageIdentifiers.add(path);
-    bufferedPackageIds++;
-    if (bufferedPackageIds >= this.batchSize) {
-      flush();
+  public synchronized void process(Iterable<PackageIdentifier> partialResult)
+      throws InterruptedException {
+    for (PackageIdentifier path : partialResult) {
+      packageIdentifiers.add(path);
+      bufferedPackageIds++;
+      if (bufferedPackageIds >= this.batchSize) {
+        flush();
+      }
     }
   }
 
   @Override
-  public void close() {
+  public synchronized void close() throws InterruptedException {
     flush();
   }
 
-  private void flush() {
+  @GuardedBy("this")
+  private void flush() throws InterruptedException {
     if (bufferedPackageIds > 0) {
-      batchResults.accept(packageIdentifiers.build());
+      batchResults.process(packageIdentifiers.build());
       reset();
     }
   }
 
+  @GuardedBy("this")
   private void reset() {
     packageIdentifiers = ImmutableList.builderWithExpectedSize(batchSize);
     bufferedPackageIds = 0;
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index f8193fd..d401c57 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -16,7 +16,6 @@
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -33,6 +32,7 @@
 import com.google.devtools.build.lib.cmdline.TargetPatternResolver;
 import com.google.devtools.build.lib.concurrent.BatchCallback;
 import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
 import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
 import com.google.devtools.build.lib.events.Event;
@@ -52,7 +52,6 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
 
 /**
  * A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}.
@@ -62,7 +61,7 @@
     extends TargetPatternResolver<Target> {
 
   // TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value?
-  private static final int MAX_PACKAGES_BULK_GET = 1000;
+  static final int MAX_PACKAGES_BULK_GET = 1000;
 
   protected final FilteringPolicy policy;
   private final RecursivePackageProvider recursivePackageProvider;
@@ -253,18 +252,20 @@
       ListeningExecutorService executor) {
     FilteringPolicy actualPolicy =
         rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy;
-    PathFragment pathFragment;
+
     ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
-    Consumer<ImmutableList<PackageIdentifier>> startGettingTargetsCallback =
+    ThreadSafeBatchCallback<PackageIdentifier, UnusedException> getPackageTargetsCallback =
         (pkgIdBatch) ->
             futures.add(
                 executor.submit(
                     new GetTargetsInPackagesTask<>(pkgIdBatch, pattern, actualPolicy, callback)));
-    try (PackageIdentifierBatchingCallback pkgIdBatchProducer =
-        new PackageIdentifierBatchingCallback(startGettingTargetsCallback, MAX_PACKAGES_BULK_GET)) {
+
+    PathFragment pathFragment;
+    try (PackageIdentifierBatchingCallback batchingCallback =
+        new PackageIdentifierBatchingCallback(getPackageTargetsCallback, MAX_PACKAGES_BULK_GET)) {
       pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
       recursivePackageProvider.streamPackagesUnderDirectory(
-          pkgIdBatchProducer,
+          batchingCallback,
           eventHandler,
           repository,
           pathFragment,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
index 18579ae..b5e4821 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
@@ -14,23 +14,26 @@
 package com.google.devtools.build.lib.skyframe;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.Root;
 import com.google.devtools.build.lib.vfs.RootedPath;
 import com.google.devtools.build.skyframe.WalkableGraph;
 import java.util.List;
-import java.util.function.Consumer;
 
 /** Looks up {@link RecursivePkgValue}s of given roots in a {@link WalkableGraph}. */
 public class RecursivePkgValueRootPackageExtractor implements RootPackageExtractor {
 
   @Override
   public void streamPackagesFromRoots(
-      Consumer<PathFragment> results,
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
       WalkableGraph graph,
       List<Root> roots,
       ExtendedEventHandler eventHandler,
@@ -61,16 +64,16 @@
           "Root %s in repository %s could not be found in the graph.",
           root.asPath(),
           repository.getName());
+      ImmutableList.Builder<PackageIdentifier> packageIds = ImmutableList.builder();
       for (String packageName : lookup.getPackages()) {
         // TODO(bazel-team): Make RecursivePkgValue return NestedSet<PathFragment> so this transform
         // is unnecessary.
         PathFragment packageNamePathFragment = PathFragment.create(packageName);
-        if (!Iterables.any(
-            excludedSubdirectories,
-            excludedSubdirectory -> packageNamePathFragment.startsWith(excludedSubdirectory))) {
-          results.accept(packageNamePathFragment);
+        if (!Iterables.any(excludedSubdirectories, packageNamePathFragment::startsWith)) {
+          packageIds.add(PackageIdentifier.create(repository, packageNamePathFragment));
         }
       }
+      results.process(packageIds.build());
     }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
index f6eeb6a..e0bba766 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
@@ -14,13 +14,15 @@
 package com.google.devtools.build.lib.skyframe;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.Root;
 import com.google.devtools.build.skyframe.WalkableGraph;
 import java.util.List;
-import java.util.function.Consumer;
 
 /** A streaming interface for recursively searching for all packages under a given set of roots. */
 public interface RootPackageExtractor {
@@ -29,7 +31,7 @@
    * Recursively search each of the given roots in a repository for packages (while respecting
    * blacklists and exclusions), calling the {@code results} callback as each package is discovered.
    *
-   * @param results callback invoked once for each package as it is discovered under a root
+   * @param results callback invoked once for groups of packages as they are discovered under a root
    * @param graph skyframe graph used for retrieving the directories under each root
    * @param roots all the filesystem roots to search for packages
    * @param eventHandler receives package-loading errors for any packages loaded by graph queries
@@ -43,7 +45,7 @@
    *     searched exhaustively
    */
   void streamPackagesFromRoots(
-      Consumer<PathFragment> results,
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
       WalkableGraph graph,
       List<Root> roots,
       ExtendedEventHandler eventHandler,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
index 8fba861..6a30fa4 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
@@ -14,12 +14,18 @@
 package com.google.devtools.build.lib.skyframe;
 
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.devtools.build.lib.skyframe.RecursivePackageProviderBackedTargetPatternResolver.MAX_PACKAGES_BULK_GET;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.vfs.PathFragment;
@@ -28,11 +34,13 @@
 import com.google.devtools.build.skyframe.SkyKey;
 import com.google.devtools.build.skyframe.SkyValue;
 import com.google.devtools.build.skyframe.WalkableGraph;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
-import java.util.function.Consumer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /** Looks up values under {@link TraversalInfo}s of given roots in a {@link WalkableGraph}. */
 public class TraversalInfoRootPackageExtractor implements RootPackageExtractor {
@@ -40,9 +48,12 @@
   private static final Comparator<TraversalInfo> TRAVERSAL_INFO_COMPARATOR =
       Comparator.comparing(ti -> ti.rootedDir.getRootRelativePath());
 
+  private static final int PACKAGE_ID_OUTPUT_BATCH_SIZE = 100;
+  private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
+
   @Override
   public void streamPackagesFromRoots(
-      Consumer<PathFragment> results,
+      ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
       WalkableGraph graph,
       List<Root> roots,
       ExtendedEventHandler eventHandler,
@@ -51,81 +62,154 @@
       ImmutableSet<PathFragment> blacklistedSubdirectories,
       ImmutableSet<PathFragment> excludedSubdirectories)
       throws InterruptedException {
+    TreeSet<TraversalInfo> dirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
     for (Root root : roots) {
       RootedPath rootedDir = RootedPath.toRootedPath(root, directory);
-      TraversalInfo info =
-          new TraversalInfo(rootedDir, blacklistedSubdirectories, excludedSubdirectories);
-      TreeSet<TraversalInfo> dirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
-      dirsToCheckForPackages.add(info);
-      collectPackagesUnder(results, graph, eventHandler, repository, dirsToCheckForPackages);
+      dirsToCheckForPackages.add(
+          new TraversalInfo(rootedDir, blacklistedSubdirectories, excludedSubdirectories));
     }
+    PackageCollectingParallelVisitor visitor =
+        new PackageCollectingParallelVisitor(
+            results,
+            /*visitBatchSize=*/ MAX_PACKAGES_BULK_GET,
+            /*processResultsBatchSize=*/ PACKAGE_ID_OUTPUT_BATCH_SIZE,
+            /*minPendingTasks=*/ 3 * DEFAULT_THREAD_COUNT,
+            /*resultBatchSize=*/ PACKAGE_ID_OUTPUT_BATCH_SIZE,
+            eventHandler,
+            repository,
+            graph);
+    visitor.visitAndWaitForCompletion(dirsToCheckForPackages);
   }
 
-  private void collectPackagesUnder(
-      Consumer<PathFragment> results,
-      WalkableGraph graph,
-      ExtendedEventHandler eventHandler,
-      final RepositoryName repository,
-      TreeSet<TraversalInfo> dirsToCheckForPackages)
-      throws InterruptedException {
-    // NOTE: Maps.asMap returns a Map<T> view whose entrySet() order matches the underlying Set<T>.
-    Map<TraversalInfo, SkyKey> traversalToKeyMap =
-        Maps.asMap(
-            dirsToCheckForPackages,
-            traversalInfo ->
-                CollectPackagesUnderDirectoryValue.key(
-                    repository, traversalInfo.rootedDir, traversalInfo.blacklistedSubdirectories));
-    Map<SkyKey, SkyValue> values = graph.getSuccessfulValues(traversalToKeyMap.values());
+  private static final ExecutorService PACKAGE_ID_COLLECTING_EXECUTOR =
+      Executors.newFixedThreadPool(
+          /*numThreads=*/ DEFAULT_THREAD_COUNT,
+          new ThreadFactoryBuilder().setNameFormat("package-id-traversal-%d").build());
 
-    // NOTE: Use a TreeSet to ensure a deterministic (sorted) iteration order when we recurse.
-    TreeSet<TraversalInfo> subdirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
-    for (Map.Entry<TraversalInfo, SkyKey> entry : traversalToKeyMap.entrySet()) {
-      TraversalInfo info = entry.getKey();
-      SkyKey key = entry.getValue();
-      SkyValue val = values.get(key);
-      CollectPackagesUnderDirectoryValue collectPackagesValue =
-          (CollectPackagesUnderDirectoryValue) val;
-      if (collectPackagesValue != null) {
-        if (collectPackagesValue.isDirectoryPackage()) {
-          results.accept(info.rootedDir.getRootRelativePath());
-        }
+  /**
+   * A ParallelVisitor that reports every {@link PackageIdentifier} by querying the WalkableGraph
+   * for a {@link CollectPackagesUnderDirectoryValue} for each {@link TraversalInfo} it visits.
+   */
+  static class PackageCollectingParallelVisitor
+      extends ParallelVisitor<
+          TraversalInfo,
+          TraversalInfo,
+          TraversalInfo,
+          PackageIdentifier,
+          UnusedException,
+          ThreadSafeBatchCallback<PackageIdentifier, UnusedException>> {
 
-        if (collectPackagesValue.getErrorMessage() != null) {
-          eventHandler.handle(Event.error(collectPackagesValue.getErrorMessage()));
-        }
+    private final ExtendedEventHandler eventHandler;
+    private final RepositoryName repository;
+    private final WalkableGraph graph;
 
-        ImmutableMap<RootedPath, Boolean> subdirectoryTransitivelyContainsPackages =
-            collectPackagesValue.getSubdirectoryTransitivelyContainsPackagesOrErrors();
-        for (RootedPath subdirectory : subdirectoryTransitivelyContainsPackages.keySet()) {
-          if (subdirectoryTransitivelyContainsPackages.get(subdirectory)) {
-            PathFragment subdirectoryRelativePath = subdirectory.getRootRelativePath();
-            ImmutableSet<PathFragment> blacklistedSubdirectoriesBeneathThisSubdirectory =
-                info.blacklistedSubdirectories
-                    .stream()
-                    .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
-                    .collect(toImmutableSet());
-            ImmutableSet<PathFragment> excludedSubdirectoriesBeneathThisSubdirectory =
-                info.excludedSubdirectories
-                    .stream()
-                    .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
-                    .collect(toImmutableSet());
-            if (!excludedSubdirectoriesBeneathThisSubdirectory.contains(subdirectoryRelativePath)) {
-              subdirsToCheckForPackages.add(
-                  new TraversalInfo(
-                      subdirectory,
-                      blacklistedSubdirectoriesBeneathThisSubdirectory,
-                      excludedSubdirectoriesBeneathThisSubdirectory));
+    PackageCollectingParallelVisitor(
+        ThreadSafeBatchCallback<PackageIdentifier, UnusedException> callback,
+        int visitBatchSize,
+        int processResultsBatchSize,
+        int minPendingTasks,
+        int resultBatchSize,
+        ExtendedEventHandler eventHandler,
+        RepositoryName repository,
+        WalkableGraph graph) {
+      super(
+          callback,
+          UnusedException.class,
+          visitBatchSize,
+          processResultsBatchSize,
+          minPendingTasks,
+          resultBatchSize,
+          PACKAGE_ID_COLLECTING_EXECUTOR);
+      this.eventHandler = eventHandler;
+      this.repository = repository;
+      this.graph = graph;
+    }
+
+    @Override
+    protected Iterable<PackageIdentifier> outputKeysToOutputValues(
+        Iterable<TraversalInfo> targetKeys) {
+      ImmutableList.Builder<PackageIdentifier> results =
+          ImmutableList.builderWithExpectedSize(resultBatchSize);
+      for (TraversalInfo resultInfo : targetKeys) {
+        results.add(
+            PackageIdentifier.create(repository, resultInfo.rootedDir.getRootRelativePath()));
+      }
+      return results.build();
+    }
+
+    @Override
+    protected Visit getVisitResult(Iterable<TraversalInfo> dirsToCheckForPackages)
+        throws InterruptedException {
+      ImmutableMap.Builder<TraversalInfo, SkyKey> traversalToKeyMapBuilder = ImmutableMap.builder();
+      for (TraversalInfo traversalInfo : dirsToCheckForPackages) {
+        traversalToKeyMapBuilder.put(
+            traversalInfo,
+            CollectPackagesUnderDirectoryValue.key(
+                repository, traversalInfo.rootedDir, traversalInfo.blacklistedSubdirectories));
+      }
+      ImmutableMap<TraversalInfo, SkyKey> traversalToKeyMap = traversalToKeyMapBuilder.build();
+      Map<SkyKey, SkyValue> values = graph.getSuccessfulValues(traversalToKeyMap.values());
+
+      // NOTE: Use a TreeSet to ensure a deterministic (sorted) iteration order when we recurse.
+      List<TraversalInfo> resultPackageIds = new ArrayList<>();
+      TreeSet<TraversalInfo> subdirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
+      for (Map.Entry<TraversalInfo, SkyKey> entry : traversalToKeyMap.entrySet()) {
+        TraversalInfo info = entry.getKey();
+        SkyKey key = entry.getValue();
+        SkyValue val = values.get(key);
+        CollectPackagesUnderDirectoryValue collectPackagesValue =
+            (CollectPackagesUnderDirectoryValue) val;
+        if (collectPackagesValue != null) {
+          if (collectPackagesValue.isDirectoryPackage()) {
+            resultPackageIds.add(info);
+          }
+
+          if (collectPackagesValue.getErrorMessage() != null) {
+            eventHandler.handle(Event.error(collectPackagesValue.getErrorMessage()));
+          }
+
+          ImmutableMap<RootedPath, Boolean> subdirectoryTransitivelyContainsPackages =
+              collectPackagesValue.getSubdirectoryTransitivelyContainsPackagesOrErrors();
+          for (RootedPath subdirectory : subdirectoryTransitivelyContainsPackages.keySet()) {
+            if (subdirectoryTransitivelyContainsPackages.get(subdirectory)) {
+              PathFragment subdirectoryRelativePath = subdirectory.getRootRelativePath();
+              ImmutableSet<PathFragment> blacklistedSubdirectoriesBeneathThisSubdirectory =
+                  info.blacklistedSubdirectories.stream()
+                      .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
+                      .collect(toImmutableSet());
+              ImmutableSet<PathFragment> excludedSubdirectoriesBeneathThisSubdirectory =
+                  info.excludedSubdirectories.stream()
+                      .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
+                      .collect(toImmutableSet());
+              if (!excludedSubdirectoriesBeneathThisSubdirectory.contains(
+                  subdirectoryRelativePath)) {
+                subdirsToCheckForPackages.add(
+                    new TraversalInfo(
+                        subdirectory,
+                        blacklistedSubdirectoriesBeneathThisSubdirectory,
+                        excludedSubdirectoriesBeneathThisSubdirectory));
+              }
             }
           }
         }
       }
+      return new Visit(
+          /*keysToUseForResult=*/ resultPackageIds, /*keysToVisit=*/ subdirsToCheckForPackages);
     }
 
-    if (!subdirsToCheckForPackages.isEmpty()) {
-      collectPackagesUnder(results, graph, eventHandler, repository, subdirsToCheckForPackages);
+    @Override
+    protected Iterable<TraversalInfo> preprocessInitialVisit(Iterable<TraversalInfo> infos) {
+      return infos;
+    }
+
+    @Override
+    protected Iterable<TraversalInfo> noteAndReturnUniqueVisitationKeys(
+        Iterable<TraversalInfo> prospectiveVisitationKeys) {
+      return prospectiveVisitationKeys;
     }
   }
 
+  /** Value type used as visitation and output key for {@link PackageCollectingParallelVisitor}. */
   private static final class TraversalInfo {
     final RootedPath rootedDir;
     // Set of blacklisted directories. The graph is assumed to be prepopulated with