Refactor the async execution interface for actions

This makes the interface generic such that it can be used for multiple
steps as well as for multiple concurrently running spawns, effectively
allowing any action subclass to opt-in to the new framework.

The new API is directly equivalent to the old execute method, so we can
subsequently switch to using the new API for all actions, even if
they're still executing synchronous, and then move individual action
types over piecemeal.

Progress on #6394.

PiperOrigin-RevId: 235726176
diff --git a/src/main/java/com/google/devtools/build/lib/actions/Action.java b/src/main/java/com/google/devtools/build/lib/actions/Action.java
index e8a75be..4e56588 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/Action.java
+++ b/src/main/java/com/google/devtools/build/lib/actions/Action.java
@@ -86,34 +86,75 @@
   void prepare(FileSystem fileSystem, Path execRoot) throws IOException;
 
   /**
-   * Executes this action; called by the Builder when all of this Action's inputs have been
-   * successfully created. (Behaviour is undefined if the prerequisites are not up to date.) This
-   * method <i>actually does the work of the Action, unconditionally</i>; in other words, it is
-   * invoked by the Builder only when dependency analysis has deemed it necessary.
+   * Executes this action. This method <i>unconditionally does the work of the Action</i>, although
+   * it may delegate some of that work to {@link ActionContext} instances obtained from the {@link
+   * ActionExecutionContext}, which may in turn perform caching at smaller granularity than an
+   * entire action.
    *
-   * <p>The framework guarantees that the output directory for each file in <code>getOutputs()
-   * </code> has already been created, and will check to ensure that each of those files is indeed
-   * created.
+   * <p>This method may not be invoked if an equivalent action (as determined by the hashes of the
+   * input files, the list of output files, and the action cache key) has been previously executed,
+   * possibly on another machine.
    *
-   * <p>Implementations of this method should try to honour the {@link java.lang.Thread#interrupted}
-   * contract: if an interrupt is delivered to the thread in which execution occurs, the action
-   * should detect this on a best-effort basis and terminate as quickly as possible by throwing an
-   * ActionExecutionException.
+   * <p>The framework guarantees that:
    *
-   * <p>Action execution must be ThreadCompatible in order to be safely used with a concurrent
-   * Builder implementation such as ParallelBuilder.
+   * <ul>
+   *   <li>all declared inputs have already been successfully created,
+   *   <li>the output directory for each file in <code>getOutputs()</code> has already been created,
+   *   <li>this method is only called by at most one thread at a time, but subsequent calls may be
+   *       made from different threads,
+   *   <li>for shared actions, at most one instance is executed per build.
+   * </ul>
+   *
+   * <p>Multiple instances of the same action implementation may be called in parallel.
+   * Implementations must therefore be thread-compatible. Also see the class documentation for
+   * additional invariants.
+   *
+   * <p>Implementations should attempt to detect interrupts, and exit quickly with an {@link
+   * InterruptedException}.
    *
    * @param actionExecutionContext services in the scope of the action, like the output and error
    *     streams to use for messages arising during action execution
    * @return returns an ActionResult containing action execution metadata
    * @throws ActionExecutionException if execution fails for any reason
-   * @throws InterruptedException
+   * @throws InterruptedException if the execution is interrupted
    */
   @ConditionallyThreadCompatible
   ActionResult execute(ActionExecutionContext actionExecutionContext)
       throws ActionExecutionException, InterruptedException;
 
   /**
+   * Actions that want to support async execution can use this interface to do so. While this is
+   * still disabled by default, we want to eventually deprecate the {@link #execute} method in favor
+   * of this new interface.
+   *
+   * <p>If the relevant command-line flag is enabled, Skyframe will call this method rather than
+   * {@link #execute}. As such, actions implementing both should exhibit identical behavior, and all
+   * requirements from the {@link #execute} documentation apply.
+   *
+   * <p>This method allows an action to return a continuation representing future work to be done,
+   * in combination with a listenable future representing concurrent ongoing work in another thread
+   * pool or even on another machine. When the concurrent work finishes, the listenable future must
+   * be completed to notify Skyframe of this fact.
+   *
+   * <p>Once the listenable future is completed, Skyframe will re-execute the corresponding Skyframe
+   * node representing this action, which will eventually call into the continuation returned here.
+   *
+   * <p>Actions implementing this method are not required to run asynchronously, although we expect
+   * the majority of actions to do so eventually. They can block the current thread for any amount
+   * of time as long as they return eventually, and also honor interrupt signals.
+   *
+   * @param actionExecutionContext services in the scope of the action, like the output and error
+   *     streams to use for messages arising during action execution
+   * @return returns an ActionResult containing action execution metadata
+   * @throws ActionExecutionException if execution fails for any reason
+   * @throws InterruptedException if the execution is interrupted
+   */
+  default ActionContinuationOrResult beginExecution(ActionExecutionContext actionExecutionContext)
+      throws ActionExecutionException, InterruptedException {
+    return ActionContinuationOrResult.of(execute(actionExecutionContext));
+  }
+
+  /**
    * Returns true iff action must be executed regardless of its current state.
    * Default implementation can be overridden by some actions that might be
    * executed unconditionally under certain circumstances - e.g., if caching of
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ActionContinuationOrResult.java b/src/main/java/com/google/devtools/build/lib/actions/ActionContinuationOrResult.java
new file mode 100644
index 0000000..54c3cac
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/actions/ActionContinuationOrResult.java
@@ -0,0 +1,79 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.actions;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nullable;
+
+/**
+ * Represents either an action continuation or a final result, depending on the return value of the
+ * {@link #isDone} method. Subclasses must implement {@link #getFuture} (but may return {@code null}
+ * and {@link #execute}. Use {@link #of} to construct a final result.
+ *
+ * <p>Any clients <b>must</b> first call {@link #isDone} before calling any other method in this
+ * class.
+ */
+public abstract class ActionContinuationOrResult {
+  public static ActionContinuationOrResult of(ActionResult actionResult) {
+    return new Finished(actionResult);
+  }
+
+  /** Returns true if this is a final result. */
+  public boolean isDone() {
+    return false;
+  }
+
+  /** Returns a future representing any ongoing concurrent work, or {@code null} otherwise. */
+  @Nullable
+  public abstract ListenableFuture<?> getFuture();
+
+  /** Performs the next step in the process of executing the parent action. */
+  public abstract ActionContinuationOrResult execute()
+      throws ActionExecutionException, InterruptedException;
+
+  /** Returns the final result. */
+  public ActionResult get() {
+    throw new IllegalStateException();
+  }
+
+  /** Represents a finished action result. */
+  private static final class Finished extends ActionContinuationOrResult {
+    private final ActionResult actionResult;
+
+    private Finished(ActionResult actionResult) {
+      this.actionResult = actionResult;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+
+    @Override
+    public ListenableFuture<?> getFuture() {
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public ActionContinuationOrResult execute() {
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public ActionResult get() {
+      return actionResult;
+    }
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/analysis/actions/SpawnAction.java b/src/main/java/com/google/devtools/build/lib/analysis/actions/SpawnAction.java
index f100886..aa4cb82 100644
--- a/src/main/java/com/google/devtools/build/lib/analysis/actions/SpawnAction.java
+++ b/src/main/java/com/google/devtools/build/lib/analysis/actions/SpawnAction.java
@@ -22,8 +22,10 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.AbstractAction;
 import com.google.devtools.build.lib.actions.Action;
+import com.google.devtools.build.lib.actions.ActionContinuationOrResult;
 import com.google.devtools.build.lib.actions.ActionEnvironment;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
 import com.google.devtools.build.lib.actions.ActionExecutionException;
@@ -284,52 +286,42 @@
         .exec(spawn, actionExecutionContext);
   }
 
-  public boolean mayExecuteAsync() {
+  protected boolean mayExecuteAsync() {
     return true;
   }
 
-  public FutureSpawn execMaybeAsync(ActionExecutionContext actionExecutionContext)
-      throws ActionExecutionException, ExecException, InterruptedException {
-    Spawn spawn;
+  @Override
+  public ActionContinuationOrResult beginExecution(ActionExecutionContext actionExecutionContext)
+      throws ActionExecutionException, InterruptedException {
+    if (!mayExecuteAsync()) {
+      return super.beginExecution(actionExecutionContext);
+    }
     try {
-      spawn = getSpawn(actionExecutionContext);
+      Spawn spawn = getSpawn(actionExecutionContext);
+      SpawnActionContext context = actionExecutionContext.getContext(SpawnActionContext.class);
+      FutureSpawn futureSpawn = context.execMaybeAsync(spawn, actionExecutionContext);
+      return new ActionContinuationOrResult() {
+        @Override
+        public ListenableFuture<?> getFuture() {
+          return futureSpawn.getFuture();
+        }
+
+        @Override
+        public ActionContinuationOrResult execute()
+            throws ActionExecutionException, InterruptedException {
+          try {
+            return ActionContinuationOrResult.of(
+                ActionResult.create(ImmutableList.of(futureSpawn.get())));
+          } catch (ExecException e) {
+            throw toActionExecutionException(e, actionExecutionContext.getVerboseFailures());
+          }
+        }
+      };
+    } catch (ExecException e) {
+      throw toActionExecutionException(e, actionExecutionContext.getVerboseFailures());
     } catch (CommandLineExpansionException e) {
       throw new ActionExecutionException(e, this, /*catastrophe=*/ false);
     }
-    SpawnActionContext context = actionExecutionContext.getContext(SpawnActionContext.class);
-    return context.execMaybeAsync(spawn, actionExecutionContext);
-  }
-
-  public ActionResult finishSync(FutureSpawn futureSpawn, boolean verboseFailures)
-      throws ActionExecutionException, InterruptedException {
-    try {
-      return ActionResult.create(ImmutableList.of(futureSpawn.get()));
-    } catch (ExecException e) {
-      String failMessage;
-      if (isShellCommand()) {
-        // The possible reasons it could fail are: shell executable not found, shell
-        // exited non-zero, or shell died from signal.  The first is impossible
-        // and the second two aren't very interesting, so in the interests of
-        // keeping the noise-level down, we don't print a reason why, just the
-        // command that failed.
-        //
-        // 0=shell executable, 1=shell command switch, 2=command
-        try {
-          failMessage =
-              "error executing shell command: "
-                  + "'"
-                  + truncate(Joiner.on(" ").join(getArguments()), 200)
-                  + "'";
-        } catch (CommandLineExpansionException commandLineExpansionException) {
-          failMessage =
-              "error executing shell command, and error expanding command line: "
-                  + commandLineExpansionException;
-        }
-      } else {
-        failMessage = getRawProgressMessage();
-      }
-      throw e.toActionExecutionException(failMessage, verboseFailures, this);
-    }
   }
 
   @Override
@@ -338,36 +330,40 @@
     try {
       return ActionResult.create(internalExecute(actionExecutionContext));
     } catch (ExecException e) {
-      String failMessage;
-      if (isShellCommand()) {
-        // The possible reasons it could fail are: shell executable not found, shell
-        // exited non-zero, or shell died from signal.  The first is impossible
-        // and the second two aren't very interesting, so in the interests of
-        // keeping the noise-level down, we don't print a reason why, just the
-        // command that failed.
-        //
-        // 0=shell executable, 1=shell command switch, 2=command
-        try {
-          failMessage =
-              "error executing shell command: "
-                  + "'"
-                  + truncate(Joiner.on(" ").join(getArguments()), 200)
-                  + "'";
-        } catch (CommandLineExpansionException commandLineExpansionException) {
-          failMessage =
-              "error executing shell command, and error expanding command line: "
-                  + commandLineExpansionException;
-        }
-      } else {
-        failMessage = getRawProgressMessage();
-      }
-      throw e.toActionExecutionException(
-          failMessage, actionExecutionContext.getVerboseFailures(), this);
+      throw toActionExecutionException(e, actionExecutionContext.getVerboseFailures());
     } catch (CommandLineExpansionException e) {
       throw new ActionExecutionException(e, this, /*catastrophe=*/ false);
     }
   }
 
+  private ActionExecutionException toActionExecutionException(
+      ExecException e, boolean verboseFailures) {
+    String failMessage;
+    if (isShellCommand()) {
+      // The possible reasons it could fail are: shell executable not found, shell
+      // exited non-zero, or shell died from signal.  The first is impossible
+      // and the second two aren't very interesting, so in the interests of
+      // keeping the noise-level down, we don't print a reason why, just the
+      // command that failed.
+      //
+      // 0=shell executable, 1=shell command switch, 2=command
+      try {
+        failMessage =
+            "error executing shell command: "
+                + "'"
+                + truncate(Joiner.on(" ").join(getArguments()), 200)
+                + "'";
+      } catch (CommandLineExpansionException commandLineExpansionException) {
+        failMessage =
+            "error executing shell command, and error expanding command line: "
+                + commandLineExpansionException;
+      }
+    } else {
+      failMessage = getRawProgressMessage();
+    }
+    return e.toActionExecutionException(failMessage, verboseFailures, this);
+  }
+
   /**
    * Returns s, truncated to no more than maxLen characters, appending an
    * ellipsis if truncation occurred.
diff --git a/src/main/java/com/google/devtools/build/lib/analysis/extra/ExtraAction.java b/src/main/java/com/google/devtools/build/lib/analysis/extra/ExtraAction.java
index 7ea7bbb..cf61932 100644
--- a/src/main/java/com/google/devtools/build/lib/analysis/extra/ExtraAction.java
+++ b/src/main/java/com/google/devtools/build/lib/analysis/extra/ExtraAction.java
@@ -104,7 +104,7 @@
   }
 
   @Override
-  public boolean mayExecuteAsync() {
+  protected boolean mayExecuteAsync() {
     return false;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/rules/cpp/LtoBackendAction.java b/src/main/java/com/google/devtools/build/lib/rules/cpp/LtoBackendAction.java
index e3f1d8e..5abc94f 100644
--- a/src/main/java/com/google/devtools/build/lib/rules/cpp/LtoBackendAction.java
+++ b/src/main/java/com/google/devtools/build/lib/rules/cpp/LtoBackendAction.java
@@ -104,7 +104,7 @@
   }
 
   @Override
-  public boolean mayExecuteAsync() {
+  protected boolean mayExecuteAsync() {
     return false;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/rules/genrule/GenRuleAction.java b/src/main/java/com/google/devtools/build/lib/rules/genrule/GenRuleAction.java
index d5e70c3..99d112a 100644
--- a/src/main/java/com/google/devtools/build/lib/rules/genrule/GenRuleAction.java
+++ b/src/main/java/com/google/devtools/build/lib/rules/genrule/GenRuleAction.java
@@ -14,7 +14,6 @@
 
 package com.google.devtools.build.lib.rules.genrule;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.actions.AbstractAction;
@@ -78,7 +77,7 @@
     if (!TrackSourceDirectoriesFlag.trackSourceDirectories()) {
       checkInputsForDirectories(reporter, actionExecutionContext.getMetadataProvider());
     }
-    List<SpawnResult> spawnResults = ImmutableList.of();
+    List<SpawnResult> spawnResults;
     try {
       spawnResults = super.internalExecute(actionExecutionContext);
     } catch (CommandLineExpansionException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
index a497c6b..09bbb3d 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Striped;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.actions.Action;
@@ -27,6 +28,7 @@
 import com.google.devtools.build.lib.actions.ActionCacheChecker.Token;
 import com.google.devtools.build.lib.actions.ActionCompletionEvent;
 import com.google.devtools.build.lib.actions.ActionContext;
+import com.google.devtools.build.lib.actions.ActionContinuationOrResult;
 import com.google.devtools.build.lib.actions.ActionExecutedEvent;
 import com.google.devtools.build.lib.actions.ActionExecutedEvent.ErrorTiming;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
@@ -55,11 +57,9 @@
 import com.google.devtools.build.lib.actions.ArtifactPrefixConflictException;
 import com.google.devtools.build.lib.actions.CachedActionEvent;
 import com.google.devtools.build.lib.actions.EnvironmentalExecException;
-import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.Executor;
 import com.google.devtools.build.lib.actions.FileArtifactValue;
 import com.google.devtools.build.lib.actions.FilesetOutputSymlink;
-import com.google.devtools.build.lib.actions.FutureSpawn;
 import com.google.devtools.build.lib.actions.LostInputsExecException.LostInputsActionExecutionException;
 import com.google.devtools.build.lib.actions.MapBasedActionGraph;
 import com.google.devtools.build.lib.actions.MetadataConsumer;
@@ -71,7 +71,6 @@
 import com.google.devtools.build.lib.actions.PackageRootResolver;
 import com.google.devtools.build.lib.actions.TargetOutOfDateException;
 import com.google.devtools.build.lib.actions.cache.MetadataHandler;
-import com.google.devtools.build.lib.analysis.actions.SpawnAction;
 import com.google.devtools.build.lib.buildtool.BuildRequestOptions;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.concurrent.ExecutorUtil;
@@ -788,6 +787,53 @@
         throws InterruptedException;
   }
 
+  private static ActionContinuationOrResult begin(
+      Action action, ActionExecutionContext actionExecutionContext) {
+    return new ActionContinuationOrResult() {
+      @Override
+      public ListenableFuture<?> getFuture() {
+        return null;
+      }
+
+      @Override
+      public ActionContinuationOrResult execute()
+          throws ActionExecutionException, InterruptedException {
+        return action.beginExecution(actionExecutionContext);
+      }
+    };
+  }
+
+  /** Returns a continuation to run the specified action in a profiler task. */
+  private ActionContinuationOrResult runFully(
+      Action action,
+      ActionExecutionContext actionExecutionContext,
+      ExtendedEventHandler eventHandler) {
+    return new ActionContinuationOrResult() {
+      @Override
+      public ListenableFuture<?> getFuture() {
+        return null;
+      }
+
+      @Override
+      public ActionContinuationOrResult execute()
+          throws ActionExecutionException, InterruptedException {
+        // ActionExecutionExceptions that occur as the thread is interrupted are assumed to be a
+        // result of that, so we throw InterruptedException instead.
+        try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION_EXECUTE, action.describe())) {
+          return ActionContinuationOrResult.of(action.execute(actionExecutionContext));
+        } catch (ActionExecutionException e) {
+          throw processAndGetExceptionToThrow(
+              eventHandler,
+              actionExecutionContext.getInputPath(action.getPrimaryOutput()),
+              action,
+              e,
+              actionExecutionContext.getFileOutErr(),
+              ErrorTiming.AFTER_EXECUTION);
+        }
+      }
+    };
+  }
+
   /** Represents an action that needs to be run. */
   private final class ActionRunner extends ActionStep {
     private final Action action;
@@ -881,26 +927,20 @@
         // This is the first iteration of the async action execution framework. It is currently only
         // implemented for SpawnAction (and subclasses), and will need to be extended for all other
         // action types.
-        if (useAsyncExecution
-            && (action instanceof SpawnAction)
-            && ((SpawnAction) action).mayExecuteAsync()) {
-          SpawnAction spawnAction = (SpawnAction) action;
-          try {
-            FutureSpawn futureSpawn;
-            try {
-              futureSpawn = spawnAction.execMaybeAsync(actionExecutionContext);
-            } catch (InterruptedException e) {
-              return ActionStepOrResult.of(e);
-            } catch (ActionExecutionException e) {
-              return ActionStepOrResult.of(e);
-            }
-            return new ActionCompletionStep(futureSpawn, spawnAction);
-          } catch (ExecException e) {
-            return ActionStepOrResult.of(e.toActionExecutionException(spawnAction));
-          }
+        if (useAsyncExecution) {
+          // TODO(ulfjack): This implicitly drops the ACTION_EXECUTE profiler segment that otherwise
+          // wraps the Action.execute call. That one is already excluded from the Json profile, so
+          // maybe we should just remove it.
+          // TODO(ulfjack): This causes problems in that REMOTE_EXECUTION segments now heavily
+          // overlap in the Json profile, which the renderer isn't able to handle. We should move
+          // those to some sort of 'virtual thread' to visualize the work that's happening on other
+          // machines.
+          return continueAction(
+              actionExecutionContext.getEventHandler(), begin(action, actionExecutionContext));
         }
 
-        return completeAction(env.getListener(), () -> executeAction(env.getListener()));
+        return continueAction(
+            env.getListener(), runFully(action, actionExecutionContext, env.getListener()));
       }
     }
 
@@ -920,60 +960,41 @@
       }
     }
 
-    /**
-     * Execute the specified action, in a profiler task. The caller is responsible for having
-     * already checked that we need to execute it and for acquiring/releasing any scheduling locks
-     * needed.
-     *
-     * <p>This is thread-safe so long as you don't try to execute the same action twice at the same
-     * time (or overlapping times). May execute in a worker thread.
-     *
-     * @throws ActionExecutionException if the execution of the specified action failed for any
-     *     reason.
-     * @throws InterruptedException if the thread was interrupted.
-     * @return true if the action output was dumped, false otherwise.
-     */
-    private ActionResult executeAction(ExtendedEventHandler eventHandler)
-        throws ActionExecutionException, InterruptedException {
-      // ActionExecutionExceptions that occur as the thread is interrupted are assumed to be a
-      // result of that, so we throw InterruptedException instead.
-      try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION_EXECUTE, action.describe())) {
-        return action.execute(actionExecutionContext);
-      } catch (ActionExecutionException e) {
-        throw processAndGetExceptionToThrow(
-            eventHandler,
-            actionExecutionContext.getInputPath(action.getPrimaryOutput()),
-            action,
-            e,
-            actionExecutionContext.getFileOutErr(),
-            ErrorTiming.AFTER_EXECUTION);
-      }
-    }
-
-    private ActionStepOrResult completeAction(
-        ExtendedEventHandler eventHandler, ActionClosure actionClosure) {
-      LostInputsActionExecutionException lostInputsActionExecutionException = null;
+    /** Executes the given continuation and returns a new one or a final result. */
+    private ActionStepOrResult continueAction(
+        ExtendedEventHandler eventHandler, ActionContinuationOrResult actionContinuation) {
+      // Every code path that exits this method must call notifyActionCompletion, except for the
+      // one that returns a new ActionContinuationStep. Unfortunately, that requires some code
+      // duplication.
+      ActionContinuationOrResult nextActionContinuationOrResult;
       try {
-        ActionResult actionResult;
-        try {
-          actionResult = actionClosure.execute();
-        } catch (LostInputsActionExecutionException e) {
-          // If inputs are lost, then avoid publishing ActionCompletedEvent. Action rewinding will
-          // rerun this failed action after trying to regenerate the lost inputs. However, enrich
-          // the exception so that, if rewinding fails, an ActionCompletedEvent will be published.
-          e.setActionStartedEventAlreadyEmitted();
-          lostInputsActionExecutionException = e;
-          throw lostInputsActionExecutionException;
-        } catch (InterruptedException e) {
-          return ActionStepOrResult.of(e);
-        }
-        return new ActionCacheWriteStep(actuallyCompleteAction(eventHandler, actionResult));
+        nextActionContinuationOrResult = actionContinuation.execute();
+      } catch (LostInputsActionExecutionException e) {
+        // If inputs are lost, then avoid publishing ActionCompletedEvent. Action rewinding will
+        // rerun this failed action after trying to regenerate the lost inputs. However, enrich
+        // the exception so that, if rewinding fails, an ActionCompletedEvent will be published.
+        e.setActionStartedEventAlreadyEmitted();
+        notifyActionCompletion(eventHandler, /*postActionCompletionEvent=*/ false);
+        return ActionStepOrResult.of(e);
+      } catch (ActionExecutionException e) {
+        notifyActionCompletion(eventHandler, /*postActionCompletionEvent=*/ true);
+        return ActionStepOrResult.of(e);
+      } catch (InterruptedException e) {
+        notifyActionCompletion(eventHandler, /*postActionCompletionEvent=*/ true);
+        return ActionStepOrResult.of(e);
+      }
+
+      if (!nextActionContinuationOrResult.isDone()) {
+        return new ActionContinuationStep(nextActionContinuationOrResult);
+      }
+
+      try {
+        return new ActionCacheWriteStep(
+            actuallyCompleteAction(eventHandler, nextActionContinuationOrResult.get()));
       } catch (ActionExecutionException e) {
         return ActionStepOrResult.of(e);
       } finally {
-        notifyActionCompletion(
-            eventHandler,
-            /*postActionCompletionEvent=*/ lostInputsActionExecutionException == null);
+        notifyActionCompletion(eventHandler, /*postActionCompletionEvent=*/ true);
       }
     }
 
@@ -1063,28 +1084,27 @@
           ActionExecutionFunction.actionDependsOnBuildId(action));
     }
 
-    /** A closure to complete an asynchronously running action. */
-    private class ActionCompletionStep extends ActionStep {
-      private final FutureSpawn futureSpawn;
-      private final SpawnAction spawnAction;
+    /** A closure to continue an asynchronously running action. */
+    private class ActionContinuationStep extends ActionStep {
+      private final ActionContinuationOrResult actionContinuationOrResult;
 
-      public ActionCompletionStep(FutureSpawn futureSpawn, SpawnAction spawnAction) {
-        this.futureSpawn = futureSpawn;
-        this.spawnAction = spawnAction;
+      public ActionContinuationStep(ActionContinuationOrResult actionContinuationOrResult) {
+        Preconditions.checkArgument(!actionContinuationOrResult.isDone());
+        this.actionContinuationOrResult = actionContinuationOrResult;
       }
 
       @Override
       public ActionStepOrResult run(Environment env) {
-        if (!futureSpawn.getFuture().isDone()) {
-          env.dependOnFuture(futureSpawn.getFuture());
+        ListenableFuture<?> future = actionContinuationOrResult.getFuture();
+        if (future != null && !future.isDone()) {
+          env.dependOnFuture(future);
           return this;
         }
-        return completeAction(
-            actionExecutionContext.getEventHandler(),
-            () -> spawnAction.finishSync(futureSpawn, actionExecutionContext.getVerboseFailures()));
+        return continueAction(actionExecutionContext.getEventHandler(), actionContinuationOrResult);
       }
     }
 
+    /** A closure to post-process the action and write the result to the action cache. */
     private class ActionCacheWriteStep extends ActionStep {
       private final ActionExecutionValue value;