Delete downloadFile in favor of prefetchFiles in AbstractActionInputPrefetcher.

This simplifies the interface and makes it less likely that we get the subtle synchronization logic wrong in the future.

Due to these changes, AbstractActionInputPrefetcher is now too low-level to handle the presentation of a BulkTransferException to the user, so the logic to convert it into an EnvironmentalExecException is lifted into AbstractSpawnStrategy.

PiperOrigin-RevId: 516816172
Change-Id: I6e4b57fb90d9c84821dee36ae29c92cee02a906f
diff --git a/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java
index 206f92d..913a323 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java
@@ -15,10 +15,12 @@
 package com.google.devtools.build.lib.exec;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.ActionContext;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
@@ -48,6 +50,7 @@
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.profiler.Profiler;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
+import com.google.devtools.build.lib.remote.common.BulkTransferException;
 import com.google.devtools.build.lib.server.FailureDetails;
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
 import com.google.devtools.build.lib.server.FailureDetails.Spawn.Code;
@@ -246,13 +249,35 @@
     public ListenableFuture<Void> prefetchInputs()
         throws IOException, ForbiddenActionInputException {
       if (Spawns.shouldPrefetchInputsForLocalExecution(spawn)) {
-        return actionExecutionContext
-            .getActionInputPrefetcher()
-            .prefetchFiles(
-                getInputMapping(PathFragment.EMPTY_FRAGMENT, /* willAccessRepeatedly= */ true)
-                    .values(),
-                getMetadataProvider(),
-                Priority.MEDIUM);
+        return Futures.catchingAsync(
+            actionExecutionContext
+                .getActionInputPrefetcher()
+                .prefetchFiles(
+                    getInputMapping(PathFragment.EMPTY_FRAGMENT, /* willAccessRepeatedly= */ true)
+                        .values(),
+                    getMetadataProvider(),
+                    Priority.MEDIUM),
+            BulkTransferException.class,
+            (BulkTransferException e) -> {
+              if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
+                var code =
+                    executionOptions.useNewExitCodeForLostInputs
+                        ? Code.REMOTE_CACHE_EVICTED
+                        : Code.REMOTE_CACHE_FAILED;
+                throw new EnvironmentalExecException(
+                    e,
+                    FailureDetail.newBuilder()
+                        .setMessage(
+                            "Failed to fetch blobs because they do not exist remotely."
+                                + " Build without the Bytes does not work if your remote"
+                                + " cache evicts blobs during builds.")
+                        .setSpawn(FailureDetails.Spawn.newBuilder().setCode(code))
+                        .build());
+              } else {
+                throw e;
+              }
+            },
+            directExecutor());
       }
 
       return immediateVoidFuture();
diff --git a/src/main/java/com/google/devtools/build/lib/exec/BUILD b/src/main/java/com/google/devtools/build/lib/exec/BUILD
index 168ac13..36439c3 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/exec/BUILD
@@ -27,6 +27,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
         "//src/main/java/com/google/devtools/build/lib/events",
         "//src/main/java/com/google/devtools/build/lib/profiler",
+        "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
         "//src/main/java/com/google/devtools/build/lib/util:command",
         "//src/main/java/com/google/devtools/build/lib/util/io",
         "//src/main/java/com/google/devtools/build/lib/vfs",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
index a97c3b05..df11197 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java
@@ -22,7 +22,6 @@
 import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
 import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
 import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
-import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -277,11 +276,6 @@
 
   protected void prefetchVirtualActionInput(VirtualActionInput input) throws IOException {}
 
-  /** Transforms the error encountered during the prefetch . */
-  protected Completable onErrorResumeNext(Throwable error) {
-    return Completable.error(error);
-  }
-
   /**
    * Fetches remotely stored action outputs, that are inputs to this spawn, and stores them under
    * their path in the output base.
@@ -323,8 +317,7 @@
 
     Completable prefetch =
         Completable.using(
-                () -> dirCtx, ctx -> mergeBulkTransfer(transfers), DirectoryContext::close)
-            .onErrorResumeNext(this::onErrorResumeNext);
+            () -> dirCtx, ctx -> mergeBulkTransfer(transfers), DirectoryContext::close);
 
     return toListenableFuture(prefetch);
   }
@@ -427,30 +420,11 @@
     return null;
   }
 
-  /**
-   * Downloads file into the {@code path} with its metadata.
-   *
-   * <p>The file will be written into a temporary file and moved to the final destination after the
-   * download finished.
-   */
-  private Completable downloadFileRx(
-      DirectoryContext dirCtx,
-      Path path,
-      @Nullable Path treeRoot,
-      @Nullable ActionInput actionInput,
-      FileArtifactValue metadata,
-      Priority priority) {
-    if (!canDownloadFile(path, metadata)) {
-      return Completable.complete();
-    }
-    return downloadFileNoCheckRx(dirCtx, path, treeRoot, actionInput, metadata, priority);
-  }
-
   private Completable downloadFileNoCheckRx(
       DirectoryContext dirCtx,
       Path path,
       @Nullable Path treeRoot,
-      @Nullable ActionInput actionInput,
+      ActionInput actionInput,
       FileArtifactValue metadata,
       Priority priority) {
     if (path.isSymbolicLink()) {
@@ -492,7 +466,7 @@
                 /* eager= */ false)
             .doOnError(
                 error -> {
-                  if (error instanceof CacheNotFoundException && actionInput != null) {
+                  if (error instanceof CacheNotFoundException) {
                     missingActionInputs.add(actionInput);
                   }
                 });
@@ -508,37 +482,6 @@
             }));
   }
 
-  /**
-   * Download file to the {@code path} with given metadata. Blocking await for the download to
-   * complete.
-   *
-   * <p>The file will be written into a temporary file and moved to the final destination after the
-   * download finished.
-   */
-  public void downloadFile(Path path, @Nullable ActionInput actionInput, FileArtifactValue metadata)
-      throws IOException, InterruptedException {
-    getFromFuture(downloadFileAsync(path.asFragment(), actionInput, metadata, Priority.CRITICAL));
-  }
-
-  protected ListenableFuture<Void> downloadFileAsync(
-      PathFragment path,
-      @Nullable ActionInput actionInput,
-      FileArtifactValue metadata,
-      Priority priority) {
-    return toListenableFuture(
-        Completable.using(
-            DirectoryContext::new,
-            dirCtx ->
-                downloadFileRx(
-                    dirCtx,
-                    execRoot.getFileSystem().getPath(path),
-                    /* treeRoot= */ null,
-                    actionInput,
-                    metadata,
-                    priority),
-            DirectoryContext::close));
-  }
-
   private void finalizeDownload(
       DirectoryContext dirCtx, @Nullable Path treeRoot, Path tmpPath, Path finalPath)
       throws IOException {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 3033522..3f28643 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -207,6 +207,7 @@
     deps = [
         ":abstract_action_input_prefetcher",
         "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/actions:action_input_helper",
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
         "//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
         "//src/main/java/com/google/devtools/build/lib/analysis:analysis_cluster",
@@ -220,6 +221,7 @@
         "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
         "//src/main/java/com/google/devtools/build/lib/util",
         "//src/main/java/com/google/devtools/build/lib/vfs",
+        "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/com/google/devtools/build/skyframe",
         "//src/main/java/com/google/devtools/build/skyframe:skyframe-objects",
         "//third_party:flogger",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionFileSystem.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionFileSystem.java
index 2998356..e2c25cb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionFileSystem.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionFileSystem.java
@@ -20,12 +20,15 @@
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
 import static com.google.common.collect.Streams.stream;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
 import com.google.devtools.build.lib.actions.ActionInputMap;
+import com.google.devtools.build.lib.actions.ActionInputPrefetcher.Priority;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
 import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
@@ -566,10 +569,6 @@
     return null;
   }
 
-  ActionInput getActionInput(PathFragment path) {
-    return getInput(path.relativeTo(execRoot).getPathString());
-  }
-
   @Nullable
   @Override
   public FileArtifactValue getMetadata(ActionInput input) throws IOException {
@@ -605,7 +604,7 @@
   }
 
   @Nullable
-  RemoteFileArtifactValue getRemoteMetadata(PathFragment path) {
+  private RemoteFileArtifactValue getRemoteMetadata(PathFragment path) {
     if (!isOutput(path)) {
       return null;
     }
@@ -631,15 +630,21 @@
   }
 
   private void downloadFileIfRemote(PathFragment path) throws IOException {
-    FileArtifactValue m = getRemoteMetadata(path);
-    if (m != null) {
-      try {
-        inputFetcher.downloadFile(delegateFs.getPath(path), getActionInput(path), m);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(
-            String.format("Received interrupt while fetching file '%s'", path), e);
+    if (!isRemote(path)) {
+      return;
+    }
+    PathFragment execPath = path.relativeTo(execRoot);
+    try {
+      ActionInput input = getInput(execPath.getPathString());
+      if (input == null) {
+        // For undeclared outputs, getInput returns null as there's no artifact associated with the
+        // path. Therefore, we synthesize one here just so we're able to call prefetchFiles.
+        input = ActionInputHelper.fromPath(execPath);
       }
+      getFromFuture(inputFetcher.prefetchFiles(ImmutableList.of(input), this, Priority.CRITICAL));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(String.format("Received interrupt while fetching file '%s'", path), e);
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index 28b4ccb..20bf04f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -21,25 +21,19 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.devtools.build.lib.actions.EnvironmentalExecException;
 import com.google.devtools.build.lib.actions.FileArtifactValue;
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
 import com.google.devtools.build.lib.events.ExtendedEventHandler.FetchProgress;
 import com.google.devtools.build.lib.events.Reporter;
 import com.google.devtools.build.lib.exec.SpawnProgressEvent;
 import com.google.devtools.build.lib.remote.RemoteCache.DownloadProgressReporter;
-import com.google.devtools.build.lib.remote.common.BulkTransferException;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TempPathGenerator;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
-import com.google.devtools.build.lib.server.FailureDetails;
-import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
-import com.google.devtools.build.lib.server.FailureDetails.Spawn.Code;
 import com.google.devtools.build.lib.vfs.OutputPermissions;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
-import io.reactivex.rxjava3.core.Completable;
 import java.io.IOException;
 import java.util.regex.Pattern;
 
@@ -54,7 +48,6 @@
   private final String buildRequestId;
   private final String commandId;
   private final RemoteCache remoteCache;
-  private final boolean useNewExitCodeForLostInputs;
 
   RemoteActionInputFetcher(
       Reporter reporter,
@@ -64,13 +57,11 @@
       Path execRoot,
       TempPathGenerator tempPathGenerator,
       ImmutableList<Pattern> patternsToDownload,
-      OutputPermissions outputPermissions,
-      boolean useNewExitCodeForLostInputs) {
+      OutputPermissions outputPermissions) {
     super(reporter, execRoot, tempPathGenerator, patternsToDownload, outputPermissions);
     this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
     this.commandId = Preconditions.checkNotNull(commandId);
     this.remoteCache = Preconditions.checkNotNull(remoteCache);
-    this.useNewExitCodeForLostInputs = useNewExitCodeForLostInputs;
   }
 
   @Override
@@ -141,25 +132,4 @@
       return progress.finished();
     }
   }
-
-  @Override
-  protected Completable onErrorResumeNext(Throwable error) {
-    if (error instanceof BulkTransferException) {
-      if (((BulkTransferException) error).allCausedByCacheNotFoundException()) {
-        var code =
-            useNewExitCodeForLostInputs ? Code.REMOTE_CACHE_EVICTED : Code.REMOTE_CACHE_FAILED;
-        error =
-            new EnvironmentalExecException(
-                (BulkTransferException) error,
-                FailureDetail.newBuilder()
-                    .setMessage(
-                        "Failed to fetch blobs because they do not exist remotely."
-                            + " Build without the Bytes does not work if your remote"
-                            + " cache evicts blobs during builds")
-                    .setSpawn(FailureDetails.Spawn.newBuilder().setCode(code))
-                    .build());
-      }
-    }
-    return Completable.error(error);
-  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index babbf43..5bffa74 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -34,9 +34,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.actions.ActionAnalysisMetadata;
-import com.google.devtools.build.lib.actions.ActionInput;
 import com.google.devtools.build.lib.actions.Artifact;
-import com.google.devtools.build.lib.actions.FileArtifactValue;
 import com.google.devtools.build.lib.analysis.AnalysisResult;
 import com.google.devtools.build.lib.analysis.BlazeDirectories;
 import com.google.devtools.build.lib.analysis.ConfiguredTarget;
@@ -61,7 +59,6 @@
 import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
 import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
 import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
-import com.google.devtools.build.lib.remote.ToplevelArtifactsDownloader.PathToMetadataConverter;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
 import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
@@ -965,8 +962,6 @@
 
     actionContextProvider.setTempPathGenerator(tempPathGenerator);
 
-    ExecutionOptions executionOptions =
-        Preconditions.checkNotNull(env.getOptions().getOptions(ExecutionOptions.class));
     RemoteOptions remoteOptions =
         Preconditions.checkNotNull(
             env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
@@ -988,8 +983,7 @@
               env.getExecRoot(),
               tempPathGenerator,
               patternsToDownload,
-              outputPermissions,
-              executionOptions.useNewExitCodeForLostInputs);
+              outputPermissions);
       env.getEventBus().register(actionInputFetcher);
       builder.setActionInputPrefetcher(actionInputFetcher);
       actionContextProvider.setActionInputFetcher(actionInputFetcher);
@@ -1000,28 +994,13 @@
               remoteOutputsMode.downloadToplevelOutputsOnly(),
               env.getSkyframeExecutor().getEvaluator(),
               actionInputFetcher,
-              new PathToMetadataConverter() {
-                @Nullable
-                @Override
-                public FileArtifactValue getMetadata(Path path) {
-                  FileSystem fileSystem = path.getFileSystem();
-                  if (fileSystem instanceof RemoteActionFileSystem) {
-                    return ((RemoteActionFileSystem) path.getFileSystem())
-                        .getRemoteMetadata(path.asFragment());
-                  }
-                  return null;
+              env.getExecRoot().asFragment(),
+              (path) -> {
+                FileSystem fileSystem = path.getFileSystem();
+                if (fileSystem instanceof RemoteActionFileSystem) {
+                  return (RemoteActionFileSystem) path.getFileSystem();
                 }
-
-                @Nullable
-                @Override
-                public ActionInput getActionInput(Path path) {
-                  FileSystem fileSystem = path.getFileSystem();
-                  if (fileSystem instanceof RemoteActionFileSystem) {
-                    return ((RemoteActionFileSystem) path.getFileSystem())
-                        .getActionInput(path.asFragment());
-                  }
-                  return null;
-                }
+                return null;
               });
       env.getEventBus().register(toplevelArtifactsDownloader);
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ToplevelArtifactsDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/ToplevelArtifactsDownloader.java
index e67a3ee..06ebe98 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ToplevelArtifactsDownloader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ToplevelArtifactsDownloader.java
@@ -19,6 +19,7 @@
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static com.google.devtools.build.lib.packages.TargetUtils.isTestRuleName;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
@@ -26,9 +27,11 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
 import com.google.devtools.build.lib.actions.ActionInputPrefetcher.Priority;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.FileArtifactValue;
+import com.google.devtools.build.lib.actions.MetadataProvider;
 import com.google.devtools.build.lib.analysis.AspectCompleteEvent;
 import com.google.devtools.build.lib.analysis.ConfiguredTarget;
 import com.google.devtools.build.lib.analysis.ConfiguredTargetValue;
@@ -44,6 +47,7 @@
 import com.google.devtools.build.lib.skyframe.TreeArtifactValue;
 import com.google.devtools.build.lib.util.Pair;
 import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.skyframe.MemoizingEvaluator;
 import com.google.devtools.build.skyframe.SkyValue;
 import javax.annotation.Nullable;
@@ -67,14 +71,16 @@
   private final boolean downloadToplevel;
   private final MemoizingEvaluator memoizingEvaluator;
   private final AbstractActionInputPrefetcher actionInputPrefetcher;
-  private final PathToMetadataConverter pathToMetadataConverter;
+  private final PathFragment execRoot;
+  private final PathToMetadataProvider pathToMetadataProvider;
 
   public ToplevelArtifactsDownloader(
       String commandName,
       boolean downloadToplevel,
       MemoizingEvaluator memoizingEvaluator,
       AbstractActionInputPrefetcher actionInputPrefetcher,
-      PathToMetadataConverter pathToMetadataConverter) {
+      PathFragment execRoot,
+      PathToMetadataProvider pathToMetadataProvider) {
     switch (commandName) {
       case "build":
         this.commandMode = CommandMode.BUILD;
@@ -94,38 +100,38 @@
     this.downloadToplevel = downloadToplevel;
     this.memoizingEvaluator = memoizingEvaluator;
     this.actionInputPrefetcher = actionInputPrefetcher;
-    this.pathToMetadataConverter = pathToMetadataConverter;
+    this.execRoot = execRoot;
+    this.pathToMetadataProvider = pathToMetadataProvider;
   }
 
   /**
-   * Interface that converts {@link Path} to metadata {@link FileArtifactValue}.
+   * Interface that converts a {@link Path} into a {@link MetadataProvider} suitable for retrieving
+   * metadata for that path.
    *
-   * <p>{@link ToplevelArtifactsDownloader} is only used with {@code ActionFileSystem} together. If
-   * we see a {@link Path}, its underlying file system must be {@code ActionFileSystem}. We use this
-   * interface to avoid passing in the actionFs implementation.
+   * <p>{@link ToplevelArtifactsDownloader} may only used in conjunction with filesystems that
+   * implement {@link MetadataProvider}.
    */
-  public interface PathToMetadataConverter {
+  public interface PathToMetadataProvider {
     @Nullable
-    FileArtifactValue getMetadata(Path path);
-
-    @Nullable
-    ActionInput getActionInput(Path path);
+    MetadataProvider getMetadataProvider(Path path);
   }
 
   private void downloadTestOutput(Path path) {
     // Since the event is fired within action execution, the skyframe doesn't know the outputs of
-    // test actions yet, so we can't get their metadata through skyframe. However, the fileSystem
-    // of the path is an ActionFileSystem, we use it to get the metadata for this file.
+    // test actions yet, so we can't get their metadata through skyframe. However, since the path
+    // belongs to a filesystem that implements MetadataProvider, we use it to get the metadata.
     //
     // If the test hit action cache, the filesystem is local filesystem because the actual test
-    // action didn't get the chance to execute. In this case the metadata is null which is fine
-    // because test outputs are already downloaded (otherwise it cannot hit the action cache).
-    FileArtifactValue metadata = pathToMetadataConverter.getMetadata(path);
-    ActionInput actionInput = pathToMetadataConverter.getActionInput(path);
-    if (metadata != null) {
+    // action didn't get the chance to execute. In this case the MetadataProvider is null, which
+    // is fine because test outputs are already downloaded (otherwise the action cache wouldn't
+    // have been hit).
+    MetadataProvider metadataProvider = pathToMetadataProvider.getMetadataProvider(path);
+    if (metadataProvider != null) {
+      // RemoteActionFileSystem#getInput returns null for undeclared test outputs.
+      ActionInput input = ActionInputHelper.fromPath(path.asFragment().relativeTo(execRoot));
       ListenableFuture<Void> future =
-          actionInputPrefetcher.downloadFileAsync(
-              path.asFragment(), actionInput, metadata, Priority.LOW);
+          actionInputPrefetcher.prefetchFiles(
+              ImmutableList.of(input), metadataProvider, Priority.LOW);
       addCallback(
           future,
           new FutureCallback<Void>() {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
index 32ae32c..d76c3ce 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java
@@ -18,6 +18,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth.assertWithMessage;
 import static com.google.devtools.build.lib.actions.util.ActionsTestUtil.createTreeArtifactWithGeneratingAction;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
@@ -570,22 +571,7 @@
   }
 
   @Test
-  public void downloadFile_downloadRemoteFiles() throws Exception {
-    Map<ActionInput, FileArtifactValue> metadata = new HashMap<>();
-    Map<HashCode, byte[]> cas = new HashMap<>();
-    Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cas);
-    AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);
-
-    prefetcher.downloadFile(a1.getPath(), /* actionInput= */ null, metadata.get(a1));
-
-    assertThat(FileSystemUtils.readContent(a1.getPath(), UTF_8)).isEqualTo("hello world");
-    assertThat(a1.getPath().isExecutable()).isTrue();
-    assertThat(a1.getPath().isReadable()).isTrue();
-    assertThat(a1.getPath().isWritable()).isFalse();
-  }
-
-  @Test
-  public void downloadFile_onInterrupt_deletePartialDownloadedFile() throws Exception {
+  public void prefetchFiles_onInterrupt_deletePartialDownloadedFile() throws Exception {
     Semaphore startSemaphore = new Semaphore(0);
     Semaphore endSemaphore = new Semaphore(0);
     Map<ActionInput, FileArtifactValue> metadata = new HashMap<>();
@@ -605,7 +591,11 @@
         new Thread(
             () -> {
               try {
-                prefetcher.downloadFile(a1.getPath(), /* actionInput= */ null, metadata.get(a1));
+                getFromFuture(
+                    prefetcher.prefetchFiles(
+                        ImmutableList.of(a1),
+                        new StaticMetadataProvider(metadata),
+                        Priority.MEDIUM));
               } catch (IOException ignored) {
                 // Intentionally left empty
               } catch (InterruptedException e) {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionFileSystemTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionFileSystemTest.java
index 8037c55..c8de525 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionFileSystemTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionFileSystemTest.java
@@ -16,6 +16,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth8.assertThat;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -28,7 +29,11 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.hash.HashCode;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
 import com.google.devtools.build.lib.actions.ActionInputMap;
+import com.google.devtools.build.lib.actions.ActionInputPrefetcher.Priority;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
 import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
@@ -50,6 +55,7 @@
 import com.google.devtools.build.lib.vfs.Symlinks;
 import com.google.devtools.build.lib.vfs.SyscallCache;
 import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import java.io.IOException;
 import java.util.Map;
 import org.junit.Before;
@@ -113,10 +119,10 @@
     return outputRoot.getRoot().asPath().getRelative(outputRootRelativePath).asFragment();
   }
 
-  private static Answer<Void> mockDownloadFile(Path path, String contents) {
+  private static Answer<ListenableFuture<Void>> mockPrefetchFile(Path path, String contents) {
     return invocationOnMock -> {
       FileSystemUtils.writeContent(path, UTF_8, contents);
-      return null;
+      return immediateVoidFuture();
     };
   }
 
@@ -142,9 +148,9 @@
     ActionInputMap inputs = new ActionInputMap(1);
     Artifact artifact = createRemoteArtifact("remote-file", "remote contents", inputs);
     FileSystem actionFs = createActionFileSystem(inputs);
-    doAnswer(mockDownloadFile(artifact.getPath(), "remote contents"))
+    doAnswer(mockPrefetchFile(artifact.getPath(), "remote contents"))
         .when(inputFetcher)
-        .downloadFile(eq(artifact.getPath()), any(), eq(inputs.getMetadata(artifact)));
+        .prefetchFiles(eq(ImmutableList.of(artifact)), any(), eq(Priority.CRITICAL));
 
     // act
     Path actionFsPath = actionFs.getPath(artifact.getPath().asFragment());
@@ -154,7 +160,7 @@
     assertThat(actionFsPath.getFileSystem()).isSameInstanceAs(actionFs);
     assertThat(contents).isEqualTo("remote contents");
     verify(inputFetcher)
-        .downloadFile(eq(artifact.getPath()), any(), eq(inputs.getMetadata(artifact)));
+        .prefetchFiles(eq(ImmutableList.of(artifact)), any(), eq(Priority.CRITICAL));
     verifyNoMoreInteractions(inputFetcher);
   }
 
@@ -163,11 +169,10 @@
     // arrange
     Artifact artifact = ActionsTestUtil.createArtifact(outputRoot, "out");
     FileSystem actionFs = createActionFileSystem(new ActionInputMap(0), ImmutableList.of(artifact));
-    FileArtifactValue metadata =
-        injectRemoteFile(actionFs, artifact.getPath().asFragment(), "remote contents");
-    doAnswer(mockDownloadFile(artifact.getPath(), "remote contents"))
+    injectRemoteFile(actionFs, artifact.getPath().asFragment(), "remote contents");
+    doAnswer(mockPrefetchFile(artifact.getPath(), "remote contents"))
         .when(inputFetcher)
-        .downloadFile(eq(artifact.getPath()), any(), eq(metadata));
+        .prefetchFiles(eq(ImmutableList.of(artifact)), any(), eq(Priority.CRITICAL));
 
     // act
     Path actionFsPath = actionFs.getPath(artifact.getPath().asFragment());
@@ -176,7 +181,8 @@
     // assert
     assertThat(actionFsPath.getFileSystem()).isSameInstanceAs(actionFs);
     assertThat(contents).isEqualTo("remote contents");
-    verify(inputFetcher).downloadFile(eq(artifact.getPath()), any(), eq(metadata));
+    verify(inputFetcher)
+        .prefetchFiles(eq(ImmutableList.of(artifact)), any(), eq(Priority.CRITICAL));
     verifyNoMoreInteractions(inputFetcher);
   }
 
@@ -184,11 +190,12 @@
   public void testGetInputStream_fromRemoteOutputTree_forUndeclaredOutput() throws Exception {
     // arrange
     Path path = outputRoot.getRoot().getRelative("out");
+    ActionInput input = ActionInputHelper.fromPath(path.relativeTo(execRoot));
     FileSystem actionFs = createActionFileSystem();
-    FileArtifactValue metadata = injectRemoteFile(actionFs, path.asFragment(), "remote contents");
-    doAnswer(mockDownloadFile(path, "remote contents"))
+    injectRemoteFile(actionFs, path.asFragment(), "remote contents");
+    doAnswer(mockPrefetchFile(path, "remote contents"))
         .when(inputFetcher)
-        .downloadFile(eq(path), any(), eq(metadata));
+        .prefetchFiles(eq(ImmutableList.of(input)), any(), eq(Priority.CRITICAL));
 
     // act
     Path actionFsPath = actionFs.getPath(path.asFragment());
@@ -197,7 +204,7 @@
     // assert
     assertThat(actionFsPath.getFileSystem()).isSameInstanceAs(actionFs);
     assertThat(contents).isEqualTo("remote contents");
-    verify(inputFetcher).downloadFile(eq(path), any(), eq(metadata));
+    verify(inputFetcher).prefetchFiles(eq(ImmutableList.of(input)), any(), eq(Priority.CRITICAL));
     verifyNoMoreInteractions(inputFetcher);
   }
 
@@ -564,6 +571,7 @@
   }
 
   @Override
+  @CanIgnoreReturnValue
   protected FileArtifactValue injectRemoteFile(
       FileSystem actionFs, PathFragment path, String content) throws IOException {
     byte[] contentBytes = content.getBytes(UTF_8);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
index 36bbf24..b4d7f51 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
@@ -25,12 +25,12 @@
 import com.google.devtools.build.lib.actions.ActionInput;
 import com.google.devtools.build.lib.actions.ActionInputPrefetcher.Priority;
 import com.google.devtools.build.lib.actions.Artifact;
-import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.FileArtifactValue;
 import com.google.devtools.build.lib.actions.MetadataProvider;
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
 import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
 import com.google.devtools.build.lib.events.Reporter;
+import com.google.devtools.build.lib.remote.common.BulkTransferException;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
@@ -76,8 +76,7 @@
         execRoot,
         tempPathGenerator,
         ImmutableList.of(),
-        OutputPermissions.READONLY,
-        /* useNewExitCodeForLostInputs= */ false);
+        OutputPermissions.READONLY);
   }
 
   @Test
@@ -94,8 +93,7 @@
             execRoot,
             tempPathGenerator,
             ImmutableList.of(),
-            OutputPermissions.READONLY,
-            /* useNewExitCodeForLostInputs= */ false);
+            OutputPermissions.READONLY);
     VirtualActionInput a = ActionsTestUtil.createVirtualActionInput("file1", "hello world");
 
     // act
@@ -123,8 +121,7 @@
             execRoot,
             tempPathGenerator,
             ImmutableList.of(),
-            OutputPermissions.READONLY,
-            /* useNewExitCodeForLostInputs= */ false);
+            OutputPermissions.READONLY);
 
     // act
     wait(
@@ -145,7 +142,7 @@
 
     var error =
         assertThrows(
-            ExecException.class,
+            BulkTransferException.class,
             () ->
                 wait(
                     prefetcher.prefetchFiles(