remote: add action input fetcher

Add a RemoteActionInputFetcher class which can stage action inputs that
are only available on a remote system. This change only introduces the
class and tests. Will enable it in a follow up CL.

Progress towards #6862

Closes #7866.

PiperOrigin-RevId: 240953164
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
new file mode 100644
index 0000000..8e6470d
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -0,0 +1,161 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputPrefetcher;
+import com.google.devtools.build.lib.actions.FileArtifactValue;
+import com.google.devtools.build.lib.actions.MetadataProvider;
+import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
+import com.google.devtools.build.lib.profiler.Profiler;
+import com.google.devtools.build.lib.profiler.SilentCloseable;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.Utils;
+import com.google.devtools.build.lib.vfs.Path;
+import io.grpc.Context;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * Stages output files that are stored remotely to the local filesystem.
+ *
+ * <p>This is necessary for remote caching/execution when {@code
+ * --experimental_remote_fetch_outputs=minimal} is specified.
+ */
+class RemoteActionInputFetcher implements ActionInputPrefetcher {
+
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private final Set<Path> downloadedPaths = new HashSet<>();
+
+  @GuardedBy("lock")
+  private final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();
+
+  private final AbstractRemoteActionCache remoteCache;
+  private final Path execRoot;
+  private final Context ctx;
+
+  RemoteActionInputFetcher(AbstractRemoteActionCache remoteCache, Path execRoot, Context ctx) {
+    this.remoteCache = Preconditions.checkNotNull(remoteCache);
+    this.execRoot = Preconditions.checkNotNull(execRoot);
+    this.ctx = Preconditions.checkNotNull(ctx);
+  }
+
+  /**
+   * Fetches remotely stored action outputs, that are inputs to this spawn, and stores them under
+   * their path in the output base.
+   *
+   * <p>This method blocks until all downloads have finished.
+   *
+   * <p>This method is safe to be called concurrently from spawn runners before running any local
+   * spawn.
+   */
+  @Override
+  public void prefetchFiles(
+      Iterable<? extends ActionInput> inputs, MetadataProvider metadataProvider)
+      throws IOException, InterruptedException {
+    try (SilentCloseable c = Profiler.instance().profile("Remote.fetchInputs")) {
+      Map<Path, ListenableFuture<Void>> downloadsToWaitFor = new HashMap<>();
+      for (ActionInput input : inputs) {
+        if (input instanceof VirtualActionInput) {
+          VirtualActionInput paramFileActionInput = (VirtualActionInput) input;
+          Path outputPath = execRoot.getRelative(paramFileActionInput.getExecPath());
+          outputPath.getParentDirectory().createDirectoryAndParents();
+          try (OutputStream out = outputPath.getOutputStream()) {
+            paramFileActionInput.writeTo(out);
+          }
+        } else {
+          FileArtifactValue metadata = metadataProvider.getMetadata(input);
+          if (metadata == null || !metadata.isRemote()) {
+            continue;
+          }
+
+          Path path = execRoot.getRelative(input.getExecPath());
+          synchronized (lock) {
+            if (downloadedPaths.contains(path)) {
+              continue;
+            }
+
+            ListenableFuture<Void> download = downloadsInProgress.get(path);
+            if (download == null) {
+              Context prevCtx = ctx.attach();
+              try {
+                download =
+                    remoteCache.downloadFile(
+                        path, DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize()));
+                downloadsInProgress.put(path, download);
+              } finally {
+                ctx.detach(prevCtx);
+              }
+            }
+            downloadsToWaitFor.putIfAbsent(path, download);
+          }
+        }
+      }
+
+      IOException ioException = null;
+      InterruptedException interruptedException = null;
+      try {
+        for (Map.Entry<Path, ListenableFuture<Void>> entry : downloadsToWaitFor.entrySet()) {
+          try {
+            Utils.getFromFuture(entry.getValue());
+            entry.getKey().setExecutable(true);
+          } catch (IOException e) {
+            if (e instanceof CacheNotFoundException) {
+              e =
+                  new IOException(
+                      String.format(
+                          "Failed to fetch file with hash '%s' because it does not exist remotely."
+                              + " --experimental_remote_fetch_outputs=minimal does not work if"
+                              + " your remote cache evicts files during builds.",
+                          ((CacheNotFoundException) e).getMissingDigest().getHash()));
+            }
+            ioException = ioException == null ? e : ioException;
+          } catch (InterruptedException e) {
+            interruptedException = interruptedException == null ? e : interruptedException;
+          }
+        }
+      } finally {
+        synchronized (lock) {
+          for (Path path : downloadsToWaitFor.keySet()) {
+            downloadsInProgress.remove(path);
+            downloadedPaths.add(path);
+          }
+        }
+      }
+
+      if (interruptedException != null) {
+        throw interruptedException;
+      }
+      if (ioException != null) {
+        throw ioException;
+      }
+    }
+  }
+
+  ImmutableSet<Path> downloadedFiles() {
+    synchronized (lock) {
+      return ImmutableSet.copyOf(downloadedPaths);
+    }
+  }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
new file mode 100644
index 0000000..8a10229
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java
@@ -0,0 +1,240 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import build.bazel.remote.execution.v2.Action;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Command;
+import build.bazel.remote.execution.v2.Digest;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.hash.HashCode;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.Artifact;
+import com.google.devtools.build.lib.actions.ArtifactRoot;
+import com.google.devtools.build.lib.actions.FileArtifactValue;
+import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue;
+import com.google.devtools.build.lib.actions.MetadataProvider;
+import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
+import com.google.devtools.build.lib.clock.JavaClock;
+import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
+import com.google.devtools.build.lib.remote.util.StaticMetadataProvider;
+import com.google.devtools.build.lib.remote.util.StringActionInput;
+import com.google.devtools.build.lib.util.io.FileOutErr;
+import com.google.devtools.build.lib.vfs.DigestHashFunction;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import com.google.devtools.common.options.Options;
+import com.google.protobuf.ByteString;
+import io.grpc.Context;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RemoteActionInputFetcher}. */
+@RunWith(JUnit4.class)
+public class RemoteActionInputFetcherTest {
+
+  private static final DigestHashFunction HASH_FUNCTION = DigestHashFunction.SHA256;
+
+  private Path execRoot;
+  private ArtifactRoot artifactRoot;
+  private RemoteOptions options;
+  private DigestUtil digestUtil;
+
+  @Before
+  public void setUp() throws IOException {
+    FileSystem fs = new InMemoryFileSystem(new JavaClock(), HASH_FUNCTION);
+    execRoot = fs.getPath("/exec");
+    execRoot.createDirectoryAndParents();
+    artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, execRoot.getRelative("root"));
+    artifactRoot.getRoot().asPath().createDirectoryAndParents();
+    options = Options.getDefaults(RemoteOptions.class);
+    digestUtil = new DigestUtil(HASH_FUNCTION);
+  }
+
+  @Test
+  public void testFetching() throws Exception {
+    // arrange
+    Map<ActionInput, FileArtifactValue> metadata = new HashMap<>();
+    Map<Digest, ByteString> cacheEntries = new HashMap<>();
+    Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
+    Artifact a2 = createRemoteArtifact("file2", "fizz buzz", metadata, cacheEntries);
+    MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
+    AbstractRemoteActionCache remoteCache =
+        new StaticRemoteActionCache(options, digestUtil, cacheEntries);
+    RemoteActionInputFetcher actionInputFetcher =
+        new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
+
+    // act
+    actionInputFetcher.prefetchFiles(metadata.keySet(), metadataProvider);
+
+    // assert
+    assertThat(FileSystemUtils.readContent(a1.getPath(), StandardCharsets.UTF_8))
+        .isEqualTo("hello world");
+    assertThat(a1.getPath().isExecutable()).isTrue();
+    assertThat(FileSystemUtils.readContent(a2.getPath(), StandardCharsets.UTF_8))
+        .isEqualTo("fizz buzz");
+    assertThat(a2.getPath().isExecutable()).isTrue();
+    assertThat(actionInputFetcher.downloadedFiles()).hasSize(2);
+    assertThat(actionInputFetcher.downloadedFiles()).containsAllOf(a1.getPath(), a2.getPath());
+  }
+
+  @Test
+  public void testStagingVirtualActionInput() throws Exception {
+    // arrange
+    MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
+    AbstractRemoteActionCache remoteCache =
+        new StaticRemoteActionCache(options, digestUtil, new HashMap<>());
+    RemoteActionInputFetcher actionInputFetcher =
+        new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
+    VirtualActionInput a = new StringActionInput("hello world", PathFragment.create("file1"));
+
+    // act
+    actionInputFetcher.prefetchFiles(ImmutableList.of(a), metadataProvider);
+
+    // assert
+    Path p = execRoot.getRelative(a.getExecPath());
+    assertThat(FileSystemUtils.readContent(p, StandardCharsets.UTF_8)).isEqualTo("hello world");
+    assertThat(p.isExecutable()).isFalse();
+    assertThat(actionInputFetcher.downloadedFiles()).isEmpty();
+  }
+
+  @Test
+  public void testFileNotFound() throws Exception {
+    // Test that we get an exception if an input file is missing
+
+    // arrange
+    Map<ActionInput, FileArtifactValue> metadata = new HashMap<>();
+    Artifact a =
+        createRemoteArtifact("file1", "hello world", metadata, /* cacheEntries= */ new HashMap<>());
+    MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
+    AbstractRemoteActionCache remoteCache =
+        new StaticRemoteActionCache(options, digestUtil, new HashMap<>());
+    RemoteActionInputFetcher actionInputFetcher =
+        new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
+
+    // act
+    try {
+      actionInputFetcher.prefetchFiles(ImmutableList.of(a), metadataProvider);
+      fail("expected IOException");
+    } catch (IOException e) {
+      // Intentionally left empty
+    }
+
+    // assert
+    assertThat(actionInputFetcher.downloadedFiles()).containsExactly(a.getPath());
+  }
+
+  @Test
+  public void testIgnoreNoneRemoteFiles() throws Exception {
+    // Test that files that are not remote are not downloaded
+
+    // arrange
+    Path p = execRoot.getRelative(artifactRoot.getExecPath()).getRelative("file1");
+    FileSystemUtils.writeContent(p, StandardCharsets.UTF_8, "hello world");
+    Artifact a = new Artifact(p, artifactRoot);
+    FileArtifactValue f = FileArtifactValue.create(a);
+    MetadataProvider metadataProvider = new StaticMetadataProvider(ImmutableMap.of(a, f));
+    AbstractRemoteActionCache remoteCache =
+        new StaticRemoteActionCache(options, digestUtil, new HashMap<>());
+    RemoteActionInputFetcher actionInputFetcher =
+        new RemoteActionInputFetcher(remoteCache, execRoot, Context.current());
+
+    // act
+    actionInputFetcher.prefetchFiles(ImmutableList.of(a), metadataProvider);
+
+    // assert
+    assertThat(actionInputFetcher.downloadedFiles()).isEmpty();
+  }
+
+  private Artifact createRemoteArtifact(
+      String pathFragment,
+      String contents,
+      Map<ActionInput, FileArtifactValue> metadata,
+      Map<Digest, ByteString> cacheEntries) {
+    Path p = artifactRoot.getRoot().getRelative(pathFragment);
+    Artifact a = new Artifact(p, artifactRoot);
+    byte[] b = contents.getBytes(StandardCharsets.UTF_8);
+    HashCode h = HASH_FUNCTION.getHashFunction().hashBytes(b);
+    FileArtifactValue f =
+        new RemoteFileArtifactValue(h.asBytes(), b.length, /* locationIndex= */ 1);
+    metadata.put(a, f);
+    cacheEntries.put(DigestUtil.buildDigest(h.asBytes(), b.length), ByteString.copyFrom(b));
+    return a;
+  }
+
+  private static class StaticRemoteActionCache extends AbstractRemoteActionCache {
+
+    private final ImmutableMap<Digest, ByteString> cacheEntries;
+
+    public StaticRemoteActionCache(
+        RemoteOptions options, DigestUtil digestUtil, Map<Digest, ByteString> cacheEntries) {
+      super(options, digestUtil);
+      this.cacheEntries = ImmutableMap.copyOf(cacheEntries);
+    }
+
+    @Override
+    ActionResult getCachedActionResult(ActionKey actionKey) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    void upload(
+        ActionKey actionKey,
+        Action action,
+        Command command,
+        Path execRoot,
+        Collection<Path> files,
+        FileOutErr outErr) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+      ByteString data = cacheEntries.get(digest);
+      if (data == null) {
+        return Futures.immediateFailedFuture(new CacheNotFoundException(digest, digestUtil));
+      }
+      try {
+        data.writeTo(out);
+      } catch (IOException e) {
+        return Futures.immediateFailedFuture(e);
+      }
+      return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public void close() {
+      // Intentionally left empty.
+    }
+  }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/InputTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/InputTreeTest.java
index 65db74c..bed2b4b 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/InputTreeTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/InputTreeTest.java
@@ -26,6 +26,7 @@
 import com.google.devtools.build.lib.remote.merkletree.InputTree.FileNode;
 import com.google.devtools.build.lib.remote.merkletree.InputTree.Node;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.StaticMetadataProvider;
 import com.google.devtools.build.lib.remote.util.StringActionInput;
 import com.google.devtools.build.lib.vfs.DigestHashFunction;
 import com.google.devtools.build.lib.vfs.FileSystem;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java
index fa57faa..b00ea4f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java
@@ -27,6 +27,7 @@
 import com.google.devtools.build.lib.actions.FileArtifactValue;
 import com.google.devtools.build.lib.clock.JavaClock;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.StaticMetadataProvider;
 import com.google.devtools.build.lib.vfs.DigestHashFunction;
 import com.google.devtools.build.lib.vfs.FileSystem;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/StaticMetadataProvider.java b/src/test/java/com/google/devtools/build/lib/remote/util/StaticMetadataProvider.java
similarity index 91%
rename from src/test/java/com/google/devtools/build/lib/remote/merkletree/StaticMetadataProvider.java
rename to src/test/java/com/google/devtools/build/lib/remote/util/StaticMetadataProvider.java
index cc48cb1..1d12166 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/StaticMetadataProvider.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/StaticMetadataProvider.java
@@ -11,7 +11,7 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-package com.google.devtools.build.lib.remote.merkletree;
+package com.google.devtools.build.lib.remote.util;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.devtools.build.lib.actions.ActionInput;
@@ -21,7 +21,7 @@
 import javax.annotation.Nullable;
 
 /** A {@link MetadataProvider} backed by static data */
-class StaticMetadataProvider implements MetadataProvider {
+public final class StaticMetadataProvider implements MetadataProvider {
 
   private final ImmutableMap<ActionInput, FileArtifactValue> metadata;