Use `refCount` based method for virtual lane.
... so no [de]registration is needed for `Profiler` to work with virtual thread.
`Profiler` now assumes virtual thread has name pattern like `prefix%d` and tries to extract the prefix into thread local when the first time it needs a virtual lane.
Remove the `synchronized` block since it harms the performance.
PiperOrigin-RevId: 627697201
Change-Id: I35b48d1b30b8d95e397b559546c47aa5b60c7c40
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java b/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java
index 4757485..70a1500 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java
@@ -78,7 +78,6 @@
import com.google.devtools.build.lib.cmdline.RepositoryName;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.pkgcache.PackageOptions;
-import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.rules.repository.LocalRepositoryFunction;
import com.google.devtools.build.lib.rules.repository.LocalRepositoryRule;
import com.google.devtools.build.lib.rules.repository.NewLocalRepositoryFunction;
@@ -333,9 +332,7 @@
Executors.class
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(
- null,
- Profiler.instance()
- .profileableVirtualThreadFactory("starlark-repository-")));
+ null, Thread.ofVirtual().name("starlark-repository-", 0).factory()));
} catch (ReflectiveOperationException e) {
if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
starlarkRepositoryFunction.setWorkerExecutorService(null);
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java
index c62644d..63fb097 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java
@@ -31,7 +31,6 @@
import com.google.devtools.build.lib.exec.ExecutionPolicy;
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
import com.google.devtools.build.lib.exec.local.LocalExecutionOptions;
-import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.Command;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
@@ -81,7 +80,7 @@
if (buildRequestOptions != null && buildRequestOptions.useAsyncExecution) {
executorService =
Executors.newThreadPerTaskExecutor(
- Profiler.instance().profileableVirtualThreadFactory("dynamic-execution-thread-"));
+ Thread.ofVirtual().name("dynamic-execution-thread-", 0).factory());
} else {
executorService =
Executors.newCachedThreadPool(
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
index d5fde8f..4bd8089 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
@@ -22,8 +22,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.collect.Extrema;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
@@ -42,18 +40,17 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
/**
* Blaze internal profiler. Provides facility to report various Blaze tasks and store them
@@ -573,6 +570,8 @@
aggregator.clear();
}
}
+
+ multiLaneGenerator.reset();
}
/** Returns true iff profiling is currently enabled. */
@@ -614,37 +613,41 @@
* @param type task type
* @param description task description. May be stored until end of build.
*/
- private void logTask(
- long threadId, long startTimeNanos, long duration, ProfilerTask type, String description) {
- checkNotNull(description);
- checkState(!description.isEmpty(), "No description -> not helpful");
- if (duration < 0) {
- // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
- duration = 0;
- }
+ private void logTask(long startTimeNanos, long duration, ProfilerTask type, String description) {
+ var threadId = borrowLaneAndGetLaneId();
+ try {
+ checkNotNull(description);
+ checkState(!description.isEmpty(), "No description -> not helpful");
+ if (duration < 0) {
+ // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
+ duration = 0;
+ }
- StatRecorder statRecorder = tasksHistograms[type.ordinal()];
- if (collectTaskHistograms && statRecorder != null) {
- statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description);
- }
+ StatRecorder statRecorder = tasksHistograms[type.ordinal()];
+ if (collectTaskHistograms && statRecorder != null) {
+ statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description);
+ }
- if (isActive() && startTimeNanos >= 0 && isProfiling(type)) {
- // Store instance fields as local variables so they are not nulled out from under us by
- // #clear.
- JsonTraceFileWriter currentWriter = writerRef.get();
- if (wasTaskSlowEnoughToRecord(type, duration)) {
- TaskData data = new TaskData(threadId, startTimeNanos, type, description);
- data.durationNanos = duration;
- if (currentWriter != null) {
- currentWriter.enqueue(data);
- }
+ if (isActive() && startTimeNanos >= 0 && isProfiling(type)) {
+ // Store instance fields as local variables so they are not nulled out from under us by
+ // #clear.
+ JsonTraceFileWriter currentWriter = writerRef.get();
+ if (wasTaskSlowEnoughToRecord(type, duration)) {
+ TaskData data = new TaskData(threadId, startTimeNanos, type, description);
+ data.durationNanos = duration;
+ if (currentWriter != null) {
+ currentWriter.enqueue(data);
+ }
- SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
+ SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
- if (aggregator != null) {
- aggregator.add(data);
+ if (aggregator != null) {
+ aggregator.add(data);
+ }
}
}
+ } finally {
+ releaseLane();
}
}
@@ -659,7 +662,7 @@
*/
public void logSimpleTask(long startTimeNanos, ProfilerTask type, String description) {
if (clock != null) {
- logTask(getLaneId(), startTimeNanos, clock.nanoTime() - startTimeNanos, type, description);
+ logTask(startTimeNanos, clock.nanoTime() - startTimeNanos, type, description);
}
}
@@ -677,7 +680,7 @@
*/
public void logSimpleTask(
long startTimeNanos, long stopTimeNanos, ProfilerTask type, String description) {
- logTask(getLaneId(), startTimeNanos, stopTimeNanos - startTimeNanos, type, description);
+ logTask(startTimeNanos, stopTimeNanos - startTimeNanos, type, description);
}
/**
@@ -692,12 +695,12 @@
*/
public void logSimpleTaskDuration(
long startTimeNanos, Duration duration, ProfilerTask type, String description) {
- logTask(getLaneId(), startTimeNanos, duration.toNanos(), type, description);
+ logTask(startTimeNanos, duration.toNanos(), type, description);
}
/** Used to log "events" happening at a specific time - tasks with zero duration. */
public void logEventAtTime(long atTimeNanos, ProfilerTask type, String description) {
- logTask(getLaneId(), atTimeNanos, 0, type, description);
+ logTask(atTimeNanos, 0, type, description);
}
/** Used to log "events" - tasks with zero duration. */
@@ -706,9 +709,16 @@
logEventAtTime(clock.nanoTime(), type, description);
}
- private SilentCloseable reallyProfile(long laneId, ProfilerTask type, String description) {
+ private SilentCloseable reallyProfile(ProfilerTask type, String description) {
final long startTimeNanos = clock.nanoTime();
- return () -> completeTask(laneId, startTimeNanos, type, description);
+ long laneId = borrowLaneAndGetLaneId();
+ return () -> {
+ try {
+ completeTask(laneId, startTimeNanos, type, description);
+ } finally {
+ releaseLane();
+ }
+ };
}
/**
@@ -731,11 +741,7 @@
* @param description task description. May be stored until the end of the build.
*/
public SilentCloseable profile(ProfilerTask type, String description) {
- return profile(getLaneId(), type, description);
- }
-
- private SilentCloseable profile(long laneId, ProfilerTask type, String description) {
- return (isActive() && isProfiling(type)) ? reallyProfile(laneId, type, description) : NOP;
+ return (isActive() && isProfiling(type)) ? reallyProfile(type, description) : NOP;
}
/**
@@ -743,11 +749,7 @@
* profiling.
*/
public SilentCloseable profile(ProfilerTask type, Supplier<String> description) {
- return profile(getLaneId(), type, description);
- }
-
- private SilentCloseable profile(long laneId, ProfilerTask type, Supplier<String> description) {
- return (isActive() && isProfiling(type)) ? reallyProfile(laneId, type, description.get()) : NOP;
+ return (isActive() && isProfiling(type)) ? reallyProfile(type, description.get()) : NOP;
}
/**
@@ -787,15 +789,21 @@
checkNotNull(description);
if (isActive() && isProfiling(type)) {
final long startTimeNanos = clock.nanoTime();
- return () ->
+ var laneId = borrowLaneAndGetLaneId();
+ return () -> {
+ try {
completeAction(
- getLaneId(),
+ laneId,
startTimeNanos,
type,
description,
mnemonic,
includePrimaryOutput ? primaryOutput : null,
includeTargetLabel ? targetLabel : null);
+ } finally {
+ releaseLane();
+ }
+ };
} else {
return NOP;
}
@@ -813,7 +821,12 @@
}
public void completeTask(long startTimeNanos, ProfilerTask type, String description) {
- completeTask(getLaneId(), startTimeNanos, type, description);
+ var laneId = borrowLaneAndGetLaneId();
+ try {
+ completeTask(laneId, startTimeNanos, type, description);
+ } finally {
+ releaseLane();
+ }
}
/** Records the end of the task. */
@@ -894,110 +907,6 @@
}
}
- static class ProfilerTaskType {
- private final String format;
-
- public ProfilerTaskType(String format) {
- this.format = format;
- }
-
- public String getName(long index) {
- return String.format(format, index);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ProfilerTaskType that)) {
- return false;
- }
-
- if (this == obj) {
- return true;
- }
-
- return Objects.equals(this.format, that.format);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(format);
- }
- }
-
- public ProfilerTaskType createTaskType(String format) {
- return new ProfilerTaskType(format);
- }
-
- /**
- * A profiler has a specific lane id. All trace events emitted by it are associated with its lane
- * id instead of Thread.currentThread().getId().
- */
- public static class ScopedProfiler {
- private final boolean active;
- private final long laneId;
-
- public ScopedProfiler(boolean active, long laneId) {
- this.active = active;
- this.laneId = laneId;
- }
-
- public SilentCloseable profile(String description) {
- if (!active) {
- return () -> {};
- }
- return Profiler.instance().profile(laneId, ProfilerTask.INFO, description);
- }
- }
-
- /**
- * An interface used to supply a {@link ListenableFuture} with the given {@link ScopedProfiler}.
- */
- public interface FutureSupplier<T> {
- ListenableFuture<T> get(ScopedProfiler profiler);
- }
-
- public <T> ListenableFuture<T> profileAsync(
- String prefix, String description, FutureSupplier<T> futureSupplier) {
- if (!(isActive() && isProfiling(ProfilerTask.INFO))) {
- return futureSupplier.get(new ScopedProfiler(/* active= */ false, 0));
- }
-
- var lane = multiLaneGenerator.acquire(prefix);
- final long startTimeNanos = clock.nanoTime();
- var scopedProfiler = new ScopedProfiler(/* active= */ true, lane.id());
- var future = futureSupplier.get(scopedProfiler);
- future.addListener(
- () -> {
- long endTimeNanos = clock.nanoTime();
- long duration = endTimeNanos - startTimeNanos;
- recordTask(
- new TaskData(lane.id(), startTimeNanos, duration, ProfilerTask.INFO, description));
- multiLaneGenerator.release(prefix, lane);
- },
- MoreExecutors.directExecutor());
- return future;
- }
-
- private final ThreadLocal<String> virtualThreadPrefix = ThreadLocal.withInitial(() -> null);
- private final ThreadLocal<Lane> borrowedLane = ThreadLocal.withInitial(() -> null);
-
- private void registerVirtualThread(String prefix) {
- var thread = Thread.currentThread();
- var threadId = thread.threadId();
- virtualThreadPrefix.set(prefix);
- thread.setName(prefix + threadId);
- }
-
- private void deregisterVirtualThread() {
- var prefix = checkNotNull(virtualThreadPrefix.get());
- virtualThreadPrefix.remove();
- var lane = borrowedLane.get();
- if (lane != null) {
- borrowedLane.remove();
- multiLaneGenerator.release(prefix, lane);
- }
- }
-
private final AtomicLong nextLaneId = new AtomicLong(1_000_000);
private final MultiLaneGenerator multiLaneGenerator = new MultiLaneGenerator();
@@ -1016,9 +925,20 @@
var laneGenerator = checkNotNull(laneGenerators.get(prefix));
laneGenerator.release(lane);
}
+
+ private void reset() {
+ multiLaneGenerator.laneGenerators.clear();
+ }
}
- private record Lane(long id) implements Comparable<Lane> {
+ private static class Lane implements Comparable<Lane> {
+ private final long id;
+ private int refCount;
+
+ private Lane(long id) {
+ this.id = id;
+ }
+
@Override
public int compareTo(Lane o) {
return Long.compare(id, o.id);
@@ -1028,74 +948,90 @@
private class LaneGenerator {
private final String prefix;
- @GuardedBy("this")
- private final PriorityQueue<Lane> availableLanes = new PriorityQueue<>();
+ private final Queue<Lane> availableLanes = new ConcurrentLinkedQueue<>();
- @GuardedBy("this")
- private int count = 0;
+ private final AtomicInteger count = new AtomicInteger(0);
private LaneGenerator(String prefix) {
this.prefix = prefix;
}
public Lane acquire() {
- long newLaneId;
- String newLaneName;
- synchronized (this) {
- if (!availableLanes.isEmpty()) {
- return availableLanes.poll();
+ var lane = availableLanes.poll();
+ // It might create more virtual lanes, but it's fine for our purpose.
+ if (lane == null) {
+ long newLaneId = nextLaneId.getAndIncrement();
+ int newLaneIndex = count.getAndIncrement();
+ String newLaneName = prefix + newLaneIndex + " (Virtual)";
+ var threadMetadata = new ThreadMetadata(newLaneName, newLaneId);
+ var writer = Profiler.this.writerRef.get();
+ if (writer != null) {
+ writer.enqueue(threadMetadata);
}
-
- newLaneId = nextLaneId.getAndIncrement();
- int newLaneIndex = count++;
- newLaneName = prefix + newLaneIndex + " (Virtual)";
+ lane = new Lane(newLaneId);
}
-
- var threadMetadata = new ThreadMetadata(newLaneName, newLaneId);
- var writer = Profiler.this.writerRef.get();
- if (writer != null) {
- writer.enqueue(threadMetadata);
- }
- return new Lane(newLaneId);
+ return lane;
}
- public synchronized void release(Lane lane) {
- availableLanes.add(lane);
+ public void release(Lane lane) {
+ availableLanes.offer(lane);
}
}
- private long getLaneId() {
+ private final ThreadLocal<String> virtualThreadPrefix =
+ ThreadLocal.withInitial(this::guessThreadPrefix);
+ private final ThreadLocal<Lane> borrowedLane =
+ ThreadLocal.withInitial(
+ () -> {
+ var prefix = virtualThreadPrefix.get();
+ var lane = multiLaneGenerator.acquire(prefix);
+ checkState(lane.refCount == 0);
+ return lane;
+ });
+
+ private long borrowLaneAndGetLaneId() {
var currentThread = Thread.currentThread();
var threadId = currentThread.threadId();
- if (!currentThread.isVirtual()) {
+ if (!currentThread.isVirtual() || !isActive()) {
return threadId;
}
var lane = borrowedLane.get();
- if (lane == null) {
- var prefix = virtualThreadPrefix.get();
- checkNotNull(
- prefix,
- "Current virtual thread is not registered. Did you use"
- + " Profiler#profileableVirtualThreadFactor to create a VirtualThread?");
- lane = multiLaneGenerator.acquire(prefix);
- borrowedLane.set(lane);
- }
- return lane.id();
+ lane.refCount += 1;
+ return lane.id;
}
- public ThreadFactory profileableVirtualThreadFactory(String prefix) {
- return r ->
- Thread.ofVirtual()
- .unstarted(
- () -> {
- var profiler = Profiler.instance();
- profiler.registerVirtualThread(prefix);
- try {
- r.run();
- } finally {
- profiler.deregisterVirtualThread();
- }
- });
+ private void releaseLane() {
+ var currentThread = Thread.currentThread();
+ if (!currentThread.isVirtual() || !isActive()) {
+ return;
+ }
+
+ var lane = borrowedLane.get();
+ lane.refCount -= 1;
+ checkState(lane.refCount >= 0);
+ if (lane.refCount == 0) {
+ borrowedLane.remove();
+ var prefix = virtualThreadPrefix.get();
+ multiLaneGenerator.release(prefix, lane);
+ }
+ }
+
+ private String guessThreadPrefix() {
+ var currentThread = Thread.currentThread();
+ checkState(currentThread.isVirtual());
+ var threadName = currentThread.getName();
+
+ // Assume the thread name has format "prefix%d"
+ for (int i = threadName.length() - 1; i > 0; i--) {
+ var ch = threadName.charAt(i);
+ if (ch < '0' || ch > '9') {
+ if (i < threadName.length() - 1) {
+ return threadName.substring(0, i + 1);
+ }
+ }
+ }
+
+ return "Other";
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/QuiescingExecutorsImpl.java b/src/main/java/com/google/devtools/build/lib/runtime/QuiescingExecutorsImpl.java
index 325d1bb..2cafbbb 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/QuiescingExecutorsImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/QuiescingExecutorsImpl.java
@@ -25,7 +25,6 @@
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.QuiescingExecutors;
import com.google.devtools.build.lib.pkgcache.PackageOptions;
-import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.skyframe.ParallelEvaluatorErrorClassifier;
import com.google.devtools.common.options.OptionsProvider;
import java.util.concurrent.Executors;
@@ -146,8 +145,7 @@
/* parallelism= */ cpuHeavySkyKeysThreadPoolSize, SKYFRAME_EVALUATOR_CPU_HEAVY),
useAsyncExecution
? Executors.newThreadPerTaskExecutor(
- Profiler.instance()
- .profileableVirtualThreadFactory(SKYFRAME_EVALUATOR_EXECUTION + "-"))
+ Thread.ofVirtual().name(SKYFRAME_EVALUATOR_EXECUTION + "-", 0).factory())
: AbstractQueueVisitor.createExecutorService(
/* parallelism= */ executionParallelism, SKYFRAME_EVALUATOR_EXECUTION),
ExceptionHandlingMode.FAIL_FAST,