RecursivePackageProviderBackedTargetPatternResolver uses streamPackagesUnderDirectory.

The RootPackageExtractor and RecursivePackageProvider interfaces now only have
"streaming" interfaces, in which callers must provide a callback that is invoked as
each package is identified.

This allows us to begin resolving targets as soon as a single batch of
PackagesIdentifiers have been discovered.

RELNOTES:
None.
PiperOrigin-RevId: 260497403
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index d63b8eb..1d18260 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -50,7 +50,9 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 
 /**
  * A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}.
@@ -180,7 +182,7 @@
       implements ThreadSafeBatchCallback<T, E> {
     private final BatchCallback<T, E> delegate;
 
-    public SynchronizedBatchCallback(BatchCallback<T, E> delegate) {
+    SynchronizedBatchCallback(BatchCallback<T, E> delegate) {
       this.delegate = delegate;
     }
 
@@ -241,76 +243,95 @@
   }
 
   private <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsyncImpl(
-      final RepositoryName repository,
-      final String originalPattern,
+      RepositoryName repository,
+      String pattern,
       String directory,
       boolean rulesOnly,
       ImmutableSet<PathFragment> blacklistedSubdirectories,
       ImmutableSet<PathFragment> excludedSubdirectories,
-      final ThreadSafeBatchCallback<Target, E> callback,
+      ThreadSafeBatchCallback<Target, E> callback,
       ListeningExecutorService executor) {
-    final FilteringPolicy actualPolicy = rulesOnly
-        ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy)
-        : policy;
-    final PathFragment pathFragment;
-    Iterable<PathFragment> packagesUnderDirectory;
-    try {
+    FilteringPolicy actualPolicy =
+        rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy;
+    PathFragment pathFragment;
+    ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+    Consumer<ImmutableList<PackageIdentifier>> startGettingTargetsCallback =
+        (pkgIdBatch) ->
+            futures.add(
+                executor.submit(
+                    new GetTargetsInPackagesTask<>(pkgIdBatch, pattern, actualPolicy, callback)));
+    try (PackageIdentifierBatchingCallback pkgIdBatchProducer =
+        new PackageIdentifierBatchingCallback(startGettingTargetsCallback, MAX_PACKAGES_BULK_GET)) {
       pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
-      packagesUnderDirectory =
-          recursivePackageProvider.getPackagesUnderDirectory(
-              eventHandler,
-              repository,
-              pathFragment,
-              blacklistedSubdirectories,
-              excludedSubdirectories);
+      recursivePackageProvider.streamPackagesUnderDirectory(
+          pkgIdBatchProducer,
+          eventHandler,
+          repository,
+          pathFragment,
+          blacklistedSubdirectories,
+          excludedSubdirectories);
     } catch (TargetParsingException e) {
       return Futures.immediateFailedFuture(e);
     } catch (InterruptedException e) {
       return Futures.immediateCancelledFuture();
     }
 
-    if (Iterables.isEmpty(packagesUnderDirectory)) {
+    if (futures.isEmpty()) {
       return Futures.immediateFailedFuture(
           new TargetParsingException("no targets found beneath '" + pathFragment + "'"));
     }
 
-    Iterable<PackageIdentifier> pkgIds =
-        Iterables.transform(
-            packagesUnderDirectory, path -> PackageIdentifier.create(repository, path));
-
-    // For very large sets of packages, we may not want to process all of them at once, so we split
-    // into batches.
-    List<List<PackageIdentifier>> partitions =
-        ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET));
-    ArrayList<ListenableFuture<Void>> futures = new ArrayList<>(partitions.size());
-    for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) {
-      futures.add(
-          executor.submit(
-              () -> {
-                ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(pkgIdBatch);
-                packageSemaphore.acquireAll(pkgIdBatchSet);
-                try {
-                  Iterable<Collection<Target>> resolvedTargets =
-                      bulkGetTargetsInPackage(originalPattern, pkgIdBatch, actualPolicy).values();
-                  List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets));
-                  for (Collection<Target> targets : resolvedTargets) {
-                    filteredTargets.addAll(targets);
-                  }
-                  // TODO(bazel-core): Invoking the callback while holding onto the package
-                  // semaphore can lead to deadlocks. Also, if the semaphore has a small count,
-                  // acquireAll can also lead to problems if we don't batch appropriately.
-                  // Although we default to an unbounded semaphore for SkyQuery and this is an
-                  // unreported issue, consider refactoring so that the code is strictly correct.
-                  callback.process(filteredTargets);
-                } finally {
-                  packageSemaphore.releaseAll(pkgIdBatchSet);
-                }
-                return null;
-              }));
-    }
     return Futures.whenAllSucceed(futures).call(() -> null, directExecutor());
   }
 
+  /**
+   * Task to get all matching targets in the given packages, filter them, and pass them to the
+   * target batch callback.
+   */
+  private class GetTargetsInPackagesTask<E extends Exception> implements Callable<Void> {
+
+    private final Iterable<PackageIdentifier> packageIdentifiers;
+    private final String originalPattern;
+    private final FilteringPolicy actualPolicy;
+    private final ThreadSafeBatchCallback<Target, E> callback;
+
+    GetTargetsInPackagesTask(
+        Iterable<PackageIdentifier> packageIdentifiers,
+        String originalPattern,
+        FilteringPolicy actualPolicy,
+        ThreadSafeBatchCallback<Target, E> callback) {
+      this.packageIdentifiers = packageIdentifiers;
+      this.originalPattern = originalPattern;
+      this.actualPolicy = actualPolicy;
+      this.callback = callback;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(packageIdentifiers);
+      packageSemaphore.acquireAll(pkgIdBatchSet);
+      try {
+        Iterable<Collection<Target>> resolvedTargets =
+            RecursivePackageProviderBackedTargetPatternResolver.this
+                .bulkGetTargetsInPackage(originalPattern, packageIdentifiers, actualPolicy)
+                .values();
+        List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets));
+        for (Collection<Target> targets : resolvedTargets) {
+          filteredTargets.addAll(targets);
+        }
+        // TODO(bazel-core): Invoking the callback while holding onto the package
+        // semaphore can lead to deadlocks. Also, if the semaphore has a small count,
+        // acquireAll can also lead to problems if we don't batch appropriately.
+        // Although we default to an unbounded semaphore for SkyQuery and this is an
+        // unreported issue, consider refactoring so that the code is strictly correct.
+        callback.process(filteredTargets);
+      } finally {
+        packageSemaphore.releaseAll(pkgIdBatchSet);
+      }
+      return null;
+    }
+  }
+
   private static <T> int calculateSize(Iterable<Collection<T>> resolvedTargets) {
     int size = 0;
     for (Collection<T> targets : resolvedTargets) {