Refactor profiler

- move the save method to an inner class
- don't use a timer, use a blocking queue instead
- add a format enum (in anticipation of adding a json output format)
- update the test to use an in memory buffer, and avoid FoundationTestCase

Compared to the original https://github.com/bazelbuild/bazel/commit/15b8c259db111012b4642287172cb4d1d82151f3, it contains these changes:
- Make it so we don't create a queue if we are not going to write any
  data! The queue is now owned by the writer, and if there is no writer, there
  is no queue.

  This was causing a memory regression because slowest task profiling is
  enabled by default, in which case the profiler is started with no output
  file. In that case, there's no thread that is emptying the queue, but the
  queue was still created by default.

- add additional tests for slowest task and histogram handling; these also
  provide coverage for the case where the profiler is started without an output
  stream
- move all the writer thread handling into the inner class
- make writer access thread-safe
- add a bunch of documentation

PiperOrigin-RevId: 200212978
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
index 0bf0e41..80f04ce 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
@@ -36,12 +36,11 @@
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Logger;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
@@ -130,22 +129,23 @@
   // EOF marker. Must be < 0.
   public static final int EOF_MARKER = -1;
 
-  // Profiler will check for gathered data and persist all of it in the
-  // separate thread every SAVE_DELAY ms.
-  private static final int SAVE_DELAY = 2000; // ms
-
-  /**
-   * The profiler (a static singleton instance). Inactive by default.
-   */
+  /** The profiler (a static singleton instance). Inactive by default. */
   private static final Profiler instance = new Profiler();
 
   private static final int HISTOGRAM_BUCKETS = 20;
 
+  private static final TaskData POISON_PILL = new TaskData(0, 0, null, null, "poison pill");
+
+  /** File format enum. */
+  public static enum Format {
+    BINARY_BAZEL_FORMAT;
+  }
+
   /** A task that was very slow. */
   public static final class SlowTask implements Comparable<SlowTask> {
     final long durationNanos;
     final String description;
-    ProfilerTask type;
+    final ProfilerTask type;
 
     private SlowTask(TaskData taskData) {
       this.durationNanos = taskData.duration;
@@ -186,7 +186,7 @@
    * methods is.
    */
   @ThreadCompatible
-  private final class TaskData {
+  private static final class TaskData {
     final long threadId;
     final long startTimeNanos;
     final int id;
@@ -198,12 +198,11 @@
     int[] counts; // number of invocations per ProfilerTask type
     long[] durations; // time spend in the task per ProfilerTask type
 
-    TaskData(long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) {
-      threadId = Thread.currentThread().getId();
-      counts = null;
-      durations = null;
-      id = taskId.incrementAndGet();
-      parentId = (parent == null  ? 0 : parent.id);
+    TaskData(
+        int id, long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) {
+      this.id = id;
+      this.threadId = Thread.currentThread().getId();
+      this.parentId = (parent == null  ? 0 : parent.id);
       this.startTimeNanos = startTimeNanos;
       this.type = eventType;
       this.description = Preconditions.checkNotNull(description);
@@ -238,7 +237,6 @@
    */
   @ThreadSafe
   private final class TaskStack extends ThreadLocal<List<TaskData>> {
-
     @Override
     public List<TaskData> initialValue() {
       return new ArrayList<>();
@@ -266,7 +264,7 @@
     }
 
     public TaskData create(long startTimeNanos, ProfilerTask eventType, String description) {
-      return new TaskData(startTimeNanos, peek(), eventType, description);
+      return new TaskData(taskId.incrementAndGet(), startTimeNanos, peek(), eventType, description);
     }
 
     @Override
@@ -428,19 +426,29 @@
 
   private Clock clock;
   private ProfiledTaskKinds profiledTaskKinds;
-  private volatile long profileStartTime = 0L;
+  private volatile long profileStartTime;
   private volatile boolean recordAllDurations = false;
+
+  /** This counter provides a unique id for every task, used to provide a parent/child relation. */
   private AtomicInteger taskId = new AtomicInteger();
 
+  /**
+   * The reference to the current writer, if any. If the referenced writer is null, then disk writes
+   * are disabled. This can happen when slowest task recording is enabled.
+   */
+  private AtomicReference<FileWriter> writerRef = new AtomicReference<>();
+
+  /**
+   * This is a per-thread data structure that's used to track the current stack of open tasks, the
+   * purpose of which is to track the parent id of every task. This is also used to ensure that
+   * {@link #profile} and {@link #completeTask} calls always occur in pairs.
+   */
+  // TODO(ulfjack): We can infer the parent/child relationship after the fact instead of tracking it
+  // at runtime. That would allow us to remove this data structure entirely.
   private TaskStack taskStack;
-  private Queue<TaskData> taskQueue;
-  private DataOutputStream out;
-  private Timer timer;
-  private IOException saveException;
-  private ObjectDescriber describer;
-  @SuppressWarnings("unchecked")
+
   private final SlowestTaskAggregator[] slowestTasks =
-  new SlowestTaskAggregator[ProfilerTask.values().length];
+      new SlowestTaskAggregator[ProfilerTask.values().length];
 
   private final StatRecorder[] tasksHistograms = new StatRecorder[ProfilerTask.values().length];
 
@@ -470,6 +478,16 @@
     }
   }
 
+  /**
+   * Returns task histograms. This must be called between calls to {@link #start} and {@link #stop},
+   * or the returned recorders are all empty. Note that the returned recorders may still be modified
+   * concurrently (but at least they are thread-safe, so that's good).
+   *
+   * <p>The stat recorders are indexed by {@code ProfilerTask#ordinal}.
+   */
+  // TODO(ulfjack): This returns incomplete data by design. Maybe we should return the histograms on
+  // stop instead? However, this is currently only called from one location in a module, and that
+  // can't call stop itself. What to do?
   public ImmutableList<StatRecorder> getTasksHistograms() {
     return ImmutableList.copyOf(tasksHistograms);
   }
@@ -508,15 +526,13 @@
   public synchronized void start(
       ProfiledTaskKinds profiledTaskKinds,
       OutputStream stream,
+      Format format,
       String comment,
       boolean recordAllDurations,
       Clock clock,
-      long execStartTimeNanos)
-          throws IOException {
+      long execStartTimeNanos) {
     Preconditions.checkState(!isActive(), "Profiler already active");
-    taskStack = new TaskStack();
-    taskQueue = new ConcurrentLinkedQueue<>();
-    describer = new ObjectDescriber();
+    initHistograms();
 
     this.profiledTaskKinds = profiledTaskKinds;
     this.clock = clock;
@@ -529,38 +545,25 @@
     // reset state for the new profiling session
     taskId.set(0);
     this.recordAllDurations = recordAllDurations;
-    this.saveException = null;
+    this.taskStack = new TaskStack();
+    FileWriter writer = null;
     if (stream != null) {
-      this.timer = new Timer("ProfilerTimer", true);
-      // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by
-      // the save() method. Values for buffer sizes were chosen by running small amount of tests
-      // and identifying point of diminishing returns - but I have not really tried to optimize
-      // them.
-      this.out = new DataOutputStream(new BufferedOutputStream(new DeflaterOutputStream(
-          stream, new Deflater(Deflater.BEST_SPEED, false), 65536), 262144));
-
-      this.out.writeInt(MAGIC); // magic
-      this.out.writeInt(VERSION); // protocol_version
-      this.out.writeUTF(comment);
-      // ProfileTask.values() method sorts enums using their ordinal() value, so
-      // there there is no need to store ordinal() value for each entry.
-      this.out.writeInt(TASK_COUNT);
-      for (ProfilerTask type : ProfilerTask.values()) {
-        this.out.writeUTF(type.toString());
+      if (format == Format.BINARY_BAZEL_FORMAT) {
+        writer = new BinaryFormatWriter(stream, profileStartTime, comment);
+        writer.start();
       }
-
-      // Start save thread
-      timer.schedule(new TimerTask() {
-        @Override public void run() { save(); }
-      }, SAVE_DELAY, SAVE_DELAY);
-    } else {
-      this.out = null;
     }
+    this.writerRef.set(writer);
 
     // activate profiler
     profileStartTime = execStartTimeNanos;
   }
 
+  /**
+   * Returns task histograms. This must be called between calls to {@link #start} and {@link #stop},
+   * or the returned list is empty.
+   */
+  // TODO(ulfjack): This returns incomplete data by design. Also see getTasksHistograms.
   public synchronized Iterable<SlowTask> getSlowestTasks() {
     List<Iterable<SlowTask>> slowestTasksByType = new ArrayList<>();
 
@@ -579,31 +582,25 @@
    * be recorded in the profile.
    */
   public synchronized void stop() throws IOException {
-    if (saveException != null) {
-      throw saveException;
-    }
     if (!isActive()) {
       return;
     }
     // Log a final event to update the duration of ProfilePhase.FINISH.
     logEvent(ProfilerTask.INFO, "Finishing");
-    save();
-    clear();
+    FileWriter writer = writerRef.getAndSet(null);
+    if (writer != null) {
+      writer.shutdown();
+      writer = null;
+    }
+    taskStack = null;
+    initHistograms();
+    profileStartTime = 0L;
 
     for (SlowestTaskAggregator aggregator : slowestTasks) {
       if (aggregator != null) {
         aggregator.clear();
       }
     }
-
-    if (saveException != null) {
-      throw saveException;
-    }
-    if (out != null) {
-      out.writeInt(EOF_MARKER);
-      out.close();
-      out = null;
-    }
   }
 
   /**
@@ -618,81 +615,6 @@
   }
 
   /**
-   * Saves all gathered information from taskQueue queue to the file.
-   * Method is invoked internally by the Timer-based thread and at the end of
-   * profiling session.
-   */
-  private synchronized void save() {
-    if (out == null) {
-      return;
-    }
-    try {
-      // Allocate the sink once to avoid GC
-      ByteBuffer sink = ByteBuffer.allocate(1024);
-      TaskData data;
-      while ((data = taskQueue.poll()) != null) {
-        sink.clear();
-
-        VarInt.putVarLong(data.threadId, sink);
-        VarInt.putVarInt(data.id, sink);
-        VarInt.putVarInt(data.parentId, sink);
-        VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink);
-        VarInt.putVarLong(data.duration, sink);
-
-        // To save space (and improve performance), convert all description
-        // strings to the canonical object and use IdentityHashMap to assign
-        // unique numbers for each string.
-        int descIndex = describer.getDescriptionIndex(data.description);
-        VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values.
-
-        // Save types using their ordinal() value
-        sink.put((byte) data.type.ordinal());
-
-        // Save aggregated data stats.
-        if (data.counts != null) {
-          for (int i = 0; i < TASK_COUNT; i++) {
-            if (data.counts[i] > 0) {
-              sink.put((byte) i); // aggregated type ordinal value
-              VarInt.putVarInt(data.counts[i], sink);
-              VarInt.putVarLong(data.durations[i], sink);
-            }
-          }
-        }
-
-        this.out.writeInt(sink.position());
-        this.out.write(sink.array(), 0, sink.position());
-        if (describer.isUnassigned(descIndex)) {
-          this.out.writeUTF(describer.memoizeDescription(data.description));
-        }
-      }
-      this.out.flush();
-    } catch (IOException e) {
-      saveException = e;
-      clear();
-      try {
-        out.close();
-      } catch (IOException e2) {
-        // ignore it
-      }
-    }
-  }
-
-  private synchronized void clear() {
-    initHistograms();
-    profileStartTime = 0L;
-    if (timer != null) {
-      timer.cancel();
-      timer = null;
-    }
-    taskStack = null;
-    taskQueue = null;
-    describer = null;
-
-    // Note that slowest task aggregator are not cleared here because clearing happens
-    // periodically over the course of a command invocation.
-  }
-
-  /**
    * Unless --record_full_profiler_data is given we drop small tasks and add their time to the
    * parents duration.
    */
@@ -721,8 +643,8 @@
         (int) TimeUnit.NANOSECONDS.toMillis(duration), description);
     // Store instance fields as local variables so they are not nulled out from under us by #clear.
     TaskStack localStack = taskStack;
-    Queue<TaskData> localQueue = taskQueue;
-    if (localStack == null || localQueue == null) {
+    FileWriter currentWriter = writerRef.get();
+    if (localStack == null) {
       // Variables have been nulled out by #clear in between the check the caller made and this
       // point in the code. Probably due to an asynchronous crash.
       logger.severe("Variables null in profiler for " + type + ", probably due to async crash");
@@ -735,8 +657,8 @@
     if (wasTaskSlowEnoughToRecord(type, duration)) {
       TaskData data = localStack.create(startTimeNanos, type, description);
       data.duration = duration;
-      if (out != null) {
-        localQueue.add(data);
+      if (currentWriter != null) {
+        currentWriter.enqueue(data);
       }
 
       SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
@@ -863,13 +785,13 @@
         taskStack.peek().aggregateChild(data.type, data.duration);
       }
       boolean shouldRecordTask = wasTaskSlowEnoughToRecord(type, data.duration);
-      if (out != null && (shouldRecordTask || data.counts != null)) {
-        taskQueue.add(data);
+      FileWriter writer = writerRef.get();
+      if ((shouldRecordTask || data.counts != null) && writer != null) {
+        writer.enqueue(data);
       }
 
       if (shouldRecordTask) {
         SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
-
         if (aggregator != null) {
           aggregator.add(data);
         }
@@ -885,4 +807,146 @@
       logEvent(ProfilerTask.PHASE, phase.description);
     }
   }
+
+  private abstract static class FileWriter implements Runnable {
+    protected final BlockingQueue<TaskData> queue;
+    protected final Thread thread;
+    protected IOException savedException;
+
+    FileWriter() {
+      this.queue = new LinkedBlockingDeque<>();
+      this.thread = new Thread(this);
+    }
+
+    public void shutdown() throws IOException {
+      // Add poison pill to queue and then wait for writer thread to shut down.
+      queue.add(POISON_PILL);
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        thread.interrupt();
+        Thread.currentThread().interrupt();
+      }
+      if (savedException != null) {
+        throw savedException;
+      }
+    }
+
+    public void start() {
+      thread.start();
+    }
+
+    public void enqueue(TaskData data) {
+      queue.add(data);
+    }
+  }
+
+  /** Writes the profile in the binary Bazel profile format. */
+  private static class BinaryFormatWriter extends FileWriter {
+    private final DataOutputStream out;
+    private final long profileStartTime;
+    private final String comment;
+
+    BinaryFormatWriter(
+        OutputStream out,
+        long profileStartTime,
+        String comment) {
+      // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by
+      // the write() method. Values for buffer sizes were chosen by running small amount of tests
+      // and identifying point of diminishing returns - but I have not really tried to optimize
+      // them.
+      this.out =
+          new DataOutputStream(
+              new BufferedOutputStream(
+                  new DeflaterOutputStream(
+                      // the DeflaterOutputStream has its own output buffer of 65k, chosen at random
+                      out, new Deflater(Deflater.BEST_SPEED, false), 65536),
+                  262144)); // buffer size, basically chosen at random
+      this.profileStartTime = profileStartTime;
+      this.comment = comment;
+    }
+
+    private void writeHeader() throws IOException {
+      out.writeInt(MAGIC); // magic
+      out.writeInt(VERSION); // protocol_version
+      out.writeUTF(comment);
+      // ProfileTask.values() method sorts enums using their ordinal() value, so
+      // there there is no need to store ordinal() value for each entry.
+      out.writeInt(TASK_COUNT);
+      for (ProfilerTask type : ProfilerTask.values()) {
+        out.writeUTF(type.toString());
+      }
+    }
+
+    /**
+     * Saves all gathered information from taskQueue queue to the file.
+     * Method is invoked internally by the Timer-based thread and at the end of
+     * profiling session.
+     */
+    @Override
+    public void run() {
+      try {
+        boolean receivedPoisonPill = false;
+        try {
+          writeHeader();
+          // Allocate the sink once to avoid GC
+          ByteBuffer sink = ByteBuffer.allocate(1024);
+          ObjectDescriber describer = new ObjectDescriber();
+          TaskData data;
+          while ((data = queue.take()) != POISON_PILL) {
+            sink.clear();
+
+            VarInt.putVarLong(data.threadId, sink);
+            VarInt.putVarInt(data.id, sink);
+            VarInt.putVarInt(data.parentId, sink);
+            VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink);
+            VarInt.putVarLong(data.duration, sink);
+
+            // To save space (and improve performance), convert all description
+            // strings to the canonical object and use IdentityHashMap to assign
+            // unique numbers for each string.
+            int descIndex = describer.getDescriptionIndex(data.description);
+            VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values.
+
+            // Save types using their ordinal() value
+            sink.put((byte) data.type.ordinal());
+
+            // Save aggregated data stats.
+            if (data.counts != null) {
+              for (int i = 0; i < TASK_COUNT; i++) {
+                if (data.counts[i] > 0) {
+                  sink.put((byte) i); // aggregated type ordinal value
+                  VarInt.putVarInt(data.counts[i], sink);
+                  VarInt.putVarLong(data.durations[i], sink);
+                }
+              }
+            }
+
+            out.writeInt(sink.position());
+            out.write(sink.array(), 0, sink.position());
+            if (describer.isUnassigned(descIndex)) {
+              out.writeUTF(describer.memoizeDescription(data.description));
+            }
+          }
+          receivedPoisonPill = true;
+          out.writeInt(EOF_MARKER);
+          out.close();
+        } catch (IOException e) {
+          this.savedException = e;
+          try {
+            out.close();
+          } catch (IOException e2) {
+            // ignore it
+          }
+          if (!receivedPoisonPill) {
+            while (queue.take() != POISON_PILL) {
+              // We keep emptying the queue, but we can't write anything.
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        // Exit silently.
+      }
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/ProfilerTask.java b/src/main/java/com/google/devtools/build/lib/profiler/ProfilerTask.java
index 39c9c86..2666930 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/ProfilerTask.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/ProfilerTask.java
@@ -30,12 +30,12 @@
   ACTION("action processing", 0x666699),
   __ACTION_BUILDER("parallel builder completion queue", 0xCC3399), // unused
   __ACTION_SUBMIT("execution queue submission", 0xCC3399), // unused
-  ACTION_CHECK("action dependency checking", 10000000, 0x999933, 0),
+  ACTION_CHECK("action dependency checking", 10000000, 0x999933, 0, false),
   ACTION_EXECUTE("action execution", 0x99CCFF),
-  ACTION_LOCK("action resource lock", 10000000, 0xCC9933, 0),
-  ACTION_RELEASE("action resource release", 10000000, 0x006666, 0),
+  ACTION_LOCK("action resource lock", 10000000, 0xCC9933, 0, false),
+  ACTION_RELEASE("action resource release", 10000000, 0x006666, 0, false),
   __ACTION_GRAPH("action graph dependency", 0x3399FF), // unused
-  ACTION_UPDATE("update action information", 10000000, 0x993300, 0),
+  ACTION_UPDATE("update action information", 10000000, 0x993300, 0, false),
   ACTION_COMPLETE("complete action execution", 0xCCCC99),
   INFO("general information", 0x000066),
   __EXCEPTION("exception", 0xFFCC66), // unused
@@ -47,12 +47,12 @@
   SCANNER("include scanner", 0x669999),
   // 30 is a good number because the slowest items are stored in a heap, with temporarily
   // one more element, and with 31 items, a heap becomes a complete binary tree
-  LOCAL_PARSE("Local parse to prepare for remote execution", 50000000, 0x6699CC, 30),
-  UPLOAD_TIME("Remote execution upload time", 50000000, 0x6699CC, 0),
-  PROCESS_TIME("Remote execution process wall time", 50000000, 0xF999CC, 0),
-  REMOTE_QUEUE("Remote execution queuing time", 50000000, 0xCC6600, 0),
-  REMOTE_SETUP("Remote execution setup", 50000000, 0xA999CC, 0),
-  FETCH("Remote execution file fetching", 50000000, 0xBB99CC, 0),
+  LOCAL_PARSE("Local parse to prepare for remote execution", 50000000, 0x6699CC, 30, false),
+  UPLOAD_TIME("Remote execution upload time", 50000000, 0x6699CC, 0, false),
+  PROCESS_TIME("Remote execution process wall time", 50000000, 0xF999CC, 0, false),
+  REMOTE_QUEUE("Remote execution queuing time", 50000000, 0xCC6600, 0, false),
+  REMOTE_SETUP("Remote execution setup", 50000000, 0xA999CC, 0, false),
+  FETCH("Remote execution file fetching", 50000000, 0xBB99CC, 0, false),
   VFS_STAT("VFS stat", 10000000, 0x9999FF, 30, true),
   VFS_DIR("VFS readdir", 10000000, 0x0066CC, 30, true),
   VFS_READLINK("VFS readlink", 10000000, 0x99CCCC, 30, true),
@@ -67,7 +67,7 @@
   VFS_VMFS_STAT("VMFS stat", 10000000, 0x9999FF, 0, true),
   VFS_VMFS_DIR("VMFS readdir", 10000000, 0x0066CC, 0, true),
   VFS_VMFS_READ("VMFS read", 10000000, 0x99CC33, 0, true),
-  WAIT("thread wait", 5000000, 0x66CCCC, 0),
+  WAIT("thread wait", 5000000, 0x66CCCC, 0, false),
   __CONFIGURED_TARGET("configured target creation", 0x663300), // unused
   __TRANSITIVE_CLOSURE("transitive closure creation", 0x996600), // unused
   __TEST("for testing only", 0x000000), // unused
@@ -108,15 +108,7 @@
   /** True if the metric records VFS operations */
   private final boolean vfs;
 
-  ProfilerTask(String description, int color) {
-    this(description, /* minDuration= */ -1, color, /* slowestInstanceCount= */ 0, /*vfs=*/ false);
-  }
-
-  ProfilerTask(String description, long minDuration, int color, int slowestInstanceCount) {
-    this(description, minDuration, color, slowestInstanceCount, /*vfs=*/ false);
-  }
-
-  ProfilerTask(String description, long minDuration, int color, int slowestInstanceCount,
+  private ProfilerTask(String description, long minDuration, int color, int slowestInstanceCount,
       boolean vfs) {
     this.description = description;
     this.minDuration = minDuration;
@@ -125,8 +117,17 @@
     this.vfs = vfs;
   }
 
-  ProfilerTask(String description) {
-    this(description, -1, 0x000000, 0);
+  private ProfilerTask(String description, int color) {
+    this(
+        description,
+        /* minDuration= */ -1,
+        color,
+        /* slowestInstanceCount= */ 0,
+        /* vfs= */ false);
+  }
+
+  private ProfilerTask(String description) {
+    this(description, -1, 0x000000, 0, /* vfs= */ false);
   }
 
   /** Whether the Profiler collects the slowest instances of this task. */
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/analysis/ProfileInfo.java b/src/main/java/com/google/devtools/build/lib/profiler/analysis/ProfileInfo.java
index 6104674..5050eb44 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/analysis/ProfileInfo.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/analysis/ProfileInfo.java
@@ -35,6 +35,7 @@
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
@@ -882,23 +883,22 @@
   /**
    * Loads and parses Blaze profile file.
    *
-   * @param profileFile profile file path
+   * @param profileStream profile file path
    *
    * @return ProfileInfo object with some fields populated (call calculateStats()
    *         and analyzeRelationships() to populate the remaining fields)
    * @throws UnsupportedEncodingException if the file format is invalid
    * @throws IOException if the file can't be read
    */
-  public static ProfileInfo loadProfile(Path profileFile)
-      throws IOException {
-    // It is extremely important to wrap InflaterInputStream using
-    // BufferedInputStream because majority of reads would be done using
-    // readInt()/readLong() methods and InflaterInputStream is very inefficient
-    // in handling small read requests (performance difference with 1MB buffer
-    // used below is almost 10x).
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(new InflaterInputStream(
-        profileFile.getInputStream(), new Inflater(false), 65536), 1024 * 1024));
+  public static ProfileInfo loadProfile(InputStream profileStream) throws IOException {
+    // It is extremely important to wrap InflaterInputStream using BufferedInputStream because
+    // the majority of reads would be done using readInt()/readLong() methods and
+    // InflaterInputStream is very inefficient in handling small read requests (performance
+    // difference with 1MB buffer used below is almost 10x).
+    DataInputStream in =
+        new DataInputStream(
+            new BufferedInputStream(
+                new InflaterInputStream(profileStream, new Inflater(false), 65536), 1024 * 1024));
 
     if (in.readInt() != Profiler.MAGIC) {
       in.close();
@@ -995,7 +995,10 @@
   public static ProfileInfo loadProfileVerbosely(Path profileFile, InfoListener reporter)
       throws IOException {
     reporter.info("Loading " + profileFile.getPathString());
-    ProfileInfo profileInfo = ProfileInfo.loadProfile(profileFile);
+    ProfileInfo profileInfo;
+    try (InputStream in = profileFile.getInputStream()) {
+      profileInfo = ProfileInfo.loadProfile(in);
+    }
     if (profileInfo.isCorruptedOrIncomplete()) {
       reporter.warn("Profile file is incomplete or corrupted - not all records were parsed");
     }
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index dcdca11..a7b1a96 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -86,7 +86,6 @@
 import com.google.devtools.common.options.OptionsParsingException;
 import com.google.devtools.common.options.OptionsProvider;
 import com.google.devtools.common.options.TriState;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -277,7 +276,7 @@
         Path profilePath = env.getWorkspace().getRelative(options.profilePath);
 
         recordFullProfilerData = options.recordFullProfilerData;
-        out = new BufferedOutputStream(profilePath.getOutputStream(), 1024 * 1024);
+        out = profilePath.getOutputStream();
         env.getReporter().handle(Event.info("Writing profile data to '" + profilePath + "'"));
         profiledTasks = ProfiledTaskKinds.ALL;
       } else if (options.alwaysProfileSlowOperations) {
@@ -289,8 +288,13 @@
         Profiler.instance().start(
             profiledTasks,
             out,
-            getProductName() + " profile for " + env.getOutputBase() + " at " + new Date()
-            + ", build ID: " + buildID,
+            Profiler.Format.BINARY_BAZEL_FORMAT,
+            String.format(
+                "%s profile for %s at %s, build ID: %s",
+                getProductName(),
+                env.getOutputBase(),
+                new Date(),
+                buildID),
             recordFullProfilerData,
             clock,
             execStartTimeNanos);
diff --git a/src/test/java/com/google/devtools/build/lib/profiler/AutoProfilerBenchmark.java b/src/test/java/com/google/devtools/build/lib/profiler/AutoProfilerBenchmark.java
index c3d9bbe..044ae8c 100644
--- a/src/test/java/com/google/devtools/build/lib/profiler/AutoProfilerBenchmark.java
+++ b/src/test/java/com/google/devtools/build/lib/profiler/AutoProfilerBenchmark.java
@@ -27,9 +27,14 @@
 
   @BeforeExperiment
   void startProfiler() throws Exception {
-    Profiler.instance().start(ProfiledTaskKinds.ALL,
-        new InMemoryFileSystem().getPath("/out.dat").getOutputStream(), "benchmark", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    Profiler.instance().start(
+        ProfiledTaskKinds.ALL,
+        new InMemoryFileSystem().getPath("/out.dat").getOutputStream(),
+        Profiler.Format.BINARY_BAZEL_FORMAT,
+        "benchmark",
+        false,
+        BlazeClock.instance(),
+        BlazeClock.instance().nanoTime());
   }
 
   @BeforeExperiment
diff --git a/src/test/java/com/google/devtools/build/lib/profiler/ProfilerChartTest.java b/src/test/java/com/google/devtools/build/lib/profiler/ProfilerChartTest.java
index 105afb0..6b3bd51 100644
--- a/src/test/java/com/google/devtools/build/lib/profiler/ProfilerChartTest.java
+++ b/src/test/java/com/google/devtools/build/lib/profiler/ProfilerChartTest.java
@@ -35,6 +35,7 @@
 import com.google.devtools.build.lib.testutil.Suite;
 import com.google.devtools.build.lib.testutil.TestSpec;
 import com.google.devtools.build.lib.vfs.Path;
+import java.io.InputStream;
 import java.util.List;
 import java.util.Locale;
 import org.junit.Test;
@@ -248,8 +249,14 @@
     Path cacheDir = scratch.dir("/tmp");
     Path cacheFile = cacheDir.getRelative("profile1.dat");
     Profiler profiler = Profiler.instance();
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "basic test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    profiler.start(
+        ProfiledTaskKinds.ALL,
+        cacheFile.getOutputStream(),
+        Profiler.Format.BINARY_BAZEL_FORMAT,
+        "basic test",
+        false,
+        BlazeClock.instance(),
+        BlazeClock.instance().nanoTime());
 
     // Write from multiple threads to generate multiple rows in the chart.
     for (int i = 0; i < noOfRows; i++) {
@@ -259,7 +266,9 @@
     }
 
     profiler.stop();
-    return ProfileInfo.loadProfile(cacheFile);
+    try (InputStream in = cacheFile.getInputStream()) {
+      return ProfileInfo.loadProfile(in);
+    }
   }
 
   private void task(final Profiler profiler, ProfilerTask task, String name) {
diff --git a/src/test/java/com/google/devtools/build/lib/profiler/ProfilerTest.java b/src/test/java/com/google/devtools/build/lib/profiler/ProfilerTest.java
index ae1d572..0fee591 100644
--- a/src/test/java/com/google/devtools/build/lib/profiler/ProfilerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/profiler/ProfilerTest.java
@@ -14,7 +14,9 @@
 package com.google.devtools.build.lib.profiler;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.profiler.Profiler.Format.BINARY_BAZEL_FORMAT;
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
+import static org.junit.Assert.fail;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteStreams;
@@ -23,15 +25,15 @@
 import com.google.devtools.build.lib.profiler.Profiler.ProfiledTaskKinds;
 import com.google.devtools.build.lib.profiler.Profiler.SlowTask;
 import com.google.devtools.build.lib.profiler.analysis.ProfileInfo;
-import com.google.devtools.build.lib.testutil.FoundationTestCase;
 import com.google.devtools.build.lib.testutil.ManualClock;
 import com.google.devtools.build.lib.testutil.Suite;
 import com.google.devtools.build.lib.testutil.TestSpec;
 import com.google.devtools.build.lib.testutil.TestUtils;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -40,6 +42,7 @@
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,29 +53,43 @@
  */
 @TestSpec(size = Suite.MEDIUM_TESTS) // testConcurrentProfiling takes ~700ms, testProfiler 100ms.
 @RunWith(JUnit4.class)
-public class ProfilerTest extends FoundationTestCase {
-
-  private Path cacheDir;
+public class ProfilerTest {
   private Profiler profiler = Profiler.instance();
   private ManualClock clock;
 
   @Before
-  public final void createCacheDirectory() throws Exception {
-    cacheDir = scratch.dir("/tmp");
-  }
-
-  @Before
   public final void setManualClock() {
     clock = new ManualClock();
     BlazeClock.setClock(clock);
   }
 
+  @After
+  public void forceStopToAvoidPoisoningTheProfiler() {
+    // If a test does not stop the profiler, e.g., due to a test failure, all subsequent tests fail
+    // because the profiler is still running, so we force-stop the profiler here.
+    try {
+      profiler.stop();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ByteArrayOutputStream start(ProfiledTaskKinds kinds, Profiler.Format format) {
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    profiler.start(
+        kinds, buffer, format, "test", false, BlazeClock.instance(), BlazeClock.nanoTime());
+    return buffer;
+  }
+
+  private void startUnbuffered(ProfiledTaskKinds kinds) {
+    profiler.start(
+        kinds, null, null, "test", false, BlazeClock.instance(), BlazeClock.nanoTime());
+  }
+
   @Test
   public void testProfilerActivation() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile1.dat");
     assertThat(profiler.isActive()).isFalse();
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "basic test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
     assertThat(profiler.isActive()).isTrue();
 
     profiler.stop();
@@ -81,14 +98,12 @@
 
   @Test
   public void testTaskDetails() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile1.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "basic test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
     try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION, "action task")) {
       profiler.logEvent(ProfilerTask.INFO, "event");
     }
     profiler.stop();
-    ProfileInfo info = ProfileInfo.loadProfile(cacheFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
 
     ProfileInfo.Task task = info.allTasksById.get(0);
@@ -104,9 +119,7 @@
 
   @Test
   public void testProfiler() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile1.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "basic test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
     profiler.logSimpleTask(BlazeClock.instance().nanoTime(),
                            ProfilerTask.PHASE, "profiler start");
     try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION, "complex task")) {
@@ -125,7 +138,7 @@
     // all other calls to profiler should be ignored
     profiler.logEvent(ProfilerTask.PHASE, "should be ignored");
 
-    ProfileInfo info = ProfileInfo.loadProfile(cacheFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.allTasksById).hasSize(6); // only 5 tasks + finalization should be recorded
 
@@ -160,16 +173,22 @@
 
   @Test
   public void testProfilerRecordingAllEvents() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile1.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "basic test", true,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    profiler.start(
+        ProfiledTaskKinds.ALL,
+        buffer,
+        BINARY_BAZEL_FORMAT,
+        "basic test",
+        true,
+        BlazeClock.instance(),
+        BlazeClock.instance().nanoTime());
     try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION, "action task")) {
       // Next task takes less than 10 ms but should be recorded anyway.
       clock.advanceMillis(1);
       profiler.logSimpleTask(BlazeClock.instance().nanoTime(), ProfilerTask.VFS_STAT, "stat1");
     }
     profiler.stop();
-    ProfileInfo info = ProfileInfo.loadProfile(cacheFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.allTasksById).hasSize(3); // 2 tasks + finalization should be recorded
 
@@ -182,10 +201,16 @@
 
   @Test
   public void testProfilerRecordingOnlySlowestEvents() throws Exception {
-    Path profileData = cacheDir.getRelative("foo");
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
-    profiler.start(ProfiledTaskKinds.SLOWEST, profileData.getOutputStream(), "test", true,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    profiler.start(
+        ProfiledTaskKinds.SLOWEST,
+        buffer,
+        BINARY_BAZEL_FORMAT,
+        "test",
+        true,
+        BlazeClock.instance(),
+        BlazeClock.instance().nanoTime());
     profiler.logSimpleTask(10000, 20000, ProfilerTask.VFS_STAT, "stat");
     profiler.logSimpleTask(20000, 30000, ProfilerTask.REMOTE_EXECUTION, "remote execution");
 
@@ -194,7 +219,7 @@
 
     profiler.stop();
 
-    ProfileInfo info = ProfileInfo.loadProfile(profileData);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.allTasksById).hasSize(1); // only VFS_STAT task should be recorded
 
@@ -203,14 +228,20 @@
   }
 
   @Test
+  public void testSlowestTasks() throws Exception {
+    startUnbuffered(ProfiledTaskKinds.ALL);
+    profiler.logSimpleTaskDuration(
+        Profiler.nanoTimeMaybe(), Duration.ofSeconds(10), ProfilerTask.LOCAL_PARSE, "foo");
+    Iterable<SlowTask> slowestTasks = profiler.getSlowestTasks();
+    assertThat(slowestTasks).hasSize(1);
+    SlowTask task = slowestTasks.iterator().next();
+    assertThat(task.type).isEqualTo(ProfilerTask.LOCAL_PARSE);
+    profiler.stop();
+  }
+
+  @Test
   public void testGetSlowestTasksCapped() throws Exception {
-    profiler.start(
-        ProfiledTaskKinds.SLOWEST,
-        ByteStreams.nullOutputStream(),
-        "test",
-        /*recordAllDurations=*/ true,
-        BlazeClock.instance(),
-        BlazeClock.instance().nanoTime());
+    startUnbuffered(ProfiledTaskKinds.SLOWEST);
 
     // Add some fast tasks - these shouldn't show up in the slowest.
     for (int i = 0; i < ProfilerTask.VFS_STAT.slowestInstancesCount; i++) {
@@ -279,10 +310,15 @@
 
   @Test
   public void testProfilerRecordsNothing() throws Exception {
-    Path profileData = cacheDir.getRelative("foo");
-
-    profiler.start(ProfiledTaskKinds.NONE, profileData.getOutputStream(), "test", true,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    profiler.start(
+        ProfiledTaskKinds.NONE,
+        buffer,
+        BINARY_BAZEL_FORMAT,
+        "test",
+        true,
+        BlazeClock.instance(),
+        BlazeClock.instance().nanoTime());
     profiler.logSimpleTask(10000, 20000, ProfilerTask.VFS_STAT, "stat");
 
     assertThat(ProfilerTask.VFS_STAT.collectsSlowestInstances()).isTrue();
@@ -290,16 +326,14 @@
 
     profiler.stop();
 
-    ProfileInfo info = ProfileInfo.loadProfile(profileData);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.allTasksById).isEmpty();
   }
 
   @Test
   public void testConcurrentProfiling() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile3.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "concurrent test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
 
     long id = Thread.currentThread().getId();
     Thread thread1 = new Thread() {
@@ -329,7 +363,7 @@
     }
     profiler.stop();
 
-    ProfileInfo info = ProfileInfo.loadProfile(cacheFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     info.analyzeRelationships();
     assertThat(info.allTasksById).hasSize(4 + 10000 + 10000); // total number of tasks
@@ -353,9 +387,7 @@
 
   @Test
   public void testPhaseTasks() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile4.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "phase test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
     Thread thread1 = new Thread() {
       @Override public void run() {
         for (int i = 0; i < 100; i++) {
@@ -394,7 +426,7 @@
     clock.advanceMillis(1);
     profiler.stop();
 
-    ProfileInfo info = ProfileInfo.loadProfile(cacheFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     info.analyzeRelationships();
     // number of tasks: INIT(1) + LOAD(1) + Thread1.TEST(100) + ANALYZE(1)
@@ -417,9 +449,7 @@
 
   @Test
   public void testCorruptedFile() throws Exception {
-    Path cacheFile = cacheDir.getRelative("profile5.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(), "phase test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
     for (int i = 0; i < 100; i++) {
       try (SilentCloseable c = profiler.profile(ProfilerTask.INFO, "outer task " + i)) {
         clock.advanceMillis(1);
@@ -428,14 +458,12 @@
     }
     profiler.stop();
 
-    ProfileInfo info = ProfileInfo.loadProfile(cacheFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.isCorruptedOrIncomplete()).isFalse();
 
-    Path corruptedFile = cacheDir.getRelative("profile5bad.dat");
-    FileSystemUtils.writeContent(
-        corruptedFile, Arrays.copyOf(FileSystemUtils.readContent(cacheFile), 2000));
-    info = ProfileInfo.loadProfile(corruptedFile);
+    info = ProfileInfo.loadProfile(
+        new ByteArrayInputStream(Arrays.copyOf(buffer.toByteArray(), 2000)));
     info.calculateStats();
     assertThat(info.isCorruptedOrIncomplete()).isTrue();
     // Since root tasks will appear after nested tasks in the profile file and
@@ -447,9 +475,7 @@
 
   @Test
   public void testUnsupportedProfilerRecord() throws Exception {
-    Path dataFile = cacheDir.getRelative("profile5.dat");
-    profiler.start(ProfiledTaskKinds.ALL, dataFile.getOutputStream(), "phase test", false,
-        BlazeClock.instance(), BlazeClock.instance().nanoTime());
+    ByteArrayOutputStream buffer = start(ProfiledTaskKinds.ALL, BINARY_BAZEL_FORMAT);
     try (SilentCloseable c = profiler.profile(ProfilerTask.INFO, "outer task")) {
       profiler.logEvent(ProfilerTask.PHASE, "inner task");
     }
@@ -459,29 +485,28 @@
     profiler.stop();
 
     // Validate our test profile.
-    ProfileInfo info = ProfileInfo.loadProfile(dataFile);
+    ProfileInfo info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.isCorruptedOrIncomplete()).isFalse();
     assertThat(info.getStatsForType(ProfilerTask.INFO, info.rootTasksById).count).isEqualTo(3);
     assertThat(info.getStatsForType(ProfilerTask.UNKNOWN, info.rootTasksById).count).isEqualTo(0);
 
     // Now replace "TEST" type with something unsupported - e.g. "XXXX".
-    InputStream in = new InflaterInputStream(dataFile.getInputStream(), new Inflater(false), 65536);
-    byte[] buffer = new byte[60000];
-    int len = in.read(buffer);
-    in.close();
-    assertThat(len).isLessThan(buffer.length); // Validate that file was completely decoded.
-    String content = new String(buffer, ISO_8859_1);
+    byte[] deflated = ByteStreams.toByteArray(new InflaterInputStream(
+        new ByteArrayInputStream(buffer.toByteArray()), new Inflater(false), 65536));
+    String content = new String(deflated, ISO_8859_1);
     int infoIndex = content.indexOf("INFO");
     assertThat(infoIndex).isGreaterThan(0);
     content = content.substring(0, infoIndex) + "XXXX" + content.substring(infoIndex + 4);
-    OutputStream out = new DeflaterOutputStream(dataFile.getOutputStream(),
-        new Deflater(Deflater.BEST_SPEED, false), 65536);
+
+    buffer = new ByteArrayOutputStream();
+    OutputStream out =
+        new DeflaterOutputStream(buffer, new Deflater(Deflater.BEST_SPEED, false), 65536);
     out.write(content.getBytes(ISO_8859_1));
     out.close();
 
     // Validate that XXXX records were classified as UNKNOWN.
-    info = ProfileInfo.loadProfile(dataFile);
+    info = ProfileInfo.loadProfile(new ByteArrayInputStream(buffer.toByteArray()));
     info.calculateStats();
     assertThat(info.isCorruptedOrIncomplete()).isFalse();
     assertThat(info.getStatsForType(ProfilerTask.INFO, info.rootTasksById).count).isEqualTo(0);
@@ -505,10 +530,71 @@
         return initialNanoTime - numNanoTimeCalls.addAndGet(1);
       }
     };
-    Path cacheFile = cacheDir.getRelative("profile1.dat");
-    profiler.start(ProfiledTaskKinds.ALL, cacheFile.getOutputStream(),
-        "testResilenceToNonDecreasingNanoTimes", false, badClock, initialNanoTime);
+    profiler.start(
+        ProfiledTaskKinds.ALL,
+        new ByteArrayOutputStream(),
+        BINARY_BAZEL_FORMAT,
+        "testResilenceToNonDecreasingNanoTimes",
+        false,
+        badClock,
+        initialNanoTime);
     profiler.logSimpleTask(badClock.nanoTime(), ProfilerTask.INFO, "some task");
     profiler.stop();
   }
+
+  /** Checks that the histograms are cleared in the stop call. */
+  @Test
+  public void testEmptyTaskHistograms() throws Exception {
+    startUnbuffered(ProfiledTaskKinds.ALL);
+    profiler.logSimpleTaskDuration(
+        Profiler.nanoTimeMaybe(), Duration.ofSeconds(10), ProfilerTask.INFO, "foo");
+    profiler.stop();
+    ImmutableList<StatRecorder> histograms = profiler.getTasksHistograms();
+    for (StatRecorder recorder : histograms) {
+      assertThat(recorder.isEmpty()).isTrue();
+    }
+  }
+
+  @Test
+  public void testTaskHistograms() throws Exception {
+    startUnbuffered(ProfiledTaskKinds.ALL);
+    profiler.logSimpleTaskDuration(
+        Profiler.nanoTimeMaybe(), Duration.ofSeconds(10), ProfilerTask.INFO, "foo");
+    ImmutableList<StatRecorder> histograms = profiler.getTasksHistograms();
+    StatRecorder infoStatRecorder = histograms.get(ProfilerTask.INFO.ordinal());
+    assertThat(infoStatRecorder.isEmpty()).isFalse();
+    // This is the only provided API to get the contents of the StatRecorder.
+    assertThat(infoStatRecorder.toString()).contains("'INFO'");
+    assertThat(infoStatRecorder.toString()).contains("Count: 1");
+    assertThat(infoStatRecorder.toString()).contains("[8192..16384 ms]");
+    // The stop() call is here because the histograms are cleared in the stop call. See the
+    // documentation of {@link Profiler#getTasksHistograms}.
+    profiler.stop();
+  }
+
+  @Test
+  public void testIOExceptionInOutputStream() throws Exception {
+    OutputStream failingOutputStream = new OutputStream() {
+      @Override
+      public void write(int b) throws IOException {
+        throw new IOException("Expected failure.");
+      }
+    };
+    profiler.start(
+        ProfiledTaskKinds.ALL,
+        failingOutputStream,
+        BINARY_BAZEL_FORMAT,
+        "basic test",
+        false,
+        BlazeClock.instance(),
+        BlazeClock.instance().nanoTime());
+    profiler.logSimpleTaskDuration(
+        Profiler.nanoTimeMaybe(), Duration.ofSeconds(10), ProfilerTask.INFO, "foo");
+    try {
+      profiler.stop();
+      fail();
+    } catch (IOException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("Expected failure.");
+    }
+  }
 }