| // Copyright 2022 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.profiler; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.devtools.build.lib.analysis.BlazeVersionInfo; |
| import com.google.devtools.build.lib.profiler.Profiler.TaskData; |
| import com.google.gson.stream.JsonWriter; |
| import java.io.BufferedOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.nio.charset.StandardCharsets; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.HashMap; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import javax.annotation.Nullable; |
| |
| /** Writes the profile in Json Trace file format. */ |
| class JsonTraceFileWriter implements Runnable { |
| protected final BlockingQueue<TraceData> queue; |
| protected final Thread thread; |
| protected IOException savedException; |
| |
| private final OutputStream outStream; |
| private final long profileStartTimeNanos; |
| private final ThreadLocal<Boolean> metadataPosted = ThreadLocal.withInitial(() -> Boolean.FALSE); |
| private final boolean slimProfile; |
| private final UUID buildID; |
| private final String outputBase; |
| |
| private static final long SLIM_PROFILE_EVENT_THRESHOLD = 10_000; |
| private static final long SLIM_PROFILE_MAXIMAL_PAUSE_NS = Duration.ofMillis(100).toNanos(); |
| private static final long SLIM_PROFILE_MAXIMAL_DURATION_NS = Duration.ofMillis(250).toNanos(); |
| |
| private static final TaskData POISON_PILL = |
| new TaskData( |
| /* threadId= */ 0, /* startTimeNanos= */ 0, /* eventType= */ null, "poison pill"); |
| |
| JsonTraceFileWriter( |
| OutputStream outStream, |
| long profileStartTimeNanos, |
| boolean slimProfile, |
| String outputBase, |
| UUID buildID) { |
| this.queue = new LinkedBlockingQueue<>(); |
| this.thread = new Thread(this, "profile-writer-thread"); |
| this.outStream = outStream; |
| this.profileStartTimeNanos = profileStartTimeNanos; |
| this.slimProfile = slimProfile; |
| this.buildID = buildID; |
| this.outputBase = outputBase; |
| } |
| |
| public void shutdown() throws IOException { |
| // Add poison pill to queue and then wait for writer thread to shut down. |
| queue.add(POISON_PILL); |
| try { |
| thread.join(); |
| } catch (InterruptedException e) { |
| thread.interrupt(); |
| Thread.currentThread().interrupt(); |
| } |
| if (savedException != null) { |
| throw savedException; |
| } |
| } |
| |
| public void start() { |
| thread.start(); |
| } |
| |
| public void enqueue(TraceData data) { |
| if (!metadataPosted.get()) { |
| metadataPosted.set(Boolean.TRUE); |
| queue.add(new ThreadMetadata()); |
| } |
| queue.add(data); |
| } |
| |
| private static final class MergedEvent { |
| int count = 0; |
| long startTimeNanos; |
| long endTimeNanos; |
| TaskData data; |
| |
| /* |
| * Tries to merge an additional event, i.e. if the event is close enough to the already merged |
| * event. |
| * |
| * Returns null, if merging was possible. |
| * If not mergeable, returns the TaskData of the previously merged events and clears the |
| * internal data structures. |
| */ |
| @Nullable |
| TaskData maybeMerge(TaskData data) { |
| long startTimeNanos = data.startTimeNanos; |
| long endTimeNanos = startTimeNanos + data.durationNanos; |
| if (count > 0 && startTimeNanos >= this.startTimeNanos && endTimeNanos <= this.endTimeNanos) { |
| // Skips child tasks. |
| return null; |
| } |
| if (count == 0) { |
| this.data = data; |
| this.startTimeNanos = startTimeNanos; |
| this.endTimeNanos = endTimeNanos; |
| count++; |
| return null; |
| } else if (startTimeNanos <= this.endTimeNanos + SLIM_PROFILE_MAXIMAL_PAUSE_NS) { |
| this.endTimeNanos = endTimeNanos; |
| count++; |
| return null; |
| } else { |
| TaskData ret = getAndReset(); |
| this.startTimeNanos = startTimeNanos; |
| this.endTimeNanos = endTimeNanos; |
| this.data = data; |
| count = 1; |
| return ret; |
| } |
| } |
| |
| // Returns a TaskData object representing the merged data and clears internal data structures. |
| TaskData getAndReset() { |
| TaskData ret; |
| if (data == null || count <= 1) { |
| ret = data; |
| } else { |
| ret = |
| new TaskData( |
| data.threadId, |
| this.startTimeNanos, |
| this.endTimeNanos - this.startTimeNanos, |
| "merged " + count + " events"); |
| } |
| count = 0; |
| data = null; |
| return ret; |
| } |
| } |
| |
| private static boolean isCandidateForMerging(TaskData data) { |
| return data.durationNanos > 0 |
| && data.durationNanos < SLIM_PROFILE_MAXIMAL_DURATION_NS |
| && data.type != ProfilerTask.CRITICAL_PATH_COMPONENT; |
| } |
| |
| /** |
| * Saves all gathered information from taskQueue queue to the file. Method is invoked internally |
| * by the Timer-based thread and at the end of profiling session. |
| */ |
| @Override |
| public void run() { |
| try { |
| boolean receivedPoisonPill = false; |
| try (JsonWriter writer = |
| new JsonWriter( |
| // The buffer size of 262144 is chosen at random. |
| new OutputStreamWriter( |
| new BufferedOutputStream(outStream, 262144), StandardCharsets.UTF_8))) { |
| var finishDate = Instant.now(); |
| writer.beginObject(); |
| writer.name("otherData"); |
| writer.beginObject(); |
| writer.name("bazel_version").value(BlazeVersionInfo.instance().getReleaseName()); |
| writer.name("build_id").value(buildID.toString()); |
| writer.name("output_base").value(outputBase); |
| writer.name("date").value(finishDate.toString()); |
| writer.name("profile_finish_ts").value(finishDate.getEpochSecond() * 1000); |
| writer.endObject(); |
| writer.name("traceEvents"); |
| writer.beginArray(); |
| |
| // Generate metadata event for the critical path as thread 0 in disguise. |
| ThreadMetadata criticalPathMetadata = |
| ThreadMetadata.createFakeThreadMetadataForCriticalPath(); |
| criticalPathMetadata.writeTraceData(writer, profileStartTimeNanos); |
| |
| HashMap<Long, MergedEvent> eventsPerThread = new HashMap<>(); |
| int eventCount = 0; |
| TraceData data; |
| while ((data = queue.take()) != POISON_PILL) { |
| Preconditions.checkNotNull(data); |
| eventCount++; |
| |
| if (slimProfile |
| && eventCount > SLIM_PROFILE_EVENT_THRESHOLD |
| && data instanceof TaskData |
| && isCandidateForMerging((TaskData) data)) { |
| TaskData taskData = (TaskData) data; |
| eventsPerThread.putIfAbsent(taskData.threadId, new MergedEvent()); |
| TaskData mergedTaskData = eventsPerThread.get(taskData.threadId).maybeMerge(taskData); |
| if (mergedTaskData != null) { |
| mergedTaskData.writeTraceData(writer, profileStartTimeNanos); |
| } |
| } else { |
| data.writeTraceData(writer, profileStartTimeNanos); |
| } |
| } |
| for (JsonTraceFileWriter.MergedEvent value : eventsPerThread.values()) { |
| TaskData taskData = value.getAndReset(); |
| if (taskData != null) { |
| taskData.writeTraceData(writer, profileStartTimeNanos); |
| } |
| } |
| receivedPoisonPill = true; |
| writer.setIndent(" "); |
| writer.endArray(); |
| writer.endObject(); |
| } catch (IOException e) { |
| this.savedException = e; |
| if (!receivedPoisonPill) { |
| while (queue.take() != POISON_PILL) { |
| // We keep emptying the queue, but we can't write anything. |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| // Exit silently. |
| } |
| } |
| } |