Multithread BFS in TraversalInfoRootPackageExtractor using a ParallelVisitor.
RELNOTES:
None.
PiperOrigin-RevId: 265063633
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index f8193fd..d401c57 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -16,7 +16,6 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -33,6 +32,7 @@
import com.google.devtools.build.lib.cmdline.TargetPatternResolver;
import com.google.devtools.build.lib.concurrent.BatchCallback;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
@@ -52,7 +52,6 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
/**
* A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}.
@@ -62,7 +61,7 @@
extends TargetPatternResolver<Target> {
// TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value?
- private static final int MAX_PACKAGES_BULK_GET = 1000;
+ static final int MAX_PACKAGES_BULK_GET = 1000;
protected final FilteringPolicy policy;
private final RecursivePackageProvider recursivePackageProvider;
@@ -253,18 +252,20 @@
ListeningExecutorService executor) {
FilteringPolicy actualPolicy =
rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy;
- PathFragment pathFragment;
+
ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
- Consumer<ImmutableList<PackageIdentifier>> startGettingTargetsCallback =
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> getPackageTargetsCallback =
(pkgIdBatch) ->
futures.add(
executor.submit(
new GetTargetsInPackagesTask<>(pkgIdBatch, pattern, actualPolicy, callback)));
- try (PackageIdentifierBatchingCallback pkgIdBatchProducer =
- new PackageIdentifierBatchingCallback(startGettingTargetsCallback, MAX_PACKAGES_BULK_GET)) {
+
+ PathFragment pathFragment;
+ try (PackageIdentifierBatchingCallback batchingCallback =
+ new PackageIdentifierBatchingCallback(getPackageTargetsCallback, MAX_PACKAGES_BULK_GET)) {
pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
recursivePackageProvider.streamPackagesUnderDirectory(
- pkgIdBatchProducer,
+ batchingCallback,
eventHandler,
repository,
pathFragment,