Resolve inputs that were stored in action cache ahead of time so metadata for them can be retrieved with other inputs, avoiding a restart.

This also allows us to delete the UndeclaredInputsHandler, since we eagerly cache metadata before entering any ActionCacheChecker methods.

This should also allow us to split FileAndMetadataCache into two separate classes, as well as asserting that SingleBuildFileCache only sees non-artifact ActionInputs. To be done in follow-ups.

--
MOS_MIGRATED_REVID=89718712
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ActionCacheChecker.java b/src/main/java/com/google/devtools/build/lib/actions/ActionCacheChecker.java
index e3a8ff8..004f2c3 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/ActionCacheChecker.java
+++ b/src/main/java/com/google/devtools/build/lib/actions/ActionCacheChecker.java
@@ -15,6 +15,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.actions.Action.MiddlemanType;
 import com.google.devtools.build.lib.actions.cache.ActionCache;
@@ -142,8 +143,8 @@
    */
   // Note: the handler should only be used for DEPCHECKER events; there's no
   // guarantee it will be available for other events.
-  public Token getTokenIfNeedToExecute(Action action, EventHandler handler,
-      MetadataHandler metadataHandler, PackageRootResolver resolver) {
+  public Token getTokenIfNeedToExecute(Action action, Iterable<Artifact> resolvedCacheArtifacts,
+      EventHandler handler, MetadataHandler metadataHandler) {
     // TODO(bazel-team): (2010) For RunfilesAction/SymlinkAction and similar actions that
     // produce only symlinks we should not check whether inputs are valid at all - all that matters
     // that inputs and outputs are still exist (and new inputs have not appeared). All other checks
@@ -160,17 +161,15 @@
       return null;
     }
     Iterable<Artifact> actionInputs = action.getInputs();
-    ActionCache.Entry entry = getCacheEntry(action);
     // Resolve action inputs from cache, if necessary.
     boolean inputsKnown = action.inputsKnown();
-    if (entry != null && !entry.isCorrupted() && !inputsKnown) {
-      Preconditions.checkState(action.discoversInputs(), action);
-      actionInputs = resolveCachedActionInputs(action, entry, resolver);
-      if (actionInputs == null) {
-        // Not all inputs could be resolved. Try again next time.
-        return Token.NEED_TO_RERUN;
-      }
+    if (!inputsKnown && resolvedCacheArtifacts != null) {
+      // The action doesn't know its inputs, but the caller has a good idea of what they are.
+      Preconditions.checkState(action.discoversInputs(),
+          "Actions that don't know their inputs must discover them: %s", action);
+      actionInputs = resolvedCacheArtifacts;
     }
+    ActionCache.Entry entry = getCacheEntry(action);
     if (mustExecute(action, entry, handler, metadataHandler, actionInputs)) {
       if (entry != null) {
         removeCacheEntry(action);
@@ -241,10 +240,12 @@
     actionCache.put(key, entry);
   }
 
-  private Iterable<Artifact> resolveCachedActionInputs(Action action, ActionCache.Entry entry,
-      PackageRootResolver resolver) {
-    Preconditions.checkNotNull(entry, action);
-    Preconditions.checkState(!entry.isCorrupted(), action);
+  @Nullable
+  public Iterable<Artifact> getCachedInputs(Action action, PackageRootResolver resolver) {
+    ActionCache.Entry entry = getCacheEntry(action);
+    if (entry == null || entry.isCorrupted()) {
+      return ImmutableList.of();
+    }
 
     List<PathFragment> outputs = new ArrayList<>();
     for (Artifact output : action.getOutputs()) {
@@ -353,7 +354,6 @@
 
   /** Wrapper for all context needed by the ActionCacheChecker to handle a single action. */
   public static final class Token {
-    public static final Token NEED_TO_RERUN = new Token("need to rerun");
     private final String cacheKey;
 
     private Token(String cacheKey) {
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java
index 02ef3b3..9861aa1 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/ActionExecutionFunction.java
@@ -15,7 +15,7 @@
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -30,7 +30,6 @@
 import com.google.devtools.build.lib.actions.NotifyOnActionCacheHit;
 import com.google.devtools.build.lib.actions.PackageRootResolver;
 import com.google.devtools.build.lib.actions.Root;
-import com.google.devtools.build.lib.actions.cache.MetadataHandler;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.packages.PackageIdentifier;
@@ -45,26 +44,32 @@
 import com.google.devtools.build.skyframe.ValueOrException2;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.annotation.Nullable;
+
 /**
- * A builder for {@link ActionExecutionValue}s.
+ * A {@link SkyFunction} that creates {@link ActionExecutionValue}s. There are four points where
+ * this function can abort due to missing values in the graph:
+ * <ol>
+ *   <li>For actions that discover inputs, if missing metadata needed to resolve an artifact from a
+ *   string input in the action cache.</li>
+ *   <li>If missing metadata for artifacts in inputs (including the artifacts above).</li>
+ *   <li>For actions that discover inputs, if missing metadata for inputs discovered prior to
+ *   execution.</li>
+ *   <li>For actions that discover inputs, but do so during execution, if missing metadata for
+ *   inputs discovered during execution.</li>
+ * </ol>
  */
 public class ActionExecutionFunction implements SkyFunction, CompletionReceiver {
-
-  private static final Predicate<Artifact> IS_SOURCE_ARTIFACT = new Predicate<Artifact>() {
-    @Override
-    public boolean apply(Artifact input) {
-      return input.isSourceArtifact();
-    }
-  };
-
   private final SkyframeActionExecutor skyframeActionExecutor;
   private final TimestampGranularityMonitor tsgm;
   private ConcurrentMap<Action, ContinuationState> stateMap;
@@ -80,20 +85,6 @@
   public SkyValue compute(SkyKey skyKey, Environment env) throws ActionExecutionFunctionException,
       InterruptedException {
     Action action = (Action) skyKey.argument();
-    Map<Artifact, FileArtifactValue> inputArtifactData = null;
-    Map<Artifact, Collection<Artifact>> expandedMiddlemen = null;
-    boolean alreadyRan = skyframeActionExecutor.probeActionExecution(action);
-    try {
-      Pair<Map<Artifact, FileArtifactValue>, Map<Artifact, Collection<Artifact>>> checkedInputs =
-          checkInputs(env, action, alreadyRan); // Declare deps on known inputs to action.
-
-      if (checkedInputs != null) {
-        inputArtifactData = checkedInputs.first;
-        expandedMiddlemen = checkedInputs.second;
-      }
-    } catch (ActionExecutionException e) {
-      throw new ActionExecutionFunctionException(e);
-    }
     // TODO(bazel-team): Non-volatile NotifyOnActionCacheHit actions perform worse in Skyframe than
     // legacy when they are not at the top of the action graph. In legacy, they are stored
     // separately, so notifying non-dirty actions is cheap. In Skyframe, they depend on the
@@ -103,14 +94,66 @@
       // Depending on the buildID ensure that these actions have a chance to execute.
       PrecomputedValue.BUILD_ID.get(env);
     }
+    // For restarts of this ActionExecutionFunction we use a ContinuationState variable, below, to
+    // avoid redoing work. However, if two actions are shared and the first one executes, when the
+    // second one goes to execute, we should detect that and short-circuit, even without taking
+    // ContinuationState into account.
+    boolean sharedActionAlreadyRan = skyframeActionExecutor.probeActionExecution(action);
+    ContinuationState state;
+    if (action.discoversInputs()) {
+      state = getState(action);
+    } else {
+      // Because this is a new state, all conditionals below about whether state has already done
+      // something will return false, and so we will execute all necessary steps.
+      state = new ContinuationState();
+    }
+    if (!state.hasCollectedInputs()) {
+      state.allInputs = collectInputs(action, env);
+      if (state.allInputs == null) {
+        // Missing deps.
+        return null;
+      }
+    } else if (state.allInputs.keysRequested != null) {
+      // Preserve the invariant that we ask for the same deps each build.
+      env.getValues(state.allInputs.keysRequested);
+      Preconditions.checkState(!env.valuesMissing(), "%s %s", action, state);
+    }
+    Pair<Map<Artifact, FileArtifactValue>, Map<Artifact, Collection<Artifact>>> checkedInputs =
+        null;
+    try {
+      // Declare deps on known inputs to action. We do this unconditionally to maintain our
+      // invariant of asking for the same deps each build.
+      Map<SkyKey, ValueOrException2<MissingInputFileException, ActionExecutionException>> inputDeps
+          = env.getValuesOrThrow(toKeys(state.allInputs.getAllInputs(),
+                  action.discoversInputs() ? action.getMandatoryInputs() : null),
+              MissingInputFileException.class, ActionExecutionException.class);
+
+      if (!sharedActionAlreadyRan && !state.hasArtifactData()) {
+        // Do we actually need to find our metadata?
+        checkedInputs = checkInputs(env, action, inputDeps);
+      }
+    } catch (ActionExecutionException e) {
+      // Remove action from state map in case it's there (won't be unless it discovers inputs).
+      stateMap.remove(action);
+      throw new ActionExecutionFunctionException(e);
+    }
     if (env.valuesMissing()) {
+      // There was missing artifact metadata in the graph. Wait for it to be present.
       return null;
     }
 
+    if (checkedInputs != null) {
+      Preconditions.checkState(!state.hasArtifactData(), "%s %s", state, action);
+      state.inputArtifactData = checkedInputs.first;
+      state.expandedMiddlemen = checkedInputs.second;
+    }
+
     ActionExecutionValue result;
     try {
-      result = checkCacheAndExecuteIfNeeded(action, inputArtifactData, expandedMiddlemen, env);
+      result = checkCacheAndExecuteIfNeeded(action, state, env);
     } catch (ActionExecutionException e) {
+      // Remove action from state map in case it's there (won't be unless it discovers inputs).
+      stateMap.remove(action);
       // In this case we do not report the error to the action reporter because we have already
       // done it in SkyframeExecutor.reportErrorIfNotAbortingMode() method. That method
       // prints the error in the top-level reporter and also dumps the recorded StdErr for the
@@ -119,17 +162,70 @@
     }
 
     if (env.valuesMissing()) {
+      Preconditions.checkState(stateMap.containsKey(action), action);
       return null;
     }
 
+    // Remove action from state map in case it's there (won't be unless it discovers inputs).
+    stateMap.remove(action);
     return result;
   }
-  
+
+  /**
+   * An action's inputs needed for execution. May not just be the result of Action#getInputs(). If
+   * the action cache's view of this action contains additional inputs, it will request metadata for
+   * them, so we consider those inputs as dependencies of this action as well. Returns null if some
+   * dependencies were missing and this ActionExecutionFunction needs to restart.
+   */
+  @Nullable
+  private AllInputs collectInputs(Action action, Environment env) {
+    if (action.inputsKnown()) {
+      return new AllInputs(action.getInputs());
+    }
+    Preconditions.checkState(action.discoversInputs(), action);
+    PackageRootResolverWithEnvironment resolver = new PackageRootResolverWithEnvironment(env);
+    Iterable<Artifact> actionCacheInputs =
+        skyframeActionExecutor.getActionCachedInputs(action, resolver);
+    if (actionCacheInputs == null) {
+      Preconditions.checkState(env.valuesMissing(), action);
+      return null;
+    }
+    return new AllInputs(action.getInputs(), actionCacheInputs, resolver.keysRequested);
+  }
+
+  private static class AllInputs {
+    final Iterable<Artifact> defaultInputs;
+    @Nullable
+    final Iterable<Artifact> actionCacheInputs;
+    @Nullable
+    final List<SkyKey> keysRequested;
+
+    AllInputs(Iterable<Artifact> defaultInputs) {
+      this.defaultInputs = Preconditions.checkNotNull(defaultInputs);
+      this.actionCacheInputs = null;
+      this.keysRequested = null;
+    }
+
+    AllInputs(Iterable<Artifact> defaultInputs, Iterable<Artifact> actionCacheInputs,
+        List<SkyKey> keysRequested) {
+      this.defaultInputs = Preconditions.checkNotNull(defaultInputs);
+      this.actionCacheInputs = Preconditions.checkNotNull(actionCacheInputs);
+      this.keysRequested = keysRequested;
+    }
+
+    Iterable<Artifact> getAllInputs() {
+      return actionCacheInputs == null
+          ? defaultInputs
+          : Iterables.concat(defaultInputs, actionCacheInputs);
+    }
+  }
+
   /**
    * Skyframe implementation of {@link PackageRootResolver}. Should be used only from SkyFunctions,
    * because it uses SkyFunction.Environment for evaluation of ContainingPackageLookupValue.
    */
   private static class PackageRootResolverWithEnvironment implements PackageRootResolver {
+    final List<SkyKey> keysRequested = new ArrayList<>();
     private final Environment env;
 
     public PackageRootResolverWithEnvironment(Environment env) {
@@ -138,11 +234,15 @@
 
     @Override
     public Map<PathFragment, Root> findPackageRoots(Iterable<PathFragment> execPaths) {
-      Map<PathFragment, SkyKey> depKeys = new HashMap<>(); 
+      Preconditions.checkState(keysRequested.isEmpty(),
+          "resolver should only be called once: %s %s", keysRequested, execPaths);
+      Map<PathFragment, SkyKey> depKeys = new HashMap<>();
       // Create SkyKeys list based on execPaths.
       for (PathFragment path : execPaths) {
-        depKeys.put(path,
-            ContainingPackageLookupValue.key(PackageIdentifier.createInDefaultRepo(path)));
+        SkyKey depKey =
+            ContainingPackageLookupValue.key(PackageIdentifier.createInDefaultRepo(path));
+        depKeys.put(path, depKey);
+        keysRequested.add(depKey);
       }
       Map<SkyKey, SkyValue> values = env.getValues(depKeys.values());
       if (env.valuesMissing()) {
@@ -170,54 +270,28 @@
 
   private ActionExecutionValue checkCacheAndExecuteIfNeeded(
       Action action,
-      Map<Artifact, FileArtifactValue> inputArtifactData,
-      Map<Artifact, Collection<Artifact>> expandedMiddlemen,
+      ContinuationState state,
       Environment env) throws ActionExecutionException, InterruptedException {
-    // If this is the second time we are here (because the action discovers inputs, and we had
-    // to restart the value builder after declaring our dependence on newly discovered inputs),
-    // the result returned here is the already-computed result from the first run.
-    // Similarly, if this is a shared action and the other action is the one that executed, we
-    // must use that other action's value, provided here, since it is populated with metadata
-    // for the outputs.
-    if (inputArtifactData == null) {
+    // If this is a shared action and the other action is the one that executed, we must use that
+    // other action's value, provided here, since it is populated with metadata for the outputs.
+    if (!state.hasArtifactData()) {
       return skyframeActionExecutor.executeAction(action, null, -1, null);
     }
-    ContinuationState state;
-    if (action.discoversInputs()) {
-      state = getState(action);
-    } else {
-      state = new ContinuationState();
-    }
     // This may be recreated if we discover inputs.
     FileAndMetadataCache fileAndMetadataCache = new FileAndMetadataCache(
-          inputArtifactData,
-          expandedMiddlemen,
+          state.inputArtifactData,
+          state.expandedMiddlemen,
           skyframeActionExecutor.getExecRoot(),
           action.getOutputs(),
-          // Only give the metadata cache the ability to look up Skyframe values if the action
-          // might have undeclared inputs. If those undeclared inputs are generated, they are
-          // present in Skyframe, so we can save a stat by looking them up directly.
-          action.discoversInputs() ? env : null,
-          tsgm);
-    MetadataHandler metadataHandler =
-          skyframeActionExecutor.constructMetadataHandler(fileAndMetadataCache);
+        tsgm);
     long actionStartTime = System.nanoTime();
     // We only need to check the action cache if we haven't done it on a previous run.
-    if (!state.hasDiscoveredInputs()) {
-      Token token = skyframeActionExecutor.checkActionCache(action, metadataHandler,
-          new PackageRootResolverWithEnvironment(env), actionStartTime);
-      if (token == Token.NEED_TO_RERUN) {
-        // Sadly, there is no state that we can preserve here across this restart.
-        return null;
-      }
-      state.token = token;
+    if (!state.hasCheckedActionCache()) {
+      state.token = skyframeActionExecutor.checkActionCache(action, fileAndMetadataCache,
+          actionStartTime, state.allInputs.actionCacheInputs);
     }
 
     if (state.token == null) {
-      if (action.discoversInputs()) {
-        // Action may have had its inputs updated. Keep track of those new inputs.
-        declareAdditionalDependencies(env, action);
-      }
       // We got a hit from the action cache -- no need to execute.
       return new ActionExecutionValue(
           fileAndMetadataCache.getOutputData(),
@@ -226,11 +300,9 @@
 
     // This may be recreated if we discover inputs.
     ActionExecutionContext actionExecutionContext =
-        skyframeActionExecutor.constructActionExecutionContext(fileAndMetadataCache,
-            metadataHandler);
+        skyframeActionExecutor.constructActionExecutionContext(fileAndMetadataCache);
     boolean inputsDiscoveredDuringActionExecution = false;
-    ActionExecutionValue result;
-    Token token;
+    Map<Artifact, FileArtifactValue> metadataFoundDuringActionExecution = null;
     try {
       if (action.discoversInputs()) {
         if (!state.hasDiscoveredInputs()) {
@@ -242,35 +314,31 @@
             inputsDiscoveredDuringActionExecution = true;
           }
         }
+        // state.discoveredInputs can be null even after include scanning if action discovers them
+        // during execution.
         if (state.discoveredInputs != null
-            && !inputArtifactData.keySet().containsAll(state.discoveredInputs)) {
-          inputArtifactData = addDiscoveredInputs(inputArtifactData, state.discoveredInputs,
-              env);
+            && !state.inputArtifactData.keySet().containsAll(state.discoveredInputs)) {
+          Map<Artifact, FileArtifactValue> inputArtifactData =
+              addDiscoveredInputs(state.inputArtifactData, state.discoveredInputs, env);
           if (env.valuesMissing()) {
-            // This is the only place that we actually preserve meaningful state across restarts.
             return null;
           }
+          state.inputArtifactData = inputArtifactData;
           fileAndMetadataCache = new FileAndMetadataCache(
-              inputArtifactData,
-              expandedMiddlemen,
+              state.inputArtifactData,
+              state.expandedMiddlemen,
               skyframeActionExecutor.getExecRoot(),
               action.getOutputs(),
-              null,
               tsgm
           );
-          actionExecutionContext = skyframeActionExecutor.constructActionExecutionContext(
-              fileAndMetadataCache, fileAndMetadataCache);
+          actionExecutionContext =
+              skyframeActionExecutor.constructActionExecutionContext(fileAndMetadataCache);
         }
       }
-      // Clear state before actual execution of action. It will never be needed again because
-      // skyframeActionExecutor is guaranteed to have a result after this.
-      token = state.token;
-      if (action.discoversInputs()) {
-        removeState(action);
+      if (!state.hasExecutedAction()) {
+        state.value = skyframeActionExecutor.executeAction(action,
+            fileAndMetadataCache, actionStartTime, actionExecutionContext);
       }
-      state = null;
-      result = skyframeActionExecutor.executeAction(action,
-          fileAndMetadataCache, actionStartTime, actionExecutionContext);
     } finally {
       try {
         actionExecutionContext.getFileOutErr().close();
@@ -278,11 +346,32 @@
         // Nothing we can do here.
       }
       if (inputsDiscoveredDuringActionExecution) {
-        declareAdditionalDependencies(env, action);
+        metadataFoundDuringActionExecution =
+            declareAdditionalDependencies(env, action, state.inputArtifactData.keySet());
+        state.discoveredInputs = metadataFoundDuringActionExecution.keySet();
       }
     }
-    skyframeActionExecutor.afterExecution(action, fileAndMetadataCache, token);
-    return result;
+    if (env.valuesMissing()) {
+      return null;
+    }
+    if (inputsDiscoveredDuringActionExecution && !metadataFoundDuringActionExecution.isEmpty()) {
+      // We are in the interesting case of an action that discovered its inputs during execution,
+      // and found some new ones, but the new ones were already present in the graph. We must
+      // therefore cache the metadata for those new ones.
+      Map<Artifact, FileArtifactValue> inputArtifactData = new HashMap<>();
+      inputArtifactData.putAll(state.inputArtifactData);
+      inputArtifactData.putAll(metadataFoundDuringActionExecution);
+      state.inputArtifactData = inputArtifactData;
+      fileAndMetadataCache = new FileAndMetadataCache(
+          state.inputArtifactData,
+          state.expandedMiddlemen,
+          skyframeActionExecutor.getExecRoot(),
+          action.getOutputs(),
+          tsgm
+      );
+    }
+    skyframeActionExecutor.afterExecution(action, fileAndMetadataCache, state.token);
+    return state.value;
   }
 
   private static Map<Artifact, FileArtifactValue> addDiscoveredInputs(
@@ -307,11 +396,7 @@
     if (env.valuesMissing()) {
       return null;
     }
-    for (Map.Entry<SkyKey, SkyValue> depsEntry : data.entrySet()) {
-      Artifact input = ArtifactValue.artifact(depsEntry.getKey());
-      result.put(input,
-          Preconditions.checkNotNull((FileArtifactValue) depsEntry.getValue(), input));
-    }
+    result.putAll(transformArtifactMetadata(data));
     return result;
   }
 
@@ -341,18 +426,9 @@
    * missing. Some inputs may not yet be in the graph, in which case the builder should abort.
    */
   private Pair<Map<Artifact, FileArtifactValue>, Map<Artifact, Collection<Artifact>>> checkInputs(
-      Environment env, Action action, boolean alreadyRan) throws ActionExecutionException {
-    Map<SkyKey, ValueOrException2<MissingInputFileException, ActionExecutionException>> inputDeps =
-        env.getValuesOrThrow(toKeys(action.getInputs(), action.discoversInputs()
-            ? action.getMandatoryInputs() : null), MissingInputFileException.class,
-            ActionExecutionException.class);
-
-    // If the action was already run, then break out early. This avoids the cost of constructing the
-    // input map and expanded middlemen if they're not going to be used.
-    if (alreadyRan) {
-      return null;
-    }
-
+      Environment env, Action action,
+      Map<SkyKey, ValueOrException2<MissingInputFileException, ActionExecutionException>> inputDeps)
+      throws ActionExecutionException {
     int missingCount = 0;
     int actionFailures = 0;
     boolean catastrophe = false;
@@ -426,12 +502,27 @@
         Collections.unmodifiableMap(expandedMiddlemen));
   }
 
-  private static void declareAdditionalDependencies(Environment env, Action action) {
-    if (action.discoversInputs()) {
-      // TODO(bazel-team): Should this be all inputs, or just source files?
-      env.getValues(toKeys(Iterables.filter(action.getInputs(), IS_SOURCE_ARTIFACT),
-          action.getMandatoryInputs()));
+  /**
+   * Returns a map of artifact to artifact metadata for any of {@code action}s inputs that are not
+   * already in {@code knownInputs}. If some metadata was not available yet, the artifact is still
+   * present in the map but with null metadata.
+   */
+  private static Map<Artifact, FileArtifactValue> declareAdditionalDependencies(Environment env,
+      Action action, Set<Artifact> knownInputs) {
+    Preconditions.checkState(action.discoversInputs(), action);
+    Iterable<Artifact> newArtifacts =
+        Iterables.filter(action.getInputs(), Predicates.not(Predicates.in(knownInputs)));
+    return transformArtifactMetadata(
+        env.getValues(toKeys(newArtifacts, action.getMandatoryInputs())));
+  }
+
+  private static Map<Artifact, FileArtifactValue> transformArtifactMetadata(
+      Map<SkyKey, SkyValue> map) {
+    Map<Artifact, FileArtifactValue> result = new HashMap<>(); // May contain null values.
+    for (Map.Entry<SkyKey, SkyValue> entry : map.entrySet()) {
+      result.put(ArtifactValue.artifact(entry.getKey()), (FileArtifactValue) entry.getValue());
     }
+    return result;
   }
 
   /**
@@ -462,29 +553,61 @@
     return state;
   }
 
-  private void removeState(Action action) {
-    Preconditions.checkNotNull(stateMap.remove(action), action);
-  }
-
   /**
-   * State to save work across restarts of ActionExecutionFunction due to missing discovered inputs.
+   * State to save work across restarts of ActionExecutionFunction due to missing values in the
+   * graph for actions that discover inputs. There are three places where we save work, all for
+   * actions that discover inputs:
+   * <ol>
+   *   <li>If not all known input metadata (coming from Action#getInputs) is available yet, then the
+   *   calculated set of inputs (including the inputs resolved from the action cache) is saved.</li>
+   *   <li>If not all discovered inputs' metadata is available yet, then the known input metadata
+   *   together with the set of discovered inputs is saved, as well as the Token used to identify
+   *   this action to the action cache.</li>
+   *   <li>If, after execution, new inputs are discovered whose metadata is not yet available, then
+   *   the same data as in the previous case is saved, along with the actual result of execution.
+   *   </li>
+   * </ol>
    */
   private static class ContinuationState {
+    AllInputs allInputs;
+    Map<Artifact, FileArtifactValue> inputArtifactData = null;
+    Map<Artifact, Collection<Artifact>> expandedMiddlemen = null;
     Token token = null;
     Collection<Artifact> discoveredInputs = null;
+    ActionExecutionValue value = null;
+
+    boolean hasCollectedInputs() {
+      return allInputs != null;
+    }
+
+    boolean hasArtifactData() {
+      boolean result = inputArtifactData != null;
+      Preconditions.checkState(result == (expandedMiddlemen != null), this);
+      return result;
+    }
 
     // This will always be false for actions that don't discover their inputs, but we never restart
     // those actions in any case. For actions that do discover their inputs, they either discover
     // them before execution, in which case discoveredInputs will be non-null if that has already
-    // happened, or after execution, in which case they returned null when Action#discoverInputs()
-    // was called, and won't restart due to missing dependencies before execution.
+    // happened, or after execution, in which case we set discoveredInputs then.
     boolean hasDiscoveredInputs() {
       return discoveredInputs != null;
     }
 
+    boolean hasCheckedActionCache() {
+      // If token is null because there was an action cache hit, this method is never called again
+      // because we return immediately.
+      return token != null;
+    }
+
+    boolean hasExecutedAction() {
+      return value != null;
+    }
+
     @Override
     public String toString() {
-      return token + ", " + discoveredInputs;
+      return token + ", " + value + ", " + allInputs + ", " + inputArtifactData + ", "
+          + discoveredInputs;
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/FileAndMetadataCache.java b/src/main/java/com/google/devtools/build/lib/skyframe/FileAndMetadataCache.java
index 74a5796..b9b1284 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/FileAndMetadataCache.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/FileAndMetadataCache.java
@@ -35,7 +35,6 @@
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.RootedPath;
 import com.google.devtools.build.lib.vfs.Symlinks;
-import com.google.devtools.build.skyframe.SkyFunction;
 import com.google.protobuf.ByteString;
 
 import java.io.File;
@@ -82,20 +81,19 @@
       new ConcurrentHashMap<>();
   private final Set<Artifact> injectedArtifacts = Sets.newConcurrentHashSet();
   private final ImmutableSet<Artifact> outputs;
-  @Nullable private final SkyFunction.Environment env;
   private final TimestampGranularityMonitor tsgm;
 
   private static final Interner<ByteString> BYTE_INTERNER = Interners.newWeakInterner();
 
+  @VisibleForTesting
   public FileAndMetadataCache(Map<Artifact, FileArtifactValue> inputArtifactData,
       Map<Artifact, Collection<Artifact>> expandedInputMiddlemen, File execRoot,
-      Iterable<Artifact> outputs, @Nullable SkyFunction.Environment env,
+      Iterable<Artifact> outputs,
       TimestampGranularityMonitor tsgm) {
     this.inputArtifactData = Preconditions.checkNotNull(inputArtifactData);
     this.expandedInputMiddlemen = Preconditions.checkNotNull(expandedInputMiddlemen);
     this.execRoot = Preconditions.checkNotNull(execRoot);
     this.outputs = ImmutableSet.copyOf(outputs);
-    this.env = env;
     this.tsgm = tsgm;
   }
 
@@ -131,47 +129,10 @@
 
   @Nullable
   private FileArtifactValue getInputFileArtifactValue(ActionInput input) {
-    FileArtifactValue value = inputArtifactData.get(input);
-    if (value != null) {
-      return value;
-    }
-    if (outputs.contains(input)) {
-      // When this method is called to calculate the metadata of an artifact, the artifact may be an
-      // output artifact. Don't try to do anything then.
+    if (outputs.contains(input) || !(input instanceof Artifact)) {
       return null;
     }
-    if (!(input instanceof Artifact)) {
-      // Maybe we're being asked for some strange constructed ActionInput coming from runfiles or
-      // similar. We have no information about such things.
-      return null;
-    }
-    // TODO(bazel-team): Remove this codepath once Skyframe has native input discovery, so all
-    // inputs will already have metadata known.
-    // ActionExecutionFunction may have passed in null environment if this action does not
-    // discover inputs. In which case we should not have gotten here.
-    Preconditions.checkNotNull(env, input);
-    Artifact artifact = (Artifact) input;
-    if (artifact.isSourceArtifact()) {
-      // We might have no artifact data for discovered source inputs, and it's not worth storing
-      // it in this cache, because it won't be reused across actions -- while we could request an
-      // artifact from the graph, we would have to be tolerant to it not yet being present in the
-      // graph yet, which adds complexity. Instead, we let the undeclared inputs handler stat it, so
-      // it can be reused.
-      return null;
-    } else {
-      // This getValue call is not expected to return null, because if the artifact is a
-      // transitive dependency of this action (as it should be), it will already have been built,
-      // so this call will return a value.
-      // This getValue call is theoretically less efficient for a subsequent incremental build
-      // than it would be to do a bulk getValues call after action execution, as is done for
-      // source inputs. However, since almost all nodes requested here are transitive deps of an
-      // already-declared dependency, change pruning will request these values serially, but they
-      // will already have been built. So the only penalty is restarting ParallelEvaluator#run, as
-      // opposed to traversing the entire downward transitive closure on a single thread.
-      value = (FileArtifactValue) env.getValue(
-          FileArtifactValue.key(artifact, /*argument ignored for derived artifacts*/false));
-      return value;
-    }
+    return Preconditions.checkNotNull(inputArtifactData.get(input), input);
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
index b0a2cfb..4dc1478 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java
@@ -50,9 +50,6 @@
 import com.google.devtools.build.lib.actions.ResourceManager;
 import com.google.devtools.build.lib.actions.ResourceSet;
 import com.google.devtools.build.lib.actions.TargetOutOfDateException;
-import com.google.devtools.build.lib.actions.cache.Digest;
-import com.google.devtools.build.lib.actions.cache.DigestUtils;
-import com.google.devtools.build.lib.actions.cache.Metadata;
 import com.google.devtools.build.lib.actions.cache.MetadataHandler;
 import com.google.devtools.build.lib.concurrent.ExecutorShutdownUtil;
 import com.google.devtools.build.lib.concurrent.Sharder;
@@ -65,7 +62,6 @@
 import com.google.devtools.build.lib.util.Pair;
 import com.google.devtools.build.lib.util.io.FileOutErr;
 import com.google.devtools.build.lib.util.io.OutErr;
-import com.google.devtools.build.lib.vfs.FileStatus;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.Symlinks;
@@ -104,7 +100,6 @@
   private Executor executorEngine;
   private ActionLogBufferPathGenerator actionLogBufferPathGenerator;
   private ActionCacheChecker actionCacheChecker;
-  private ConcurrentMap<Artifact, Metadata> undeclaredInputsMetadata = new ConcurrentHashMap<>();
   private final Profiler profiler = Profiler.instance();
   private boolean explain;
 
@@ -186,102 +181,6 @@
   }
 
   /**
-   * Basic implementation of {@link MetadataHandler} that delegates to Skyframe for metadata and
-   * caches missing source artifacts (which must be undeclared inputs: discovered headers) to avoid
-   * excessive filesystem access. The discovered-header cache is available across actions.
-   */
-  // TODO(bazel-team): remove when include scanning is skyframe-native.
-  private static class UndeclaredInputHandler implements MetadataHandler {
-    private final ConcurrentMap<Artifact, Metadata> undeclaredInputsMetadata;
-    private final MetadataHandler perActionHandler;
-
-    UndeclaredInputHandler(MetadataHandler perActionHandler,
-        ConcurrentMap<Artifact, Metadata> undeclaredInputsMetadata) {
-      // Shared across all UndeclaredInputHandlers in this build.
-      this.undeclaredInputsMetadata = undeclaredInputsMetadata;
-      this.perActionHandler = perActionHandler;
-    }
-
-    @Override
-    public Metadata getMetadataMaybe(Artifact artifact) {
-      try {
-        return getMetadata(artifact);
-      } catch (IOException e) {
-        return null;
-      }
-    }
-
-    @Override
-    public Metadata getMetadata(Artifact artifact) throws IOException {
-      Metadata metadata = perActionHandler.getMetadata(artifact);
-      if (metadata != null) {
-        return metadata;
-      }
-      // Skyframe stats all generated artifacts, because either they are outputs of the action being
-      // executed or they are generated files already present in the graph.
-      Preconditions.checkState(artifact.isSourceArtifact(), artifact);
-      metadata = undeclaredInputsMetadata.get(artifact);
-      if (metadata != null) {
-        return metadata;
-      }
-      FileStatus stat = artifact.getPath().stat();
-      if (DigestUtils.useFileDigest(artifact, stat.isFile(), stat.getSize())) {
-        metadata = new Metadata(Preconditions.checkNotNull(
-            DigestUtils.getDigestOrFail(artifact.getPath(), stat.getSize()), artifact));
-      } else {
-        metadata = new Metadata(stat.getLastModifiedTime());
-      }
-      // Cache for other actions that may also include without declaring.
-      Metadata oldMetadata = undeclaredInputsMetadata.put(artifact, metadata);
-      FileAndMetadataCache.checkInconsistentData(artifact, oldMetadata, metadata);
-      return metadata;
-    }
-
-    @Override
-    public void setDigestForVirtualArtifact(Artifact artifact, Digest digest) {
-      perActionHandler.setDigestForVirtualArtifact(artifact, digest);
-    }
-
-    @Override
-    public void injectDigest(ActionInput output, FileStatus statNoFollow, byte[] digest) {
-      perActionHandler.injectDigest(output, statNoFollow, digest);
-    }
-
-    @Override
-    public boolean artifactExists(Artifact artifact) {
-      return perActionHandler.artifactExists(artifact);
-    }
-
-    @Override
-    public boolean isRegularFile(Artifact artifact) {
-      return perActionHandler.isRegularFile(artifact);
-    }
-
-    @Override
-    public boolean isInjected(Artifact artifact) throws IOException {
-      return perActionHandler.isInjected(artifact);
-    }
-
-    @Override
-    public void discardMetadata(Collection<Artifact> artifactList) {
-      // This input handler only caches undeclared inputs, which never need to be discarded
-      // intra-build.
-      perActionHandler.discardMetadata(artifactList);
-    }
-
-    @Override
-    public void markOmitted(ActionInput output) {
-      perActionHandler.markOmitted(output);
-    }
-
-    @Override
-    public boolean artifactOmitted(Artifact artifact) {
-      return perActionHandler.artifactOmitted(artifact);
-    }
-
-  }
-
-  /**
    * Find conflicts between generated artifacts. There are two ways to have conflicts. First, if
    * two (unshareable) actions generate the same output artifact, this will result in an {@link
    * ActionConflictException}. Second, if one action generates an artifact whose path is a prefix of
@@ -440,7 +339,6 @@
     this.hadExecutionError = false;
     this.actionCacheChecker = Preconditions.checkNotNull(actionCacheChecker);
     // Don't cache possibly stale data from the last build.
-    undeclaredInputsMetadata = new ConcurrentHashMap<>();
     this.explain = explain;
   }
 
@@ -525,14 +423,14 @@
    * pass the returned context to {@link #executeAction}, and any other method that needs to execute
    * tasks related to that action.
    */
-  ActionExecutionContext constructActionExecutionContext(final FileAndMetadataCache graphFileCache,
-      MetadataHandler metadataHandler) {
+  ActionExecutionContext constructActionExecutionContext(
+      final FileAndMetadataCache graphFileCache) {
     // TODO(bazel-team): this should be closed explicitly somewhere.
     FileOutErr fileOutErr = actionLogBufferPathGenerator.generate();
     return new ActionExecutionContext(
         executorEngine,
         new DelegatingPairFileCache(graphFileCache, perBuildFileCache),
-        metadataHandler,
+        graphFileCache,
         fileOutErr,
         new MiddlemanExpander() {
           @Override
@@ -550,24 +448,16 @@
   }
 
   /**
-   * Returns a MetadataHandler for use when executing a particular action. The caller can pass the
-   * returned handler in whenever a MetadataHandler is needed in the course of executing the action.
-   */
-  MetadataHandler constructMetadataHandler(MetadataHandler graphFileCache) {
-    return new UndeclaredInputHandler(graphFileCache, undeclaredInputsMetadata);
-  }
-
-  /**
    * Checks the action cache to see if {@code action} needs to be executed, or is up to date.
    * Returns a token with the semantics of {@link ActionCacheChecker#getTokenIfNeedToExecute}: null
    * if the action is up to date, and non-null if it needs to be executed, in which case that token
    * should be provided to the ActionCacheChecker after execution.
    */
   Token checkActionCache(Action action, MetadataHandler metadataHandler,
-      PackageRootResolver resolver, long actionStartTime) {
+      long actionStartTime, Iterable<Artifact> resolvedCacheArtifacts) {
     profiler.startTask(ProfilerTask.ACTION_CHECK, action);
     Token token = actionCacheChecker.getTokenIfNeedToExecute(
-        action, explain ? reporter : null, metadataHandler, resolver);
+        action, resolvedCacheArtifacts, explain ? reporter : null, metadataHandler);
     profiler.completeTask(ProfilerTask.ACTION_CHECK);
     if (token == null) {
       boolean eventPosted = false;
@@ -603,6 +493,11 @@
     }
   }
 
+  @Nullable
+  Iterable<Artifact> getActionCachedInputs(Action action, PackageRootResolver resolver) {
+    return actionCacheChecker.getCachedInputs(action, resolver);
+  }
+
   /**
    * Perform dependency discovery for action, which must discover its inputs.
    *