Multithread BFS in TraversalInfoRootPackageExtractor using a ParallelVisitor.
RELNOTES:
None.
PiperOrigin-RevId: 265063633
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
index 8cf5de8..af2f3a8 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ParallelVisitor.java
@@ -147,6 +147,12 @@
protected abstract Iterable<OutputResultT> outputKeysToOutputValues(
Iterable<OutputKeyT> targetKeys) throws ExceptionT, InterruptedException;
+ /**
+ * Suitable exception type to use with {@link ParallelVisitor} when no checked exception is
+ * appropriate.
+ */
+ public static final class UnusedException extends RuntimeException {}
+
/** An object to hold keys to visit and keys ready for processing. */
protected final class Visit {
private final Iterable<OutputKeyT> keysToUseForResult;
diff --git a/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
index a395124..429a159 100644
--- a/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/pkgcache/RecursivePackageProvider.java
@@ -17,6 +17,8 @@
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.packages.NoSuchPackageException;
import com.google.devtools.build.lib.packages.NoSuchTargetException;
@@ -25,7 +27,6 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
import java.util.Map;
-import java.util.function.Consumer;
/**
* Support for resolving {@code package/...} target patterns.
@@ -47,10 +48,9 @@
* @param blacklistedSubdirectories a set of {@link PathFragment}s specifying transitive
* subdirectories that have been blacklisted
* @param excludedSubdirectories a set of {@link PathFragment}s specifying transitive
- * subdirectories to exclude
*/
void streamPackagesUnderDirectory(
- Consumer<PackageIdentifier> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
ExtendedEventHandler eventHandler,
RepositoryName repository,
PathFragment directory,
@@ -116,7 +116,7 @@
@Override
public void streamPackagesUnderDirectory(
- Consumer<PackageIdentifier> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
ExtendedEventHandler eventHandler,
RepositoryName repository,
PathFragment directory,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java b/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
index 1eb90b8..6be671b 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/CollectPackagesUnderDirectoryValue.java
@@ -21,6 +21,7 @@
import com.google.devtools.build.lib.cmdline.RepositoryName;
import com.google.devtools.build.lib.concurrent.BlazeInterners;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider;
import com.google.devtools.build.lib.skyframe.serialization.autocodec.AutoCodec;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
@@ -34,10 +35,10 @@
* The value computed by {@link CollectPackagesUnderDirectoryFunction}. Contains a mapping for all
* its non-excluded directories to whether there are packages or error messages beneath them.
*
- * <p>This value is used by {@link GraphBackedRecursivePackageProvider#streamPackagesUnderDirectory}
- * to help it traverse the graph and find the set of packages under a directory, recursively by
- * {@link CollectPackagesUnderDirectoryFunction} which computes a value for a directory by
- * aggregating results calculated from its subdirectories, and by {@link
+ * <p>This value is used by {@link RecursivePackageProvider#streamPackagesUnderDirectory} to help it
+ * traverse the graph and find the set of packages under a directory, recursively by {@link
+ * CollectPackagesUnderDirectoryFunction} which computes a value for a directory by aggregating
+ * results calculated from its subdirectories, and by {@link
* PrepareDepsOfTargetsUnderDirectoryFunction} which uses this value to find transitive targets to
* load.
*
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
index 8b06189..f3926b7 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/EnvironmentBackedRecursivePackageProvider.java
@@ -14,12 +14,15 @@
package com.google.devtools.build.lib.skyframe;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.actions.InconsistentFilesystemException;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.packages.BuildFileContainsErrorsException;
@@ -38,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
/**
* A {@link RecursivePackageProvider} backed by an {@link Environment}. Its methods may throw {@link
@@ -60,7 +62,8 @@
/**
* Whether any of the calls to {@link #getPackage}, {@link #getTarget}, {@link #bulkGetPackages},
- * or {@link #streamPackagesUnderDirectory} encountered a package in error.
+ * or {@link RecursivePackageProvider#streamPackagesUnderDirectory} encountered a package in
+ * error.
*
* <p>The client of {@link EnvironmentBackedRecursivePackageProvider} may want to check this. See
* comments in {@link #getPackage} for details.
@@ -139,7 +142,7 @@
@Override
public void streamPackagesUnderDirectory(
- Consumer<PackageIdentifier> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
ExtendedEventHandler eventHandler,
RepositoryName repository,
PathFragment directory,
@@ -203,10 +206,9 @@
// TODO(bazel-team): Make RecursivePkgValue return NestedSet<PathFragment> so this transform
// is unnecessary.
PathFragment packageNamePathFragment = PathFragment.create(packageName);
- if (!Iterables.any(
- excludedSubdirectories,
- excludedSubdirectory -> packageNamePathFragment.startsWith(excludedSubdirectory))) {
- results.accept(PackageIdentifier.create(repository, packageNamePathFragment));
+ if (!Iterables.any(excludedSubdirectories, packageNamePathFragment::startsWith)) {
+ results.process(
+ ImmutableList.of(PackageIdentifier.create(repository, packageNamePathFragment)));
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java b/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
index 38d3dba..5867051 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/GraphBackedRecursivePackageProvider.java
@@ -25,6 +25,8 @@
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
import com.google.devtools.build.lib.cmdline.TargetPattern;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
@@ -44,7 +46,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Consumer;
import java.util.logging.Logger;
/**
@@ -207,7 +208,7 @@
@Override
public void streamPackagesUnderDirectory(
- Consumer<PackageIdentifier> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
ExtendedEventHandler eventHandler,
RepositoryName repository,
PathFragment directory,
@@ -219,7 +220,7 @@
repository, directory, blacklistedSubdirectories, excludedSubdirectories);
rootPackageExtractor.streamPackagesFromRoots(
- path -> results.accept(PackageIdentifier.create(repository, path)),
+ results,
graph,
roots,
eventHandler,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java b/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
index ecabd39..b731907 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/PackageIdentifierBatchingCallback.java
@@ -15,52 +15,66 @@
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider;
-import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
/**
* A callback for {@link RecursivePackageProvider#streamPackagesUnderDirectory} that buffers the
- * PackageIdentifiers it receives into batches that it delivers to a supplied {@code
- * Consumer<ImmutableList<PackageIdentifier>>}.
+ * PackageIdentifiers it receives into fixed-size batches that it delivers to a supplied {@code
+ * ThreadSafeBatchCallback<PackageIdentifier, RuntimeException>}.
+ *
+ * <p>The final batch delivered to the delegate callback may be smaller than the fixed size; the
+ * callback must be {@link #close() closed} to deliver this final batch.
*/
-@ThreadCompatible
+@ThreadSafe
public class PackageIdentifierBatchingCallback
- implements Consumer<PackageIdentifier>, AutoCloseable {
+ implements ThreadSafeBatchCallback<PackageIdentifier, UnusedException>, AutoCloseable {
- private final Consumer<ImmutableList<PackageIdentifier>> batchResults;
+ private final ThreadSafeBatchCallback<PackageIdentifier, UnusedException> batchResults;
private final int batchSize;
+
+ @GuardedBy("this")
private ImmutableList.Builder<PackageIdentifier> packageIdentifiers;
+
+ @GuardedBy("this")
private int bufferedPackageIds;
public PackageIdentifierBatchingCallback(
- Consumer<ImmutableList<PackageIdentifier>> batchResults, int batchSize) {
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> batchResults, int batchSize) {
this.batchResults = batchResults;
this.batchSize = batchSize;
reset();
}
@Override
- public void accept(PackageIdentifier path) {
- packageIdentifiers.add(path);
- bufferedPackageIds++;
- if (bufferedPackageIds >= this.batchSize) {
- flush();
+ public synchronized void process(Iterable<PackageIdentifier> partialResult)
+ throws InterruptedException {
+ for (PackageIdentifier path : partialResult) {
+ packageIdentifiers.add(path);
+ bufferedPackageIds++;
+ if (bufferedPackageIds >= this.batchSize) {
+ flush();
+ }
}
}
@Override
- public void close() {
+ public synchronized void close() throws InterruptedException {
flush();
}
- private void flush() {
+ @GuardedBy("this")
+ private void flush() throws InterruptedException {
if (bufferedPackageIds > 0) {
- batchResults.accept(packageIdentifiers.build());
+ batchResults.process(packageIdentifiers.build());
reset();
}
}
+ @GuardedBy("this")
private void reset() {
packageIdentifiers = ImmutableList.builderWithExpectedSize(batchSize);
bufferedPackageIds = 0;
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index f8193fd..d401c57 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -16,7 +16,6 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -33,6 +32,7 @@
import com.google.devtools.build.lib.cmdline.TargetPatternResolver;
import com.google.devtools.build.lib.concurrent.BatchCallback;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
@@ -52,7 +52,6 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
/**
* A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}.
@@ -62,7 +61,7 @@
extends TargetPatternResolver<Target> {
// TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value?
- private static final int MAX_PACKAGES_BULK_GET = 1000;
+ static final int MAX_PACKAGES_BULK_GET = 1000;
protected final FilteringPolicy policy;
private final RecursivePackageProvider recursivePackageProvider;
@@ -253,18 +252,20 @@
ListeningExecutorService executor) {
FilteringPolicy actualPolicy =
rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy;
- PathFragment pathFragment;
+
ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
- Consumer<ImmutableList<PackageIdentifier>> startGettingTargetsCallback =
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> getPackageTargetsCallback =
(pkgIdBatch) ->
futures.add(
executor.submit(
new GetTargetsInPackagesTask<>(pkgIdBatch, pattern, actualPolicy, callback)));
- try (PackageIdentifierBatchingCallback pkgIdBatchProducer =
- new PackageIdentifierBatchingCallback(startGettingTargetsCallback, MAX_PACKAGES_BULK_GET)) {
+
+ PathFragment pathFragment;
+ try (PackageIdentifierBatchingCallback batchingCallback =
+ new PackageIdentifierBatchingCallback(getPackageTargetsCallback, MAX_PACKAGES_BULK_GET)) {
pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
recursivePackageProvider.streamPackagesUnderDirectory(
- pkgIdBatchProducer,
+ batchingCallback,
eventHandler,
repository,
pathFragment,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
index 18579ae..b5e4821 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePkgValueRootPackageExtractor.java
@@ -14,23 +14,26 @@
package com.google.devtools.build.lib.skyframe;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Root;
import com.google.devtools.build.lib.vfs.RootedPath;
import com.google.devtools.build.skyframe.WalkableGraph;
import java.util.List;
-import java.util.function.Consumer;
/** Looks up {@link RecursivePkgValue}s of given roots in a {@link WalkableGraph}. */
public class RecursivePkgValueRootPackageExtractor implements RootPackageExtractor {
@Override
public void streamPackagesFromRoots(
- Consumer<PathFragment> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
WalkableGraph graph,
List<Root> roots,
ExtendedEventHandler eventHandler,
@@ -61,16 +64,16 @@
"Root %s in repository %s could not be found in the graph.",
root.asPath(),
repository.getName());
+ ImmutableList.Builder<PackageIdentifier> packageIds = ImmutableList.builder();
for (String packageName : lookup.getPackages()) {
// TODO(bazel-team): Make RecursivePkgValue return NestedSet<PathFragment> so this transform
// is unnecessary.
PathFragment packageNamePathFragment = PathFragment.create(packageName);
- if (!Iterables.any(
- excludedSubdirectories,
- excludedSubdirectory -> packageNamePathFragment.startsWith(excludedSubdirectory))) {
- results.accept(packageNamePathFragment);
+ if (!Iterables.any(excludedSubdirectories, packageNamePathFragment::startsWith)) {
+ packageIds.add(PackageIdentifier.create(repository, packageNamePathFragment));
}
}
+ results.process(packageIds.build());
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
index f6eeb6a..e0bba766 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RootPackageExtractor.java
@@ -14,13 +14,15 @@
package com.google.devtools.build.lib.skyframe;
import com.google.common.collect.ImmutableSet;
+import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Root;
import com.google.devtools.build.skyframe.WalkableGraph;
import java.util.List;
-import java.util.function.Consumer;
/** A streaming interface for recursively searching for all packages under a given set of roots. */
public interface RootPackageExtractor {
@@ -29,7 +31,7 @@
* Recursively search each of the given roots in a repository for packages (while respecting
* blacklists and exclusions), calling the {@code results} callback as each package is discovered.
*
- * @param results callback invoked once for each package as it is discovered under a root
+ * @param results callback invoked once for groups of packages as they are discovered under a root
* @param graph skyframe graph used for retrieving the directories under each root
* @param roots all the filesystem roots to search for packages
* @param eventHandler receives package-loading errors for any packages loaded by graph queries
@@ -43,7 +45,7 @@
* searched exhaustively
*/
void streamPackagesFromRoots(
- Consumer<PathFragment> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
WalkableGraph graph,
List<Root> roots,
ExtendedEventHandler eventHandler,
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java b/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
index 8fba861..6a30fa4 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/TraversalInfoRootPackageExtractor.java
@@ -14,12 +14,18 @@
package com.google.devtools.build.lib.skyframe;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.devtools.build.lib.skyframe.RecursivePackageProviderBackedTargetPatternResolver.MAX_PACKAGES_BULK_GET;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor;
+import com.google.devtools.build.lib.concurrent.ParallelVisitor.UnusedException;
+import com.google.devtools.build.lib.concurrent.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.vfs.PathFragment;
@@ -28,11 +34,13 @@
import com.google.devtools.build.skyframe.SkyKey;
import com.google.devtools.build.skyframe.SkyValue;
import com.google.devtools.build.skyframe.WalkableGraph;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
-import java.util.function.Consumer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/** Looks up values under {@link TraversalInfo}s of given roots in a {@link WalkableGraph}. */
public class TraversalInfoRootPackageExtractor implements RootPackageExtractor {
@@ -40,9 +48,12 @@
private static final Comparator<TraversalInfo> TRAVERSAL_INFO_COMPARATOR =
Comparator.comparing(ti -> ti.rootedDir.getRootRelativePath());
+ private static final int PACKAGE_ID_OUTPUT_BATCH_SIZE = 100;
+ private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
+
@Override
public void streamPackagesFromRoots(
- Consumer<PathFragment> results,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> results,
WalkableGraph graph,
List<Root> roots,
ExtendedEventHandler eventHandler,
@@ -51,81 +62,154 @@
ImmutableSet<PathFragment> blacklistedSubdirectories,
ImmutableSet<PathFragment> excludedSubdirectories)
throws InterruptedException {
+ TreeSet<TraversalInfo> dirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
for (Root root : roots) {
RootedPath rootedDir = RootedPath.toRootedPath(root, directory);
- TraversalInfo info =
- new TraversalInfo(rootedDir, blacklistedSubdirectories, excludedSubdirectories);
- TreeSet<TraversalInfo> dirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
- dirsToCheckForPackages.add(info);
- collectPackagesUnder(results, graph, eventHandler, repository, dirsToCheckForPackages);
+ dirsToCheckForPackages.add(
+ new TraversalInfo(rootedDir, blacklistedSubdirectories, excludedSubdirectories));
}
+ PackageCollectingParallelVisitor visitor =
+ new PackageCollectingParallelVisitor(
+ results,
+ /*visitBatchSize=*/ MAX_PACKAGES_BULK_GET,
+ /*processResultsBatchSize=*/ PACKAGE_ID_OUTPUT_BATCH_SIZE,
+ /*minPendingTasks=*/ 3 * DEFAULT_THREAD_COUNT,
+ /*resultBatchSize=*/ PACKAGE_ID_OUTPUT_BATCH_SIZE,
+ eventHandler,
+ repository,
+ graph);
+ visitor.visitAndWaitForCompletion(dirsToCheckForPackages);
}
- private void collectPackagesUnder(
- Consumer<PathFragment> results,
- WalkableGraph graph,
- ExtendedEventHandler eventHandler,
- final RepositoryName repository,
- TreeSet<TraversalInfo> dirsToCheckForPackages)
- throws InterruptedException {
- // NOTE: Maps.asMap returns a Map<T> view whose entrySet() order matches the underlying Set<T>.
- Map<TraversalInfo, SkyKey> traversalToKeyMap =
- Maps.asMap(
- dirsToCheckForPackages,
- traversalInfo ->
- CollectPackagesUnderDirectoryValue.key(
- repository, traversalInfo.rootedDir, traversalInfo.blacklistedSubdirectories));
- Map<SkyKey, SkyValue> values = graph.getSuccessfulValues(traversalToKeyMap.values());
+ private static final ExecutorService PACKAGE_ID_COLLECTING_EXECUTOR =
+ Executors.newFixedThreadPool(
+ /*numThreads=*/ DEFAULT_THREAD_COUNT,
+ new ThreadFactoryBuilder().setNameFormat("package-id-traversal-%d").build());
- // NOTE: Use a TreeSet to ensure a deterministic (sorted) iteration order when we recurse.
- TreeSet<TraversalInfo> subdirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
- for (Map.Entry<TraversalInfo, SkyKey> entry : traversalToKeyMap.entrySet()) {
- TraversalInfo info = entry.getKey();
- SkyKey key = entry.getValue();
- SkyValue val = values.get(key);
- CollectPackagesUnderDirectoryValue collectPackagesValue =
- (CollectPackagesUnderDirectoryValue) val;
- if (collectPackagesValue != null) {
- if (collectPackagesValue.isDirectoryPackage()) {
- results.accept(info.rootedDir.getRootRelativePath());
- }
+ /**
+ * A ParallelVisitor that reports every {@link PackageIdentifier} by querying the WalkableGraph
+ * for a {@link CollectPackagesUnderDirectoryValue} for each {@link TraversalInfo} it visits.
+ */
+ static class PackageCollectingParallelVisitor
+ extends ParallelVisitor<
+ TraversalInfo,
+ TraversalInfo,
+ TraversalInfo,
+ PackageIdentifier,
+ UnusedException,
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException>> {
- if (collectPackagesValue.getErrorMessage() != null) {
- eventHandler.handle(Event.error(collectPackagesValue.getErrorMessage()));
- }
+ private final ExtendedEventHandler eventHandler;
+ private final RepositoryName repository;
+ private final WalkableGraph graph;
- ImmutableMap<RootedPath, Boolean> subdirectoryTransitivelyContainsPackages =
- collectPackagesValue.getSubdirectoryTransitivelyContainsPackagesOrErrors();
- for (RootedPath subdirectory : subdirectoryTransitivelyContainsPackages.keySet()) {
- if (subdirectoryTransitivelyContainsPackages.get(subdirectory)) {
- PathFragment subdirectoryRelativePath = subdirectory.getRootRelativePath();
- ImmutableSet<PathFragment> blacklistedSubdirectoriesBeneathThisSubdirectory =
- info.blacklistedSubdirectories
- .stream()
- .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
- .collect(toImmutableSet());
- ImmutableSet<PathFragment> excludedSubdirectoriesBeneathThisSubdirectory =
- info.excludedSubdirectories
- .stream()
- .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
- .collect(toImmutableSet());
- if (!excludedSubdirectoriesBeneathThisSubdirectory.contains(subdirectoryRelativePath)) {
- subdirsToCheckForPackages.add(
- new TraversalInfo(
- subdirectory,
- blacklistedSubdirectoriesBeneathThisSubdirectory,
- excludedSubdirectoriesBeneathThisSubdirectory));
+ PackageCollectingParallelVisitor(
+ ThreadSafeBatchCallback<PackageIdentifier, UnusedException> callback,
+ int visitBatchSize,
+ int processResultsBatchSize,
+ int minPendingTasks,
+ int resultBatchSize,
+ ExtendedEventHandler eventHandler,
+ RepositoryName repository,
+ WalkableGraph graph) {
+ super(
+ callback,
+ UnusedException.class,
+ visitBatchSize,
+ processResultsBatchSize,
+ minPendingTasks,
+ resultBatchSize,
+ PACKAGE_ID_COLLECTING_EXECUTOR);
+ this.eventHandler = eventHandler;
+ this.repository = repository;
+ this.graph = graph;
+ }
+
+ @Override
+ protected Iterable<PackageIdentifier> outputKeysToOutputValues(
+ Iterable<TraversalInfo> targetKeys) {
+ ImmutableList.Builder<PackageIdentifier> results =
+ ImmutableList.builderWithExpectedSize(resultBatchSize);
+ for (TraversalInfo resultInfo : targetKeys) {
+ results.add(
+ PackageIdentifier.create(repository, resultInfo.rootedDir.getRootRelativePath()));
+ }
+ return results.build();
+ }
+
+ @Override
+ protected Visit getVisitResult(Iterable<TraversalInfo> dirsToCheckForPackages)
+ throws InterruptedException {
+ ImmutableMap.Builder<TraversalInfo, SkyKey> traversalToKeyMapBuilder = ImmutableMap.builder();
+ for (TraversalInfo traversalInfo : dirsToCheckForPackages) {
+ traversalToKeyMapBuilder.put(
+ traversalInfo,
+ CollectPackagesUnderDirectoryValue.key(
+ repository, traversalInfo.rootedDir, traversalInfo.blacklistedSubdirectories));
+ }
+ ImmutableMap<TraversalInfo, SkyKey> traversalToKeyMap = traversalToKeyMapBuilder.build();
+ Map<SkyKey, SkyValue> values = graph.getSuccessfulValues(traversalToKeyMap.values());
+
+ // NOTE: Use a TreeSet to ensure a deterministic (sorted) iteration order when we recurse.
+ List<TraversalInfo> resultPackageIds = new ArrayList<>();
+ TreeSet<TraversalInfo> subdirsToCheckForPackages = new TreeSet<>(TRAVERSAL_INFO_COMPARATOR);
+ for (Map.Entry<TraversalInfo, SkyKey> entry : traversalToKeyMap.entrySet()) {
+ TraversalInfo info = entry.getKey();
+ SkyKey key = entry.getValue();
+ SkyValue val = values.get(key);
+ CollectPackagesUnderDirectoryValue collectPackagesValue =
+ (CollectPackagesUnderDirectoryValue) val;
+ if (collectPackagesValue != null) {
+ if (collectPackagesValue.isDirectoryPackage()) {
+ resultPackageIds.add(info);
+ }
+
+ if (collectPackagesValue.getErrorMessage() != null) {
+ eventHandler.handle(Event.error(collectPackagesValue.getErrorMessage()));
+ }
+
+ ImmutableMap<RootedPath, Boolean> subdirectoryTransitivelyContainsPackages =
+ collectPackagesValue.getSubdirectoryTransitivelyContainsPackagesOrErrors();
+ for (RootedPath subdirectory : subdirectoryTransitivelyContainsPackages.keySet()) {
+ if (subdirectoryTransitivelyContainsPackages.get(subdirectory)) {
+ PathFragment subdirectoryRelativePath = subdirectory.getRootRelativePath();
+ ImmutableSet<PathFragment> blacklistedSubdirectoriesBeneathThisSubdirectory =
+ info.blacklistedSubdirectories.stream()
+ .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
+ .collect(toImmutableSet());
+ ImmutableSet<PathFragment> excludedSubdirectoriesBeneathThisSubdirectory =
+ info.excludedSubdirectories.stream()
+ .filter(pathFragment -> pathFragment.startsWith(subdirectoryRelativePath))
+ .collect(toImmutableSet());
+ if (!excludedSubdirectoriesBeneathThisSubdirectory.contains(
+ subdirectoryRelativePath)) {
+ subdirsToCheckForPackages.add(
+ new TraversalInfo(
+ subdirectory,
+ blacklistedSubdirectoriesBeneathThisSubdirectory,
+ excludedSubdirectoriesBeneathThisSubdirectory));
+ }
}
}
}
}
+ return new Visit(
+ /*keysToUseForResult=*/ resultPackageIds, /*keysToVisit=*/ subdirsToCheckForPackages);
}
- if (!subdirsToCheckForPackages.isEmpty()) {
- collectPackagesUnder(results, graph, eventHandler, repository, subdirsToCheckForPackages);
+ @Override
+ protected Iterable<TraversalInfo> preprocessInitialVisit(Iterable<TraversalInfo> infos) {
+ return infos;
+ }
+
+ @Override
+ protected Iterable<TraversalInfo> noteAndReturnUniqueVisitationKeys(
+ Iterable<TraversalInfo> prospectiveVisitationKeys) {
+ return prospectiveVisitationKeys;
}
}
+ /** Value type used as visitation and output key for {@link PackageCollectingParallelVisitor}. */
private static final class TraversalInfo {
final RootedPath rootedDir;
// Set of blacklisted directories. The graph is assumed to be prepopulated with