Use a ForkJoinPool instead of a ThreadPoolExecutor for the globbing. This makes
the multi-thread more efficient, especially when --legacy_globbing_threads is
dialed up on slow file systems.
RELNOTES: None.
PiperOrigin-RevId: 235318679
diff --git a/src/main/java/com/google/devtools/build/lib/packages/GlobCache.java b/src/main/java/com/google/devtools/build/lib/packages/GlobCache.java
index a968c4e..3e3c1af 100644
--- a/src/main/java/com/google/devtools/build/lib/packages/GlobCache.java
+++ b/src/main/java/com/google/devtools/build/lib/packages/GlobCache.java
@@ -35,8 +35,8 @@
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -76,10 +76,9 @@
private AtomicReference<? extends UnixGlob.FilesystemCalls> syscalls;
private final int maxDirectoriesToEagerlyVisit;
- /**
- * The thread pool for glob evaluation.
- */
- private final ThreadPoolExecutor globExecutor;
+ /** The thread pool for glob evaluation. */
+ private final Executor globExecutor;
+
private final AtomicBoolean globalStarted = new AtomicBoolean(false);
/**
@@ -98,7 +97,7 @@
final PackageIdentifier packageId,
final CachingPackageLocator locator,
AtomicReference<? extends UnixGlob.FilesystemCalls> syscalls,
- ThreadPoolExecutor globExecutor,
+ Executor globExecutor,
int maxDirectoriesToEagerlyVisit) {
this.packageDirectory = Preconditions.checkNotNull(packageDirectory);
this.packageId = Preconditions.checkNotNull(packageId);
@@ -206,7 +205,7 @@
.addPattern(pattern)
.setExcludeDirectories(excludeDirs)
.setDirectoryFilter(childDirectoryPredicate)
- .setThreadPool(globExecutor)
+ .setExecutor(globExecutor)
.setFilesystemCalls(syscalls)
.globAsync();
}
diff --git a/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java b/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java
index 8f329db..5887300 100644
--- a/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java
@@ -20,7 +20,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.analysis.skylark.BazelStarlarkContext;
import com.google.devtools.build.lib.analysis.skylark.SymbolGenerator;
import com.google.devtools.build.lib.cmdline.Label;
@@ -29,6 +28,7 @@
import com.google.devtools.build.lib.cmdline.LabelValidator;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.RepositoryName;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.events.ExtendedEventHandler.Postable;
@@ -84,10 +84,8 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -341,7 +339,7 @@
private AtomicReference<? extends UnixGlob.FilesystemCalls> syscalls;
- private final ThreadPoolExecutor threadPool;
+ private ForkJoinPool executor;
private int maxDirectoriesToEagerlyVisitInGlobbing;
@@ -396,16 +394,7 @@
this.ruleFactory = new RuleFactory(ruleClassProvider, attributeContainerFactory);
this.ruleFunctions = buildRuleFunctions(ruleFactory);
this.ruleClassProvider = ruleClassProvider;
- threadPool =
- new ThreadPoolExecutor(
- 100,
- Integer.MAX_VALUE,
- 15L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryBuilder().setNameFormat("Legacy globber %d").setDaemon(true).build());
- // Do not consume threads when not in use.
- threadPool.allowCoreThreadTimeOut(true);
+ setGlobbingThreads(100);
this.environmentExtensions = ImmutableList.copyOf(environmentExtensions);
this.packageArguments = createPackageArguments();
this.nativeModule = newNativeModule();
@@ -424,7 +413,9 @@
* Sets the max number of threads to use for globbing.
*/
public void setGlobbingThreads(int globbingThreads) {
- threadPool.setCorePoolSize(globbingThreads);
+ if (executor == null || executor.getParallelism() != globbingThreads) {
+ executor = NamedForkJoinPool.newNamedPool("globbing pool", globbingThreads);
+ }
}
/**
@@ -1424,7 +1415,7 @@
packageId,
locator,
syscalls,
- threadPool,
+ executor,
maxDirectoriesToEagerlyVisitInGlobbing));
}
@@ -1448,7 +1439,7 @@
packageId,
locator,
syscalls,
- threadPool,
+ executor,
maxDirectoriesToEagerlyVisitInGlobbing),
/*sort=*/ false);
}
diff --git a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
index f38e5ae..9efd60d 100644
--- a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
+++ b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
@@ -44,8 +44,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@@ -62,30 +62,39 @@
public final class UnixGlob {
private UnixGlob() {}
- private static List<Path> globInternal(Path base, Collection<String> patterns,
+ private static List<Path> globInternal(
+ Path base,
+ Collection<String> patterns,
boolean excludeDirectories,
Predicate<Path> dirPred,
FilesystemCalls syscalls,
- ThreadPoolExecutor threadPool)
+ Executor executor)
throws IOException, InterruptedException {
- GlobVisitor visitor = new GlobVisitor(threadPool);
+ GlobVisitor visitor = new GlobVisitor(executor);
return visitor.glob(base, patterns, excludeDirectories, dirPred, syscalls);
}
- private static List<Path> globInternalUninterruptible(Path base, Collection<String> patterns,
- boolean excludeDirectories, Predicate<Path> dirPred, FilesystemCalls syscalls,
- ThreadPoolExecutor threadPool) throws IOException {
- GlobVisitor visitor = new GlobVisitor(threadPool);
+ private static List<Path> globInternalUninterruptible(
+ Path base,
+ Collection<String> patterns,
+ boolean excludeDirectories,
+ Predicate<Path> dirPred,
+ FilesystemCalls syscalls,
+ Executor executor)
+ throws IOException {
+ GlobVisitor visitor = new GlobVisitor(executor);
return visitor.globUninterruptible(base, patterns, excludeDirectories, dirPred, syscalls);
}
private static long globInternalAndReturnNumGlobTasksForTesting(
- Path base, Collection<String> patterns,
+ Path base,
+ Collection<String> patterns,
boolean excludeDirectories,
Predicate<Path> dirPred,
FilesystemCalls syscalls,
- ThreadPoolExecutor threadPool) throws IOException, InterruptedException {
- GlobVisitor visitor = new GlobVisitor(threadPool);
+ Executor executor)
+ throws IOException, InterruptedException {
+ GlobVisitor visitor = new GlobVisitor(executor);
visitor.glob(base, patterns, excludeDirectories, dirPred, syscalls);
return visitor.getNumGlobTasksForTesting();
}
@@ -96,9 +105,9 @@
boolean excludeDirectories,
Predicate<Path> dirPred,
FilesystemCalls syscalls,
- ThreadPoolExecutor threadPool) {
- Preconditions.checkNotNull(threadPool, "%s %s", base, patterns);
- return new GlobVisitor(threadPool)
+ Executor executor) {
+ Preconditions.checkNotNull(executor, "%s %s", base, patterns);
+ return new GlobVisitor(executor)
.globAsync(base, patterns, excludeDirectories, dirPred, syscalls);
}
@@ -289,7 +298,7 @@
private List<String> patterns;
private boolean excludeDirectories;
private Predicate<Path> pathFilter;
- private ThreadPoolExecutor threadPool;
+ private Executor executor;
private AtomicReference<? extends FilesystemCalls> syscalls =
new AtomicReference<>(DEFAULT_SYSCALLS);
@@ -351,13 +360,12 @@
return this;
}
-
/**
- * Sets the threadpool to use for parallel glob evaluation.
- * If unset, evaluation is done in-thread.
+ * Sets the executor to use for parallel glob evaluation. If unset, evaluation is done
+ * in-thread.
*/
- public Builder setThreadPool(ThreadPoolExecutor pool) {
- this.threadPool = pool;
+ public Builder setExecutor(Executor pool) {
+ this.executor = pool;
return this;
}
@@ -376,8 +384,8 @@
* Executes the glob.
*/
public List<Path> glob() throws IOException {
- return globInternalUninterruptible(base, patterns, excludeDirectories, pathFilter,
- syscalls.get(), threadPool);
+ return globInternalUninterruptible(
+ base, patterns, excludeDirectories, pathFilter, syscalls.get(), executor);
}
/**
@@ -386,29 +394,23 @@
* @throws InterruptedException if the thread is interrupted.
*/
public List<Path> globInterruptible() throws IOException, InterruptedException {
- return globInternal(base, patterns, excludeDirectories, pathFilter, syscalls.get(),
- threadPool);
+ return globInternal(base, patterns, excludeDirectories, pathFilter, syscalls.get(), executor);
}
@VisibleForTesting
public long globInterruptibleAndReturnNumGlobTasksForTesting()
throws IOException, InterruptedException {
- return globInternalAndReturnNumGlobTasksForTesting(base, patterns, excludeDirectories,
- pathFilter, syscalls.get(), threadPool);
+ return globInternalAndReturnNumGlobTasksForTesting(
+ base, patterns, excludeDirectories, pathFilter, syscalls.get(), executor);
}
/**
- * Executes the glob asynchronously. {@link #setThreadPool} must have been called already with a
+ * Executes the glob asynchronously. {@link #setExecutor} must have been called already with a
* non-null argument.
*/
public Future<List<Path>> globAsync() {
return globAsyncInternal(
- base,
- patterns,
- excludeDirectories,
- pathFilter,
- syscalls.get(),
- threadPool);
+ base, patterns, excludeDirectories, pathFilter, syscalls.get(), executor);
}
}
@@ -458,7 +460,7 @@
private final ConcurrentHashMap<String, Pattern> cache = new ConcurrentHashMap<>();
private final GlobFuture result;
- private final ThreadPoolExecutor executor;
+ private final Executor executor;
private final AtomicLong totalOps = new AtomicLong(0);
private final AtomicLong pendingOps = new AtomicLong(0);
private final AtomicReference<IOException> ioException = new AtomicReference<>();
@@ -466,7 +468,7 @@
private final AtomicReference<Error> error = new AtomicReference<>();
private volatile boolean canceled = false;
- GlobVisitor(ThreadPoolExecutor executor) {
+ GlobVisitor(Executor executor) {
this.executor = executor;
this.result = new GlobFuture(this);
}
diff --git a/src/test/java/com/google/devtools/build/lib/vfs/GlobTest.java b/src/test/java/com/google/devtools/build/lib/vfs/GlobTest.java
index ad62a0f..56130e3 100644
--- a/src/test/java/com/google/devtools/build/lib/vfs/GlobTest.java
+++ b/src/test/java/com/google/devtools/build/lib/vfs/GlobTest.java
@@ -374,7 +374,7 @@
new UnixGlob.Builder(tmpPath)
.addPattern("**")
.setDirectoryFilter(interrupterPredicate)
- .setThreadPool(executor)
+ .setExecutor(executor)
.globAsync();
globResult.get();
fail(); // Should have received InterruptedException
@@ -413,9 +413,12 @@
}
};
- List<Path> result = new UnixGlob.Builder(tmpPath)
- .addPatterns("**", "*")
- .setDirectoryFilter(interrupterPredicate).setThreadPool(executor).glob();
+ List<Path> result =
+ new UnixGlob.Builder(tmpPath)
+ .addPatterns("**", "*")
+ .setDirectoryFilter(interrupterPredicate)
+ .setExecutor(executor)
+ .glob();
// In the non-interruptible case, the interrupt bit should be set, but the
// glob should return the correct set of full results.