[7.2.0] Support profiling in an virtual thread. (#22375)
Cherry-pick commits:
- 10777215a95224f8dfd163572d98a301862a277e
- 10e02a15a6f5b915b7a8d25ad7a4be3481210037
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java b/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java
index 4206cbe..d9c15f8 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java
@@ -85,7 +85,9 @@
}
public void enqueue(TraceData data) {
- if (!metadataPosted.get()) {
+ // We assign a virtual lane for virtual thread and the metadata for the virtual lane is posted
+ // at creation time.
+ if (!Thread.currentThread().isVirtual() && !metadataPosted.get()) {
metadataPosted.set(Boolean.TRUE);
queue.add(new ThreadMetadata());
}
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
index a0ad097..24bf3b6 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
@@ -22,8 +22,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ResourceEstimator;
import com.google.devtools.build.lib.bugreport.BugReporter;
import com.google.devtools.build.lib.clock.Clock;
@@ -45,11 +43,12 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -276,8 +275,7 @@
// @ThreadSafe
void add(TaskData taskData) {
- Extrema<SlowTask> extrema =
- extremaAggregators[(int) (Thread.currentThread().getId() % SHARDS)];
+ Extrema<SlowTask> extrema = extremaAggregators[(int) (taskData.threadId % SHARDS)];
synchronized (extrema) {
extrema.aggregate(new SlowTask(taskData));
}
@@ -583,6 +581,8 @@
aggregator.clear();
}
}
+
+ multiLaneGenerator.reset();
}
/** Returns true iff profiling is currently enabled. */
@@ -624,37 +624,41 @@
* @param type task type
* @param description task description. May be stored until end of build.
*/
- private void logTask(
- long threadId, long startTimeNanos, long duration, ProfilerTask type, String description) {
- checkNotNull(description);
- checkState(!description.isEmpty(), "No description -> not helpful");
- if (duration < 0) {
- // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
- duration = 0;
- }
+ private void logTask(long startTimeNanos, long duration, ProfilerTask type, String description) {
+ var threadId = borrowLaneAndGetLaneId();
+ try {
+ checkNotNull(description);
+ checkState(!description.isEmpty(), "No description -> not helpful");
+ if (duration < 0) {
+ // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
+ duration = 0;
+ }
- StatRecorder statRecorder = tasksHistograms[type.ordinal()];
- if (collectTaskHistograms && statRecorder != null) {
- statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description);
- }
+ StatRecorder statRecorder = tasksHistograms[type.ordinal()];
+ if (collectTaskHistograms && statRecorder != null) {
+ statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description);
+ }
- if (isActive() && startTimeNanos >= 0 && isProfiling(type)) {
- // Store instance fields as local variables so they are not nulled out from under us by
- // #clear.
- JsonTraceFileWriter currentWriter = writerRef.get();
- if (wasTaskSlowEnoughToRecord(type, duration)) {
- TaskData data = new TaskData(threadId, startTimeNanos, type, description);
- data.durationNanos = duration;
- if (currentWriter != null) {
- currentWriter.enqueue(data);
- }
+ if (isActive() && startTimeNanos >= 0 && isProfiling(type)) {
+ // Store instance fields as local variables so they are not nulled out from under us by
+ // #clear.
+ JsonTraceFileWriter currentWriter = writerRef.get();
+ if (wasTaskSlowEnoughToRecord(type, duration)) {
+ TaskData data = new TaskData(threadId, startTimeNanos, type, description);
+ data.durationNanos = duration;
+ if (currentWriter != null) {
+ currentWriter.enqueue(data);
+ }
- SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
+ SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
- if (aggregator != null) {
- aggregator.add(data);
+ if (aggregator != null) {
+ aggregator.add(data);
+ }
}
}
+ } finally {
+ releaseLane();
}
}
@@ -669,12 +673,7 @@
*/
public void logSimpleTask(long startTimeNanos, ProfilerTask type, String description) {
if (clock != null) {
- logTask(
- Thread.currentThread().getId(),
- startTimeNanos,
- clock.nanoTime() - startTimeNanos,
- type,
- description);
+ logTask(startTimeNanos, clock.nanoTime() - startTimeNanos, type, description);
}
}
@@ -692,12 +691,7 @@
*/
public void logSimpleTask(
long startTimeNanos, long stopTimeNanos, ProfilerTask type, String description) {
- logTask(
- Thread.currentThread().getId(),
- startTimeNanos,
- stopTimeNanos - startTimeNanos,
- type,
- description);
+ logTask(startTimeNanos, stopTimeNanos - startTimeNanos, type, description);
}
/**
@@ -712,12 +706,12 @@
*/
public void logSimpleTaskDuration(
long startTimeNanos, Duration duration, ProfilerTask type, String description) {
- logTask(Thread.currentThread().getId(), startTimeNanos, duration.toNanos(), type, description);
+ logTask(startTimeNanos, duration.toNanos(), type, description);
}
/** Used to log "events" happening at a specific time - tasks with zero duration. */
public void logEventAtTime(long atTimeNanos, ProfilerTask type, String description) {
- logTask(Thread.currentThread().getId(), atTimeNanos, 0, type, description);
+ logTask(atTimeNanos, 0, type, description);
}
/** Used to log "events" - tasks with zero duration. */
@@ -726,9 +720,16 @@
logEventAtTime(clock.nanoTime(), type, description);
}
- private SilentCloseable reallyProfile(long laneId, ProfilerTask type, String description) {
+ private SilentCloseable reallyProfile(ProfilerTask type, String description) {
final long startTimeNanos = clock.nanoTime();
- return () -> completeTask(laneId, startTimeNanos, type, description);
+ long laneId = borrowLaneAndGetLaneId();
+ return () -> {
+ try {
+ completeTask(laneId, startTimeNanos, type, description);
+ } finally {
+ releaseLane();
+ }
+ };
}
/**
@@ -751,11 +752,7 @@
* @param description task description. May be stored until the end of the build.
*/
public SilentCloseable profile(ProfilerTask type, String description) {
- return profile(Thread.currentThread().getId(), type, description);
- }
-
- private SilentCloseable profile(long laneId, ProfilerTask type, String description) {
- return (isActive() && isProfiling(type)) ? reallyProfile(laneId, type, description) : NOP;
+ return (isActive() && isProfiling(type)) ? reallyProfile(type, description) : NOP;
}
/**
@@ -763,11 +760,7 @@
* profiling.
*/
public SilentCloseable profile(ProfilerTask type, Supplier<String> description) {
- return profile(Thread.currentThread().getId(), type, description);
- }
-
- private SilentCloseable profile(long laneId, ProfilerTask type, Supplier<String> description) {
- return (isActive() && isProfiling(type)) ? reallyProfile(laneId, type, description.get()) : NOP;
+ return (isActive() && isProfiling(type)) ? reallyProfile(type, description.get()) : NOP;
}
/**
@@ -807,15 +800,21 @@
checkNotNull(description);
if (isActive() && isProfiling(type)) {
final long startTimeNanos = clock.nanoTime();
- return () ->
+ var laneId = borrowLaneAndGetLaneId();
+ return () -> {
+ try {
completeAction(
- Thread.currentThread().getId(),
+ laneId,
startTimeNanos,
type,
description,
mnemonic,
includePrimaryOutput ? primaryOutput : null,
includeTargetLabel ? targetLabel : null);
+ } finally {
+ releaseLane();
+ }
+ };
} else {
return NOP;
}
@@ -833,6 +832,15 @@
|| (type == ProfilerTask.INFO && "discoverInputs".equals(taskData.description));
}
+ public void completeTask(long startTimeNanos, ProfilerTask type, String description) {
+ var laneId = borrowLaneAndGetLaneId();
+ try {
+ completeTask(laneId, startTimeNanos, type, description);
+ } finally {
+ releaseLane();
+ }
+ }
+
/** Records the end of the task. */
private void completeTask(
long laneId, long startTimeNanos, ProfilerTask type, String description) {
@@ -905,136 +913,131 @@
}
}
- static class ProfilerTaskType {
- private final String format;
+ private final AtomicLong nextLaneId = new AtomicLong(1_000_000);
+ private final MultiLaneGenerator multiLaneGenerator = new MultiLaneGenerator();
- public ProfilerTaskType(String format) {
- this.format = format;
+ private class MultiLaneGenerator {
+ private final Map<String, LaneGenerator> laneGenerators = Maps.newConcurrentMap();
+
+ private Lane acquire(String prefix) {
+ checkState(isActive());
+ var laneGenerator =
+ laneGenerators.computeIfAbsent(prefix, unused -> new LaneGenerator(prefix));
+ return laneGenerator.acquire();
}
- public String getName(long index) {
- return String.format(format, index);
+ private void release(String prefix, Lane lane) {
+ checkState(isActive());
+ var laneGenerator = checkNotNull(laneGenerators.get(prefix));
+ laneGenerator.release(lane);
+ }
+
+ private void reset() {
+ multiLaneGenerator.laneGenerators.clear();
+ }
+ }
+
+ private static class Lane implements Comparable<Lane> {
+ private final long id;
+ private int refCount;
+
+ private Lane(long id) {
+ this.id = id;
}
@Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ProfilerTaskType)) {
- return false;
+ public int compareTo(Lane o) {
+ return Long.compare(id, o.id);
+ }
+ }
+
+ private class LaneGenerator {
+ private final String prefix;
+
+ private final Queue<Lane> availableLanes = new ConcurrentLinkedQueue<>();
+
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ private LaneGenerator(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public Lane acquire() {
+ var lane = availableLanes.poll();
+ // It might create more virtual lanes, but it's fine for our purpose.
+ if (lane == null) {
+ long newLaneId = nextLaneId.getAndIncrement();
+ int newLaneIndex = count.getAndIncrement();
+ String newLaneName = prefix + newLaneIndex + " (Virtual)";
+ var threadMetadata = new ThreadMetadata(newLaneName, newLaneId);
+ var writer = Profiler.this.writerRef.get();
+ if (writer != null) {
+ writer.enqueue(threadMetadata);
+ }
+ lane = new Lane(newLaneId);
}
+ return lane;
+ }
- if (this == obj) {
- return true;
+ public void release(Lane lane) {
+ availableLanes.offer(lane);
+ }
+ }
+
+ private final ThreadLocal<String> virtualThreadPrefix =
+ ThreadLocal.withInitial(this::guessThreadPrefix);
+ private final ThreadLocal<Lane> borrowedLane =
+ ThreadLocal.withInitial(
+ () -> {
+ var prefix = virtualThreadPrefix.get();
+ var lane = multiLaneGenerator.acquire(prefix);
+ checkState(lane.refCount == 0);
+ return lane;
+ });
+
+ private long borrowLaneAndGetLaneId() {
+ var currentThread = Thread.currentThread();
+ var threadId = currentThread.threadId();
+ if (!currentThread.isVirtual() || !isActive()) {
+ return threadId;
+ }
+
+ var lane = borrowedLane.get();
+ lane.refCount += 1;
+ return lane.id;
+ }
+
+ private void releaseLane() {
+ var currentThread = Thread.currentThread();
+ if (!currentThread.isVirtual() || !isActive()) {
+ return;
+ }
+
+ var lane = borrowedLane.get();
+ lane.refCount -= 1;
+ checkState(lane.refCount >= 0);
+ if (lane.refCount == 0) {
+ borrowedLane.remove();
+ var prefix = virtualThreadPrefix.get();
+ multiLaneGenerator.release(prefix, lane);
+ }
+ }
+
+ private String guessThreadPrefix() {
+ var currentThread = Thread.currentThread();
+ checkState(currentThread.isVirtual());
+ var threadName = currentThread.getName();
+
+ // Assume the thread name has format "prefix%d"
+ for (int i = threadName.length() - 1; i > 0; i--) {
+ var ch = threadName.charAt(i);
+ if (ch < '0' || ch > '9') {
+ if (i < threadName.length() - 1) {
+ return threadName.substring(0, i + 1);
+ }
}
-
- var that = (ProfilerTaskType) obj;
- return Objects.equals(this.format, that.format);
}
- @Override
- public int hashCode() {
- return Objects.hashCode(format);
- }
- }
-
- public ProfilerTaskType createTaskType(String format) {
- return new ProfilerTaskType(format);
- }
-
- /**
- * A profiler has a specific lane id. All trace events emitted by it are associated with its lane
- * id instead of Thread.currentThread().getId().
- */
- public static class ScopedProfiler {
- private final boolean active;
- private final long laneId;
-
- public ScopedProfiler(boolean active, long laneId) {
- this.active = active;
- this.laneId = laneId;
- }
-
- public SilentCloseable profile(String description) {
- if (!active) {
- return () -> {};
- }
- return Profiler.instance().profile(laneId, ProfilerTask.INFO, description);
- }
- }
-
- /**
- * An interface used to supply a {@link ListenableFuture} with the given {@link ScopedProfiler}.
- */
- public interface FutureSupplier<T> {
- ListenableFuture<T> get(ScopedProfiler profiler);
- }
-
- public <T> ListenableFuture<T> profileAsync(
- ProfilerTaskType type, String description, FutureSupplier<T> futureSupplier) {
- if (!(isActive() && isProfiling(ProfilerTask.INFO))) {
- return futureSupplier.get(new ScopedProfiler(/* active= */ false, 0));
- }
-
- long laneId = laneIdGenerator.acquire(type);
- final long startTimeNanos = clock.nanoTime();
- var scopedProfiler = new ScopedProfiler(/* active= */ true, laneId);
- var future = futureSupplier.get(scopedProfiler);
- future.addListener(
- () -> {
- long endTimeNanos = clock.nanoTime();
- long duration = endTimeNanos - startTimeNanos;
- recordTask(
- new TaskData(laneId, startTimeNanos, duration, ProfilerTask.INFO, description));
- laneIdGenerator.release(type, laneId);
- },
- MoreExecutors.directExecutor());
- return future;
- }
-
- private static final long LANE_ID_BASE = 1_000_000;
- private final AtomicLong nextLaneId = new AtomicLong(LANE_ID_BASE);
- private final TaskTypeLaneIdGenerator laneIdGenerator = new TaskTypeLaneIdGenerator();
-
- private class TaskTypeLaneIdGenerator {
- private final Map<ProfilerTaskType, LaneIdGenerator> typeToLaneIdGenerator =
- Maps.newConcurrentMap();
-
- public long acquire(ProfilerTaskType type) {
- var laneIdGenerator =
- typeToLaneIdGenerator.computeIfAbsent(type, unused -> new LaneIdGenerator(type));
- return laneIdGenerator.acquire();
- }
-
- public void release(ProfilerTaskType type, long laneId) {
- var laneIdGenerator = checkNotNull(typeToLaneIdGenerator.get(type));
- laneIdGenerator.release(laneId);
- }
- }
-
- private class LaneIdGenerator {
- private final ProfilerTaskType type;
- private final PriorityQueue<Long> availableLaneIds = new PriorityQueue<>();
-
- private int count = 0;
-
- private LaneIdGenerator(ProfilerTaskType type) {
- this.type = type;
- }
-
- public synchronized long acquire() {
- if (!availableLaneIds.isEmpty()) {
- return availableLaneIds.poll();
- }
- var newLaneId = Profiler.this.nextLaneId.getAndIncrement();
- var threadMetadata = new ThreadMetadata(type.getName(count++), newLaneId, LANE_ID_BASE);
- var writer = Profiler.this.writerRef.get();
- if (writer != null) {
- writer.enqueue(threadMetadata);
- }
- return newLaneId;
- }
-
- public synchronized void release(long laneId) {
- availableLaneIds.add(laneId);
- }
+ return "Other";
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java b/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java
index 99b1a39..54ca49e 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java
@@ -24,6 +24,12 @@
private final long threadId;
private final long sortIndex;
+ public ThreadMetadata(String readableName, long threadId) {
+ this.readableName = readableName;
+ this.threadId = threadId;
+ this.sortIndex = getSortIndex(readableName);
+ }
+
public ThreadMetadata(String readableName, long threadId, long sortIndex) {
this.readableName = readableName;
this.threadId = threadId;