[aquery] Also parallelize the output producer in case of --skyframe_state.

The previous change (https://github.com/bazelbuild/bazel/commit/68702a7affc5bdd6bf924726c39a1cd4bae9de8b) left out this part.

Also fix a concurrency bug with the parallel output. This scenario was possible before:

```
    Main thread: queue.put(POISON_PILL) -> outputStream.flush()
Consumer thread:              <consumer printing stuff> -> queue.take(POISON_PILL)
```

What happened there was the outputStream being flushed (via close()) and the eventual closing of the AqueryOutputHandler while the consumer maybe still be going through the tasks in the queue. This would cause unexpected output from blaze.

By forcing #stopConsumer to wait for the consumer to finish consuming the POISON_PILL, we can avoid this race condition. As POISON_PILL is guaranteed to be the last task, we can be sure that nothing's left to be printed in the output streams by the time we're done with #stopConsumer.

PiperOrigin-RevId: 528773712
Change-Id: Iffc16d27223fdfb7c9d3329f9973593bfb4b2283
diff --git a/src/main/java/com/google/devtools/build/lib/buildtool/AqueryProcessor.java b/src/main/java/com/google/devtools/build/lib/buildtool/AqueryProcessor.java
index 5d4c0c1..ab7a01e 100644
--- a/src/main/java/com/google/devtools/build/lib/buildtool/AqueryProcessor.java
+++ b/src/main/java/com/google/devtools/build/lib/buildtool/AqueryProcessor.java
@@ -80,7 +80,8 @@
           ActionGraphProtoOutputFormatterCallback.constructAqueryOutputHandler(
               OutputType.fromString(aqueryOptions.outputFormat),
               queryRuntimeHelper.getOutputStreamForQueryOutput(),
-              printStream)) {
+              printStream,
+              aqueryOptions.parallelAqueryOutput)) {
         ActionGraphDump actionGraphDump =
             new ActionGraphDump(
                 aqueryOptions.includeCommandline,
@@ -125,18 +126,9 @@
       ActionGraphDump actionGraphDump)
       throws CommandLineExpansionException, TemplateExpansionException, IOException {
     if (aqueryOutputHandler instanceof AqueryConsumingOutputHandler) {
-      AqueryConsumingOutputHandler aqueryConsumingOutputHandler =
-          (AqueryConsumingOutputHandler) aqueryOutputHandler;
-      try {
-        aqueryConsumingOutputHandler.startConsumer();
-        ((SequencedSkyframeExecutor) env.getSkyframeExecutor()).dumpSkyframeState(actionGraphDump);
-      } finally {
-        try {
-          aqueryConsumingOutputHandler.stopConsumer();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
+      ((SequencedSkyframeExecutor) env.getSkyframeExecutor())
+          .dumpSkyframeStateInParallel(
+              actionGraphDump, (AqueryConsumingOutputHandler) aqueryOutputHandler);
     } else {
       ((SequencedSkyframeExecutor) env.getSkyframeExecutor()).dumpSkyframeState(actionGraphDump);
     }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java
index 1afdc5c..871592e 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/aquery/ActionGraphProtoOutputFormatterCallback.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.aquery;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.devtools.build.lib.actions.CommandLineExpansionException;
 import com.google.devtools.build.lib.analysis.AspectValue;
@@ -90,7 +91,7 @@
     return constructAqueryOutputHandler(outputType, out, printStream, /* parallelized= */ false);
   }
 
-  private static AqueryOutputHandler constructAqueryOutputHandler(
+  public static AqueryOutputHandler constructAqueryOutputHandler(
       OutputType outputType, OutputStream out, PrintStream printStream, boolean parallelized) {
     switch (outputType) {
       case BINARY:
@@ -172,32 +173,31 @@
     try (SilentCloseable c = Profiler.instance().profile("process partial result")) {
       // Enabling includeParamFiles should enable includeCommandline by default.
       options.includeCommandline |= options.includeParamFiles;
-      aqueryConsumingOutputHandler.startConsumer();
       ForkJoinPool executor =
           NamedForkJoinPool.newNamedPool("aquery", Runtime.getRuntime().availableProcessors());
 
       try {
+        Future<Void> consumerFuture = executor.submit(aqueryConsumingOutputHandler.startConsumer());
         List<Future<Void>> futures = executor.invokeAll(toTasks(partialResult));
         for (Future<Void> future : futures) {
           future.get();
         }
+        aqueryConsumingOutputHandler.stopConsumer(/* discardRemainingTasks= */ false);
+        // Get any possible exception from the consumer.
+        consumerFuture.get();
       } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
+        aqueryConsumingOutputHandler.stopConsumer(/* discardRemainingTasks= */ true);
+        Throwable cause = Throwables.getRootCause(e);
         if (cause instanceof CommandLineExpansionException
             || cause instanceof TemplateExpansionException) {
           // This is kinda weird, but keeping it in line with the status quo for now.
           // TODO(b/266179316): Clean this up.
           throw new IOException(cause.getMessage());
         }
-        if (cause instanceof IOException) {
-          throw (IOException) cause;
-        }
-        if (cause instanceof InterruptedException) {
-          throw (InterruptedException) cause;
-        }
+        Throwables.propagateIfPossible(cause, IOException.class);
+        Throwables.propagateIfPossible(cause, InterruptedException.class);
         throw new IllegalStateException("Unexpected exception type: ", e);
       } finally {
-        aqueryConsumingOutputHandler.stopConsumer();
         executor.shutdown();
       }
     }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SequencedSkyframeExecutor.java b/src/main/java/com/google/devtools/build/lib/skyframe/SequencedSkyframeExecutor.java
index bba4f92..31f3274 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SequencedSkyframeExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SequencedSkyframeExecutor.java
@@ -17,6 +17,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -38,6 +39,7 @@
 import com.google.devtools.build.lib.bugreport.BugReporter;
 import com.google.devtools.build.lib.buildtool.BuildRequestOptions;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
 import com.google.devtools.build.lib.concurrent.QuiescingExecutors;
 import com.google.devtools.build.lib.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.events.Event;
@@ -60,6 +62,7 @@
 import com.google.devtools.build.lib.skyframe.PackageFunction.ActionOnIOExceptionReadingBuildFile;
 import com.google.devtools.build.lib.skyframe.PackageLookupFunction.CrossRepositoryLabelViolationStrategy;
 import com.google.devtools.build.lib.skyframe.actiongraph.v2.ActionGraphDump;
+import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryConsumingOutputHandler;
 import com.google.devtools.build.lib.skyframe.rewinding.RewindableGraphInconsistencyReceiver;
 import com.google.devtools.build.lib.util.AbruptExitException;
 import com.google.devtools.build.lib.util.ResourceUsage;
@@ -93,6 +96,10 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
@@ -479,6 +486,67 @@
     return new ArrayList<>(ruleStats.values());
   }
 
+  public void dumpSkyframeStateInParallel(
+      ActionGraphDump actionGraphDump, AqueryConsumingOutputHandler aqueryConsumingOutputHandler)
+      throws CommandLineExpansionException, IOException, TemplateExpansionException {
+    ImmutableList.Builder<Callable<Void>> tasks = ImmutableList.builder();
+
+    try {
+      for (Map.Entry<SkyKey, SkyValue> skyKeyAndValue :
+          memoizingEvaluator.getDoneValues().entrySet()) {
+        SkyKey key = skyKeyAndValue.getKey();
+        SkyValue skyValue = skyKeyAndValue.getValue();
+        if (skyValue == null) {
+          // The skyValue may be null in case analysis of the previous build failed.
+          continue;
+        }
+        if (skyValue instanceof RuleConfiguredTargetValue) {
+          tasks.add(
+              () -> {
+                actionGraphDump.dumpConfiguredTarget((RuleConfiguredTargetValue) skyValue);
+                return null;
+              });
+        } else if (key.functionName().equals(SkyFunctions.ASPECT)) {
+          AspectValue aspectValue = (AspectValue) skyValue;
+          AspectKey aspectKey = (AspectKey) key;
+          ConfiguredTargetValue configuredTargetValue =
+              (ConfiguredTargetValue)
+                  memoizingEvaluator.getExistingValue(aspectKey.getBaseConfiguredTargetKey());
+          tasks.add(
+              () -> {
+                actionGraphDump.dumpAspect(aspectValue, configuredTargetValue);
+                return null;
+              });
+        }
+      }
+      ForkJoinPool executor =
+          NamedForkJoinPool.newNamedPool(
+              "action-graph-dump", Runtime.getRuntime().availableProcessors());
+      try {
+        Future<Void> consumerFuture = executor.submit(aqueryConsumingOutputHandler.startConsumer());
+        List<Future<Void>> futures = executor.invokeAll(tasks.build());
+        for (Future<Void> future : futures) {
+          future.get();
+        }
+        aqueryConsumingOutputHandler.stopConsumer(/* discardRemainingTasks= */ false);
+        // Get any possible exception from the consumer.
+        consumerFuture.get();
+      } catch (ExecutionException e) {
+        aqueryConsumingOutputHandler.stopConsumer(/* discardRemainingTasks= */ true);
+        Throwable cause = Throwables.getRootCause(e);
+        Throwables.propagateIfPossible(cause, CommandLineExpansionException.class);
+        Throwables.propagateIfPossible(cause, TemplateExpansionException.class);
+        Throwables.propagateIfPossible(cause, IOException.class);
+        Throwables.propagateIfPossible(cause, InterruptedException.class);
+        throw new IllegalStateException("Unexpected exception type: ", e);
+      } finally {
+        executor.shutdown();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
   /** Support for aquery output. */
   public void dumpSkyframeState(ActionGraphDump actionGraphDump)
       throws CommandLineExpansionException, IOException, TemplateExpansionException {
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java
index 3d985c3..5c1cbb1 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/AqueryConsumingOutputHandler.java
@@ -13,10 +13,17 @@
 // limitations under the License.
 package com.google.devtools.build.lib.skyframe.actiongraph.v2;
 
+import java.util.concurrent.Callable;
+
 /** AqueryOutputHandler that receives and consumes tasks via a work queue. */
 public interface AqueryConsumingOutputHandler extends AqueryOutputHandler {
 
-  void startConsumer();
+  Callable<Void> startConsumer();
 
-  void stopConsumer() throws InterruptedException;
+  /**
+   * Stops the consumer thread.
+   *
+   * @param discardRemainingTasks true in case an error occurred with the producer
+   */
+  void stopConsumer(boolean discardRemainingTasks) throws InterruptedException;
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java
index 15f2bf7..9612ba3 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/actiongraph/v2/StreamedConsumingOutputHandler.java
@@ -32,7 +32,9 @@
 import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 
 /** Manages the various streamed output channels of aquery. This does not support JSON format. */
 public class StreamedConsumingOutputHandler implements AqueryConsumingOutputHandler {
@@ -42,6 +44,8 @@
   private final CodedOutputStream outputStream;
   private final PrintStream printStream;
 
+  private final Object exitLock = new Object();
+  private volatile boolean readyToExit = false;
   private final BlockingQueue<PrintTask> queue;
 
   public StreamedConsumingOutputHandler(
@@ -100,17 +104,32 @@
   }
 
   @Override
-  public void startConsumer() {
-    new Thread(new AqueryOutputTaskConsumer(queue)).start();
+  public Callable<Void> startConsumer() {
+    return new AqueryOutputTaskConsumer(queue);
   }
 
   @Override
-  public void stopConsumer() throws InterruptedException {
-    queue.put(POISON_PILL);
+  public void stopConsumer(boolean discardRemainingTasks) throws InterruptedException {
+    if (discardRemainingTasks) {
+      queue.drainTo(new ArrayList<>());
+    }
+    // This lock ensures that the method actually waits until the consumer properly exits,
+    // which prevents a race condition with the #close() method below.
+    synchronized (exitLock) {
+      queue.put(POISON_PILL);
+      while (!readyToExit) {
+        exitLock.wait();
+      }
+    }
   }
 
   /** Construct the printing task and put it in the queue. */
   void addTaskToQueue(Message message, int fieldNumber, String messageLabel) {
+    // This means that there was an exception in the consumer.
+    if (readyToExit) {
+      return;
+    }
+
     try {
       queue.put(
           outputType == BINARY
@@ -127,7 +146,8 @@
     printStream.flush();
   }
 
-  private class AqueryOutputTaskConsumer implements Runnable {
+  // Only runs on 1 single thread.
+  private class AqueryOutputTaskConsumer implements Callable<Void> {
     private final BlockingQueue<PrintTask> queue;
 
     AqueryOutputTaskConsumer(BlockingQueue<PrintTask> queue) {
@@ -135,13 +155,17 @@
     }
 
     @Override
-    public void run() {
+    public Void call() throws InterruptedException, IOException {
       try {
         while (true) {
           PrintTask nextTask = queue.take();
 
           if (nextTask.equals(POISON_PILL)) {
-            return;
+            synchronized (exitLock) {
+              readyToExit = true;
+              exitLock.notify();
+            }
+            return null;
           }
           switch (outputType) {
             case BINARY:
@@ -160,10 +184,9 @@
               throw new IllegalStateException("Unknown outputType " + outputType.formatName());
           }
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } catch (IOException e) {
-        throw new IllegalStateException("Unexpected exception: ", e);
+      } finally {
+        // In case of an exception.
+        readyToExit = true;
       }
     }
   }