// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputPrefetcher;
import com.google.devtools.build.lib.actions.ActionOutputDirectoryHelper;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.cache.OutputMetadataStore;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.OutputPermissions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/**
 * Abstract implementation of {@link ActionInputPrefetcher} which implements the orchestration of
 * prefeching multiple inputs so subclasses can focus on prefetching / downloading single input.
 */
public abstract class AbstractActionInputPrefetcher implements ActionInputPrefetcher {
  private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

  private final Reporter reporter;
  private final AsyncTaskCache.NoResult<Path> downloadCache = AsyncTaskCache.NoResult.create();
  private final TempPathGenerator tempPathGenerator;
  private final OutputPermissions outputPermissions;

  protected final Path execRoot;
  protected final RemoteOutputChecker remoteOutputChecker;

  private final Set<ActionInput> missingActionInputs = Sets.newConcurrentHashSet();

  private final ActionOutputDirectoryHelper outputDirectoryHelper;

  /** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */
  enum DirectoryState {
    PERMANENTLY_WRITABLE,
    TEMPORARILY_WRITABLE,
    OUTPUT_PERMISSIONS
  }

  /**
   * Tracks directory permissions to minimize filesystem operations.
   *
   * <p>Throughout the prefetcher, {@link Path#setWritable} and {@link Path#chmod} calls on output
   * directories must go through the methods in this class.
   */
  private final class DirectoryTracker {
    private final ConcurrentHashMap<Path, DirectoryState> directoryStateMap =
        new ConcurrentHashMap<>();

    /**
     * Marks a directory as temporarily writable.
     *
     * <p>A temporarily writable directory may have its output permissions set by a later call to
     * {@link #setOutputPermissions}, unless {@link #setPermanentlyWritable} is called in the
     * interim.
     */
    void setTemporarilyWritable(Path dir) throws IOException {
      setWritable(dir, DirectoryState.TEMPORARILY_WRITABLE);
    }

    /**
     * Marks a directory as permanently writable.
     *
     * <p>A permanently writable directory will never have its output permissions set by a later
     * call to {@link #setOutputPermissions}.
     */
    void setPermanentlyWritable(Path dir) throws IOException {
      setWritable(dir, DirectoryState.PERMANENTLY_WRITABLE);
    }

    private void setWritable(Path dir, DirectoryState newState) throws IOException {
      AtomicReference<IOException> caughtException = new AtomicReference<>();

      directoryStateMap.compute(
          dir,
          (unusedKey, oldState) -> {
            if (oldState == DirectoryState.TEMPORARILY_WRITABLE
                || oldState == DirectoryState.PERMANENTLY_WRITABLE) {
              // Already writable, but must potentially upgrade from temporary to permanent.
              return newState == DirectoryState.PERMANENTLY_WRITABLE ? newState : oldState;
            }
            try {
              outputDirectoryHelper.createOutputDirectory(dir, execRoot);
              dir.setWritable(true);
            } catch (IOException e) {
              caughtException.set(e);
              return oldState;
            }
            return newState;
          });

      if (caughtException.get() != null) {
        throw caughtException.get();
      }
    }

    /**
     * Sets the output permissions on a directory.
     *
     * <p>If {@link #setPermanentlyWritable} has been previously called on this directory, or if no
     * {@link #setTemporarilyWritable} call has intervened since the last call to {@link
     * #setOutputPermissions}, this is a no-op. Otherwise, the output permissions are set.
     */
    void setOutputPermissions(Path dir) throws IOException {
      AtomicReference<IOException> caughtException = new AtomicReference<>();

      directoryStateMap.compute(
          dir,
          (unusedKey, oldState) -> {
            if (oldState == DirectoryState.OUTPUT_PERMISSIONS
                || oldState == DirectoryState.PERMANENTLY_WRITABLE) {
              // Either the output permissions have already been set, or we're not changing the
              // permissions ever again.
              return oldState;
            }
            try {
              dir.chmod(outputPermissions.getPermissionsMode());
            } catch (IOException e) {
              caughtException.set(e);
              return oldState;
            }
            return DirectoryState.OUTPUT_PERMISSIONS;
          });

      if (caughtException.get() != null) {
        throw caughtException.get();
      }
    }
  }

  private final DirectoryTracker directoryTracker = new DirectoryTracker();

  /** A symlink in the output tree. */
  @AutoValue
  abstract static class Symlink {

    abstract PathFragment getLinkExecPath();

    abstract PathFragment getTargetExecPath();

    static Symlink of(PathFragment linkExecPath, PathFragment targetExecPath) {
      checkArgument(!linkExecPath.equals(targetExecPath));
      return new AutoValue_AbstractActionInputPrefetcher_Symlink(linkExecPath, targetExecPath);
    }
  }

  protected AbstractActionInputPrefetcher(
      Reporter reporter,
      Path execRoot,
      TempPathGenerator tempPathGenerator,
      RemoteOutputChecker remoteOutputChecker,
      ActionOutputDirectoryHelper outputDirectoryHelper,
      OutputPermissions outputPermissions) {
    this.reporter = reporter;
    this.execRoot = execRoot;
    this.tempPathGenerator = tempPathGenerator;
    this.remoteOutputChecker = remoteOutputChecker;
    this.outputDirectoryHelper = outputDirectoryHelper;
    this.outputPermissions = outputPermissions;
  }

  private boolean shouldDownloadFile(Path path, FileArtifactValue metadata) {
    if (!path.exists()) {
      return true;
    }

    // In the most cases, skyframe should be able to detect source files modifications and delete
    // staled outputs before action execution. However, there are some cases where outputs are not
    // tracked by skyframe. We compare the digest here to make sure we don't use staled files.
    try {
      byte[] digest = path.getFastDigest();
      if (digest == null) {
        digest = path.getDigest();
      }
      return !Arrays.equals(digest, metadata.getDigest());
    } catch (IOException ignored) {
      return true;
    }
  }

  protected abstract boolean canDownloadFile(Path path, FileArtifactValue metadata);

  /**
   * Downloads file to the given path via its metadata.
   *
   * @param tempPath the temporary path which the input should be written to.
   */
  protected abstract ListenableFuture<Void> doDownloadFile(
      ActionExecutionMetadata action,
      Reporter reporter,
      Path tempPath,
      PathFragment execPath,
      FileArtifactValue metadata,
      Priority priority)
      throws IOException;

  protected void prefetchVirtualActionInput(VirtualActionInput input) throws IOException {}

  /**
   * Fetches remotely stored action outputs, that are inputs to this spawn, and stores them under
   * their path in the output base.
   *
   * <p>The {@code inputs} may not contain any unexpanded directories.
   *
   * <p>This method is safe to be called concurrently from spawn runners before running any local
   * spawn.
   *
   * @return a future that is completed once all downloads have finished.
   */
  @Override
  public ListenableFuture<Void> prefetchFiles(
      ActionExecutionMetadata action,
      Iterable<? extends ActionInput> inputs,
      MetadataSupplier metadataSupplier,
      Priority priority) {
    List<ActionInput> files = new ArrayList<>();

    for (ActionInput input : inputs) {
      // Source artifacts don't need to be fetched.
      if (input instanceof Artifact && ((Artifact) input).isSourceArtifact()) {
        continue;
      }

      // Skip empty tree artifacts (non-empty tree artifacts should have already been expanded).
      if (input.isDirectory()) {
        continue;
      }

      files.add(input);
    }

    if (files.isEmpty()) {
      return immediateVoidFuture();
    }

    // Collect the set of directories whose output permissions must be set at the end of this call.
    // This responsibility cannot lie with the downloading of an individual file, because multiple
    // files may be concurrently downloaded into the same directory within a single call to
    // prefetchFiles, and two concurrent calls to prefetchFiles may prefetch the same file. In the
    // latter case, the second call will have its downloads deduplicated against the first call, but
    // it must still synchronize on the output permissions having been set.
    Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();

    // Using plain futures to avoid RxJava overheads.
    List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
    try (var s = Profiler.instance().profile("compose prefetches")) {
      for (var file : files) {
        transfers.add(
            prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
      }
    }

    ListenableFuture<Void> mergedTransfer;
    try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
      mergedTransfer = mergeBulkTransfer(transfers);
    }

    return Futures.transformAsync(
        mergedTransfer,
        unused -> {
          try {
            // Set output permissions on tree artifact subdirectories, matching the behavior of
            // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
            for (Path dir : dirsWithOutputPermissions) {
              directoryTracker.setOutputPermissions(dir);
            }
          } catch (IOException e) {
            return immediateFailedFuture(e);
          }
          return immediateVoidFuture();
        },
        directExecutor());
  }

  private ListenableFuture<Void> prefetchFile(
      ActionExecutionMetadata action,
      Set<Path> dirsWithOutputPermissions,
      MetadataSupplier metadataSupplier,
      ActionInput input,
      Priority priority) {
    try {
      if (input instanceof VirtualActionInput) {
        prefetchVirtualActionInput((VirtualActionInput) input);
        return immediateVoidFuture();
      }

      PathFragment execPath = input.getExecPath();

      FileArtifactValue metadata = metadataSupplier.getMetadata(input);
      if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
        return immediateVoidFuture();
      }

      @Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);

      if (symlink != null) {
        checkState(execPath.startsWith(symlink.getLinkExecPath()));
        execPath =
            symlink.getTargetExecPath().getRelative(execPath.relativeTo(symlink.getLinkExecPath()));
      }

      @Nullable PathFragment treeRootExecPath = maybeGetTreeRoot(action, input, metadataSupplier);

      Completable result =
          downloadFileNoCheckRx(
              action,
              execRoot.getRelative(execPath),
              treeRootExecPath != null ? execRoot.getRelative(treeRootExecPath) : null,
              dirsWithOutputPermissions,
              input,
              metadata,
              priority);

      if (symlink != null) {
        result = result.andThen(plantSymlink(symlink));
      }

      return toListenableFuture(result);
    } catch (IOException | InterruptedException e) {
      return immediateFailedFuture(e);
    }
  }

  /**
   * For an input belonging to a tree artifact, returns the prefetch exec path of the tree artifact
   * root. Otherwise, returns null.
   *
   * <p>Some artifacts (notably, those created by {@code ctx.actions.symlink}) are materialized in
   * the output tree as a symlink to another artifact, as indicated by the {@link
   * FileArtifactValue#getMaterializationExecPath()} field in their metadata.
   */
  @Nullable
  private PathFragment maybeGetTreeRoot(
      ActionExecutionMetadata action, ActionInput input, MetadataSupplier metadataSupplier)
      throws IOException, InterruptedException {
    if (!(input instanceof TreeFileArtifact)) {
      return null;
    }
    TreeFileArtifact treeFile = (TreeFileArtifact) input;
    SpecialArtifact treeArtifact = treeFile.getParent();
    FileArtifactValue treeMetadata = metadataSupplier.getMetadata(treeArtifact);
    if (treeMetadata == null) {
      if (!treeFile.isChildOfDeclaredDirectory() && action.getOutputs().contains(treeFile)) {
        // If this file is produced by an action template, the full tree artifact metadata might
        // not be available yet. However, we know with certainty that the file is not materialized
        // as a symlink.
        return null;
      }
      throw new IllegalStateException(
          String.format("input %s belongs to a tree artifact whose metadata is missing", treeFile));
    }
    return treeMetadata.getMaterializationExecPath().orElse(treeArtifact.getExecPath());
  }

  /**
   * Returns the symlink to be planted in the output tree for artifacts that are prefetched into a
   * different location.
   *
   * <p>Some artifacts (notably, those created by {@code ctx.actions.symlink}) are materialized in
   * the output tree as a symlink to another artifact, as indicated by the {@link
   * FileArtifactValue#getMaterializationExecPath()} field in their (or their parent tree
   * artifact's) metadata.
   */
  @Nullable
  private Symlink maybeGetSymlink(
      ActionExecutionMetadata action,
      ActionInput input,
      FileArtifactValue metadata,
      MetadataSupplier metadataSupplier)
      throws IOException, InterruptedException {
    if (input instanceof TreeFileArtifact) {
      TreeFileArtifact treeFile = (TreeFileArtifact) input;
      SpecialArtifact treeArtifact = treeFile.getParent();
      FileArtifactValue treeMetadata = metadataSupplier.getMetadata(treeArtifact);
      if (treeMetadata == null) {
        if (!treeFile.isChildOfDeclaredDirectory() && action.getOutputs().contains(treeFile)) {
          // If this file is produced by an action template, the full tree artifact metadata might
          // not be available yet. However, we know with certainty that the file is not materialized
          // as a symlink.
          return null;
        }
        throw new IllegalStateException(
            String.format(
                "input %s belongs to a tree artifact whose metadata is missing", treeFile));
      }
      return maybeGetSymlink(action, treeArtifact, treeMetadata, metadataSupplier);
    }
    PathFragment execPath = input.getExecPath();
    PathFragment materializationExecPath = metadata.getMaterializationExecPath().orElse(execPath);
    if (!materializationExecPath.equals(execPath)) {
      return Symlink.of(execPath, materializationExecPath);
    }
    return null;
  }

  private Path resolveOneSymlink(Path path) throws IOException {
    var targetPathFragment = path.readSymbolicLink();
    if (targetPathFragment.isAbsolute()) {
      return path.getFileSystem().getPath(targetPathFragment);
    } else {
      return checkNotNull(path.getParentDirectory()).getRelative(targetPathFragment);
    }
  }

  private Path maybeResolveSymlink(Path path) throws IOException {
    // Potentially resolves a symlink to its target path. This differs from
    // Path#resolveSymbolicLinks() that:
    //   1. Path#resolveSymbolicLinks() checks each segment of the path, but we assume there is no
    //      intermediate symlink because they should've been already normalized for outputs.
    //   2. In case of dangling symlink, we return the target path instead of throwing
    //      FileNotFoundException because we want to download output to that target path.
    var maxAttempt = 32;
    while (path.isSymbolicLink() && maxAttempt-- > 0) {
      var resolvedPath = resolveOneSymlink(path);
      if (resolvedPath.asFragment().equals(path.asFragment())) {
        throw new FileSymlinkLoopException(path.asFragment());
      }
      path = resolvedPath;
    }
    if (maxAttempt <= 0) {
      throw new FileSymlinkLoopException(path.asFragment());
    }
    return path;
  }

  private Completable downloadFileNoCheckRx(
      ActionExecutionMetadata action,
      Path path,
      @Nullable Path treeRoot,
      Set<Path> dirsWithOutputPermissions,
      ActionInput actionInput,
      FileArtifactValue metadata,
      Priority priority) {
    // If the path to be prefetched is a non-dangling symlink, prefetch its target path instead.
    // Note that this only applies to symlinks created by spawns (or, currently, with the internal
    // version of BwoB); symlinks created in-process through an ActionFileSystem should have already
    // been resolved into their materializationExecPath in maybeGetSymlink.
    try {
      if (treeRoot != null) {
        var treeRootRelativePath = path.relativeTo(treeRoot);
        treeRoot = maybeResolveSymlink(treeRoot);
        path = treeRoot.getRelative(treeRootRelativePath);
      } else {
        path = maybeResolveSymlink(path);
      }
    } catch (IOException e) {
      return Completable.error(e);
    }

    if (actionInput instanceof Artifact && ((Artifact) actionInput).isChildOfDeclaredDirectory()) {
      // Arrange for the output permissions to be set on every directory inside the tree artifact.
      // This must be done at assembly time to ensure that the permissions are set before the
      // prefetchFiles call returns, even when the actual downloads are deduplicated against a
      // concurrent call. See finalizeDownload for why we don't do so in other cases.
      checkNotNull(treeRoot);
      for (Path dir = path.getParentDirectory();
          dir.startsWith(treeRoot);
          dir = dir.getParentDirectory()) {
        if (!dirsWithOutputPermissions.add(dir)) {
          break;
        }
      }
    }

    Path finalPath = path;

    Completable download =
        usingTempPath(
            (tempPath, alreadyDeleted) ->
                toCompletable(
                        () ->
                            doDownloadFile(
                                action,
                                reporter,
                                tempPath,
                                finalPath.relativeTo(execRoot),
                                metadata,
                                priority),
                        directExecutor())
                    .doOnComplete(
                        () -> {
                          finalizeDownload(tempPath, finalPath, dirsWithOutputPermissions);
                          alreadyDeleted.set(true);
                        })
                    .doOnError(
                        error -> {
                          if (error instanceof CacheNotFoundException) {
                            missingActionInputs.add(actionInput);
                          }
                        }));

    return downloadCache.executeIfNot(
        finalPath,
        Completable.defer(
            () -> {
              if (shouldDownloadFile(finalPath, metadata)) {
                return download;
              }
              return Completable.complete();
            }));
  }

  private void finalizeDownload(Path tmpPath, Path finalPath, Set<Path> dirsWithOutputPermissions)
      throws IOException {
    Path parentDir = checkNotNull(finalPath.getParentDirectory());

    // Ensure the parent directory exists and is writable. We cannot rely on this precondition to be
    // have been established by the execution of the owning action in a previous invocation, since
    // the output tree may have been externally modified in between invocations.
    if (dirsWithOutputPermissions.contains(parentDir)) {
      // The file belongs to a tree artifact created by an action that declared an output directory
      // (as opposed to an action template expansion). The output permissions should be set on the
      // parent directory after prefetching.
      directoryTracker.setTemporarilyWritable(parentDir);
    } else {
      // There are three cases:
      //   (1) The file does not belong to a tree artifact.
      //   (2) The file belongs to a tree artifact created by an action template expansion.
      //   (3) The file belongs to a tree artifact but we don't know it. This can occur when the
      //       file belongs to a tree artifact inside a fileset (see b/254844173).
      // In case (1), the parent directory is a package or a subdirectory of a package, and should
      // remain writable. In cases (2) and (3), even though we arguably ought to set the output
      // permissions on the parent directory to match the outcome of a locally executed action, we
      // choose not to do it and avoid the additional implementation complexity required to detect a
      // race condition between concurrent calls touching the same directory.
      directoryTracker.setPermanentlyWritable(parentDir);
    }

    // Set output permissions on files, matching the behavior of SkyframeActionExecutor#checkOutputs
    // for artifacts produced by local actions.
    tmpPath.chmod(outputPermissions.getPermissionsMode());
    FileSystemUtils.moveFile(tmpPath, finalPath);
  }

  private interface TaskWithTempPath {
    Completable run(Path tempPath, AtomicBoolean alreadyDeleted);
  }

  /**
   * Runs a task with a temporary path.
   *
   * <p>The temporary path will be deleted once the task is done. Set {@code alreadyDeleted} to
   * signal that deletion is no longer needed.
   */
  private Completable usingTempPath(TaskWithTempPath task) {
    AtomicBoolean alreadyDeleted = new AtomicBoolean(false);
    return Completable.using(
        tempPathGenerator::generateTempPath,
        (tempPath) -> task.run(tempPath, alreadyDeleted),
        tempPath -> {
          if (!alreadyDeleted.get()) {
            deletePartialDownload(tempPath);
          }
        },
        // Clean up after the upstream is disposed to ensure tempPath won't be touched further.
        /* eager= */ false);
  }

  private void deletePartialDownload(Path path) {
    try {
      path.delete();
    } catch (IOException e) {
      logger.atWarning().withCause(e).log(
          "Failed to delete output file after incomplete download: %s", path);
    }
  }

  private Completable plantSymlink(Symlink symlink) {
    return downloadCache.executeIfNot(
        execRoot.getRelative(symlink.getLinkExecPath()),
        Completable.defer(
            () -> {
              Path link = execRoot.getRelative(symlink.getLinkExecPath());
              Path target = execRoot.getRelative(symlink.getTargetExecPath());
              // Delete the link path if it already exists. This is the case for tree artifacts,
              // whose root directory is created before the action runs.
              link.delete();
              link.createSymbolicLink(target);
              return Completable.complete();
            }));
  }

  public ImmutableSet<Path> downloadedFiles() {
    return downloadCache.getFinishedTasks();
  }

  public ImmutableSet<Path> downloadsInProgress() {
    return downloadCache.getInProgressTasks();
  }

  @VisibleForTesting
  public AsyncTaskCache.NoResult<Path> getDownloadCache() {
    return downloadCache;
  }

  public void shutdown() {
    downloadCache.shutdown();
    while (true) {
      try {
        downloadCache.awaitTermination();
        break;
      } catch (InterruptedException ignored) {
        downloadCache.shutdownNow();
      }
    }
  }

  public void finalizeAction(Action action, OutputMetadataStore outputMetadataStore)
      throws IOException, InterruptedException {
    List<Artifact> outputsToDownload = new ArrayList<>();
    for (Artifact output : action.getOutputs()) {
      if (outputMetadataStore.artifactOmitted(output)) {
        continue;
      }

      var metadata = outputMetadataStore.getOutputMetadata(output);
      if (!metadata.isRemote()) {
        continue;
      }

      if (output.isTreeArtifact()) {
        var children = outputMetadataStore.getTreeArtifactChildren((SpecialArtifact) output);
        for (var file : children) {
          if (remoteOutputChecker.shouldDownloadOutput(file)) {
            outputsToDownload.add(file);
          }
        }
      } else {
        if (remoteOutputChecker.shouldDownloadOutput(output)) {
          outputsToDownload.add(output);
        }
      }
    }

    if (!outputsToDownload.isEmpty()) {
      try (var s = Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "Download outputs")) {
        getFromFuture(
            prefetchFiles(
                action, outputsToDownload, outputMetadataStore::getOutputMetadata, Priority.HIGH));
      }
    }
  }

  public void flushOutputTree() throws InterruptedException {
    downloadCache.awaitInProgressTasks();
  }

  public ImmutableSet<ActionInput> getMissingActionInputs() {
    return ImmutableSet.copyOf(missingActionInputs);
  }

  public RemoteOutputChecker getRemoteOutputChecker() {
    return remoteOutputChecker;
  }
}
