blob: 4206cbe370c4e7918e0eb44426cbb05ba257660c [file] [log] [blame]
// Copyright 2022 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.profiler;
import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.profiler.Profiler.TaskData;
import com.google.gson.stream.JsonWriter;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
/** Writes the profile in Json Trace file format. */
class JsonTraceFileWriter implements Runnable {
protected final BlockingQueue<TraceData> queue;
protected final Thread thread;
protected IOException savedException;
private final OutputStream outStream;
private final long profileStartTimeNanos;
private final ThreadLocal<Boolean> metadataPosted = ThreadLocal.withInitial(() -> Boolean.FALSE);
private final boolean slimProfile;
private final UUID buildID;
private final String outputBase;
private static final long SLIM_PROFILE_EVENT_THRESHOLD = 10_000;
private static final long SLIM_PROFILE_MAXIMAL_PAUSE_NS = Duration.ofMillis(100).toNanos();
private static final long SLIM_PROFILE_MAXIMAL_DURATION_NS = Duration.ofMillis(250).toNanos();
private static final TaskData POISON_PILL =
new TaskData(
/* threadId= */ 0, /* startTimeNanos= */ 0, /* eventType= */ null, "poison pill");
JsonTraceFileWriter(
OutputStream outStream,
long profileStartTimeNanos,
boolean slimProfile,
String outputBase,
UUID buildID) {
this.queue = new LinkedBlockingQueue<>();
this.thread = new Thread(this, "profile-writer-thread");
this.outStream = outStream;
this.profileStartTimeNanos = profileStartTimeNanos;
this.slimProfile = slimProfile;
this.buildID = buildID;
this.outputBase = outputBase;
}
public void shutdown() throws IOException {
// Add poison pill to queue and then wait for writer thread to shut down.
queue.add(POISON_PILL);
try {
thread.join();
} catch (InterruptedException e) {
thread.interrupt();
Thread.currentThread().interrupt();
}
if (savedException != null) {
throw savedException;
}
}
public void start() {
thread.start();
}
public void enqueue(TraceData data) {
if (!metadataPosted.get()) {
metadataPosted.set(Boolean.TRUE);
queue.add(new ThreadMetadata());
}
queue.add(data);
}
private static final class MergedEvent {
int count = 0;
long startTimeNanos;
long endTimeNanos;
TaskData data;
/*
* Tries to merge an additional event, i.e. if the event is close enough to the already merged
* event.
*
* Returns null, if merging was possible.
* If not mergeable, returns the TaskData of the previously merged events and clears the
* internal data structures.
*/
@Nullable
TaskData maybeMerge(TaskData data) {
long startTimeNanos = data.startTimeNanos;
long endTimeNanos = startTimeNanos + data.durationNanos;
if (count > 0 && startTimeNanos >= this.startTimeNanos && endTimeNanos <= this.endTimeNanos) {
// Skips child tasks.
return null;
}
if (count == 0) {
this.data = data;
this.startTimeNanos = startTimeNanos;
this.endTimeNanos = endTimeNanos;
count++;
return null;
} else if (startTimeNanos <= this.endTimeNanos + SLIM_PROFILE_MAXIMAL_PAUSE_NS) {
this.endTimeNanos = endTimeNanos;
count++;
return null;
} else {
TaskData ret = getAndReset();
this.startTimeNanos = startTimeNanos;
this.endTimeNanos = endTimeNanos;
this.data = data;
count = 1;
return ret;
}
}
// Returns a TaskData object representing the merged data and clears internal data structures.
TaskData getAndReset() {
TaskData ret;
if (data == null || count <= 1) {
ret = data;
} else {
ret =
new TaskData(
data.threadId,
this.startTimeNanos,
this.endTimeNanos - this.startTimeNanos,
"merged " + count + " events");
}
count = 0;
data = null;
return ret;
}
}
private static boolean isCandidateForMerging(TaskData data) {
return data.durationNanos > 0
&& data.durationNanos < SLIM_PROFILE_MAXIMAL_DURATION_NS
&& data.type != ProfilerTask.CRITICAL_PATH_COMPONENT;
}
/**
* Saves all gathered information from taskQueue queue to the file. Method is invoked internally
* by the Timer-based thread and at the end of profiling session.
*/
@Override
public void run() {
try {
boolean receivedPoisonPill = false;
try (JsonWriter writer =
new JsonWriter(
// The buffer size of 262144 is chosen at random.
new OutputStreamWriter(
new BufferedOutputStream(outStream, 262144), StandardCharsets.UTF_8))) {
var finishDate = Instant.now();
writer.beginObject();
writer.name("otherData");
writer.beginObject();
writer.name("bazel_version").value(BlazeVersionInfo.instance().getReleaseName());
writer.name("build_id").value(buildID.toString());
writer.name("output_base").value(outputBase);
writer.name("date").value(finishDate.toString());
writer.name("profile_finish_ts").value(finishDate.getEpochSecond() * 1000);
writer.endObject();
writer.name("traceEvents");
writer.beginArray();
// Generate metadata event for the critical path as thread 0 in disguise.
ThreadMetadata criticalPathMetadata =
ThreadMetadata.createFakeThreadMetadataForCriticalPath();
criticalPathMetadata.writeTraceData(writer, profileStartTimeNanos);
HashMap<Long, MergedEvent> eventsPerThread = new HashMap<>();
int eventCount = 0;
TraceData data;
while ((data = queue.take()) != POISON_PILL) {
Preconditions.checkNotNull(data);
eventCount++;
if (slimProfile
&& eventCount > SLIM_PROFILE_EVENT_THRESHOLD
&& data instanceof TaskData
&& isCandidateForMerging((TaskData) data)) {
TaskData taskData = (TaskData) data;
eventsPerThread.putIfAbsent(taskData.threadId, new MergedEvent());
TaskData mergedTaskData = eventsPerThread.get(taskData.threadId).maybeMerge(taskData);
if (mergedTaskData != null) {
mergedTaskData.writeTraceData(writer, profileStartTimeNanos);
}
} else {
data.writeTraceData(writer, profileStartTimeNanos);
}
}
for (JsonTraceFileWriter.MergedEvent value : eventsPerThread.values()) {
TaskData taskData = value.getAndReset();
if (taskData != null) {
taskData.writeTraceData(writer, profileStartTimeNanos);
}
}
receivedPoisonPill = true;
writer.setIndent(" ");
writer.endArray();
writer.endObject();
} catch (IOException e) {
this.savedException = e;
if (!receivedPoisonPill) {
while (queue.take() != POISON_PILL) {
// We keep emptying the queue, but we can't write anything.
}
}
}
} catch (InterruptedException e) {
// Exit silently.
}
}
}