RecursivePackageProviderBackedTargetPatternResolver uses streamPackagesUnderDirectory.
The RootPackageExtractor and RecursivePackageProvider interfaces now only have
"streaming" interfaces, in which callers must provide a callback that is invoked as
each package is identified.
This allows us to begin resolving targets as soon as a single batch of
PackagesIdentifiers have been discovered.
RELNOTES:
None.
PiperOrigin-RevId: 260497403
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index d63b8eb..1d18260 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -50,7 +50,9 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
/**
* A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}.
@@ -180,7 +182,7 @@
implements ThreadSafeBatchCallback<T, E> {
private final BatchCallback<T, E> delegate;
- public SynchronizedBatchCallback(BatchCallback<T, E> delegate) {
+ SynchronizedBatchCallback(BatchCallback<T, E> delegate) {
this.delegate = delegate;
}
@@ -241,76 +243,95 @@
}
private <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsyncImpl(
- final RepositoryName repository,
- final String originalPattern,
+ RepositoryName repository,
+ String pattern,
String directory,
boolean rulesOnly,
ImmutableSet<PathFragment> blacklistedSubdirectories,
ImmutableSet<PathFragment> excludedSubdirectories,
- final ThreadSafeBatchCallback<Target, E> callback,
+ ThreadSafeBatchCallback<Target, E> callback,
ListeningExecutorService executor) {
- final FilteringPolicy actualPolicy = rulesOnly
- ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy)
- : policy;
- final PathFragment pathFragment;
- Iterable<PathFragment> packagesUnderDirectory;
- try {
+ FilteringPolicy actualPolicy =
+ rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy;
+ PathFragment pathFragment;
+ ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+ Consumer<ImmutableList<PackageIdentifier>> startGettingTargetsCallback =
+ (pkgIdBatch) ->
+ futures.add(
+ executor.submit(
+ new GetTargetsInPackagesTask<>(pkgIdBatch, pattern, actualPolicy, callback)));
+ try (PackageIdentifierBatchingCallback pkgIdBatchProducer =
+ new PackageIdentifierBatchingCallback(startGettingTargetsCallback, MAX_PACKAGES_BULK_GET)) {
pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
- packagesUnderDirectory =
- recursivePackageProvider.getPackagesUnderDirectory(
- eventHandler,
- repository,
- pathFragment,
- blacklistedSubdirectories,
- excludedSubdirectories);
+ recursivePackageProvider.streamPackagesUnderDirectory(
+ pkgIdBatchProducer,
+ eventHandler,
+ repository,
+ pathFragment,
+ blacklistedSubdirectories,
+ excludedSubdirectories);
} catch (TargetParsingException e) {
return Futures.immediateFailedFuture(e);
} catch (InterruptedException e) {
return Futures.immediateCancelledFuture();
}
- if (Iterables.isEmpty(packagesUnderDirectory)) {
+ if (futures.isEmpty()) {
return Futures.immediateFailedFuture(
new TargetParsingException("no targets found beneath '" + pathFragment + "'"));
}
- Iterable<PackageIdentifier> pkgIds =
- Iterables.transform(
- packagesUnderDirectory, path -> PackageIdentifier.create(repository, path));
-
- // For very large sets of packages, we may not want to process all of them at once, so we split
- // into batches.
- List<List<PackageIdentifier>> partitions =
- ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET));
- ArrayList<ListenableFuture<Void>> futures = new ArrayList<>(partitions.size());
- for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) {
- futures.add(
- executor.submit(
- () -> {
- ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(pkgIdBatch);
- packageSemaphore.acquireAll(pkgIdBatchSet);
- try {
- Iterable<Collection<Target>> resolvedTargets =
- bulkGetTargetsInPackage(originalPattern, pkgIdBatch, actualPolicy).values();
- List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets));
- for (Collection<Target> targets : resolvedTargets) {
- filteredTargets.addAll(targets);
- }
- // TODO(bazel-core): Invoking the callback while holding onto the package
- // semaphore can lead to deadlocks. Also, if the semaphore has a small count,
- // acquireAll can also lead to problems if we don't batch appropriately.
- // Although we default to an unbounded semaphore for SkyQuery and this is an
- // unreported issue, consider refactoring so that the code is strictly correct.
- callback.process(filteredTargets);
- } finally {
- packageSemaphore.releaseAll(pkgIdBatchSet);
- }
- return null;
- }));
- }
return Futures.whenAllSucceed(futures).call(() -> null, directExecutor());
}
+ /**
+ * Task to get all matching targets in the given packages, filter them, and pass them to the
+ * target batch callback.
+ */
+ private class GetTargetsInPackagesTask<E extends Exception> implements Callable<Void> {
+
+ private final Iterable<PackageIdentifier> packageIdentifiers;
+ private final String originalPattern;
+ private final FilteringPolicy actualPolicy;
+ private final ThreadSafeBatchCallback<Target, E> callback;
+
+ GetTargetsInPackagesTask(
+ Iterable<PackageIdentifier> packageIdentifiers,
+ String originalPattern,
+ FilteringPolicy actualPolicy,
+ ThreadSafeBatchCallback<Target, E> callback) {
+ this.packageIdentifiers = packageIdentifiers;
+ this.originalPattern = originalPattern;
+ this.actualPolicy = actualPolicy;
+ this.callback = callback;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(packageIdentifiers);
+ packageSemaphore.acquireAll(pkgIdBatchSet);
+ try {
+ Iterable<Collection<Target>> resolvedTargets =
+ RecursivePackageProviderBackedTargetPatternResolver.this
+ .bulkGetTargetsInPackage(originalPattern, packageIdentifiers, actualPolicy)
+ .values();
+ List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets));
+ for (Collection<Target> targets : resolvedTargets) {
+ filteredTargets.addAll(targets);
+ }
+ // TODO(bazel-core): Invoking the callback while holding onto the package
+ // semaphore can lead to deadlocks. Also, if the semaphore has a small count,
+ // acquireAll can also lead to problems if we don't batch appropriately.
+ // Although we default to an unbounded semaphore for SkyQuery and this is an
+ // unreported issue, consider refactoring so that the code is strictly correct.
+ callback.process(filteredTargets);
+ } finally {
+ packageSemaphore.releaseAll(pkgIdBatchSet);
+ }
+ return null;
+ }
+ }
+
private static <T> int calculateSize(Iterable<Collection<T>> resolvedTargets) {
int size = 0;
for (Collection<T> targets : resolvedTargets) {