Refactor SkyframeActionExecutor

Move all action state tracking to a new subclass, which replaces the
use of Pair+FutureTask, and carefully ensures that we don't hold onto
memory for any longer than necessary.

This is another attempt at preparing for async action execution. A
similar refactoring was previously attempted in https://github.com/bazelbuild/bazel/commit/14f8b109b9f987f1b0c69c8cf399326740749382, but had to
be rolled back as https://github.com/bazelbuild/bazel/commit/68fc46b7ac2a015cbbd4e6602f2310a935783866 due to increased memory consumption.

The new code may seem a bit overengineered given that the current
implementation requires exactly two states:
STARTED -> DONE

However, async actions will have at least three, and possibly more
states, which requires a bit more infrastructure here:
STARTED -> IN_PROGRESS -> DONE

Note that we need to explicitly model the intermediate state such that
parallel threads due to action sharing can see the IN_PROGRESS state,
and register their dependency on the corresponding Future (code in a
subsequent commit).

Progress on #6394.

PiperOrigin-RevId: 234469209
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java
index 1737e4e..eb92dd6 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java
@@ -57,7 +57,7 @@
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.rules.cpp.IncludeScannable;
 import com.google.devtools.build.lib.skyframe.ActionRewindStrategy.RewindPlan;
-import com.google.devtools.build.lib.util.Pair;
+import com.google.devtools.build.lib.skyframe.SkyframeActionExecutor.ActionExecutionState;
 import com.google.devtools.build.lib.util.io.FileOutErr;
 import com.google.devtools.build.lib.util.io.TimestampGranularityMonitor;
 import com.google.devtools.build.lib.vfs.FileSystem;
@@ -78,7 +78,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.IntFunction;
 import javax.annotation.Nullable;
@@ -149,8 +148,7 @@
     //
     // Additionally, if an action restarted (in the Skyframe sense) after it executed because it
     // discovered new inputs during execution, we should detect that and short-circuit.
-    Pair<ActionLookupData, FutureTask<ActionExecutionValue>> previousExecution =
-        skyframeActionExecutor.probeActionExecution(action);
+    ActionExecutionState previousExecution = skyframeActionExecutor.probeActionExecution(action);
 
     // If this action was previously completed this build, then this evaluation must be happening
     // because of rewinding. Prevent any ProgressLike events from being published a second time for
@@ -549,21 +547,14 @@
       Environment env,
       Map<String, String> clientEnv,
       ActionLookupData actionLookupData,
-      @Nullable Pair<ActionLookupData, FutureTask<ActionExecutionValue>> previousAction,
+      @Nullable ActionExecutionState previousAction,
       Object skyframeDepsResult,
       long actionStartTime)
       throws ActionExecutionException, InterruptedException {
-    // If this is a shared action and the other action is the one that executed, we must use that
-    // other action's value, provided here, since it is populated with metadata for the outputs.
     if (previousAction != null) {
-      return skyframeActionExecutor.executeAction(
-          env.getListener(),
-          action,
-          /* metadataHandler= */ null,
-          /* actionStartTime= */ -1,
-          /* actionExecutionContext= */ null,
-          actionLookupData,
-          previousAction);
+      // If this is a shared action and the other action is the one that executed, we must use that
+      // other action's value, provided here, since it is populated with metadata for the outputs.
+      return previousAction.getResultOrDependOnFuture(env, actionLookupData, action);
     }
     // The metadataHandler may be recreated if we discover inputs.
     ArtifactPathResolver pathResolver = ArtifactPathResolver.createPathResolver(
@@ -703,13 +694,12 @@
       if (!state.hasExecutedAction()) {
         state.value =
             skyframeActionExecutor.executeAction(
-                env.getListener(),
+                env,
                 action,
                 metadataHandler,
                 actionStartTime,
                 actionExecutionContext,
-                actionLookupData,
-                /*previousAction=*/ null);
+                actionLookupData);
       }
     } catch (IOException e) {
       throw new ActionExecutionException(
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
index 1e8f9ba..f3f5a3d 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
@@ -90,6 +90,7 @@
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.Root;
 import com.google.devtools.build.lib.vfs.Symlinks;
+import com.google.devtools.build.skyframe.SkyFunction;
 import com.google.devtools.build.skyframe.SkyFunction.Environment;
 import com.google.devtools.common.options.OptionsProvider;
 import java.io.FileNotFoundException;
@@ -100,15 +101,12 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Function;
@@ -155,9 +153,7 @@
   // We don't want to execute the action again on the second entry to the SkyFunction.
   // In both cases, we store the already-computed ActionExecutionValue to avoid having to compute it
   // again.
-  private ConcurrentMap<
-          OwnerlessArtifactWrapper, Pair<ActionLookupData, FutureTask<ActionExecutionValue>>>
-      buildActionMap;
+  private ConcurrentMap<OwnerlessArtifactWrapper, ActionExecutionState> buildActionMap;
 
   // We also keep track of actions which were reset this build from a previously-completed state.
   // When re-evaluated, these actions should not emit ProgressLike events, in order to not confuse
@@ -457,8 +453,13 @@
     this.actionCacheChecker = null;
   }
 
+  /**
+   * Due to multi-threading, a null return value from this method does not guarantee that there is
+   * no such action - a concurrent thread may already be executing the same (shared) action. Any
+   * such race is resolved in the subsequent call to {@link #executeAction}.
+   */
   @Nullable
-  Pair<ActionLookupData, FutureTask<ActionExecutionValue>> probeActionExecution(Action action) {
+  ActionExecutionState probeActionExecution(Action action) {
     return buildActionMap.get(new OwnerlessArtifactWrapper(action.getPrimaryOutput()));
   }
 
@@ -492,13 +493,13 @@
     // TODO(b/19539699): This method is used only when the action cache is enabled. It is
     // incompatible with action rewinding, which removes entries from buildActionMap. Action
     // rewinding is used only with a disabled action cache.
-    Pair<ActionLookupData, ?> cachedRun =
+    ActionExecutionState cachedRun =
         Preconditions.checkNotNull(
             buildActionMap.get(new OwnerlessArtifactWrapper(action.getPrimaryOutput())),
             "%s %s",
             action,
             actionLookupData);
-    return actionLookupData.equals(cachedRun.getFirst());
+    return actionLookupData.equals(cachedRun.getActionLookupData());
   }
 
   void noteActionEvaluationStarted(ActionLookupData actionLookupData, Action action) {
@@ -512,65 +513,44 @@
    * <p>For use from {@link ArtifactFunction} only.
    */
   ActionExecutionValue executeAction(
-      ExtendedEventHandler eventHandler,
+      SkyFunction.Environment env,
       Action action,
       ActionMetadataHandler metadataHandler,
       long actionStartTime,
       ActionExecutionContext actionExecutionContext,
-      ActionLookupData actionLookupData,
-      @Nullable Pair<ActionLookupData, FutureTask<ActionExecutionValue>> previousAction)
+      ActionLookupData actionLookupData)
       throws ActionExecutionException, InterruptedException {
+    // ActionExecutionFunction may directly call into ActionExecutionState.getResultOrDependOnFuture
+    // if a shared action already passed these checks.
     Exception exception = badActionMap.get(action);
     if (exception != null) {
       // If action had a conflict with some other action in the graph, report it now.
-      reportError(exception.getMessage(), exception, action, null);
+      throw toActionExecutionException(exception.getMessage(), exception, action, null);
     }
-    FutureTask<ActionExecutionValue> actionTask =
-        new FutureTask<>(
-            new ActionRunner(
-                eventHandler,
-                action,
-                metadataHandler,
-                actionStartTime,
-                actionExecutionContext,
-                actionLookupData));
 
-    // Check one last time to see if another action is already executing/has executed this value.
-    Pair<ActionLookupData, FutureTask<ActionExecutionValue>> oldAction =
-        previousAction != null
-            ? previousAction
-            : buildActionMap.putIfAbsent(
-                new OwnerlessArtifactWrapper(action.getPrimaryOutput()),
-                Pair.of(actionLookupData, actionTask));
-
-    // true if this is a non-shared action or it's shared and to be executed.
-    boolean isPrimaryActionForTheValue = oldAction == null;
-
-    if (isPrimaryActionForTheValue) {
-      actionTask.run();
-    } else {
-      // Wait for other action to finish, so any actions that depend on its outputs can execute.
-      actionTask = oldAction.second;
-    }
-    try {
-      ActionExecutionValue value = actionTask.get();
-      return isPrimaryActionForTheValue
-          ? value
-          : value.transformForSharedAction(action.getOutputs());
-    } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause(),
-          ActionExecutionException.class, InterruptedException.class);
-      throw new IllegalStateException(e);
-    } finally {
-      String message = action.getProgressMessage();
-      if (message != null) {
-        // Tell the receiver that the action has completed *before* telling the reporter.
-        // This way the latter will correctly show the number of completed actions when task
-        // completion messages are enabled (--show_task_finish).
-        completionReceiver.actionCompleted(actionLookupData);
-        reporter.finishTask(null, prependExecPhaseStats(message));
+    if (actionCacheChecker.isActionExecutionProhibited(action)) {
+      // We can't execute an action (e.g. because --check_???_up_to_date option was used). Fail
+      // the build instead.
+      synchronized (reporter) {
+        TargetOutOfDateException e = new TargetOutOfDateException(action);
+        reporter.handle(Event.error(e.getMessage()));
+        recordExecutionError();
+        throw e;
       }
     }
+
+    // Use computeIfAbsent to handle concurrent attempts to execute the same shared action.
+    ActionExecutionState activeAction =
+        buildActionMap.computeIfAbsent(
+            new OwnerlessArtifactWrapper(action.getPrimaryOutput()),
+            (_unused_key) ->
+                new ActionExecutionState(
+                    action,
+                    metadataHandler,
+                    actionStartTime,
+                    actionExecutionContext,
+                    actionLookupData));
+    return activeAction.getResultOrDependOnFuture(env, actionLookupData, action);
   }
 
   /**
@@ -796,88 +776,264 @@
     this.actionInputPrefetcher = actionInputPrefetcher;
   }
 
-  private class ActionRunner implements Callable<ActionExecutionValue> {
-    private final ExtendedEventHandler eventHandler;
-    private final Action action;
-    private final ActionMetadataHandler metadataHandler;
-    private final long actionStartTime;
-    private final ActionExecutionContext actionExecutionContext;
+  /**
+   * A state machine representing the synchronous or asynchronous execution of an action. This is
+   * shared between all instances of the same shared action and must therefore be thread-safe. Note
+   * that only one caller will receive events and output for this action.
+   */
+  final class ActionExecutionState {
     private final ActionLookupData actionLookupData;
+    private ActionStepOrResult state;
 
-    ActionRunner(
-        ExtendedEventHandler eventHandler,
+    ActionExecutionState(
         Action action,
         ActionMetadataHandler metadataHandler,
         long actionStartTime,
         ActionExecutionContext actionExecutionContext,
         ActionLookupData actionLookupData) {
-      this.eventHandler = eventHandler;
+      this.actionLookupData = actionLookupData;
+      this.state =
+          new ActionRunner(
+              action, metadataHandler, actionStartTime, actionExecutionContext, actionLookupData);
+    }
+
+    public ActionLookupData getActionLookupData() {
+      return actionLookupData;
+    }
+
+    public ActionExecutionValue getResultOrDependOnFuture(
+        SkyFunction.Environment env, ActionLookupData actionLookupData, Action action)
+        throws ActionExecutionException, InterruptedException {
+      if (actionLookupData.equals(this.actionLookupData)) {
+        // This execution originally created this object, so we use it to run the state machine.
+        return runStateMachine(env);
+      }
+      // This is a shared action, and the executed action is owned by another Skyframe node. We do
+      // not attempt to make progress, but instead block waiting for the owner to complete the
+      // action. This is the same behavior as before this comment was added.
+      //
+      // When we async action execution we MUST also change this to async execution. Otherwise we
+      // can end up with a deadlock where all Skyframe threads are blocked here, and no thread is
+      // available to make progress on the original action.
+      synchronized (this) {
+        while (!state.isDone()) {
+          this.wait();
+        }
+        try {
+          return state.get().transformForSharedAction(action.getOutputs());
+        } finally {
+          if (action.getProgressMessage() != null) {
+            completionReceiver.actionCompleted(actionLookupData);
+          }
+        }
+      }
+    }
+
+    private synchronized ActionExecutionValue runStateMachine(SkyFunction.Environment env)
+        throws ActionExecutionException, InterruptedException {
+      while (!state.isDone()) {
+        // Run the state machine for one step; isDone returned false, so this is safe.
+        state = state.run(env);
+
+        // This method guarantees that it either blocks until the action is completed, or it
+        // registers a dependency on a ListenableFuture and returns null (it may only return null if
+        // valuesMissing returns true).
+        if (env.valuesMissing()) {
+          return null;
+        }
+      }
+      this.notifyAll();
+      // We're done, return the value to the caller (or throw an exception).
+      return state.get();
+    }
+  }
+
+  /**
+   * A state machine where instances of this interface either represent an intermediate state that
+   * requires more work to be done (possibly waiting for a ListenableFuture to complete) or the
+   * final result of the executed action (either an ActionExecutionValue or an Exception).
+   *
+   * <p>This design allows us to store the current state of the in-progress action execution using a
+   * single object reference.
+   */
+  private interface ActionStepOrResult {
+    /**
+     * Returns true if and only if the underlying action is complete, i.e., it is legal to call
+     * {@link #get}.
+     */
+    default boolean isDone() {
+      return true;
+    }
+
+    /**
+     * Returns the next state of the state machine after performing some work towards the end goal
+     * of executing the action. This must only be called if {@link #isDone} returns false, and must
+     * only be called by one thread at a time for the same instance.
+     */
+    default ActionStepOrResult run(SkyFunction.Environment env) {
+      throw new IllegalStateException();
+    }
+
+    /**
+     * Returns the final value of the action or an exception to indicate that the action failed (or
+     * the process was interrupted). This must only be called if {@link #isDone} returns true.
+     */
+    default ActionExecutionValue get() throws ActionExecutionException, InterruptedException {
+      throw new IllegalStateException();
+    }
+  }
+
+  /**
+   * Represents a finished action with a specific value. We specifically avoid anonymous inner
+   * classes to not accidentally retain a reference to the ActionRunner.
+   */
+  private static final class FinishedActionStepOrResult implements ActionStepOrResult {
+    private final ActionExecutionValue value;
+
+    FinishedActionStepOrResult(ActionExecutionValue value) {
+      this.value = value;
+    }
+
+    public ActionExecutionValue get() {
+      return value;
+    }
+  }
+
+  /**
+   * Represents a finished action with an exception. We specifically avoid anonymous inner classes
+   * to not accidentally retain a reference to the ActionRunner.
+   */
+  private static final class ExceptionalActionStepOrResult implements ActionStepOrResult {
+    private final Exception e;
+
+    ExceptionalActionStepOrResult(ActionExecutionException e) {
+      this.e = e;
+    }
+
+    ExceptionalActionStepOrResult(InterruptedException e) {
+      this.e = e;
+    }
+
+    public ActionExecutionValue get() throws ActionExecutionException, InterruptedException {
+      if (e instanceof InterruptedException) {
+        throw (InterruptedException) e;
+      }
+      throw (ActionExecutionException) e;
+    }
+  }
+
+  /** A local interface to unify immediate and deferred action execution code paths. */
+  private interface ActionClosure {
+    ActionResult execute() throws ActionExecutionException, InterruptedException;
+  }
+
+  /** Represents an action that needs to be run. */
+  private final class ActionRunner implements ActionStepOrResult {
+    private final Action action;
+    private final ActionMetadataHandler metadataHandler;
+    private final long actionStartTime;
+    private final ActionExecutionContext actionExecutionContext;
+    private final ActionLookupData actionLookupData;
+    private final ActionExecutionStatusReporter statusReporter;
+
+    ActionRunner(
+        Action action,
+        ActionMetadataHandler metadataHandler,
+        long actionStartTime,
+        ActionExecutionContext actionExecutionContext,
+        ActionLookupData actionLookupData) {
       this.action = action;
       this.metadataHandler = metadataHandler;
       this.actionStartTime = actionStartTime;
       this.actionExecutionContext = actionExecutionContext;
       this.actionLookupData = actionLookupData;
+      this.statusReporter = statusReporterRef.get();
     }
 
     @Override
-    public ActionExecutionValue call() throws ActionExecutionException, InterruptedException {
-      try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION, action.describe())) {
-        if (actionCacheChecker.isActionExecutionProhibited(action)) {
-          // We can't execute an action (e.g. because --check_???_up_to_date option was used). Fail
-          // the build instead.
-          synchronized (reporter) {
-            TargetOutOfDateException e = new TargetOutOfDateException(action);
-            reporter.handle(Event.error(e.getMessage()));
-            recordExecutionError();
-            throw e;
-          }
-        }
+    public boolean isDone() {
+      return false;
+    }
 
+    @Override
+    public ActionStepOrResult run(SkyFunction.Environment env) {
+      // There are three ExtendedEventHandler instances available while this method is running.
+      //   SkyframeActionExecutor.this.reporter
+      //   actionExecutionContext.getEventHandler
+      //   env.getListener
+      // Apparently, one isn't enough.
+      //
+      // At this time, ProgressLike events that are generated in this class should be posted to
+      // env.getListener, while ProgressLike events that are generated in the Action implementation
+      // are posted to actionExecutionContext.getEventHandler. The reason for this seems to be
+      // action rewinding, which suppresses progress on actionExecutionContext.getEventHandler, for
+      // undocumented reasons.
+      //
+      // It is also unclear why we are posting anything directly to reporter. That probably
+      // shouldn't happen.
+      try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION, action.describe())) {
         String message = action.getProgressMessage();
         if (message != null) {
           reporter.startTask(null, prependExecPhaseStats(message));
         }
 
-        ActionExecutionStatusReporter statusReporter = statusReporterRef.get();
-        LostInputsActionExecutionException lostInputsActionExecutionException = null;
         try {
+          // It is vital that updateStatus and remove are called in pairs. Unfortunately, if async
+          // action execution is enabled, we cannot use a simple finally block, but have to manually
+          // ensure that any code path that finishes the state machine also removes the action from
+          // the status reporter.
+          // To complicate things, the ActionCompletionEvent must _not_ be posted when this action
+          // is rewound.
+          // TODO(ulfjack): Change the uses of ActionStartedEvent and ActionCompletionEvent such
+          // that they can be reposted when rewinding and simplify this code path. Maybe also keep
+          // track of the rewind attempt, so that listeners can use that to adjust their behavior.
           statusReporter.updateStatus(ActionStatusMessage.preparingStrategy(action));
-          eventHandler.post(new ActionStartedEvent(action, actionStartTime));
-
+          env.getListener().post(new ActionStartedEvent(action, actionStartTime));
           Preconditions.checkState(
               actionExecutionContext.getMetadataHandler() == metadataHandler,
               "%s %s",
               actionExecutionContext.getMetadataHandler(),
               metadataHandler);
-          // Delete the outputs before executing the action, just to ensure that
-          // the action really does produce the outputs.
-          try {
-            if (!usesActionFileSystem()) {
+          if (!usesActionFileSystem()) {
+            try {
+              // This call generally deletes any files at locations that are declared outputs of the
+              // action, although some actions perform additional work, while others intentionally
+              // keep previous outputs in place.
               action.prepare(
                   actionExecutionContext.getFileSystem(), actionExecutionContext.getExecRoot());
-            } else {
-              setupActionFsFileOutErr(actionExecutionContext.getFileOutErr(), action);
+            } catch (IOException e) {
+              throw toActionExecutionException(
+                  "failed to delete output files before executing action", e, action, null);
             }
-            createOutputDirectories(action, actionExecutionContext);
-          } catch (IOException e) {
-            reportError("failed to delete output files before executing action", e, action, null);
+          } else {
+            // There's nothing to delete when the action file system is used, but we must ensure
+            // that the output directories for stdout and stderr exist.
+            setupActionFsFileOutErr(actionExecutionContext.getFileOutErr(), action);
           }
-
-          ActionResult actionResult = executeAction();
-          return completeAction(eventHandler, actionResult);
-        } catch (LostInputsActionExecutionException e) {
-          // If inputs are lost, then avoid publishing ActionCompletedEvent. Action rewinding will
-          // rerun this failed action after trying to regenerate the lost inputs. However, enrich
-          // the exception so that, if rewinding fails, an ActionCompletedEvent will be published.
-          e.setActionStartedEventAlreadyEmitted();
-          lostInputsActionExecutionException = e;
-          throw lostInputsActionExecutionException;
-        } finally {
-          statusReporter.remove(action);
-          if (lostInputsActionExecutionException == null) {
-            eventHandler.post(new ActionCompletionEvent(actionStartTime, action, actionLookupData));
-          }
+          createOutputDirectories(action, actionExecutionContext);
+        } catch (ActionExecutionException e) {
+          // This try-catch block cannot trigger rewinding, so it is safe to notify the status
+          // reporter and also post the ActionCompletionEvent.
+          notifyActionCompletion(env.getListener(), /*postActionCompletionEvent=*/ true);
+          return new ExceptionalActionStepOrResult(e);
         }
+        return completeAction(env.getListener(), () -> executeAction(env.getListener()));
+      }
+    }
+
+    private void notifyActionCompletion(
+        ExtendedEventHandler eventHandler, boolean postActionCompletionEvent) {
+      statusReporter.remove(action);
+      if (postActionCompletionEvent) {
+        eventHandler.post(new ActionCompletionEvent(actionStartTime, action, actionLookupData));
+      }
+      String message = action.getProgressMessage();
+      if (message != null) {
+        // Tell the receiver that the action has completed *before* telling the reporter.
+        // This way the latter will correctly show the number of completed actions when task
+        // completion messages are enabled (--show_task_finish).
+        completionReceiver.actionCompleted(actionLookupData);
+        reporter.finishTask(null, prependExecPhaseStats(message));
       }
     }
 
@@ -894,7 +1050,8 @@
      * @throws InterruptedException if the thread was interrupted.
      * @return true if the action output was dumped, false otherwise.
      */
-    private ActionResult executeAction() throws ActionExecutionException, InterruptedException {
+    private ActionResult executeAction(ExtendedEventHandler eventHandler)
+        throws ActionExecutionException, InterruptedException {
       // ActionExecutionExceptions that occur as the thread is interrupted are assumed to be a
       // result of that, so we throw InterruptedException instead.
       try (SilentCloseable c = profiler.profile(ProfilerTask.ACTION_EXECUTE, action.describe())) {
@@ -910,7 +1067,34 @@
       }
     }
 
-    private ActionExecutionValue completeAction(
+    private ActionStepOrResult completeAction(
+        ExtendedEventHandler eventHandler, ActionClosure actionClosure) {
+      LostInputsActionExecutionException lostInputsActionExecutionException = null;
+      try {
+        ActionResult actionResult;
+        try {
+          actionResult = actionClosure.execute();
+        } catch (LostInputsActionExecutionException e) {
+          // If inputs are lost, then avoid publishing ActionCompletedEvent. Action rewinding will
+          // rerun this failed action after trying to regenerate the lost inputs. However, enrich
+          // the exception so that, if rewinding fails, an ActionCompletedEvent will be published.
+          e.setActionStartedEventAlreadyEmitted();
+          lostInputsActionExecutionException = e;
+          throw lostInputsActionExecutionException;
+        } catch (InterruptedException e) {
+          return new ExceptionalActionStepOrResult(e);
+        }
+        return new FinishedActionStepOrResult(actuallyCompleteAction(eventHandler, actionResult));
+      } catch (ActionExecutionException e) {
+        return new ExceptionalActionStepOrResult(e);
+      } finally {
+        notifyActionCompletion(
+            eventHandler,
+            /*postActionCompletionEvent=*/ lostInputsActionExecutionException == null);
+      }
+    }
+
+    private ActionExecutionValue actuallyCompleteAction(
         ExtendedEventHandler eventHandler, ActionResult actionResult)
         throws ActionExecutionException {
       boolean outputAlreadyDumped = false;
@@ -939,7 +1123,10 @@
         try (SilentCloseable c =
             profiler.profile(ProfilerTask.ACTION_COMPLETE, action.describe())) {
           if (!checkOutputs(action, metadataHandler)) {
-            reportError("not all outputs were created or valid", null, action,
+            throw toActionExecutionException(
+                "not all outputs were created or valid",
+                null,
+                action,
                 outputAlreadyDumped ? null : fileOutErr);
           }
         }
@@ -948,10 +1135,11 @@
           try {
             outputService.finalizeAction(action, metadataHandler);
           } catch (EnvironmentalExecException | IOException e) {
-            reportError("unable to finalize action", e, action, fileOutErr);
+            throw toActionExecutionException("unable to finalize action", e, action, fileOutErr);
           }
         }
 
+        // Success in execution but failure in completion.
         reportActionExecution(
             eventHandler, primaryOutputPath, action, null, fileOutErr, ErrorTiming.NO_ERROR);
       } catch (ActionExecutionException actionException) {
@@ -1203,21 +1391,19 @@
   }
 
   /**
-   * Convenience function for reporting that the action failed due to a
-   * the exception cause, if there is an additional explanatory message that
-   * clarifies the message of the exception. Combines the user-provided message
-   * and the exceptions' message and reports the combination as error.
-   * Then, throws an ActionExecutionException with the reported error as
-   * message and the provided exception as the cause.
+   * Convenience function for creating an ActionExecutionException reporting that the action failed
+   * due to a the exception cause, if there is an additional explanatory message that clarifies the
+   * message of the exception. Combines the user-provided message and the exceptions' message and
+   * reports the combination as error.
    *
    * @param message A small text that explains why the action failed
    * @param cause The exception that caused the action to fail
    * @param action The action that failed
-   * @param actionOutput The output of the failed Action.
-   *     May be null, if there is no output to display
+   * @param actionOutput The output of the failed Action. May be null, if there is no output to
+   *     display
    */
-  private void reportError(String message, Throwable cause, Action action, FileOutErr actionOutput)
-      throws ActionExecutionException {
+  private ActionExecutionException toActionExecutionException(
+      String message, Throwable cause, Action action, FileOutErr actionOutput) {
     ActionExecutionException ex;
     if (cause == null) {
       ex = new ActionExecutionException(message, action, false);
@@ -1225,7 +1411,7 @@
       ex = new ActionExecutionException(message, cause, action, false);
     }
     printError(ex.getMessage(), action, actionOutput);
-    throw ex;
+    return ex;
   }
 
   /**