Wait for shared actions asynchronously

Instead of blocking the current thread, shared actions are waited for
using the ListenableFuture mechanism in Skyframe.

Also clean up the API a bit.

Progress on #6394.

PiperOrigin-RevId: 234970927
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ActionLookupData.java b/src/main/java/com/google/devtools/build/lib/actions/ActionLookupData.java
index 4ae4e79..d858a09 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/ActionLookupData.java
+++ b/src/main/java/com/google/devtools/build/lib/actions/ActionLookupData.java
@@ -14,6 +14,7 @@
 package com.google.devtools.build.lib.actions;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Interner;
 import com.google.devtools.build.lib.actions.ActionLookupValue.ActionLookupKey;
 import com.google.devtools.build.lib.cmdline.Label;
@@ -34,7 +35,7 @@
   private final int actionIndex;
 
   private ActionLookupData(ActionLookupKey actionLookupKey, int actionIndex) {
-    this.actionLookupKey = actionLookupKey;
+    this.actionLookupKey = Preconditions.checkNotNull(actionLookupKey);
     this.actionIndex = actionIndex;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionState.java b/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionState.java
index 92d6928..7175049 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionState.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionState.java
@@ -13,11 +13,14 @@
 // limitations under the License.
 package com.google.devtools.build.lib.skyframe;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.Action;
 import com.google.devtools.build.lib.actions.ActionExecutionException;
 import com.google.devtools.build.lib.actions.ActionLookupData;
 import com.google.devtools.build.lib.skyframe.SkyframeActionExecutor.ActionCompletedReceiver;
 import com.google.devtools.build.skyframe.SkyFunction;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 /**
@@ -26,18 +29,45 @@
  * that only one caller will receive events and output for this action.
  */
 final class ActionExecutionState {
+  /** The owner of this object. Only the owner is allowed to continue work on the state machine. */
   private final ActionLookupData actionLookupData;
 
+  // Both state and completionFuture may only be read or set when holding the lock for this. The
+  // state machine for these looks like this:
+  //
+  // !state.isDone,completionFuture=null -----> !state.isDone,completionFuture=<value>
+  //                           |                  |
+  //                           |                  | completionFuture.set()
+  //                           v                  v
+  //                    state.isDone,completionFuture=null
+  //
+  // No other states are legal. In particular, state.isDone,completionFuture=<value> is not a legal
+  // state.
+
   @GuardedBy("this")
   private ActionStepOrResult state;
 
+  /**
+   * A future to represent action completion of the primary action (randomly picked from the set of
+   * shared actions). This is initially {@code null}, and is only set to a non-null value if this
+   * turns out to be a shared action, and the primary action is not finished yet (i.e., {@code
+   * !state.isDone}. It is non-null while the primary action is being executed, at which point the
+   * thread completing the primary action completes the future, and also sets this field to null.
+   *
+   * <p>The reason for this roundabout approach is to avoid memory allocation if this is not a
+   * shared action, and to release the memory once the action is done.
+   */
+  @GuardedBy("this")
+  @Nullable
+  private SettableFuture<Void> completionFuture;
+
   ActionExecutionState(ActionLookupData actionLookupData, ActionStepOrResult state) {
-    this.actionLookupData = actionLookupData;
-    this.state = state;
+    this.actionLookupData = Preconditions.checkNotNull(actionLookupData);
+    this.state = Preconditions.checkNotNull(state);
   }
 
-  public ActionLookupData getActionLookupData() {
-    return actionLookupData;
+  public boolean isOwner(ActionLookupData actionLookupData) {
+    return this.actionLookupData.equals(actionLookupData);
   }
 
   public ActionExecutionValue getResultOrDependOnFuture(
@@ -46,48 +76,66 @@
       Action action,
       ActionCompletedReceiver actionCompletedReceiver)
       throws ActionExecutionException, InterruptedException {
-    if (actionLookupData.equals(this.actionLookupData)) {
+    if (isOwner(actionLookupData)) {
       // This continuation is owned by the Skyframe node executed by the current thread, so we use
       // it to run the state machine.
       return runStateMachine(env);
     }
-    // This is a shared action, and the executed action is owned by another Skyframe node. We do
-    // not attempt to make progress, but instead block waiting for the owner to complete the
-    // action. This is the same behavior as before this comment was added.
-    //
-    // When we async action execution we MUST also change this to async execution. Otherwise we
-    // can end up with a deadlock where all Skyframe threads are blocked here, and no thread is
-    // available to make progress on the original action.
+
+    // This is a shared action, and the executed action is owned by another Skyframe node. If the
+    // other node is done, we simply return the done value. Otherwise we register a dependency on
+    // the completionFuture and return null.
+    ActionExecutionValue result;
     synchronized (this) {
-      while (!state.isDone()) {
-        this.wait();
+      if (!state.isDone()) {
+        if (completionFuture == null) {
+          completionFuture = SettableFuture.create();
+        }
+        env.dependOnFuture(completionFuture);
+        // No other thread can access completionFuture until we exit the synchronized block.
+        Preconditions.checkState(!completionFuture.isDone(), state);
+        Preconditions.checkState(env.valuesMissing(), state);
+        return null;
       }
-      try {
-        return state.get().transformForSharedAction(action.getOutputs());
-      } finally {
-        if (action.getProgressMessage() != null) {
-          actionCompletedReceiver.actionCompleted(actionLookupData);
+      result = state.get();
+    }
+    actionCompletedReceiver.actionCompleted(actionLookupData);
+    return result.transformForSharedAction(action.getOutputs());
+  }
+
+  private ActionExecutionValue runStateMachine(SkyFunction.Environment env)
+      throws ActionExecutionException, InterruptedException {
+    ActionStepOrResult original;
+    synchronized (this) {
+      original = state;
+    }
+    ActionStepOrResult current = original;
+    // We do the work _outside_ a synchronized block to avoid blocking threads working on shared
+    // actions that only want to register with the completionFuture.
+    try {
+      while (!current.isDone()) {
+        // Run the state machine for one step; isDone returned false, so this is safe.
+        current = current.run(env);
+
+        // This method guarantees that it either blocks until the action is completed and returns
+        // a non-null value, or it registers a dependency with Skyframe and returns null; it must
+        // not return null without registering a dependency, i.e., if {@code !env.valuesMissing()}.
+        if (env.valuesMissing()) {
+          return null;
+        }
+      }
+    } finally {
+      synchronized (this) {
+        Preconditions.checkState(state == original, "Another thread modified state");
+        state = current;
+        if (current.isDone() && completionFuture != null) {
+          completionFuture.set(null);
+          completionFuture = null;
         }
       }
     }
-  }
-
-  private synchronized ActionExecutionValue runStateMachine(SkyFunction.Environment env)
-      throws ActionExecutionException, InterruptedException {
-    while (!state.isDone()) {
-      // Run the state machine for one step; isDone returned false, so this is safe.
-      state = state.run(env);
-
-      // This method guarantees that it either blocks until the action is completed, or it
-      // registers a dependency on a ListenableFuture and returns null (it may only return null if
-      // valuesMissing returns true).
-      if (env.valuesMissing()) {
-        return null;
-      }
-    }
-    this.notifyAll();
     // We're done, return the value to the caller (or throw an exception).
-    return state.get();
+    return current.get();
   }
 
   /**
@@ -97,42 +145,61 @@
    *
    * <p>This design allows us to store the current state of the in-progress action execution using a
    * single object reference.
+   *
+   * <p>Do not implement this interface directly! In order to implement an action step, subclass
+   * {@link ActionStep}, and implement {@link #run}. In order to represent a result, use {@link
+   * #of}.
    */
   interface ActionStepOrResult {
     static ActionStepOrResult of(ActionExecutionValue value) {
-      return new FinishedActionStepOrResult(value);
+      return new Finished(value);
     }
 
     static ActionStepOrResult of(ActionExecutionException exception) {
-      return new ExceptionalActionStepOrResult(exception);
+      return new Exceptional(exception);
     }
 
     static ActionStepOrResult of(InterruptedException exception) {
-      return new ExceptionalActionStepOrResult(exception);
+      return new Exceptional(exception);
     }
 
     /**
      * Returns true if and only if the underlying action is complete, i.e., it is legal to call
-     * {@link #get}.
+     * {@link #get}. The return value of a single object must not change over time. Instead, call
+     * {@link ActionStepOrResult#of} to return a final result (or exception).
      */
-    default boolean isDone() {
-      return true;
-    }
+    boolean isDone();
 
     /**
      * Returns the next state of the state machine after performing some work towards the end goal
      * of executing the action. This must only be called if {@link #isDone} returns false, and must
      * only be called by one thread at a time for the same instance.
      */
-    default ActionStepOrResult run(SkyFunction.Environment env) {
-      throw new IllegalStateException();
-    }
+    ActionStepOrResult run(SkyFunction.Environment env);
 
     /**
      * Returns the final value of the action or an exception to indicate that the action failed (or
      * the process was interrupted). This must only be called if {@link #isDone} returns true.
      */
-    default ActionExecutionValue get() throws ActionExecutionException, InterruptedException {
+    ActionExecutionValue get() throws ActionExecutionException, InterruptedException;
+  }
+
+  /**
+   * Abstract implementation of {@link ActionStepOrResult} that declares final implementations for
+   * {@link #isDone} (to return false) and {@link #get} (tho throw {@link IllegalStateException}).
+   *
+   * <p>The framework prevents concurrent calls to {@link #run}, so implementations can keep state
+   * without having to lock. Note that there may be multiple calls to {@link #run} from different
+   * threads, as long as they do not overlap in time.
+   */
+  abstract static class ActionStep implements ActionStepOrResult {
+    @Override
+    public final boolean isDone() {
+      return false;
+    }
+
+    @Override
+    public final ActionExecutionValue get() {
       throw new IllegalStateException();
     }
   }
@@ -141,13 +208,24 @@
    * Represents a finished action with a specific value. We specifically avoid anonymous inner
    * classes to not accidentally retain a reference to the ActionRunner.
    */
-  private static final class FinishedActionStepOrResult implements ActionStepOrResult {
+  private static final class Finished implements ActionStepOrResult {
     private final ActionExecutionValue value;
 
-    FinishedActionStepOrResult(ActionExecutionValue value) {
+    Finished(ActionExecutionValue value) {
       this.value = value;
     }
 
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+
+    @Override
+    public ActionStepOrResult run(SkyFunction.Environment env) {
+      throw new IllegalStateException();
+    }
+
+    @Override
     public ActionExecutionValue get() {
       return value;
     }
@@ -157,17 +235,28 @@
    * Represents a finished action with an exception. We specifically avoid anonymous inner classes
    * to not accidentally retain a reference to the ActionRunner.
    */
-  private static final class ExceptionalActionStepOrResult implements ActionStepOrResult {
+  private static final class Exceptional implements ActionStepOrResult {
     private final Exception e;
 
-    ExceptionalActionStepOrResult(ActionExecutionException e) {
+    Exceptional(ActionExecutionException e) {
       this.e = e;
     }
 
-    ExceptionalActionStepOrResult(InterruptedException e) {
+    Exceptional(InterruptedException e) {
       this.e = e;
     }
 
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+
+    @Override
+    public ActionStepOrResult run(SkyFunction.Environment env) {
+      throw new IllegalStateException();
+    }
+
+    @Override
     public ActionExecutionValue get() throws ActionExecutionException, InterruptedException {
       if (e instanceof InterruptedException) {
         throw (InterruptedException) e;
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
index 65027b1..edd1427 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
@@ -85,6 +85,7 @@
 import com.google.devtools.build.lib.profiler.SilentCloseable;
 import com.google.devtools.build.lib.rules.cpp.IncludeScannable;
 import com.google.devtools.build.lib.runtime.KeepGoingOption;
+import com.google.devtools.build.lib.skyframe.ActionExecutionState.ActionStep;
 import com.google.devtools.build.lib.skyframe.ActionExecutionState.ActionStepOrResult;
 import com.google.devtools.build.lib.util.Pair;
 import com.google.devtools.build.lib.util.io.FileOutErr;
@@ -510,7 +511,7 @@
             "%s %s",
             action,
             actionLookupData);
-    return actionLookupData.equals(cachedRun.getActionLookupData());
+    return cachedRun.isOwner(actionLookupData);
   }
 
   void noteActionEvaluationStarted(ActionLookupData actionLookupData, Action action) {
@@ -796,7 +797,7 @@
   }
 
   /** Represents an action that needs to be run. */
-  private final class ActionRunner implements ActionStepOrResult {
+  private final class ActionRunner extends ActionStep {
     private final Action action;
     private final ActionMetadataHandler metadataHandler;
     private final long actionStartTime;
@@ -819,11 +820,6 @@
     }
 
     @Override
-    public boolean isDone() {
-      return false;
-    }
-
-    @Override
     public ActionStepOrResult run(SkyFunction.Environment env) {
       // There are three ExtendedEventHandler instances available while this method is running.
       //   SkyframeActionExecutor.this.reporter
@@ -1072,7 +1068,7 @@
     }
 
     /** A closure to complete an asynchronously running action. */
-    private class ActionCompletionStep implements ActionStepOrResult {
+    private class ActionCompletionStep extends ActionStep {
       private final FutureSpawn futureSpawn;
       private final SpawnAction spawnAction;
 
@@ -1082,11 +1078,6 @@
       }
 
       @Override
-      public boolean isDone() {
-        return false;
-      }
-
-      @Override
       public ActionStepOrResult run(Environment env) {
         if (!futureSpawn.getFuture().isDone()) {
           env.dependOnFuture(futureSpawn.getFuture());