Introduce --experimental_parallel_aquery_output.

This flag would enable blaze to process the analysis graph in parallel while
constructing an aquery result, instead of sequentially. The implementation follows a producer-consumer pattern to reduce the impact of the IO bottleneck.

PiperOrigin-RevId: 518804027
Change-Id: I912a0170767c1a3a0184bbd024d48b037ec7de19
diff --git a/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java
index ad41422..1afdc5c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java
@@ -13,10 +13,12 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.aquery;
 
+import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.actions.CommandLineExpansionException;
 import com.google.devtools.build.lib.analysis.AspectValue;
 import com.google.devtools.build.lib.analysis.ConfiguredTargetValue;
 import com.google.devtools.build.lib.analysis.actions.TemplateExpansionException;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.profiler.Profiler;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
@@ -24,18 +26,29 @@
 import com.google.devtools.build.lib.skyframe.RuleConfiguredTargetValue;
 import com.google.devtools.build.lib.skyframe.SkyframeExecutor;
 import com.google.devtools.build.lib.skyframe.actiongraph.v2.ActionGraphDump;
+import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryConsumingOutputHandler;
 import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryOutputHandler;
 import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryOutputHandler.OutputType;
 import com.google.devtools.build.lib.skyframe.actiongraph.v2.MonolithicOutputHandler;
+import com.google.devtools.build.lib.skyframe.actiongraph.v2.StreamedConsumingOutputHandler;
 import com.google.devtools.build.lib.skyframe.actiongraph.v2.StreamedOutputHandler;
 import com.google.protobuf.CodedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /** Default output callback for aquery, prints proto output. */
 public class ActionGraphProtoOutputFormatterCallback extends AqueryThreadsafeCallback {
+  // TODO(b/274595070): Clean this up after flag flip.
 
+  // Arbitrarily chosen. Large enough for good performance, small enough not to cause OOMs.
+  private static final int BLOCKING_QUEUE_SIZE = Runtime.getRuntime().availableProcessors() * 2;
   private final OutputType outputType;
   private final ActionGraphDump actionGraphDump;
   private final AqueryActionFilter actionFilters;
@@ -58,7 +71,8 @@
     super(eventHandler, options, out, skyframeExecutor, accessor);
     this.outputType = outputType;
     this.actionFilters = actionFilters;
-    this.aqueryOutputHandler = constructAqueryOutputHandler(outputType, out, printStream);
+    this.aqueryOutputHandler =
+        constructAqueryOutputHandler(outputType, out, printStream, options.parallelAqueryOutput);
     this.actionGraphDump =
         new ActionGraphDump(
             options.includeCommandline,
@@ -73,11 +87,22 @@
 
   public static AqueryOutputHandler constructAqueryOutputHandler(
       OutputType outputType, OutputStream out, PrintStream printStream) {
+    return constructAqueryOutputHandler(outputType, out, printStream, /* parallelized= */ false);
+  }
+
+  private static AqueryOutputHandler constructAqueryOutputHandler(
+      OutputType outputType, OutputStream out, PrintStream printStream, boolean parallelized) {
     switch (outputType) {
       case BINARY:
       case TEXT:
-        return new StreamedOutputHandler(
-            outputType, CodedOutputStream.newInstance(out, OUTPUT_BUFFER_SIZE), printStream);
+        return parallelized
+            ? new StreamedConsumingOutputHandler(
+                outputType,
+                CodedOutputStream.newInstance(out, OUTPUT_BUFFER_SIZE),
+                printStream,
+                new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE))
+            : new StreamedOutputHandler(
+                outputType, CodedOutputStream.newInstance(out, OUTPUT_BUFFER_SIZE), printStream);
       case JSON:
         return new MonolithicOutputHandler(printStream);
     }
@@ -91,38 +116,117 @@
   }
 
   @Override
+  public void close(boolean failFast) throws IOException {
+    if (!failFast) {
+      try (SilentCloseable c = Profiler.instance().profile("aqueryOutputHandler.close")) {
+        aqueryOutputHandler.close();
+      }
+    }
+  }
+
+  @Override
   public void processOutput(Iterable<KeyedConfiguredTargetValue> partialResult)
       throws IOException, InterruptedException {
+    if (options.parallelAqueryOutput
+        && aqueryOutputHandler instanceof AqueryConsumingOutputHandler) {
+      processOutputInParallel(partialResult);
+      return;
+    }
+
     try (SilentCloseable c = Profiler.instance().profile("process partial result")) {
       // Enabling includeParamFiles should enable includeCommandline by default.
       options.includeCommandline |= options.includeParamFiles;
 
       for (KeyedConfiguredTargetValue keyedConfiguredTargetValue : partialResult) {
-        ConfiguredTargetValue configuredTargetValue =
-            keyedConfiguredTargetValue.getConfiguredTargetValue();
-        if (!(configuredTargetValue instanceof RuleConfiguredTargetValue)) {
-          // We have to include non-rule values in the graph to visit their dependencies, but they
-          // don't have any actions to print out.
-          continue;
-        }
-        actionGraphDump.dumpConfiguredTarget((RuleConfiguredTargetValue) configuredTargetValue);
-        if (options.useAspects) {
-          for (AspectValue aspectValue : accessor.getAspectValues(keyedConfiguredTargetValue)) {
-            actionGraphDump.dumpAspect(aspectValue, configuredTargetValue);
-          }
-        }
+        processSingleEntry(keyedConfiguredTargetValue);
       }
     } catch (CommandLineExpansionException | TemplateExpansionException e) {
       throw new IOException(e.getMessage());
     }
   }
 
-  @Override
-  public void close(boolean failFast) throws IOException {
-    if (!failFast) {
-      try (SilentCloseable c = Profiler.instance().profile("aqueryOutputHandler.close")) {
-        aqueryOutputHandler.close();
+  private void processSingleEntry(KeyedConfiguredTargetValue keyedConfiguredTargetValue)
+      throws CommandLineExpansionException,
+          InterruptedException,
+          IOException,
+          TemplateExpansionException {
+    ConfiguredTargetValue configuredTargetValue =
+        keyedConfiguredTargetValue.getConfiguredTargetValue();
+    if (!(configuredTargetValue instanceof RuleConfiguredTargetValue)) {
+      // We have to include non-rule values in the graph to visit their dependencies, but they
+      // don't have any actions to print out.
+      return;
+    }
+    actionGraphDump.dumpConfiguredTarget((RuleConfiguredTargetValue) configuredTargetValue);
+    if (options.useAspects) {
+      for (AspectValue aspectValue : accessor.getAspectValues(keyedConfiguredTargetValue)) {
+        actionGraphDump.dumpAspect(aspectValue, configuredTargetValue);
       }
     }
   }
+
+  private void processOutputInParallel(Iterable<KeyedConfiguredTargetValue> partialResult)
+      throws IOException, InterruptedException {
+    AqueryConsumingOutputHandler aqueryConsumingOutputHandler =
+        (AqueryConsumingOutputHandler) aqueryOutputHandler;
+    try (SilentCloseable c = Profiler.instance().profile("process partial result")) {
+      // Enabling includeParamFiles should enable includeCommandline by default.
+      options.includeCommandline |= options.includeParamFiles;
+      aqueryConsumingOutputHandler.startConsumer();
+      ForkJoinPool executor =
+          NamedForkJoinPool.newNamedPool("aquery", Runtime.getRuntime().availableProcessors());
+
+      try {
+        List<Future<Void>> futures = executor.invokeAll(toTasks(partialResult));
+        for (Future<Void> future : futures) {
+          future.get();
+        }
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof CommandLineExpansionException
+            || cause instanceof TemplateExpansionException) {
+          // This is kinda weird, but keeping it in line with the status quo for now.
+          // TODO(b/266179316): Clean this up.
+          throw new IOException(cause.getMessage());
+        }
+        if (cause instanceof IOException) {
+          throw (IOException) cause;
+        }
+        if (cause instanceof InterruptedException) {
+          throw (InterruptedException) cause;
+        }
+        throw new IllegalStateException("Unexpected exception type: ", e);
+      } finally {
+        aqueryConsumingOutputHandler.stopConsumer();
+        executor.shutdown();
+      }
+    }
+  }
+
+  private ImmutableList<AqueryOutputTask> toTasks(Iterable<KeyedConfiguredTargetValue> values) {
+    ImmutableList.Builder<AqueryOutputTask> tasks = ImmutableList.builder();
+    for (KeyedConfiguredTargetValue value : values) {
+      tasks.add(new AqueryOutputTask(value));
+    }
+    return tasks.build();
+  }
+
+  private final class AqueryOutputTask implements Callable<Void> {
+
+    private final KeyedConfiguredTargetValue keyedConfiguredTargetValue;
+
+    AqueryOutputTask(KeyedConfiguredTargetValue keyedConfiguredTargetValue) {
+      this.keyedConfiguredTargetValue = keyedConfiguredTargetValue;
+    }
+
+    @Override
+    public Void call()
+        throws CommandLineExpansionException,
+            TemplateExpansionException,
+            IOException,
+            InterruptedException {
+      processSingleEntry(keyedConfiguredTargetValue);
+      return null;
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/aquery/AqueryOptions.java b/src/main/java/com/google/devtools/build/lib/query2/aquery/AqueryOptions.java
index 0fd4af4..f848580 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/aquery/AqueryOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/aquery/AqueryOptions.java
@@ -89,4 +89,14 @@
               + " output. This does not deduplicate depsets that don't share an immediate parent."
               + " This does not affect the final effective list of input artifacts of the actions.")
   public boolean deduplicateDepsets;
+
+  @Option(
+      name = "experimental_parallel_aquery_output",
+      defaultValue = "false",
+      documentationCategory = OptionDocumentationCategory.QUERY,
+      effectTags = {OptionEffectTag.UNKNOWN},
+      help =
+          "Whether aquery proto/textproto output should be written in parallel. No-op for the "
+              + "other output formats.")
+  public boolean parallelAqueryOutput;
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java
new file mode 100644
index 0000000..44dbfad
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java
@@ -0,0 +1,22 @@
+// Copyright 2023 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.skyframe.actiongraph.v2;
+
+/** AqueryOutputHandler that receives and consumes tasks via a work queue. */
+public interface AqueryConsumingOutputHandler extends AqueryOutputHandler {
+
+  void startConsumer();
+
+  void stopConsumer();
+}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BUILD b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BUILD
index 37515ed..e60c0ca 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BUILD
@@ -35,6 +35,7 @@
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/net/starlark/java/eval",
         "//src/main/protobuf:analysis_v2_java_proto",
+        "//third_party:auto_value",
         "//third_party:guava",
         "//third_party:jsr305",
         "//third_party/protobuf:protobuf_java",
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BaseCache.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BaseCache.java
index e4eff0f..5719ce5 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BaseCache.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/BaseCache.java
@@ -14,14 +14,14 @@
 package com.google.devtools.build.lib.skyframe.actiongraph.v2;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Basic class to abstract action graph cache functionality.
  */
 abstract class BaseCache<K, P> {
-  private final Map<K, Integer> cache = new HashMap<>();
+  private final Map<K, Integer> cache = new ConcurrentHashMap<>();
   protected final AqueryOutputHandler aqueryOutputHandler;
 
   BaseCache(AqueryOutputHandler aqueryOutputHandler) {
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/PrintTask.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/PrintTask.java
new file mode 100644
index 0000000..5f0443f
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/PrintTask.java
@@ -0,0 +1,54 @@
+// Copyright 2023 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.skyframe.actiongraph.v2;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.Message;
+import javax.annotation.Nullable;
+
+/**
+ * Represent a task to be consumed by a {@link AqueryConsumingOutputHandler}.
+ *
+ * <p>We have separate Proto/TextProto subclasses to reduce some memory waste: we'll never need both
+ * the fieldNumber and the messageLabel in a PrintTask.
+ */
+@SuppressWarnings("InterfaceWithOnlyStatics")
+public interface PrintTask {
+
+  /** A task for the proto format. */
+  @AutoValue
+  abstract class ProtoPrintTask implements PrintTask {
+    @Nullable
+    abstract Message message();
+
+    abstract int fieldNumber();
+
+    public static ProtoPrintTask create(Message message, int fieldNumber) {
+      return new AutoValue_PrintTask_ProtoPrintTask(message, fieldNumber);
+    }
+  }
+
+  /** A task for the textproto format. */
+  @AutoValue
+  abstract class TextProtoPrintTask implements PrintTask {
+    @Nullable
+    abstract Message message();
+
+    abstract String messageLabel();
+
+    public static TextProtoPrintTask create(Message message, String messageLabel) {
+      return new AutoValue_PrintTask_TextProtoPrintTask(message, messageLabel);
+    }
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java
new file mode 100644
index 0000000..bc89b28
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java
@@ -0,0 +1,170 @@
+// Copyright 2023 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.skyframe.actiongraph.v2;
+
+import static com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryOutputHandler.OutputType.BINARY;
+import static com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryOutputHandler.OutputType.TEXT;
+
+import com.google.common.base.Preconditions;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.Action;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.ActionGraphContainer;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.Artifact;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.AspectDescriptor;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.Configuration;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.DepSetOfFiles;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.PathFragment;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.RuleClass;
+import com.google.devtools.build.lib.analysis.AnalysisProtosV2.Target;
+import com.google.devtools.build.lib.skyframe.actiongraph.v2.PrintTask.ProtoPrintTask;
+import com.google.devtools.build.lib.skyframe.actiongraph.v2.PrintTask.TextProtoPrintTask;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.concurrent.BlockingQueue;
+
+/** Manages the various streamed output channels of aquery. This does not support JSON format. */
+public class StreamedConsumingOutputHandler implements AqueryConsumingOutputHandler {
+
+  public static final PrintTask POISON_PILL = ProtoPrintTask.create(null, 0);
+  private final OutputType outputType;
+  private final CodedOutputStream outputStream;
+  private final PrintStream printStream;
+
+  private final BlockingQueue<PrintTask> queue;
+
+  public StreamedConsumingOutputHandler(
+      OutputType outputType,
+      CodedOutputStream outputStream,
+      PrintStream printStream,
+      BlockingQueue<PrintTask> queue) {
+    this.outputType = outputType;
+    Preconditions.checkArgument(
+        outputType == BINARY || outputType == TEXT,
+        "Only proto and textproto outputs should be streamed.");
+    this.outputStream = outputStream;
+    this.printStream = printStream;
+    this.queue = queue;
+  }
+
+  @Override
+  public void outputArtifact(Artifact message) {
+    addTaskToQueue(message, ActionGraphContainer.ARTIFACTS_FIELD_NUMBER, "artifacts");
+  }
+
+  @Override
+  public void outputAction(Action message) {
+    addTaskToQueue(message, ActionGraphContainer.ACTIONS_FIELD_NUMBER, "actions");
+  }
+
+  @Override
+  public void outputTarget(Target message) {
+    addTaskToQueue(message, ActionGraphContainer.TARGETS_FIELD_NUMBER, "targets");
+  }
+
+  @Override
+  public void outputDepSetOfFiles(DepSetOfFiles message) {
+    addTaskToQueue(message, ActionGraphContainer.DEP_SET_OF_FILES_FIELD_NUMBER, "dep_set_of_files");
+  }
+
+  @Override
+  public void outputConfiguration(Configuration message) {
+    addTaskToQueue(message, ActionGraphContainer.CONFIGURATION_FIELD_NUMBER, "configuration");
+  }
+
+  @Override
+  public void outputAspectDescriptor(AspectDescriptor message) {
+    addTaskToQueue(
+        message, ActionGraphContainer.ASPECT_DESCRIPTORS_FIELD_NUMBER, "aspect_descriptors");
+  }
+
+  @Override
+  public void outputRuleClass(RuleClass message) {
+    addTaskToQueue(message, ActionGraphContainer.RULE_CLASSES_FIELD_NUMBER, "rule_classes");
+  }
+
+  @Override
+  public void outputPathFragment(PathFragment message) {
+    addTaskToQueue(message, ActionGraphContainer.PATH_FRAGMENTS_FIELD_NUMBER, "path_fragments");
+  }
+
+  @Override
+  public void startConsumer() {
+    new Thread(new AqueryOutputTaskConsumer(queue)).start();
+  }
+
+  @Override
+  public void stopConsumer() {
+    queue.add(POISON_PILL);
+  }
+
+  /** Construct the printing task and put it in the queue. */
+  void addTaskToQueue(Message message, int fieldNumber, String messageLabel) {
+    try {
+      queue.put(
+          outputType == BINARY
+              ? ProtoPrintTask.create(message, fieldNumber)
+              : TextProtoPrintTask.create(message, messageLabel));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    outputStream.flush();
+    printStream.flush();
+  }
+
+  private class AqueryOutputTaskConsumer implements Runnable {
+    private final BlockingQueue<PrintTask> queue;
+
+    AqueryOutputTaskConsumer(BlockingQueue<PrintTask> queue) {
+      this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          PrintTask nextTask = queue.take();
+
+          if (nextTask.equals(POISON_PILL)) {
+            return;
+          }
+          switch (outputType) {
+            case BINARY:
+              ProtoPrintTask protoPrintTask = (ProtoPrintTask) nextTask;
+              outputStream.writeMessage(protoPrintTask.fieldNumber(), protoPrintTask.message());
+              break;
+            case TEXT:
+              TextProtoPrintTask textProtoPrintTask = (TextProtoPrintTask) nextTask;
+              printStream.print(
+                  textProtoPrintTask.messageLabel()
+                      + " {\n"
+                      + textProtoPrintTask.message()
+                      + "}\n");
+              break;
+            default:
+              throw new IllegalStateException("Unknown outputType " + outputType.formatName());
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (IOException e) {
+        throw new IllegalStateException("Unexpected exception: ", e);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedOutputHandler.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedOutputHandler.java
index 51c537e..513de4f 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedOutputHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedOutputHandler.java
@@ -31,7 +31,10 @@
 import java.io.IOException;
 import java.io.PrintStream;
 
-/** Manages the various streamed output channels of aquery. This does not support JSON format. */
+/**
+ * Manages the various streamed output channels of aquery. This does not support JSON format.
+ * TODO(b/274595070) Remove this class after the flag flip.
+ */
 public class StreamedOutputHandler implements AqueryOutputHandler {
   private final OutputType outputType;
   private final CodedOutputStream outputStream;