Multithread BFS in TraversalInfoRootPackageExtractor using a ParallelVisitor. RELNOTES: None. PiperOrigin-RevId: 265063633
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java index 8cf5de8..af2f3a8 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
@@ -147,6 +147,12 @@ protected abstract Iterable<OutputResultT> outputKeysToOutputValues( Iterable<OutputKeyT> targetKeys) throws ExceptionT, InterruptedException; + /** + * Suitable exception type to use with {@link ParallelVisitor} when no checked exception is + * appropriate. + */ + public static final class UnusedException extends RuntimeException {} + /** An object to hold keys to visit and keys ready for processing. */ protected final class Visit { private final Iterable<OutputKeyT> keysToUseForResult;
diff --git a/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java index a395124..429a159 100644 --- a/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java +++ b/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
@@ -17,6 +17,8 @@ import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.packages.NoSuchPackageException; import com.google.devtools.build.lib.packages.NoSuchTargetException; @@ -25,7 +27,6 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.RootedPath; import java.util.Map; -import java.util.function.Consumer; /** * Support for resolving {@code package/...} target patterns. @@ -47,10 +48,9 @@ * @param blacklistedSubdirectories a set of {@link PathFragment}s specifying transitive * subdirectories that have been blacklisted * @param excludedSubdirectories a set of {@link PathFragment}s specifying transitive - * subdirectories to exclude */ void streamPackagesUnderDirectory( - Consumer<PackageIdentifier> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, ExtendedEventHandler eventHandler, RepositoryName repository, PathFragment directory, @@ -116,7 +116,7 @@ @Override public void streamPackagesUnderDirectory( - Consumer<PackageIdentifier> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, ExtendedEventHandler eventHandler, RepositoryName repository, PathFragment directory,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java b/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java index 1eb90b8..6be671b 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
@@ -21,6 +21,7 @@ import com.google.devtools.build.lib.cmdline.RepositoryName; import com.google.devtools.build.lib.concurrent.BlazeInterners; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider; import com.google.devtools.build.lib.skyframe.serialization.autocodec.AutoCodec; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.RootedPath; @@ -34,10 +35,10 @@ * The value computed by {@link CollectPackagesUnderDirectoryFunction}. Contains a mapping for all * its non-excluded directories to whether there are packages or error messages beneath them. * - * <p>This value is used by {@link GraphBackedRecursivePackageProvider#streamPackagesUnderDirectory} - * to help it traverse the graph and find the set of packages under a directory, recursively by - * {@link CollectPackagesUnderDirectoryFunction} which computes a value for a directory by - * aggregating results calculated from its subdirectories, and by {@link + * <p>This value is used by {@link RecursivePackageProvider#streamPackagesUnderDirectory} to help it + * traverse the graph and find the set of packages under a directory, recursively by {@link + * CollectPackagesUnderDirectoryFunction} which computes a value for a directory by aggregating + * results calculated from its subdirectories, and by {@link * PrepareDepsOfTargetsUnderDirectoryFunction} which uses this value to find transitive targets to * load. *
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java index 8b06189..f3926b7 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
@@ -14,12 +14,15 @@ package com.google.devtools.build.lib.skyframe; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.devtools.build.lib.actions.InconsistentFilesystemException; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.packages.BuildFileContainsErrorsException; @@ -38,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; /** * A {@link RecursivePackageProvider} backed by an {@link Environment}. Its methods may throw {@link @@ -60,7 +62,8 @@ /** * Whether any of the calls to {@link #getPackage}, {@link #getTarget}, {@link #bulkGetPackages}, - * or {@link #streamPackagesUnderDirectory} encountered a package in error. + * or {@link RecursivePackageProvider#streamPackagesUnderDirectory} encountered a package in + * error. * * <p>The client of {@link EnvironmentBackedRecursivePackageProvider} may want to check this. See * comments in {@link #getPackage} for details. @@ -139,7 +142,7 @@ @Override public void streamPackagesUnderDirectory( - Consumer<PackageIdentifier> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, ExtendedEventHandler eventHandler, RepositoryName repository, PathFragment directory, @@ -203,10 +206,9 @@ // TODO(bazel-team): Make RecursivePkgValue return NestedSet<PathFragment> so this transform // is unnecessary. PathFragment packageNamePathFragment = PathFragment.create(packageName); - if (!Iterables.any( - excludedSubdirectories, - excludedSubdirectory -> packageNamePathFragment.startsWith(excludedSubdirectory))) { - results.accept(PackageIdentifier.create(repository, packageNamePathFragment)); + if (!Iterables.any(excludedSubdirectories, packageNamePathFragment::startsWith)) { + results.process( + ImmutableList.of(PackageIdentifier.create(repository, packageNamePathFragment))); } } }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java index 38d3dba..5867051 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
@@ -25,6 +25,8 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; import com.google.devtools.build.lib.cmdline.TargetPattern; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.ExtendedEventHandler; @@ -44,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Consumer; import java.util.logging.Logger; /** @@ -207,7 +208,7 @@ @Override public void streamPackagesUnderDirectory( - Consumer<PackageIdentifier> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, ExtendedEventHandler eventHandler, RepositoryName repository, PathFragment directory, @@ -219,7 +220,7 @@ repository, directory, blacklistedSubdirectories, excludedSubdirectories); rootPackageExtractor.streamPackagesFromRoots( - path -> results.accept(PackageIdentifier.create(repository, path)), + results, graph, roots, eventHandler,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java b/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java index ecabd39..b731907 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
@@ -15,52 +15,66 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.cmdline.PackageIdentifier; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider; -import java.util.function.Consumer; +import javax.annotation.concurrent.GuardedBy; /** * A callback for {@link RecursivePackageProvider#streamPackagesUnderDirectory} that buffers the - * PackageIdentifiers it receives into batches that it delivers to a supplied {@code - * Consumer<ImmutableList<PackageIdentifier>>}. + * PackageIdentifiers it receives into fixed-size batches that it delivers to a supplied {@code + * ThreadSafeBatchCallback<PackageIdentifier, RuntimeException>}. + * + * <p>The final batch delivered to the delegate callback may be smaller than the fixed size; the + * callback must be {@link #close() closed} to deliver this final batch. */ -@ThreadCompatible +@ThreadSafe public class PackageIdentifierBatchingCallback - implements Consumer<PackageIdentifier>, AutoCloseable { + implements ThreadSafeBatchCallback<PackageIdentifier, UnusedException>, AutoCloseable { - private final Consumer<ImmutableList<PackageIdentifier>> batchResults; + private final ThreadSafeBatchCallback<PackageIdentifier, UnusedException> batchResults; private final int batchSize; + + @GuardedBy("this") private ImmutableList.Builder<PackageIdentifier> packageIdentifiers; + + @GuardedBy("this") private int bufferedPackageIds; public PackageIdentifierBatchingCallback( - Consumer<ImmutableList<PackageIdentifier>> batchResults, int batchSize) { + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> batchResults, int batchSize) { this.batchResults = batchResults; this.batchSize = batchSize; reset(); } @Override - public void accept(PackageIdentifier path) { - packageIdentifiers.add(path); - bufferedPackageIds++; - if (bufferedPackageIds >= this.batchSize) { - flush(); + public synchronized void process(Iterable<PackageIdentifier> partialResult) + throws InterruptedException { + for (PackageIdentifier path : partialResult) { + packageIdentifiers.add(path); + bufferedPackageIds++; + if (bufferedPackageIds >= this.batchSize) { + flush(); + } } } @Override - public void close() { + public synchronized void close() throws InterruptedException { flush(); } - private void flush() { + @GuardedBy("this") + private void flush() throws InterruptedException { if (bufferedPackageIds > 0) { - batchResults.accept(packageIdentifiers.build()); + batchResults.process(packageIdentifiers.build()); reset(); } } + @GuardedBy("this") private void reset() { packageIdentifiers = ImmutableList.builderWithExpectedSize(batchSize); bufferedPackageIds = 0;
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java index f8193fd..d401c57 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -16,7 +16,6 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -33,6 +32,7 @@ import com.google.devtools.build.lib.cmdline.TargetPatternResolver; import com.google.devtools.build.lib.concurrent.BatchCallback; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; import com.google.devtools.build.lib.events.Event; @@ -52,7 +52,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; /** * A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}. @@ -62,7 +61,7 @@ extends TargetPatternResolver<Target> { // TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value? - private static final int MAX_PACKAGES_BULK_GET = 1000; + static final int MAX_PACKAGES_BULK_GET = 1000; protected final FilteringPolicy policy; private final RecursivePackageProvider recursivePackageProvider; @@ -253,18 +252,20 @@ ListeningExecutorService executor) { FilteringPolicy actualPolicy = rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy; - PathFragment pathFragment; + ArrayList<ListenableFuture<Void>> futures = new ArrayList<>(); - Consumer<ImmutableList<PackageIdentifier>> startGettingTargetsCallback = + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> getPackageTargetsCallback = (pkgIdBatch) -> futures.add( executor.submit( new GetTargetsInPackagesTask<>(pkgIdBatch, pattern, actualPolicy, callback))); - try (PackageIdentifierBatchingCallback pkgIdBatchProducer = - new PackageIdentifierBatchingCallback(startGettingTargetsCallback, MAX_PACKAGES_BULK_GET)) { + + PathFragment pathFragment; + try (PackageIdentifierBatchingCallback batchingCallback = + new PackageIdentifierBatchingCallback(getPackageTargetsCallback, MAX_PACKAGES_BULK_GET)) { pathFragment = TargetPatternResolverUtil.getPathFragment(directory); recursivePackageProvider.streamPackagesUnderDirectory( - pkgIdBatchProducer, + batchingCallback, eventHandler, repository, pathFragment,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java index 18579ae..b5e4821 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
@@ -14,23 +14,26 @@ package com.google.devtools.build.lib.skyframe; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.Root; import com.google.devtools.build.lib.vfs.RootedPath; import com.google.devtools.build.skyframe.WalkableGraph; import java.util.List; -import java.util.function.Consumer; /** Looks up {@link RecursivePkgValue}s of given roots in a {@link WalkableGraph}. */ public class RecursivePkgValueRootPackageExtractor implements RootPackageExtractor { @Override public void streamPackagesFromRoots( - Consumer<PathFragment> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, WalkableGraph graph, List<Root> roots, ExtendedEventHandler eventHandler, @@ -61,16 +64,16 @@ "Root %s in repository %s could not be found in the graph.", root.asPath(), repository.getName()); + ImmutableList.Builder<PackageIdentifier> packageIds = ImmutableList.builder(); for (String packageName : lookup.getPackages()) { // TODO(bazel-team): Make RecursivePkgValue return NestedSet<PathFragment> so this transform // is unnecessary. PathFragment packageNamePathFragment = PathFragment.create(packageName); - if (!Iterables.any( - excludedSubdirectories, - excludedSubdirectory -> packageNamePathFragment.startsWith(excludedSubdirectory))) { - results.accept(packageNamePathFragment); + if (!Iterables.any(excludedSubdirectories, packageNamePathFragment::startsWith)) { + packageIds.add(PackageIdentifier.create(repository, packageNamePathFragment)); } } + results.process(packageIds.build()); } } }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java index f6eeb6a..e0bba766 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
@@ -14,13 +14,15 @@ package com.google.devtools.build.lib.skyframe; import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.Root; import com.google.devtools.build.skyframe.WalkableGraph; import java.util.List; -import java.util.function.Consumer; /** A streaming interface for recursively searching for all packages under a given set of roots. */ public interface RootPackageExtractor { @@ -29,7 +31,7 @@ * Recursively search each of the given roots in a repository for packages (while respecting * blacklists and exclusions), calling the {@code results} callback as each package is discovered. * - * @param results callback invoked once for each package as it is discovered under a root + * @param results callback invoked once for groups of packages as they are discovered under a root * @param graph skyframe graph used for retrieving the directories under each root * @param roots all the filesystem roots to search for packages * @param eventHandler receives package-loading errors for any packages loaded by graph queries @@ -43,7 +45,7 @@ * searched exhaustively */ void streamPackagesFromRoots( - Consumer<PathFragment> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, WalkableGraph graph, List<Root> roots, ExtendedEventHandler eventHandler,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java index 8fba861..6a30fa4 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
@@ -14,12 +14,18 @@ package com.google.devtools.build.lib.skyframe; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.devtools.build.lib.skyframe.RecursivePackageProviderBackedTargetPatternResolver.MAX_PACKAGES_BULK_GET; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; +import com.google.devtools.build.lib.concurrent.ParallelVisitor; +import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException; +import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.vfs.PathFragment; @@ -28,11 +34,13 @@ import com.google.devtools.build.skyframe.SkyKey; import com.google.devtools.build.skyframe.SkyValue; import com.google.devtools.build.skyframe.WalkableGraph; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeSet; -import java.util.function.Consumer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** Looks up values under {@link TraversalInfo}s of given roots in a {@link WalkableGraph}. */ public class TraversalInfoRootPackageExtractor implements RootPackageExtractor { @@ -40,9 +48,12 @@ private static final Comparator<TraversalInfo> TRAVERSAL_INFO_COMPARATOR = Comparator.comparing(ti -> ti.rootedDir.getRootRelativePath()); + private static final int PACKAGE_ID_OUTPUT_BATCH_SIZE = 100; + private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); + @Override public void streamPackagesFromRoots( - Consumer<PathFragment> results, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results, WalkableGraph graph, List<Root> roots, ExtendedEventHandler eventHandler, @@ -51,81 +62,154 @@ ImmutableSet<PathFragment> blacklistedSubdirectories, ImmutableSet<PathFragment> excludedSubdirectories) throws InterruptedException { + TreeSet<TraversalInfo> dirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR); for (Root root : roots) { RootedPath rootedDir = RootedPath.toRootedPath(root, directory); - TraversalInfo info = - new TraversalInfo(rootedDir, blacklistedSubdirectories, excludedSubdirectories); - TreeSet<TraversalInfo> dirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR); - dirsToCheckForPackages.add(info); - collectPackagesUnder(results, graph, eventHandler, repository, dirsToCheckForPackages); + dirsToCheckForPackages.add( + new TraversalInfo(rootedDir, blacklistedSubdirectories, excludedSubdirectories)); } + PackageCollectingParallelVisitor visitor = + new PackageCollectingParallelVisitor( + results, + /*visitBatchSize=*/ MAX_PACKAGES_BULK_GET, + /*processResultsBatchSize=*/ PACKAGE_ID_OUTPUT_BATCH_SIZE, + /*minPendingTasks=*/ 3 * DEFAULT_THREAD_COUNT, + /*resultBatchSize=*/ PACKAGE_ID_OUTPUT_BATCH_SIZE, + eventHandler, + repository, + graph); + visitor.visitAndWaitForCompletion(dirsToCheckForPackages); } - private void collectPackagesUnder( - Consumer<PathFragment> results, - WalkableGraph graph, - ExtendedEventHandler eventHandler, - final RepositoryName repository, - TreeSet<TraversalInfo> dirsToCheckForPackages) - throws InterruptedException { - // NOTE: Maps.asMap returns a Map<T> view whose entrySet() order matches the underlying Set<T>. - Map<TraversalInfo, SkyKey> traversalToKeyMap = - Maps.asMap( - dirsToCheckForPackages, - traversalInfo -> - CollectPackagesUnderDirectoryValue.key( - repository, traversalInfo.rootedDir, traversalInfo.blacklistedSubdirectories)); - Map<SkyKey, SkyValue> values = graph.getSuccessfulValues(traversalToKeyMap.values()); + private static final ExecutorService PACKAGE_ID_COLLECTING_EXECUTOR = + Executors.newFixedThreadPool( + /*numThreads=*/ DEFAULT_THREAD_COUNT, + new ThreadFactoryBuilder().setNameFormat("package-id-traversal-%d").build()); - // NOTE: Use a TreeSet to ensure a deterministic (sorted) iteration order when we recurse. - TreeSet<TraversalInfo> subdirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR); - for (Map.Entry<TraversalInfo, SkyKey> entry : traversalToKeyMap.entrySet()) { - TraversalInfo info = entry.getKey(); - SkyKey key = entry.getValue(); - SkyValue val = values.get(key); - CollectPackagesUnderDirectoryValue collectPackagesValue = - (CollectPackagesUnderDirectoryValue) val; - if (collectPackagesValue != null) { - if (collectPackagesValue.isDirectoryPackage()) { - results.accept(info.rootedDir.getRootRelativePath()); - } + /** + * A ParallelVisitor that reports every {@link PackageIdentifier} by querying the WalkableGraph + * for a {@link CollectPackagesUnderDirectoryValue} for each {@link TraversalInfo} it visits. + */ + static class PackageCollectingParallelVisitor + extends ParallelVisitor< + TraversalInfo, + TraversalInfo, + TraversalInfo, + PackageIdentifier, + UnusedException, + ThreadSafeBatchCallback<PackageIdentifier, UnusedException>> { - if (collectPackagesValue.getErrorMessage() != null) { - eventHandler.handle(Event.error(collectPackagesValue.getErrorMessage())); - } + private final ExtendedEventHandler eventHandler; + private final RepositoryName repository; + private final WalkableGraph graph; - ImmutableMap<RootedPath, Boolean> subdirectoryTransitivelyContainsPackages = - collectPackagesValue.getSubdirectoryTransitivelyContainsPackagesOrErrors(); - for (RootedPath subdirectory : subdirectoryTransitivelyContainsPackages.keySet()) { - if (subdirectoryTransitivelyContainsPackages.get(subdirectory)) { - PathFragment subdirectoryRelativePath = subdirectory.getRootRelativePath(); - ImmutableSet<PathFragment> blacklistedSubdirectoriesBeneathThisSubdirectory = - info.blacklistedSubdirectories - .stream() - .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath)) - .collect(toImmutableSet()); - ImmutableSet<PathFragment> excludedSubdirectoriesBeneathThisSubdirectory = - info.excludedSubdirectories - .stream() - .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath)) - .collect(toImmutableSet()); - if (!excludedSubdirectoriesBeneathThisSubdirectory.contains(subdirectoryRelativePath)) { - subdirsToCheckForPackages.add( - new TraversalInfo( - subdirectory, - blacklistedSubdirectoriesBeneathThisSubdirectory, - excludedSubdirectoriesBeneathThisSubdirectory)); + PackageCollectingParallelVisitor( + ThreadSafeBatchCallback<PackageIdentifier, UnusedException> callback, + int visitBatchSize, + int processResultsBatchSize, + int minPendingTasks, + int resultBatchSize, + ExtendedEventHandler eventHandler, + RepositoryName repository, + WalkableGraph graph) { + super( + callback, + UnusedException.class, + visitBatchSize, + processResultsBatchSize, + minPendingTasks, + resultBatchSize, + PACKAGE_ID_COLLECTING_EXECUTOR); + this.eventHandler = eventHandler; + this.repository = repository; + this.graph = graph; + } + + @Override + protected Iterable<PackageIdentifier> outputKeysToOutputValues( + Iterable<TraversalInfo> targetKeys) { + ImmutableList.Builder<PackageIdentifier> results = + ImmutableList.builderWithExpectedSize(resultBatchSize); + for (TraversalInfo resultInfo : targetKeys) { + results.add( + PackageIdentifier.create(repository, resultInfo.rootedDir.getRootRelativePath())); + } + return results.build(); + } + + @Override + protected Visit getVisitResult(Iterable<TraversalInfo> dirsToCheckForPackages) + throws InterruptedException { + ImmutableMap.Builder<TraversalInfo, SkyKey> traversalToKeyMapBuilder = ImmutableMap.builder(); + for (TraversalInfo traversalInfo : dirsToCheckForPackages) { + traversalToKeyMapBuilder.put( + traversalInfo, + CollectPackagesUnderDirectoryValue.key( + repository, traversalInfo.rootedDir, traversalInfo.blacklistedSubdirectories)); + } + ImmutableMap<TraversalInfo, SkyKey> traversalToKeyMap = traversalToKeyMapBuilder.build(); + Map<SkyKey, SkyValue> values = graph.getSuccessfulValues(traversalToKeyMap.values()); + + // NOTE: Use a TreeSet to ensure a deterministic (sorted) iteration order when we recurse. + List<TraversalInfo> resultPackageIds = new ArrayList<>(); + TreeSet<TraversalInfo> subdirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR); + for (Map.Entry<TraversalInfo, SkyKey> entry : traversalToKeyMap.entrySet()) { + TraversalInfo info = entry.getKey(); + SkyKey key = entry.getValue(); + SkyValue val = values.get(key); + CollectPackagesUnderDirectoryValue collectPackagesValue = + (CollectPackagesUnderDirectoryValue) val; + if (collectPackagesValue != null) { + if (collectPackagesValue.isDirectoryPackage()) { + resultPackageIds.add(info); + } + + if (collectPackagesValue.getErrorMessage() != null) { + eventHandler.handle(Event.error(collectPackagesValue.getErrorMessage())); + } + + ImmutableMap<RootedPath, Boolean> subdirectoryTransitivelyContainsPackages = + collectPackagesValue.getSubdirectoryTransitivelyContainsPackagesOrErrors(); + for (RootedPath subdirectory : subdirectoryTransitivelyContainsPackages.keySet()) { + if (subdirectoryTransitivelyContainsPackages.get(subdirectory)) { + PathFragment subdirectoryRelativePath = subdirectory.getRootRelativePath(); + ImmutableSet<PathFragment> blacklistedSubdirectoriesBeneathThisSubdirectory = + info.blacklistedSubdirectories.stream() + .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath)) + .collect(toImmutableSet()); + ImmutableSet<PathFragment> excludedSubdirectoriesBeneathThisSubdirectory = + info.excludedSubdirectories.stream() + .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath)) + .collect(toImmutableSet()); + if (!excludedSubdirectoriesBeneathThisSubdirectory.contains( + subdirectoryRelativePath)) { + subdirsToCheckForPackages.add( + new TraversalInfo( + subdirectory, + blacklistedSubdirectoriesBeneathThisSubdirectory, + excludedSubdirectoriesBeneathThisSubdirectory)); + } } } } } + return new Visit( + /*keysToUseForResult=*/ resultPackageIds, /*keysToVisit=*/ subdirsToCheckForPackages); } - if (!subdirsToCheckForPackages.isEmpty()) { - collectPackagesUnder(results, graph, eventHandler, repository, subdirsToCheckForPackages); + @Override + protected Iterable<TraversalInfo> preprocessInitialVisit(Iterable<TraversalInfo> infos) { + return infos; + } + + @Override + protected Iterable<TraversalInfo> noteAndReturnUniqueVisitationKeys( + Iterable<TraversalInfo> prospectiveVisitationKeys) { + return prospectiveVisitationKeys; } } + /** Value type used as visitation and output key for {@link PackageCollectingParallelVisitor}. */ private static final class TraversalInfo { final RootedPath rootedDir; // Set of blacklisted directories. The graph is assumed to be prepopulated with