[7.2.0] Support profiling in an virtual thread. (#22375)

Cherry-pick commits:
- 10777215a95224f8dfd163572d98a301862a277e
- 10e02a15a6f5b915b7a8d25ad7a4be3481210037
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java b/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java
index 4206cbe..d9c15f8 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/JsonTraceFileWriter.java
@@ -85,7 +85,9 @@
   }
 
   public void enqueue(TraceData data) {
-    if (!metadataPosted.get()) {
+    // We assign a virtual lane for virtual thread and the metadata for the virtual lane is posted
+    // at creation time.
+    if (!Thread.currentThread().isVirtual() && !metadataPosted.get()) {
       metadataPosted.set(Boolean.TRUE);
       queue.add(new ThreadMetadata());
     }
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
index a0ad097..24bf3b6 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
@@ -22,8 +22,6 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.actions.ResourceEstimator;
 import com.google.devtools.build.lib.bugreport.BugReporter;
 import com.google.devtools.build.lib.clock.Clock;
@@ -45,11 +43,12 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -276,8 +275,7 @@
 
     // @ThreadSafe
     void add(TaskData taskData) {
-      Extrema<SlowTask> extrema =
-          extremaAggregators[(int) (Thread.currentThread().getId() % SHARDS)];
+      Extrema<SlowTask> extrema = extremaAggregators[(int) (taskData.threadId % SHARDS)];
       synchronized (extrema) {
         extrema.aggregate(new SlowTask(taskData));
       }
@@ -583,6 +581,8 @@
         aggregator.clear();
       }
     }
+
+    multiLaneGenerator.reset();
   }
 
   /** Returns true iff profiling is currently enabled. */
@@ -624,37 +624,41 @@
    * @param type task type
    * @param description task description. May be stored until end of build.
    */
-  private void logTask(
-      long threadId, long startTimeNanos, long duration, ProfilerTask type, String description) {
-    checkNotNull(description);
-    checkState(!description.isEmpty(), "No description -> not helpful");
-    if (duration < 0) {
-      // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
-      duration = 0;
-    }
+  private void logTask(long startTimeNanos, long duration, ProfilerTask type, String description) {
+    var threadId = borrowLaneAndGetLaneId();
+    try {
+      checkNotNull(description);
+      checkState(!description.isEmpty(), "No description -> not helpful");
+      if (duration < 0) {
+        // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
+        duration = 0;
+      }
 
-    StatRecorder statRecorder = tasksHistograms[type.ordinal()];
-    if (collectTaskHistograms && statRecorder != null) {
-      statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description);
-    }
+      StatRecorder statRecorder = tasksHistograms[type.ordinal()];
+      if (collectTaskHistograms && statRecorder != null) {
+        statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description);
+      }
 
-    if (isActive() && startTimeNanos >= 0 && isProfiling(type)) {
-      // Store instance fields as local variables so they are not nulled out from under us by
-      // #clear.
-      JsonTraceFileWriter currentWriter = writerRef.get();
-      if (wasTaskSlowEnoughToRecord(type, duration)) {
-        TaskData data = new TaskData(threadId, startTimeNanos, type, description);
-        data.durationNanos = duration;
-        if (currentWriter != null) {
-          currentWriter.enqueue(data);
-        }
+      if (isActive() && startTimeNanos >= 0 && isProfiling(type)) {
+        // Store instance fields as local variables so they are not nulled out from under us by
+        // #clear.
+        JsonTraceFileWriter currentWriter = writerRef.get();
+        if (wasTaskSlowEnoughToRecord(type, duration)) {
+          TaskData data = new TaskData(threadId, startTimeNanos, type, description);
+          data.durationNanos = duration;
+          if (currentWriter != null) {
+            currentWriter.enqueue(data);
+          }
 
-        SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
+          SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
 
-        if (aggregator != null) {
-          aggregator.add(data);
+          if (aggregator != null) {
+            aggregator.add(data);
+          }
         }
       }
+    } finally {
+      releaseLane();
     }
   }
 
@@ -669,12 +673,7 @@
    */
   public void logSimpleTask(long startTimeNanos, ProfilerTask type, String description) {
     if (clock != null) {
-      logTask(
-          Thread.currentThread().getId(),
-          startTimeNanos,
-          clock.nanoTime() - startTimeNanos,
-          type,
-          description);
+      logTask(startTimeNanos, clock.nanoTime() - startTimeNanos, type, description);
     }
   }
 
@@ -692,12 +691,7 @@
    */
   public void logSimpleTask(
       long startTimeNanos, long stopTimeNanos, ProfilerTask type, String description) {
-    logTask(
-        Thread.currentThread().getId(),
-        startTimeNanos,
-        stopTimeNanos - startTimeNanos,
-        type,
-        description);
+    logTask(startTimeNanos, stopTimeNanos - startTimeNanos, type, description);
   }
 
   /**
@@ -712,12 +706,12 @@
    */
   public void logSimpleTaskDuration(
       long startTimeNanos, Duration duration, ProfilerTask type, String description) {
-    logTask(Thread.currentThread().getId(), startTimeNanos, duration.toNanos(), type, description);
+    logTask(startTimeNanos, duration.toNanos(), type, description);
   }
 
   /** Used to log "events" happening at a specific time - tasks with zero duration. */
   public void logEventAtTime(long atTimeNanos, ProfilerTask type, String description) {
-    logTask(Thread.currentThread().getId(), atTimeNanos, 0, type, description);
+    logTask(atTimeNanos, 0, type, description);
   }
 
   /** Used to log "events" - tasks with zero duration. */
@@ -726,9 +720,16 @@
     logEventAtTime(clock.nanoTime(), type, description);
   }
 
-  private SilentCloseable reallyProfile(long laneId, ProfilerTask type, String description) {
+  private SilentCloseable reallyProfile(ProfilerTask type, String description) {
     final long startTimeNanos = clock.nanoTime();
-    return () -> completeTask(laneId, startTimeNanos, type, description);
+    long laneId = borrowLaneAndGetLaneId();
+    return () -> {
+      try {
+        completeTask(laneId, startTimeNanos, type, description);
+      } finally {
+        releaseLane();
+      }
+    };
   }
 
   /**
@@ -751,11 +752,7 @@
    * @param description task description. May be stored until the end of the build.
    */
   public SilentCloseable profile(ProfilerTask type, String description) {
-    return profile(Thread.currentThread().getId(), type, description);
-  }
-
-  private SilentCloseable profile(long laneId, ProfilerTask type, String description) {
-    return (isActive() && isProfiling(type)) ? reallyProfile(laneId, type, description) : NOP;
+    return (isActive() && isProfiling(type)) ? reallyProfile(type, description) : NOP;
   }
 
   /**
@@ -763,11 +760,7 @@
    * profiling.
    */
   public SilentCloseable profile(ProfilerTask type, Supplier<String> description) {
-    return profile(Thread.currentThread().getId(), type, description);
-  }
-
-  private SilentCloseable profile(long laneId, ProfilerTask type, Supplier<String> description) {
-    return (isActive() && isProfiling(type)) ? reallyProfile(laneId, type, description.get()) : NOP;
+    return (isActive() && isProfiling(type)) ? reallyProfile(type, description.get()) : NOP;
   }
 
   /**
@@ -807,15 +800,21 @@
     checkNotNull(description);
     if (isActive() && isProfiling(type)) {
       final long startTimeNanos = clock.nanoTime();
-      return () ->
+      var laneId = borrowLaneAndGetLaneId();
+      return () -> {
+        try {
           completeAction(
-              Thread.currentThread().getId(),
+              laneId,
               startTimeNanos,
               type,
               description,
               mnemonic,
               includePrimaryOutput ? primaryOutput : null,
               includeTargetLabel ? targetLabel : null);
+        } finally {
+          releaseLane();
+        }
+      };
     } else {
       return NOP;
     }
@@ -833,6 +832,15 @@
         || (type == ProfilerTask.INFO && "discoverInputs".equals(taskData.description));
   }
 
+  public void completeTask(long startTimeNanos, ProfilerTask type, String description) {
+    var laneId = borrowLaneAndGetLaneId();
+    try {
+      completeTask(laneId, startTimeNanos, type, description);
+    } finally {
+      releaseLane();
+    }
+  }
+
   /** Records the end of the task. */
   private void completeTask(
       long laneId, long startTimeNanos, ProfilerTask type, String description) {
@@ -905,136 +913,131 @@
     }
   }
 
-  static class ProfilerTaskType {
-    private final String format;
+  private final AtomicLong nextLaneId = new AtomicLong(1_000_000);
+  private final MultiLaneGenerator multiLaneGenerator = new MultiLaneGenerator();
 
-    public ProfilerTaskType(String format) {
-      this.format = format;
+  private class MultiLaneGenerator {
+    private final Map<String, LaneGenerator> laneGenerators = Maps.newConcurrentMap();
+
+    private Lane acquire(String prefix) {
+      checkState(isActive());
+      var laneGenerator =
+          laneGenerators.computeIfAbsent(prefix, unused -> new LaneGenerator(prefix));
+      return laneGenerator.acquire();
     }
 
-    public String getName(long index) {
-      return String.format(format, index);
+    private void release(String prefix, Lane lane) {
+      checkState(isActive());
+      var laneGenerator = checkNotNull(laneGenerators.get(prefix));
+      laneGenerator.release(lane);
+    }
+
+    private void reset() {
+      multiLaneGenerator.laneGenerators.clear();
+    }
+  }
+
+  private static class Lane implements Comparable<Lane> {
+    private final long id;
+    private int refCount;
+
+    private Lane(long id) {
+      this.id = id;
     }
 
     @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof ProfilerTaskType)) {
-        return false;
+    public int compareTo(Lane o) {
+      return Long.compare(id, o.id);
+    }
+  }
+
+  private class LaneGenerator {
+    private final String prefix;
+
+    private final Queue<Lane> availableLanes = new ConcurrentLinkedQueue<>();
+
+    private final AtomicInteger count = new AtomicInteger(0);
+
+    private LaneGenerator(String prefix) {
+      this.prefix = prefix;
+    }
+
+    public Lane acquire() {
+      var lane = availableLanes.poll();
+      // It might create more virtual lanes, but it's fine for our purpose.
+      if (lane == null) {
+        long newLaneId = nextLaneId.getAndIncrement();
+        int newLaneIndex = count.getAndIncrement();
+        String newLaneName = prefix + newLaneIndex + " (Virtual)";
+        var threadMetadata = new ThreadMetadata(newLaneName, newLaneId);
+        var writer = Profiler.this.writerRef.get();
+        if (writer != null) {
+          writer.enqueue(threadMetadata);
+        }
+        lane = new Lane(newLaneId);
       }
+      return lane;
+    }
 
-      if (this == obj) {
-        return true;
+    public void release(Lane lane) {
+      availableLanes.offer(lane);
+    }
+  }
+
+  private final ThreadLocal<String> virtualThreadPrefix =
+      ThreadLocal.withInitial(this::guessThreadPrefix);
+  private final ThreadLocal<Lane> borrowedLane =
+      ThreadLocal.withInitial(
+          () -> {
+            var prefix = virtualThreadPrefix.get();
+            var lane = multiLaneGenerator.acquire(prefix);
+            checkState(lane.refCount == 0);
+            return lane;
+          });
+
+  private long borrowLaneAndGetLaneId() {
+    var currentThread = Thread.currentThread();
+    var threadId = currentThread.threadId();
+    if (!currentThread.isVirtual() || !isActive()) {
+      return threadId;
+    }
+
+    var lane = borrowedLane.get();
+    lane.refCount += 1;
+    return lane.id;
+  }
+
+  private void releaseLane() {
+    var currentThread = Thread.currentThread();
+    if (!currentThread.isVirtual() || !isActive()) {
+      return;
+    }
+
+    var lane = borrowedLane.get();
+    lane.refCount -= 1;
+    checkState(lane.refCount >= 0);
+    if (lane.refCount == 0) {
+      borrowedLane.remove();
+      var prefix = virtualThreadPrefix.get();
+      multiLaneGenerator.release(prefix, lane);
+    }
+  }
+
+  private String guessThreadPrefix() {
+    var currentThread = Thread.currentThread();
+    checkState(currentThread.isVirtual());
+    var threadName = currentThread.getName();
+
+    // Assume the thread name has format "prefix%d"
+    for (int i = threadName.length() - 1; i > 0; i--) {
+      var ch = threadName.charAt(i);
+      if (ch < '0' || ch > '9') {
+        if (i < threadName.length() - 1) {
+          return threadName.substring(0, i + 1);
+        }
       }
-
-      var that = (ProfilerTaskType) obj;
-      return Objects.equals(this.format, that.format);
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(format);
-    }
-  }
-
-  public ProfilerTaskType createTaskType(String format) {
-    return new ProfilerTaskType(format);
-  }
-
-  /**
-   * A profiler has a specific lane id. All trace events emitted by it are associated with its lane
-   * id instead of Thread.currentThread().getId().
-   */
-  public static class ScopedProfiler {
-    private final boolean active;
-    private final long laneId;
-
-    public ScopedProfiler(boolean active, long laneId) {
-      this.active = active;
-      this.laneId = laneId;
-    }
-
-    public SilentCloseable profile(String description) {
-      if (!active) {
-        return () -> {};
-      }
-      return Profiler.instance().profile(laneId, ProfilerTask.INFO, description);
-    }
-  }
-
-  /**
-   * An interface used to supply a {@link ListenableFuture} with the given {@link ScopedProfiler}.
-   */
-  public interface FutureSupplier<T> {
-    ListenableFuture<T> get(ScopedProfiler profiler);
-  }
-
-  public <T> ListenableFuture<T> profileAsync(
-      ProfilerTaskType type, String description, FutureSupplier<T> futureSupplier) {
-    if (!(isActive() && isProfiling(ProfilerTask.INFO))) {
-      return futureSupplier.get(new ScopedProfiler(/* active= */ false, 0));
-    }
-
-    long laneId = laneIdGenerator.acquire(type);
-    final long startTimeNanos = clock.nanoTime();
-    var scopedProfiler = new ScopedProfiler(/* active= */ true, laneId);
-    var future = futureSupplier.get(scopedProfiler);
-    future.addListener(
-        () -> {
-          long endTimeNanos = clock.nanoTime();
-          long duration = endTimeNanos - startTimeNanos;
-          recordTask(
-              new TaskData(laneId, startTimeNanos, duration, ProfilerTask.INFO, description));
-          laneIdGenerator.release(type, laneId);
-        },
-        MoreExecutors.directExecutor());
-    return future;
-  }
-
-  private static final long LANE_ID_BASE = 1_000_000;
-  private final AtomicLong nextLaneId = new AtomicLong(LANE_ID_BASE);
-  private final TaskTypeLaneIdGenerator laneIdGenerator = new TaskTypeLaneIdGenerator();
-
-  private class TaskTypeLaneIdGenerator {
-    private final Map<ProfilerTaskType, LaneIdGenerator> typeToLaneIdGenerator =
-        Maps.newConcurrentMap();
-
-    public long acquire(ProfilerTaskType type) {
-      var laneIdGenerator =
-          typeToLaneIdGenerator.computeIfAbsent(type, unused -> new LaneIdGenerator(type));
-      return laneIdGenerator.acquire();
-    }
-
-    public void release(ProfilerTaskType type, long laneId) {
-      var laneIdGenerator = checkNotNull(typeToLaneIdGenerator.get(type));
-      laneIdGenerator.release(laneId);
-    }
-  }
-
-  private class LaneIdGenerator {
-    private final ProfilerTaskType type;
-    private final PriorityQueue<Long> availableLaneIds = new PriorityQueue<>();
-
-    private int count = 0;
-
-    private LaneIdGenerator(ProfilerTaskType type) {
-      this.type = type;
-    }
-
-    public synchronized long acquire() {
-      if (!availableLaneIds.isEmpty()) {
-        return availableLaneIds.poll();
-      }
-      var newLaneId = Profiler.this.nextLaneId.getAndIncrement();
-      var threadMetadata = new ThreadMetadata(type.getName(count++), newLaneId, LANE_ID_BASE);
-      var writer = Profiler.this.writerRef.get();
-      if (writer != null) {
-        writer.enqueue(threadMetadata);
-      }
-      return newLaneId;
-    }
-
-    public synchronized void release(long laneId) {
-      availableLaneIds.add(laneId);
-    }
+    return "Other";
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java b/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java
index 99b1a39..54ca49e 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/ThreadMetadata.java
@@ -24,6 +24,12 @@
   private final long threadId;
   private final long sortIndex;
 
+  public ThreadMetadata(String readableName, long threadId) {
+    this.readableName = readableName;
+    this.threadId = threadId;
+    this.sortIndex = getSortIndex(readableName);
+  }
+
   public ThreadMetadata(String readableName, long threadId, long sortIndex) {
     this.readableName = readableName;
     this.threadId = threadId;