| // Copyright 2014 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.profiler; |
| |
| import static com.google.devtools.build.lib.profiler.ProfilerTask.TASK_COUNT; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.devtools.build.lib.analysis.BlazeVersionInfo; |
| import com.google.devtools.build.lib.bugreport.BugReporter; |
| import com.google.devtools.build.lib.clock.Clock; |
| import com.google.devtools.build.lib.collect.Extrema; |
| import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; |
| import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; |
| import com.google.devtools.build.lib.profiler.PredicateBasedStatRecorder.RecorderAndPredicate; |
| import com.google.devtools.build.lib.profiler.StatRecorder.VfsHeuristics; |
| import com.google.devtools.build.lib.worker.WorkerMetricsCollector; |
| import com.google.gson.stream.JsonWriter; |
| import com.sun.management.OperatingSystemMXBean; |
| import java.io.BufferedOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.lang.management.ManagementFactory; |
| import java.nio.charset.StandardCharsets; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Supplier; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.zip.GZIPOutputStream; |
| import javax.annotation.Nullable; |
| |
| /** |
| * Blaze internal profiler. Provides facility to report various Blaze tasks and store them |
| * (asynchronously) in the file for future analysis. |
| * |
| * <p>Implemented as singleton so any caller should use Profiler.instance() to obtain reference. |
| * |
| * <p>Internally, profiler uses two data structures - ThreadLocal task stack to track nested tasks |
| * and single ConcurrentLinkedQueue to gather all completed tasks. |
| * |
| * <p>Also, due to the nature of the provided functionality (instrumentation of all Blaze |
| * components), build.lib.profiler package will be used by almost every other Blaze package, so |
| * special attention should be paid to avoid any dependencies on the rest of the Blaze code, |
| * including build.lib.util and build.lib.vfs. This is important because build.lib.util and |
| * build.lib.vfs contain Profiler invocations and any dependency on those two packages would create |
| * circular relationship. |
| * |
| * <p> |
| * |
| * @see ProfilerTask enum for recognized task types. |
| */ |
| @ThreadSafe |
| public final class Profiler { |
| /** The profiler (a static singleton instance). Inactive by default. */ |
| private static final Profiler instance = new Profiler(); |
| |
| private static final int HISTOGRAM_BUCKETS = 20; |
| |
| private static final TaskData POISON_PILL = new TaskData(0, 0, null, "poison pill"); |
| |
| private static final long ACTION_COUNT_BUCKET_MS = 200; |
| |
| /** File format enum. */ |
| public enum Format { |
| JSON_TRACE_FILE_FORMAT, |
| JSON_TRACE_FILE_COMPRESSED_FORMAT |
| } |
| |
| /** A task that was very slow. */ |
| public static final class SlowTask implements Comparable<SlowTask> { |
| final long durationNanos; |
| final String description; |
| final ProfilerTask type; |
| |
| private SlowTask(TaskData taskData) { |
| this.durationNanos = taskData.duration; |
| this.description = taskData.description; |
| this.type = taskData.type; |
| } |
| |
| @Override |
| public int compareTo(SlowTask other) { |
| long delta = durationNanos - other.durationNanos; |
| if (delta < 0) { // Very clumsy |
| return -1; |
| } else if (delta > 0) { |
| return 1; |
| } else { |
| return 0; |
| } |
| } |
| |
| public long getDurationNanos() { |
| return durationNanos; |
| } |
| |
| public String getDescription() { |
| return description; |
| } |
| |
| public ProfilerTask getType() { |
| return type; |
| } |
| } |
| |
| /** |
| * Container for the single task record. |
| * |
| * <p>Class itself is not thread safe, but all access to it from Profiler methods is. |
| */ |
| @ThreadCompatible |
| private static class TaskData { |
| final long threadId; |
| final long startTimeNanos; |
| final int id; |
| final ProfilerTask type; |
| final MnemonicData mnemonic; |
| final String description; |
| |
| long duration; |
| |
| TaskData( |
| int id, |
| long startTimeNanos, |
| ProfilerTask eventType, |
| MnemonicData mnemonic, |
| String description) { |
| this.id = id; |
| this.threadId = Thread.currentThread().getId(); |
| this.startTimeNanos = startTimeNanos; |
| this.type = eventType; |
| this.mnemonic = mnemonic; |
| this.description = Preconditions.checkNotNull(description); |
| } |
| |
| TaskData(int id, long startTimeNanos, ProfilerTask eventType, String description) { |
| this(id, startTimeNanos, eventType, MnemonicData.getEmptyMnemonic(), description); |
| } |
| |
| TaskData(long threadId, long startTimeNanos, long duration, String description) { |
| this.id = -1; |
| this.type = ProfilerTask.UNKNOWN; |
| this.mnemonic = MnemonicData.getEmptyMnemonic(); |
| this.threadId = threadId; |
| this.startTimeNanos = startTimeNanos; |
| this.duration = duration; |
| this.description = description; |
| } |
| |
| @Override |
| public String toString() { |
| return "Thread " + threadId + ", task " + id + ", type " + type + ", " + description; |
| } |
| } |
| |
| private static final class ActionTaskData extends TaskData { |
| final String primaryOutputPath; |
| final String targetLabel; |
| |
| ActionTaskData( |
| int id, |
| long startTimeNanos, |
| ProfilerTask eventType, |
| MnemonicData mnemonic, |
| String description, |
| String primaryOutputPath, |
| String targetLabel) { |
| super(id, startTimeNanos, eventType, mnemonic, description); |
| this.primaryOutputPath = primaryOutputPath; |
| this.targetLabel = targetLabel; |
| } |
| } |
| |
| /** |
| * Aggregator class that keeps track of the slowest tasks of the specified type. |
| * |
| * <p><code>extremaAggregators</p> is sharded so that all threads need not compete for the same |
| * lock if they do the same operation at the same time. Access to an individual {@link Extrema} |
| * is synchronized on the {@link Extrema} instance itself. |
| */ |
| private static final class SlowestTaskAggregator { |
| private static final int SHARDS = 16; |
| private static final int SIZE = 30; |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| private final Extrema<SlowTask>[] extremaAggregators = new Extrema[SHARDS]; |
| |
| SlowestTaskAggregator() { |
| for (int i = 0; i < SHARDS; i++) { |
| extremaAggregators[i] = Extrema.max(SIZE); |
| } |
| } |
| |
| // @ThreadSafe |
| void add(TaskData taskData) { |
| Extrema<SlowTask> extrema = |
| extremaAggregators[(int) (Thread.currentThread().getId() % SHARDS)]; |
| synchronized (extrema) { |
| extrema.aggregate(new SlowTask(taskData)); |
| } |
| } |
| |
| // @ThreadSafe |
| void clear() { |
| for (int i = 0; i < SHARDS; i++) { |
| Extrema<SlowTask> extrema = extremaAggregators[i]; |
| synchronized (extrema) { |
| extrema.clear(); |
| } |
| } |
| } |
| |
| // @ThreadSafe |
| Iterable<SlowTask> getSlowestTasks() { |
| // This is slow, but since it only happens during the end of the invocation, it's OK. |
| Extrema<SlowTask> mergedExtrema = Extrema.max(SIZE); |
| for (int i = 0; i < SHARDS; i++) { |
| Extrema<SlowTask> extrema = extremaAggregators[i]; |
| synchronized (extrema) { |
| for (SlowTask task : extrema.getExtremeElements()) { |
| mergedExtrema.aggregate(task); |
| } |
| } |
| } |
| return mergedExtrema.getExtremeElements(); |
| } |
| } |
| |
| private Clock clock; |
| private ImmutableSet<ProfilerTask> profiledTasks; |
| private volatile long profileStartTime; |
| private volatile boolean recordAllDurations = false; |
| private Duration profileCpuStartTime; |
| |
| /** This counter provides a unique id for every task, used to provide a parent/child relation. */ |
| private AtomicInteger taskId = new AtomicInteger(); |
| |
| /** |
| * The reference to the current writer, if any. If the referenced writer is null, then disk writes |
| * are disabled. This can happen when slowest task recording is enabled. |
| */ |
| private AtomicReference<FileWriter> writerRef = new AtomicReference<>(); |
| |
| private final SlowestTaskAggregator[] slowestTasks = |
| new SlowestTaskAggregator[ProfilerTask.values().length]; |
| |
| @VisibleForTesting |
| final StatRecorder[] tasksHistograms = new StatRecorder[ProfilerTask.values().length]; |
| |
| /** Thread that collects local cpu usage data (if enabled). */ |
| private CollectLocalResourceUsage resourceUsageThread; |
| |
| private TimeSeries actionCountTimeSeries; |
| private long actionCountStartTime; |
| private boolean collectTaskHistograms; |
| |
| private Profiler() { |
| initHistograms(); |
| for (ProfilerTask task : ProfilerTask.values()) { |
| if (task.collectsSlowestInstances) { |
| slowestTasks[task.ordinal()] = new SlowestTaskAggregator(); |
| } |
| } |
| } |
| |
| private void initHistograms() { |
| for (ProfilerTask task : ProfilerTask.values()) { |
| if (task.isVfs()) { |
| Map<String, ? extends Predicate<? super String>> vfsHeuristics = |
| VfsHeuristics.vfsTypeHeuristics; |
| List<RecorderAndPredicate> recorders = new ArrayList<>(vfsHeuristics.size()); |
| for (Map.Entry<String, ? extends Predicate<? super String>> e : vfsHeuristics.entrySet()) { |
| recorders.add( |
| new RecorderAndPredicate( |
| new SingleStatRecorder(task + " " + e.getKey(), HISTOGRAM_BUCKETS), |
| e.getValue())); |
| } |
| tasksHistograms[task.ordinal()] = new PredicateBasedStatRecorder(recorders); |
| } else { |
| tasksHistograms[task.ordinal()] = new SingleStatRecorder(task, HISTOGRAM_BUCKETS); |
| } |
| } |
| } |
| |
| /** |
| * Returns task histograms. This must be called between calls to {@link #start} and {@link #stop}, |
| * or the returned recorders are all empty. Note that the returned recorders may still be modified |
| * concurrently (but at least they are thread-safe, so that's good). |
| * |
| * <p>The stat recorders are indexed by {@code ProfilerTask#ordinal}. |
| */ |
| // TODO(ulfjack): This returns incomplete data by design. Maybe we should return the histograms on |
| // stop instead? However, this is currently only called from one location in a module, and that |
| // can't call stop itself. What to do? |
| public synchronized ImmutableList<StatRecorder> getTasksHistograms() { |
| return isActive() ? ImmutableList.copyOf(tasksHistograms) : ImmutableList.of(); |
| } |
| |
| public static Profiler instance() { |
| return instance; |
| } |
| |
| /** |
| * Returns the nanoTime of the current profiler instance, or an arbitrary constant if not active. |
| */ |
| public static long nanoTimeMaybe() { |
| if (instance.isActive()) { |
| return instance.clock.nanoTime(); |
| } |
| return -1; |
| } |
| |
| // Returns the elapsed wall clock time since the profile has been started or null if inactive. |
| @Nullable |
| public static Duration elapsedTimeMaybe() { |
| if (instance.isActive()) { |
| return Duration.ofNanos(instance.clock.nanoTime()) |
| .minus(Duration.ofNanos(instance.profileStartTime)); |
| } |
| return null; |
| } |
| |
| private static Duration getProcessCpuTime() { |
| OperatingSystemMXBean bean = |
| (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); |
| return Duration.ofNanos(bean.getProcessCpuTime()); |
| } |
| |
| // Returns the CPU time since the profile has been started or null if inactive. |
| @Nullable |
| public static Duration getProcessCpuTimeMaybe() { |
| if (instance().isActive()) { |
| return getProcessCpuTime().minus(instance().profileCpuStartTime); |
| } |
| return null; |
| } |
| |
| /** |
| * Enable profiling. |
| * |
| * <p>Subsequent calls to beginTask/endTask will be recorded in the provided output stream. Please |
| * note that stream performance is extremely important and buffered streams should be utilized. |
| * |
| * @param profiledTasks which of {@link ProfilerTask}s to track |
| * @param stream output stream to store profile data. Note: passing unbuffered stream object |
| * reference may result in significant performance penalties |
| * @param recordAllDurations iff true, record all tasks regardless of their duration; otherwise |
| * some tasks may get aggregated if they finished quick enough |
| * @param clock a {@code BlazeClock.instance()} |
| * @param execStartTimeNanos execution start time in nanos obtained from {@code clock.nanoTime()} |
| * @param collectLoadAverage If true, collects system load average (as seen in uptime(1)) |
| */ |
| public synchronized void start( |
| ImmutableSet<ProfilerTask> profiledTasks, |
| OutputStream stream, |
| Format format, |
| String outputBase, |
| UUID buildID, |
| boolean recordAllDurations, |
| Clock clock, |
| long execStartTimeNanos, |
| boolean slimProfile, |
| boolean includePrimaryOutput, |
| boolean includeTargetLabel, |
| boolean collectTaskHistograms, |
| boolean collectWorkerDataInProfiler, |
| boolean collectLoadAverage, |
| boolean collectSystemNetworkUsage, |
| WorkerMetricsCollector workerMetricsCollector, |
| BugReporter bugReporter) |
| throws IOException { |
| Preconditions.checkState(!isActive(), "Profiler already active"); |
| initHistograms(); |
| |
| this.profiledTasks = profiledTasks; |
| this.clock = clock; |
| this.actionCountStartTime = clock.nanoTime(); |
| this.actionCountTimeSeries = |
| new TimeSeries(Duration.ofNanos(actionCountStartTime).toMillis(), ACTION_COUNT_BUCKET_MS); |
| this.collectTaskHistograms = collectTaskHistograms; |
| |
| // Check for current limitation on the number of supported types due to using enum.ordinal() to |
| // store them instead of EnumSet for performance reasons. |
| Preconditions.checkState( |
| TASK_COUNT < 256, |
| "The profiler implementation supports only up to 255 different ProfilerTask values."); |
| |
| // Reset state for the new profiling session. |
| taskId.set(0); |
| this.recordAllDurations = recordAllDurations; |
| FileWriter writer = null; |
| if (stream != null && format != null) { |
| switch (format) { |
| case JSON_TRACE_FILE_FORMAT: |
| writer = |
| new JsonTraceFileWriter( |
| stream, |
| execStartTimeNanos, |
| slimProfile, |
| outputBase, |
| buildID, |
| includePrimaryOutput, |
| includeTargetLabel); |
| break; |
| case JSON_TRACE_FILE_COMPRESSED_FORMAT: |
| writer = |
| new JsonTraceFileWriter( |
| new GZIPOutputStream(stream), |
| execStartTimeNanos, |
| slimProfile, |
| outputBase, |
| buildID, |
| includePrimaryOutput, |
| includeTargetLabel); |
| } |
| writer.start(); |
| } |
| this.writerRef.set(writer); |
| |
| // Activate profiler. |
| profileStartTime = execStartTimeNanos; |
| profileCpuStartTime = getProcessCpuTime(); |
| |
| // Start collecting Bazel and system-wide CPU metric collection. |
| resourceUsageThread = |
| new CollectLocalResourceUsage( |
| bugReporter, |
| workerMetricsCollector, |
| collectWorkerDataInProfiler, |
| collectLoadAverage, |
| collectSystemNetworkUsage); |
| resourceUsageThread.setDaemon(true); |
| resourceUsageThread.start(); |
| } |
| |
| /** |
| * Returns task histograms. This must be called between calls to {@link #start} and {@link #stop}, |
| * or the returned list is empty. |
| */ |
| // TODO(ulfjack): This returns incomplete data by design. Also see getTasksHistograms. |
| public synchronized Iterable<SlowTask> getSlowestTasks() { |
| List<Iterable<SlowTask>> slowestTasksByType = new ArrayList<>(); |
| |
| for (SlowestTaskAggregator aggregator : slowestTasks) { |
| if (aggregator != null) { |
| slowestTasksByType.add(aggregator.getSlowestTasks()); |
| } |
| } |
| |
| return Iterables.concat(slowestTasksByType); |
| } |
| |
| private void collectActionCounts() { |
| if (actionCountTimeSeries != null) { |
| long endTimeMillis = Duration.ofNanos(clock.nanoTime()).toMillis(); |
| long profileStartMillis = Duration.ofNanos(actionCountStartTime).toMillis(); |
| int len = (int) ((endTimeMillis - profileStartMillis) / ACTION_COUNT_BUCKET_MS) + 1; |
| double[] actionCountValues = actionCountTimeSeries.toDoubleArray(len); |
| Profiler profiler = Profiler.instance(); |
| for (int i = 0; i < len; i++) { |
| long timeMillis = profileStartMillis + i * ACTION_COUNT_BUCKET_MS; |
| long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeMillis); |
| profiler.logEventAtTime( |
| timeNanos, ProfilerTask.ACTION_COUNTS, String.valueOf(actionCountValues[i])); |
| } |
| actionCountTimeSeries = null; |
| } |
| } |
| |
| /** |
| * Disable profiling and complete profile file creation. Subsequent calls to beginTask/endTask |
| * will no longer be recorded in the profile. |
| */ |
| public synchronized void stop() throws IOException { |
| if (!isActive()) { |
| return; |
| } |
| |
| collectActionCounts(); |
| |
| if (resourceUsageThread != null) { |
| resourceUsageThread.stopCollecting(); |
| try { |
| resourceUsageThread.join(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| resourceUsageThread.logCollectedData(); |
| resourceUsageThread = null; |
| } |
| |
| // Log a final event to update the duration of ProfilePhase.FINISH. |
| logEvent(ProfilerTask.INFO, "Finishing"); |
| FileWriter writer = writerRef.getAndSet(null); |
| if (writer != null) { |
| writer.shutdown(); |
| writer = null; |
| } |
| Arrays.fill(tasksHistograms, null); |
| profileStartTime = 0L; |
| profileCpuStartTime = null; |
| |
| for (SlowestTaskAggregator aggregator : slowestTasks) { |
| if (aggregator != null) { |
| aggregator.clear(); |
| } |
| } |
| } |
| |
| /** Returns true iff profiling is currently enabled. */ |
| public boolean isActive() { |
| return profileStartTime != 0L; |
| } |
| |
| public boolean isProfiling(ProfilerTask type) { |
| return profiledTasks.contains(type); |
| } |
| |
| /** |
| * Unless --record_full_profiler_data is given we drop small tasks and add their time to the |
| * parents duration. |
| */ |
| private boolean wasTaskSlowEnoughToRecord(ProfilerTask type, long duration) { |
| return (recordAllDurations || duration >= type.minDuration); |
| } |
| |
| /** |
| * Adds task directly to the main queue bypassing task stack. Used for simple tasks that are known |
| * to not have any subtasks. |
| * |
| * @param startTimeNanos task start time (obtained through {@link Profiler#nanoTimeMaybe()}) |
| * @param duration task duration |
| * @param type task type |
| * @param description task description. May be stored until end of build. |
| */ |
| private void logTask(long startTimeNanos, long duration, ProfilerTask type, String description) { |
| Preconditions.checkNotNull(description); |
| Preconditions.checkState(!"".equals(description), "No description -> not helpful"); |
| if (duration < 0) { |
| // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe. |
| duration = 0; |
| } |
| |
| StatRecorder statRecorder = tasksHistograms[type.ordinal()]; |
| if (collectTaskHistograms && statRecorder != null) { |
| statRecorder.addStat((int) Duration.ofNanos(duration).toMillis(), description); |
| } |
| |
| if (isActive() && startTimeNanos >= 0 && isProfiling(type)) { |
| // Store instance fields as local variables so they are not nulled out from under us by |
| // #clear. |
| FileWriter currentWriter = writerRef.get(); |
| if (wasTaskSlowEnoughToRecord(type, duration)) { |
| TaskData data = new TaskData(taskId.incrementAndGet(), startTimeNanos, type, description); |
| data.duration = duration; |
| if (currentWriter != null) { |
| currentWriter.enqueue(data); |
| } |
| |
| SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()]; |
| |
| if (aggregator != null) { |
| aggregator.add(data); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Used externally to submit simple task (one that does not have any subtasks). Depending on the |
| * minDuration attribute of the task type, task may be just aggregated into the parent task and |
| * not stored directly. |
| * |
| * @param startTimeNanos task start time (obtained through {@link Profiler#nanoTimeMaybe()}) |
| * @param type task type |
| * @param description task description. May be stored until the end of the build. |
| */ |
| public void logSimpleTask(long startTimeNanos, ProfilerTask type, String description) { |
| if (clock != null) { |
| logTask(startTimeNanos, clock.nanoTime() - startTimeNanos, type, description); |
| } |
| } |
| |
| /** |
| * Used externally to submit simple task (one that does not have any subtasks). Depending on the |
| * minDuration attribute of the task type, task may be just aggregated into the parent task and |
| * not stored directly. |
| * |
| * <p>Note that start and stop time must both be acquired from the same clock instance. |
| * |
| * @param startTimeNanos task start time |
| * @param stopTimeNanos task stop time |
| * @param type task type |
| * @param description task description. May be stored until the end of the build. |
| */ |
| public void logSimpleTask( |
| long startTimeNanos, long stopTimeNanos, ProfilerTask type, String description) { |
| logTask(startTimeNanos, stopTimeNanos - startTimeNanos, type, description); |
| } |
| |
| /** |
| * Used externally to submit simple task (one that does not have any subtasks). Depending on the |
| * minDuration attribute of the task type, task may be just aggregated into the parent task and |
| * not stored directly. |
| * |
| * @param startTimeNanos task start time (obtained through {@link Profiler#nanoTimeMaybe()}) |
| * @param duration the duration of the task |
| * @param type task type |
| * @param description task description. May be stored until the end of the build. |
| */ |
| public void logSimpleTaskDuration( |
| long startTimeNanos, Duration duration, ProfilerTask type, String description) { |
| logTask(startTimeNanos, duration.toNanos(), type, description); |
| } |
| |
| /** Used to log "events" happening at a specific time - tasks with zero duration. */ |
| public void logEventAtTime(long atTimeNanos, ProfilerTask type, String description) { |
| logTask(atTimeNanos, 0, type, description); |
| } |
| |
| /** Used to log "events" - tasks with zero duration. */ |
| @VisibleForTesting |
| void logEvent(ProfilerTask type, String description) { |
| logEventAtTime(clock.nanoTime(), type, description); |
| } |
| |
| private SilentCloseable reallyProfile(ProfilerTask type, String description) { |
| // ProfilerInfo.allTasksById is supposed to be an id -> Task map, but it is in fact a List, |
| // which means that we cannot drop tasks to which we had already assigned ids. Therefore, |
| // non-leaf tasks must not have a minimum duration. However, we don't quite consistently |
| // enforce this, and Blaze only works because we happen not to add child tasks to those parent |
| // tasks that have a minimum duration. |
| TaskData taskData = new TaskData(taskId.incrementAndGet(), clock.nanoTime(), type, description); |
| return () -> completeTask(taskData); |
| } |
| |
| /** |
| * Records the beginning of a task as specified, and returns a {@link SilentCloseable} instance |
| * that ends the task. This lets the system do the work of ending the task, with the compiler |
| * giving a warning if the returned instance is not closed. |
| * |
| * <p>Use of this method allows to support nested task monitoring. For tasks that are known to not |
| * have any subtasks, logSimpleTask() should be used instead. |
| * |
| * <p>Use like this: |
| * |
| * <pre>{@code |
| * try (SilentCloseable c = Profiler.instance().profile(type, "description")) { |
| * // Your code here. |
| * } |
| * }</pre> |
| * |
| * @param type predefined task type - see ProfilerTask for available types. |
| * @param description task description. May be stored until the end of the build. |
| */ |
| public SilentCloseable profile(ProfilerTask type, String description) { |
| Preconditions.checkNotNull(description); |
| return (isActive() && isProfiling(type)) ? reallyProfile(type, description) : NOP; |
| } |
| |
| /** |
| * Version of {@link #profile(ProfilerTask, String)} that avoids creating string unless actually |
| * profiling. |
| */ |
| public SilentCloseable profile(ProfilerTask type, Supplier<String> description) { |
| return (isActive() && isProfiling(type)) |
| ? reallyProfile(type, Preconditions.checkNotNull(description.get())) |
| : NOP; |
| } |
| |
| /** |
| * Records the beginning of a task as specified, and returns a {@link SilentCloseable} instance |
| * that ends the task. This lets the system do the work of ending the task, with the compiler |
| * giving a warning if the returned instance is not closed. |
| * |
| * <p>Use of this method allows to support nested task monitoring. For tasks that are known to not |
| * have any subtasks, logSimpleTask() should be used instead. |
| * |
| * <p>This is a convenience method that uses {@link ProfilerTask#INFO}. |
| * |
| * <p>Use like this: |
| * |
| * <pre>{@code |
| * try (SilentCloseable c = Profiler.instance().profile("description")) { |
| * // Your code here. |
| * } |
| * }</pre> |
| * |
| * @param description task description. May be stored until the end of the build. |
| */ |
| public SilentCloseable profile(String description) { |
| return profile(ProfilerTask.INFO, description); |
| } |
| |
| /** |
| * Similar to {@link #profile}, but specific to action-related events. Takes an extra argument: |
| * primaryOutput. |
| */ |
| public SilentCloseable profileAction( |
| ProfilerTask type, |
| String mnemonic, |
| String description, |
| String primaryOutput, |
| String targetLabel) { |
| Preconditions.checkNotNull(description); |
| if (isActive() && isProfiling(type)) { |
| TaskData taskData = |
| new ActionTaskData( |
| taskId.incrementAndGet(), |
| clock.nanoTime(), |
| type, |
| new MnemonicData(mnemonic), |
| description, |
| primaryOutput, |
| targetLabel); |
| return () -> completeTask(taskData); |
| } else { |
| return NOP; |
| } |
| } |
| |
| public SilentCloseable profileAction( |
| ProfilerTask type, String description, String primaryOutput, String targetLabel) { |
| return profileAction(type, null, description, primaryOutput, targetLabel); |
| } |
| |
| private static final SilentCloseable NOP = () -> {}; |
| |
| private boolean countAction(ProfilerTask type, TaskData taskData) { |
| return type == ProfilerTask.ACTION |
| || (type == ProfilerTask.INFO && "discoverInputs".equals(taskData.description)); |
| } |
| |
| /** Records the end of the task. */ |
| private void completeTask(TaskData data) { |
| if (isActive()) { |
| long endTime = clock.nanoTime(); |
| data.duration = endTime - data.startTimeNanos; |
| boolean shouldRecordTask = wasTaskSlowEnoughToRecord(data.type, data.duration); |
| FileWriter writer = writerRef.get(); |
| if (shouldRecordTask && writer != null) { |
| writer.enqueue(data); |
| } |
| |
| if (shouldRecordTask) { |
| if (actionCountTimeSeries != null && countAction(data.type, data)) { |
| synchronized (this) { |
| actionCountTimeSeries.addRange( |
| Duration.ofNanos(data.startTimeNanos).toMillis(), |
| Duration.ofNanos(endTime).toMillis()); |
| } |
| } |
| SlowestTaskAggregator aggregator = slowestTasks[data.type.ordinal()]; |
| if (aggregator != null) { |
| aggregator.add(data); |
| } |
| } |
| } |
| } |
| |
| /** Convenience method to log phase marker tasks. */ |
| public void markPhase(ProfilePhase phase) throws InterruptedException { |
| MemoryProfiler.instance().markPhase(phase); |
| if (isActive() && isProfiling(ProfilerTask.PHASE)) { |
| logEvent(ProfilerTask.PHASE, phase.description); |
| } |
| } |
| |
| private abstract static class FileWriter implements Runnable { |
| protected final BlockingQueue<TaskData> queue; |
| protected final Thread thread; |
| protected IOException savedException; |
| |
| FileWriter() { |
| this.queue = new LinkedBlockingQueue<>(); |
| this.thread = new Thread(this, "profile-writer-thread"); |
| } |
| |
| public void shutdown() throws IOException { |
| // Add poison pill to queue and then wait for writer thread to shut down. |
| queue.add(POISON_PILL); |
| try { |
| thread.join(); |
| } catch (InterruptedException e) { |
| thread.interrupt(); |
| Thread.currentThread().interrupt(); |
| } |
| if (savedException != null) { |
| throw savedException; |
| } |
| } |
| |
| public void start() { |
| thread.start(); |
| } |
| |
| public void enqueue(TaskData data) { |
| queue.add(data); |
| } |
| } |
| |
| /** Writes the profile in Json Trace file format. */ |
| private static class JsonTraceFileWriter extends FileWriter { |
| private final OutputStream outStream; |
| private final long profileStartTimeNanos; |
| private final ThreadLocal<Boolean> metadataPosted = |
| ThreadLocal.withInitial(() -> Boolean.FALSE); |
| private final boolean slimProfile; |
| private final boolean includePrimaryOutput; |
| private final boolean includeTargetLabel; |
| private final UUID buildID; |
| private final String outputBase; |
| |
| // The JDK never returns 0 as thread id so we use that as fake thread id for the critical path. |
| private static final long CRITICAL_PATH_THREAD_ID = 0; |
| |
| private static final long SLIM_PROFILE_EVENT_THRESHOLD = 10_000; |
| private static final long SLIM_PROFILE_MAXIMAL_PAUSE_NS = Duration.ofMillis(100).toNanos(); |
| private static final long SLIM_PROFILE_MAXIMAL_DURATION_NS = Duration.ofMillis(250).toNanos(); |
| private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); |
| |
| /** |
| * These constants describe ranges of threads. We suppose that there are no more than 10_000 |
| * threads of each kind, otherwise the profile becomes unreadable anyway. So the sort index of |
| * skyframe threads is in range [10_000..20_000) for example. |
| */ |
| private static final long SKYFRAME_EVALUATOR_SHIFT = 10_000; |
| |
| private static final long DYNAMIC_EXECUTION_SHIFT = 20_000; |
| private static final long INCLUDE_SCANNER_SHIFT = 30_000; |
| |
| private static final long CRITICAL_PATH_SORT_INDEX = 0; |
| private static final long MAIN_THREAD_SORT_INDEX = 1; |
| private static final long GC_THREAD_SORT_INDEX = 2; |
| private static final long MAX_SORT_INDEX = 1_000_000; |
| |
| JsonTraceFileWriter( |
| OutputStream outStream, |
| long profileStartTimeNanos, |
| boolean slimProfile, |
| String outputBase, |
| UUID buildID, |
| boolean includePrimaryOutput, |
| boolean includeTargetLabel) { |
| this.outStream = outStream; |
| this.profileStartTimeNanos = profileStartTimeNanos; |
| this.slimProfile = slimProfile; |
| this.buildID = buildID; |
| this.outputBase = outputBase; |
| this.includePrimaryOutput = includePrimaryOutput; |
| this.includeTargetLabel = includeTargetLabel; |
| } |
| |
| @Override |
| public void enqueue(TaskData data) { |
| if (!metadataPosted.get()) { |
| metadataPosted.set(Boolean.TRUE); |
| // Create a TaskData object that is special-cased below. |
| queue.add( |
| new TaskData( |
| /* id= */ 0, |
| /* startTimeNanos= */ -1, |
| ProfilerTask.THREAD_NAME, |
| Thread.currentThread().getName())); |
| queue.add( |
| new TaskData( |
| /* id= */ 0, |
| /* startTimeNanos= */ -1, |
| ProfilerTask.THREAD_SORT_INDEX, |
| String.valueOf(getSortIndex(Thread.currentThread().getName())))); |
| } |
| queue.add(data); |
| } |
| |
| private static final class MergedEvent { |
| int count = 0; |
| long startTimeNanos; |
| long endTimeNanos; |
| TaskData data; |
| |
| /* |
| * Tries to merge an additional event, i.e. if the event is close enough to the already merged |
| * event. |
| * |
| * Returns null, if merging was possible. |
| * If not mergeable, returns the TaskData of the previously merged events and clears the |
| * internal data structures. |
| */ |
| @Nullable |
| TaskData maybeMerge(TaskData data) { |
| long startTimeNanos = data.startTimeNanos; |
| long endTimeNanos = startTimeNanos + data.duration; |
| if (count > 0 |
| && startTimeNanos >= this.startTimeNanos |
| && endTimeNanos <= this.endTimeNanos) { |
| // Skips child tasks. |
| return null; |
| } |
| if (count == 0) { |
| this.data = data; |
| this.startTimeNanos = startTimeNanos; |
| this.endTimeNanos = endTimeNanos; |
| count++; |
| return null; |
| } else if (startTimeNanos <= this.endTimeNanos + SLIM_PROFILE_MAXIMAL_PAUSE_NS) { |
| this.endTimeNanos = endTimeNanos; |
| count++; |
| return null; |
| } else { |
| TaskData ret = getAndReset(); |
| this.startTimeNanos = startTimeNanos; |
| this.endTimeNanos = endTimeNanos; |
| this.data = data; |
| count = 1; |
| return ret; |
| } |
| } |
| |
| // Returns a TaskData object representing the merged data and clears internal data structures. |
| TaskData getAndReset() { |
| TaskData ret; |
| if (data == null || count <= 1) { |
| ret = data; |
| } else { |
| ret = |
| new TaskData( |
| data.threadId, |
| this.startTimeNanos, |
| this.endTimeNanos - this.startTimeNanos, |
| "merged " + count + " events"); |
| } |
| count = 0; |
| data = null; |
| return ret; |
| } |
| } |
| |
| private void writeTask(JsonWriter writer, TaskData data) throws IOException { |
| Preconditions.checkNotNull(data); |
| String eventType = data.duration == 0 ? "i" : "X"; |
| writer.setIndent(" "); |
| writer.beginObject(); |
| writer.setIndent(""); |
| if (data.type == null) { |
| writer.setIndent(" "); |
| } else { |
| writer.name("cat").value(data.type.description); |
| } |
| writer.name("name").value(data.description); |
| writer.name("ph").value(eventType); |
| writer |
| .name("ts") |
| .value(TimeUnit.NANOSECONDS.toMicros(data.startTimeNanos - profileStartTimeNanos)); |
| if (data.duration != 0) { |
| writer.name("dur").value(TimeUnit.NANOSECONDS.toMicros(data.duration)); |
| } |
| writer.name("pid").value(1); |
| |
| // Primary outputs are non-mergeable, thus incompatible with slim profiles. |
| if (includePrimaryOutput && data instanceof ActionTaskData) { |
| writer.name("out").value(((ActionTaskData) data).primaryOutputPath); |
| } |
| if (includeTargetLabel && data instanceof ActionTaskData) { |
| writer.name("args"); |
| writer.beginObject(); |
| writer.name("target").value(((ActionTaskData) data).targetLabel); |
| if (data.mnemonic.hasBeenSet()) { |
| writer.name("mnemonic").value(data.mnemonic.getValueForJson()); |
| } |
| writer.endObject(); |
| } else if (data.mnemonic.hasBeenSet() && data instanceof ActionTaskData) { |
| writer.name("args"); |
| writer.beginObject(); |
| writer.name("mnemonic").value(data.mnemonic.getValueForJson()); |
| writer.endObject(); |
| } else if (data.type == ProfilerTask.CRITICAL_PATH_COMPONENT) { |
| writer.name("args"); |
| writer.beginObject(); |
| writer.name("tid").value(data.threadId); |
| writer.endObject(); |
| } |
| long threadId = |
| data.type == ProfilerTask.CRITICAL_PATH_COMPONENT |
| ? CRITICAL_PATH_THREAD_ID |
| : data.threadId; |
| writer.name("tid").value(threadId); |
| writer.endObject(); |
| } |
| |
| private static String getReadableName(String threadName) { |
| if (isMainThread(threadName)) { |
| return "Main Thread"; |
| } |
| |
| if (isGCThread(threadName)) { |
| return "Garbage Collector"; |
| } |
| |
| return threadName; |
| } |
| |
| private static long getSortIndex(String threadName) { |
| if (isMainThread(threadName)) { |
| return MAIN_THREAD_SORT_INDEX; |
| } |
| |
| if (isGCThread(threadName)) { |
| return GC_THREAD_SORT_INDEX; |
| } |
| |
| Matcher numberMatcher = NUMBER_PATTERN.matcher(threadName); |
| if (!numberMatcher.find()) { |
| return MAX_SORT_INDEX; |
| } |
| |
| long extractedNumber; |
| try { |
| extractedNumber = Long.parseLong(numberMatcher.group()); |
| } catch (NumberFormatException e) { |
| // If the number cannot be parsed, e.g. is larger than a long, the actual position is not |
| // really relevant. |
| return MAX_SORT_INDEX; |
| } |
| |
| if (threadName.startsWith("skyframe-evaluator")) { |
| return SKYFRAME_EVALUATOR_SHIFT + extractedNumber; |
| } |
| |
| if (threadName.startsWith("dynamic-execution")) { |
| return DYNAMIC_EXECUTION_SHIFT + extractedNumber; |
| } |
| |
| if (threadName.startsWith("Include scanner")) { |
| return INCLUDE_SCANNER_SHIFT + extractedNumber; |
| } |
| |
| return MAX_SORT_INDEX; |
| } |
| |
| private static boolean isMainThread(String threadName) { |
| return threadName.startsWith("grpc-command"); |
| } |
| |
| private static boolean isGCThread(String threadName) { |
| return threadName.equals("Service Thread"); |
| } |
| |
| /** |
| * Saves all gathered information from taskQueue queue to the file. Method is invoked internally |
| * by the Timer-based thread and at the end of profiling session. |
| */ |
| @Override |
| public void run() { |
| try { |
| boolean receivedPoisonPill = false; |
| try (JsonWriter writer = |
| new JsonWriter( |
| // The buffer size of 262144 is chosen at random. |
| new OutputStreamWriter( |
| new BufferedOutputStream(outStream, 262144), StandardCharsets.UTF_8))) { |
| var finishDate = Instant.now(); |
| writer.beginObject(); |
| writer.name("otherData"); |
| writer.beginObject(); |
| writer.name("bazel_version").value(BlazeVersionInfo.instance().getReleaseName()); |
| writer.name("build_id").value(buildID.toString()); |
| writer.name("output_base").value(outputBase); |
| writer.name("date").value(finishDate.toString()); |
| writer.name("profile_finish_ts").value(finishDate.getEpochSecond() * 1000); |
| writer.endObject(); |
| writer.name("traceEvents"); |
| writer.beginArray(); |
| TaskData data; |
| |
| // Generate metadata event for the critical path as thread 0 in disguise. |
| writer.setIndent(" "); |
| writer.beginObject(); |
| writer.setIndent(""); |
| writer.name("name").value("thread_name"); |
| writer.name("ph").value("M"); |
| writer.name("pid").value(1); |
| writer.name("tid").value(CRITICAL_PATH_THREAD_ID); |
| writer.name("args"); |
| writer.beginObject(); |
| writer.name("name").value("Critical Path"); |
| writer.endObject(); |
| writer.endObject(); |
| |
| writer.setIndent(" "); |
| writer.beginObject(); |
| writer.setIndent(""); |
| writer.name("name").value("thread_sort_index"); |
| writer.name("ph").value("M"); |
| writer.name("pid").value(1); |
| writer.name("tid").value(CRITICAL_PATH_THREAD_ID); |
| writer.name("args"); |
| writer.beginObject(); |
| writer.name("sort_index").value(String.valueOf(CRITICAL_PATH_SORT_INDEX)); |
| writer.endObject(); |
| writer.endObject(); |
| |
| HashMap<Long, MergedEvent> eventsPerThread = new HashMap<>(); |
| int eventCount = 0; |
| while ((data = queue.take()) != POISON_PILL) { |
| Preconditions.checkNotNull(data); |
| eventCount++; |
| if (data.type == ProfilerTask.THREAD_NAME) { |
| writer.setIndent(" "); |
| writer.beginObject(); |
| writer.setIndent(""); |
| writer.name("name").value("thread_name"); |
| writer.name("ph").value("M"); |
| writer.name("pid").value(1); |
| writer.name("tid").value(data.threadId); |
| writer.name("args"); |
| |
| writer.beginObject(); |
| writer.name("name").value(getReadableName(data.description)); |
| writer.endObject(); |
| |
| writer.endObject(); |
| continue; |
| } |
| |
| if (data.type == ProfilerTask.THREAD_SORT_INDEX) { |
| writer.setIndent(" "); |
| writer.beginObject(); |
| writer.setIndent(""); |
| writer.name("name").value("thread_sort_index"); |
| writer.name("ph").value("M"); |
| writer.name("pid").value(1); |
| writer.name("tid").value(data.threadId); |
| writer.name("args"); |
| |
| writer.beginObject(); |
| writer.name("sort_index").value(data.description); |
| writer.endObject(); |
| |
| writer.endObject(); |
| continue; |
| } |
| |
| if (data.type == ProfilerTask.LOCAL_CPU_USAGE |
| || data.type == ProfilerTask.LOCAL_MEMORY_USAGE |
| || data.type == ProfilerTask.ACTION_COUNTS |
| || data.type == ProfilerTask.SYSTEM_CPU_USAGE |
| || data.type == ProfilerTask.SYSTEM_MEMORY_USAGE |
| || data.type == ProfilerTask.SYSTEM_NETWORK_UP_USAGE |
| || data.type == ProfilerTask.SYSTEM_NETWORK_DOWN_USAGE |
| || data.type == ProfilerTask.WORKERS_MEMORY_USAGE |
| || data.type == ProfilerTask.SYSTEM_LOAD_AVERAGE) { |
| // Skip counts equal to zero. They will show up as a thin line in the profile. |
| if ("0.0".equals(data.description)) { |
| continue; |
| } |
| writer.setIndent(" "); |
| writer.beginObject(); |
| writer.setIndent(""); |
| writer.name("name").value(data.type.description); |
| |
| // Pick acceptable counter colors manually, unfortunately we have to pick from these |
| // weird reserved names from |
| // https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html |
| switch (data.type) { |
| case LOCAL_CPU_USAGE: |
| writer.name("cname").value("good"); |
| break; |
| case LOCAL_MEMORY_USAGE: |
| writer.name("cname").value("olive"); |
| break; |
| case SYSTEM_CPU_USAGE: |
| writer.name("cname").value("rail_load"); |
| break; |
| case SYSTEM_MEMORY_USAGE: |
| writer.name("cname").value("bad"); |
| break; |
| case SYSTEM_NETWORK_UP_USAGE: |
| case SYSTEM_NETWORK_DOWN_USAGE: |
| writer.name("cname").value("rail_response"); |
| break; |
| case WORKERS_MEMORY_USAGE: |
| writer.name("cname").value("rail_animation"); |
| break; |
| case SYSTEM_LOAD_AVERAGE: |
| writer.name("cname").value("generic_work"); |
| break; |
| default: |
| // won't happen |
| } |
| writer.name("ph").value("C"); |
| writer |
| .name("ts") |
| .value( |
| TimeUnit.NANOSECONDS.toMicros(data.startTimeNanos - profileStartTimeNanos)); |
| writer.name("pid").value(1); |
| writer.name("tid").value(data.threadId); |
| writer.name("args"); |
| |
| writer.beginObject(); |
| switch (data.type) { |
| case LOCAL_CPU_USAGE: |
| writer.name("cpu").value(data.description); |
| break; |
| case LOCAL_MEMORY_USAGE: |
| writer.name("memory").value(data.description); |
| break; |
| case ACTION_COUNTS: |
| writer.name("action").value(data.description); |
| break; |
| case SYSTEM_CPU_USAGE: |
| writer.name("system cpu").value(data.description); |
| break; |
| case SYSTEM_MEMORY_USAGE: |
| writer.name("system memory").value(data.description); |
| break; |
| case SYSTEM_NETWORK_UP_USAGE: |
| writer.name("system network up (Mbps)").value(data.description); |
| break; |
| case SYSTEM_NETWORK_DOWN_USAGE: |
| writer.name("system network down (Mbps)").value(data.description); |
| break; |
| case WORKERS_MEMORY_USAGE: |
| writer.name("workers memory").value(data.description); |
| break; |
| case SYSTEM_LOAD_AVERAGE: |
| writer.name("load").value(data.description); |
| break; |
| default: |
| // won't happen |
| } |
| writer.endObject(); |
| |
| writer.endObject(); |
| continue; |
| } |
| if (slimProfile |
| && eventCount > SLIM_PROFILE_EVENT_THRESHOLD |
| && data.duration > 0 |
| && data.duration < SLIM_PROFILE_MAXIMAL_DURATION_NS |
| && data.type != ProfilerTask.CRITICAL_PATH_COMPONENT) { |
| eventsPerThread.putIfAbsent(data.threadId, new MergedEvent()); |
| TaskData taskData = eventsPerThread.get(data.threadId).maybeMerge(data); |
| if (taskData != null) { |
| writeTask(writer, taskData); |
| } |
| } else { |
| writeTask(writer, data); |
| } |
| } |
| for (Profiler.JsonTraceFileWriter.MergedEvent value : eventsPerThread.values()) { |
| TaskData taskData = value.getAndReset(); |
| if (taskData != null) { |
| writeTask(writer, taskData); |
| } |
| } |
| receivedPoisonPill = true; |
| writer.setIndent(" "); |
| writer.endArray(); |
| writer.endObject(); |
| } catch (IOException e) { |
| this.savedException = e; |
| if (!receivedPoisonPill) { |
| while (queue.take() != POISON_PILL) { |
| // We keep emptying the queue, but we can't write anything. |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| // Exit silently. |
| } |
| } |
| } |
| } |