remote: add directory support for remote caching and execution

Add support for directory trees as artifacts. Closes #4011.

PiperOrigin-RevId: 179691001
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
new file mode 100644
index 0000000..9401945
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
@@ -0,0 +1,378 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.actions.EnvironmentalExecException;
+import com.google.devtools.build.lib.actions.ExecException;
+import com.google.devtools.build.lib.concurrent.ThreadSafety;
+import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
+import com.google.devtools.build.lib.util.io.FileOutErr;
+import com.google.devtools.build.lib.vfs.Dirent;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.DirectoryNode;
+import com.google.devtools.remoteexecution.v1test.FileNode;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
+import com.google.devtools.remoteexecution.v1test.Tree;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** A cache for storing artifacts (input and output) as well as the output of running an action. */
+@ThreadSafety.ThreadSafe
+public abstract class AbstractRemoteActionCache implements AutoCloseable {
+  protected final DigestUtil digestUtil;
+
+  public AbstractRemoteActionCache(DigestUtil digestUtil) {
+    this.digestUtil = digestUtil;
+  }
+
+  /**
+   * Ensures that the tree structure of the inputs, the input files themselves, and the command are
+   * available in the remote cache, such that the tree can be reassembled and executed on another
+   * machine given the root digest.
+   *
+   * <p>The cache may check whether files or parts of the tree structure are already present, and do
+   * not need to be uploaded again.
+   *
+   * <p>Note that this method is only required for remote execution, not for caching itself.
+   * However, remote execution uses a cache to store input files, and that may be a separate
+   * end-point from the executor itself, so the functionality lives here. A pure remote caching
+   * implementation that does not support remote execution may choose not to implement this
+   * function, and throw {@link UnsupportedOperationException} instead. If so, it should be clearly
+   * documented that it cannot be used for remote execution.
+   */
+  public abstract void ensureInputsPresent(
+      TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
+      throws IOException, InterruptedException;
+
+  /**
+   * Attempts to look up the given action in the remote cache and return its result, if present.
+   * Returns {@code null} if there is no such entry. Note that a successful result from this method
+   * does not guarantee the availability of the corresponding output files in the remote cache.
+   *
+   * @throws IOException if the remote cache is unavailable.
+   */
+  abstract @Nullable ActionResult getCachedActionResult(DigestUtil.ActionKey actionKey)
+      throws IOException, InterruptedException;
+
+  /**
+   * Upload the result of a locally executed action to the cache by uploading any necessary files,
+   * stdin / stdout, as well as adding an entry for the given action key to the cache if
+   * uploadAction is true.
+   *
+   * @throws IOException if the remote cache is unavailable.
+   */
+  abstract void upload(
+      DigestUtil.ActionKey actionKey,
+      Path execRoot,
+      Collection<Path> files,
+      FileOutErr outErr,
+      boolean uploadAction)
+      throws IOException, InterruptedException;
+
+  /**
+   * Download a remote blob to a local destination.
+   *
+   * @param digest The digest of the remote blob.
+   * @param dest The path to the local file.
+   * @throws IOException if download failed.
+   */
+  protected abstract void downloadBlob(Digest digest, Path dest)
+      throws IOException, InterruptedException;
+
+  /**
+   * Download a remote blob and store it in memory.
+   *
+   * @param digest The digest of the remote blob.
+   * @return The remote blob.
+   * @throws IOException if download failed.
+   */
+  protected abstract byte[] downloadBlob(Digest digest) throws IOException, InterruptedException;
+
+  /**
+   * Download the output files and directory trees of a remotely executed action to the local
+   * machine, as well stdin / stdout to the given files.
+   *
+   * <p>In case of failure, this method deletes any output files it might have already created.
+   *
+   * @throws IOException in case of a cache miss or if the remote cache is unavailable.
+   * @throws ExecException in case clean up after a failed download failed.
+   */
+  // TODO(olaola): will need to amend to include the TreeNodeRepository for updating.
+  public void download(ActionResult result, Path execRoot, FileOutErr outErr)
+      throws ExecException, IOException, InterruptedException {
+    try {
+      for (OutputFile file : result.getOutputFilesList()) {
+        Path path = execRoot.getRelative(file.getPath());
+        downloadFile(path, file.getDigest(), file.getIsExecutable(), file.getContent());
+      }
+      for (OutputDirectory dir : result.getOutputDirectoriesList()) {
+        Digest treeDigest = dir.getTreeDigest();
+        byte[] b = downloadBlob(treeDigest);
+        Digest receivedTreeDigest = digestUtil.compute(b);
+        if (!receivedTreeDigest.equals(treeDigest)) {
+          throw new IOException(
+              "Digest does not match " + receivedTreeDigest + " != " + treeDigest);
+        }
+        Tree tree = Tree.parseFrom(b);
+        Map<Digest, Directory> childrenMap = new HashMap<>();
+        for (Directory child : tree.getChildrenList()) {
+          childrenMap.put(digestUtil.compute(child), child);
+        }
+        Path path = execRoot.getRelative(dir.getPath());
+        downloadDirectory(path, tree.getRoot(), childrenMap);
+      }
+      // TODO(ulfjack): use same code as above also for stdout / stderr if applicable.
+      downloadOutErr(result, outErr);
+    } catch (IOException downloadException) {
+      try {
+        // Delete any (partially) downloaded output files, since any subsequent local execution
+        // of this action may expect none of the output files to exist.
+        for (OutputFile file : result.getOutputFilesList()) {
+          execRoot.getRelative(file.getPath()).delete();
+        }
+        for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+          execRoot.getRelative(directory.getPath()).delete();
+        }
+        if (outErr != null) {
+          outErr.getOutputPath().delete();
+          outErr.getErrorPath().delete();
+        }
+      } catch (IOException e) {
+        // If deleting of output files failed, we abort the build with a decent error message as
+        // any subsequent local execution failure would likely be incomprehensible.
+
+        // We don't propagate the downloadException, as this is a recoverable error and the cause
+        // of the build failure is really that we couldn't delete output files.
+        throw new EnvironmentalExecException(
+            "Failed to delete output files after incomplete "
+                + "download. Cannot continue with local execution.",
+            e,
+            true);
+      }
+      throw downloadException;
+    }
+  }
+
+  /**
+   * Download a directory recursively. The directory is represented by a {@link Directory} protobuf
+   * message, and the descendant directories are in {@code childrenMap}, accessible through their
+   * digest.
+   */
+  private void downloadDirectory(Path path, Directory dir, Map<Digest, Directory> childrenMap)
+      throws IOException, InterruptedException {
+    // Ensure that the directory is created here even though the directory might be empty
+    FileSystemUtils.createDirectoryAndParents(path);
+
+    for (FileNode child : dir.getFilesList()) {
+      Path childPath = path.getRelative(child.getName());
+      downloadFile(childPath, child.getDigest(), child.getIsExecutable(), null);
+    }
+
+    for (DirectoryNode child : dir.getDirectoriesList()) {
+      Path childPath = path.getRelative(child.getName());
+      Digest childDigest = child.getDigest();
+      Directory childDir = childrenMap.get(childDigest);
+      if (childDir == null) {
+        throw new IOException(
+            "could not find subdirectory "
+                + child.getName()
+                + " of directory "
+                + path
+                + " for download: digest "
+                + childDigest
+                + "not found");
+      }
+      downloadDirectory(childPath, childDir, childrenMap);
+
+      // Prevent reuse.
+      childrenMap.remove(childDigest);
+    }
+  }
+
+  /**
+   * Download a file (that is not a directory). If the {@code content} is not given, the content is
+   * fetched from the digest.
+   */
+  protected void downloadFile(
+      Path path, Digest digest, boolean isExecutable, @Nullable ByteString content)
+      throws IOException, InterruptedException {
+    FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
+    if (digest.getSizeBytes() == 0) {
+      // Handle empty file locally.
+      FileSystemUtils.writeContent(path, new byte[0]);
+    } else {
+      if (content != null && !content.isEmpty()) {
+        try (OutputStream stream = path.getOutputStream()) {
+          content.writeTo(stream);
+        }
+      } else {
+        downloadBlob(digest, path);
+        Digest receivedDigest = digestUtil.compute(path);
+        if (!receivedDigest.equals(digest)) {
+          throw new IOException("Digest does not match " + receivedDigest + " != " + digest);
+        }
+      }
+    }
+    path.setExecutable(isExecutable);
+  }
+
+  private void downloadOutErr(ActionResult result, FileOutErr outErr)
+      throws IOException, InterruptedException {
+    if (!result.getStdoutRaw().isEmpty()) {
+      result.getStdoutRaw().writeTo(outErr.getOutputStream());
+      outErr.getOutputStream().flush();
+    } else if (result.hasStdoutDigest()) {
+      byte[] stdoutBytes = downloadBlob(result.getStdoutDigest());
+      outErr.getOutputStream().write(stdoutBytes);
+      outErr.getOutputStream().flush();
+    }
+    if (!result.getStderrRaw().isEmpty()) {
+      result.getStderrRaw().writeTo(outErr.getErrorStream());
+      outErr.getErrorStream().flush();
+    } else if (result.hasStderrDigest()) {
+      byte[] stderrBytes = downloadBlob(result.getStderrDigest());
+      outErr.getErrorStream().write(stderrBytes);
+      outErr.getErrorStream().flush();
+    }
+  }
+
+  /**
+   * The UploadManifest is used to mutualize upload between the RemoteActionCache implementations.
+   */
+  public class UploadManifest {
+    private final ActionResult.Builder result;
+    private final Path execRoot;
+    private final Map<Digest, Path> digestToFile;
+    private final Map<Digest, Chunker> digestToChunkers;
+
+    /**
+     * Create an UploadManifest from an ActionResult builder and an exec root. The ActionResult
+     * builder is populated through a call to {@link #addFile(Digest, Path)}.
+     */
+    public UploadManifest(ActionResult.Builder result, Path execRoot) {
+      this.result = result;
+      this.execRoot = execRoot;
+
+      this.digestToFile = new HashMap<>();
+      this.digestToChunkers = new HashMap<>();
+    }
+
+    /**
+     * Add a collection of files (and directories) to the UploadManifest. Adding a directory has the
+     * effect of 1) uploading a {@link Tree} protobuf message from which the whole structure of the
+     * directory, including the descendants, can be reconstructed and 2) uploading all the
+     * non-directory descendant files.
+     */
+    public void addFiles(Collection<Path> files) throws IOException, InterruptedException {
+      for (Path file : files) {
+        // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
+        // rely on the local spawn runner to stat the files, instead of statting here.
+        if (!file.exists()) {
+          // We ignore requested results that have not been generated by the action.
+          continue;
+        }
+        if (file.isDirectory()) {
+          addDirectory(file);
+        } else {
+          Digest digest = digestUtil.compute(file);
+          addFile(digest, file);
+        }
+      }
+    }
+
+    /** Map of digests to file paths to upload. */
+    public Map<Digest, Path> getDigestToFile() {
+      return digestToFile;
+    }
+
+    /**
+     * Map of digests to chunkers to upload. When the file is a regular, non-directory file it is
+     * transmitted through {@link #getDigestToFile()}. When it is a directory, it is transmitted as
+     * a {@link Tree} protobuf message through {@link #getDigestToChunkers()}.
+     */
+    public Map<Digest, Chunker> getDigestToChunkers() {
+      return digestToChunkers;
+    }
+
+    private void addFile(Digest digest, Path file) throws IOException {
+      result
+          .addOutputFilesBuilder()
+          .setPath(file.relativeTo(execRoot).getPathString())
+          .setDigest(digest)
+          .setIsExecutable(file.isExecutable());
+
+      digestToFile.put(digest, file);
+    }
+
+    private void addDirectory(Path dir) throws IOException {
+      Tree.Builder tree = Tree.newBuilder();
+      Directory root = computeDirectory(dir, tree);
+      tree.setRoot(root);
+
+      byte[] blob = tree.build().toByteArray();
+      Digest digest = digestUtil.compute(blob);
+      Chunker chunker = new Chunker(blob, blob.length, digestUtil);
+
+      if (result != null) {
+        result
+            .addOutputDirectoriesBuilder()
+            .setPath(dir.relativeTo(execRoot).getPathString())
+            .setTreeDigest(digest);
+      }
+
+      digestToChunkers.put(chunker.digest(), chunker);
+    }
+
+    private Directory computeDirectory(Path path, Tree.Builder tree) throws IOException {
+      Directory.Builder b = Directory.newBuilder();
+
+      List<Dirent> sortedDirent = new ArrayList<>(path.readdir(TreeNodeRepository.SYMLINK_POLICY));
+      sortedDirent.sort(Comparator.comparing(Dirent::getName));
+
+      for (Dirent dirent : sortedDirent) {
+        String name = dirent.getName();
+        Path child = path.getRelative(name);
+        if (dirent.getType() == Dirent.Type.DIRECTORY) {
+          Directory dir = computeDirectory(child, tree);
+          b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir));
+          tree.addChildren(dir);
+        } else {
+          Digest digest = digestUtil.compute(child);
+          b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(child.isExecutable());
+          digestToFile.put(digest, child);
+        }
+      }
+
+      return b.build();
+    }
+  }
+
+  /** Release resources associated with the cache. The cache may not be used after calling this. */
+  @Override
+  public abstract void close();
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 908fb9b..d865d0c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -25,15 +25,12 @@
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.EnvironmentalExecException;
-import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.MetadataProvider;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.remote.DigestUtil.ActionKey;
 import com.google.devtools.build.lib.remote.Retrier.RetryException;
 import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
 import com.google.devtools.build.lib.util.io.FileOutErr;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
 import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
@@ -46,7 +43,6 @@
 import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
 import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
 import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.OutputFile;
 import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
 import io.grpc.CallCredentials;
 import io.grpc.Channel;
@@ -57,7 +53,6 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -67,13 +62,12 @@
 
 /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
 @ThreadSafe
-public class GrpcRemoteCache implements RemoteActionCache {
+public class GrpcRemoteCache extends AbstractRemoteActionCache {
   private final RemoteOptions options;
   private final CallCredentials credentials;
   private final Channel channel;
   private final RemoteRetrier retrier;
   private final ByteStreamUploader uploader;
-  private final DigestUtil digestUtil;
   private final ListeningScheduledExecutorService retryScheduler =
       MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
 
@@ -84,11 +78,11 @@
       RemoteOptions options,
       RemoteRetrier retrier,
       DigestUtil digestUtil) {
+    super(digestUtil);
     this.options = options;
     this.credentials = credentials;
     this.channel = channel;
     this.retrier = retrier;
-    this.digestUtil = digestUtil;
 
     uploader = new ByteStreamUploader(options.remoteInstanceName, channel, credentials,
         options.remoteTimeout, retrier, retryScheduler);
@@ -180,90 +174,6 @@
   }
 
   /**
-   * Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
-   * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
-   */
-  @Override
-  public void download(ActionResult result, Path execRoot, FileOutErr outErr)
-      throws ExecException, IOException, InterruptedException {
-    try {
-      for (OutputFile file : result.getOutputFilesList()) {
-        Path path = execRoot.getRelative(file.getPath());
-        FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
-        Digest digest = file.getDigest();
-        if (digest.getSizeBytes() == 0) {
-          // Handle empty file locally.
-          FileSystemUtils.writeContent(path, new byte[0]);
-        } else {
-          if (!file.getContent().isEmpty()) {
-            try (OutputStream stream = path.getOutputStream()) {
-              file.getContent().writeTo(stream);
-            }
-          } else {
-            retrier.execute(
-                () -> {
-                  try (OutputStream stream = path.getOutputStream()) {
-                    readBlob(digest, stream);
-                  }
-                  return null;
-                });
-            Digest receivedDigest = digestUtil.compute(path);
-            if (!receivedDigest.equals(digest)) {
-              throw new IOException(
-                  "Digest does not match " + receivedDigest + " != " + digest);
-            }
-          }
-        }
-        path.setExecutable(file.getIsExecutable());
-      }
-      if (!result.getOutputDirectoriesList().isEmpty()) {
-        throw new UnsupportedOperationException();
-      }
-      // TODO(ulfjack): use same code as above also for stdout / stderr if applicable.
-      downloadOutErr(result, outErr);
-    } catch (IOException downloadException) {
-      try {
-        // Delete any (partially) downloaded output files, since any subsequent local execution
-        // of this action may expect none of the output files to exist.
-        for (OutputFile file : result.getOutputFilesList()) {
-          execRoot.getRelative(file.getPath()).delete();
-        }
-        outErr.getOutputPath().delete();
-        outErr.getErrorPath().delete();
-      } catch (IOException e) {
-        // If deleting of output files failed, we abort the build with a decent error message as
-        // any subsequent local execution failure would likely be incomprehensible.
-
-        // We don't propagate the downloadException, as this is a recoverable error and the cause
-        // of the build failure is really that we couldn't delete output files.
-        throw new EnvironmentalExecException("Failed to delete output files after incomplete "
-            + "download. Cannot continue with local execution.", e, true);
-      }
-      throw downloadException;
-    }
-  }
-
-  private void downloadOutErr(ActionResult result, FileOutErr outErr)
-      throws IOException, InterruptedException {
-    if (!result.getStdoutRaw().isEmpty()) {
-      result.getStdoutRaw().writeTo(outErr.getOutputStream());
-      outErr.getOutputStream().flush();
-    } else if (result.hasStdoutDigest()) {
-      byte[] stdoutBytes = downloadBlob(result.getStdoutDigest());
-      outErr.getOutputStream().write(stdoutBytes);
-      outErr.getOutputStream().flush();
-    }
-    if (!result.getStderrRaw().isEmpty()) {
-      result.getStderrRaw().writeTo(outErr.getErrorStream());
-      outErr.getErrorStream().flush();
-    } else if (result.hasStderrDigest()) {
-      byte[] stderrBytes = downloadBlob(result.getStderrDigest());
-      outErr.getErrorStream().write(stderrBytes);
-      outErr.getErrorStream().flush();
-    }
-  }
-
-  /**
    * This method can throw {@link StatusRuntimeException}, but the RemoteCache interface does not
    * allow throwing such an exception. Any caller must make sure to catch the
    * {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used
@@ -296,6 +206,30 @@
   }
 
   @Override
+  protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
+    retrier.execute(
+        () -> {
+          try (OutputStream stream = dest.getOutputStream()) {
+            readBlob(digest, stream);
+          }
+          return null;
+        });
+  }
+
+  @Override
+  protected byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
+    if (digest.getSizeBytes() == 0) {
+      return new byte[0];
+    }
+    return retrier.execute(
+        () -> {
+          ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
+          readBlob(digest, stream);
+          return stream.toByteArray();
+        });
+  }
+
+  @Override
   public void upload(
       ActionKey actionKey,
       Path execRoot,
@@ -329,38 +263,33 @@
 
   void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result)
       throws IOException, InterruptedException {
-    Map<Digest, Path> digestToFile = new HashMap<>();
-    for (Path file : files) {
-      if (!file.exists()) {
-        // We ignore requested results that have not been generated by the action.
-        continue;
-      }
-      if (file.isDirectory()) {
-        // TODO(olaola): to implement this for a directory, will need to create or pass a
-        // TreeNodeRepository to call uploadTree.
-        throw new UnsupportedOperationException("Storing a directory is not yet supported.");
-      }
+    UploadManifest manifest = new UploadManifest(result, execRoot);
+    manifest.addFiles(files);
 
-      Digest digest = digestUtil.compute(file);
-      // TODO(olaola): inline small results here.
-      result
-          .addOutputFilesBuilder()
-          .setPath(file.relativeTo(execRoot).getPathString())
-          .setDigest(digest)
-          .setIsExecutable(file.isExecutable());
-      digestToFile.put(digest, file);
-    }
-
-    ImmutableSet<Digest> digestsToUpload = getMissingDigests(digestToFile.keySet());
     List<Chunker> filesToUpload = new ArrayList<>();
+
+    Map<Digest, Path> digestToFile = manifest.getDigestToFile();
+    Map<Digest, Chunker> digestToChunkers = manifest.getDigestToChunkers();
+    Collection<Digest> digests = new ArrayList<>();
+    digests.addAll(digestToFile.keySet());
+    digests.addAll(digestToChunkers.keySet());
+
+    ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
     for (Digest digest : digestsToUpload) {
+      Chunker chunker;
       Path file = digestToFile.get(digest);
-      if (file == null) {
-        String message = "FindMissingBlobs call returned an unknown digest: " + digest;
-        throw new IOException(message);
+      if (file != null) {
+        chunker = new Chunker(file);
+      } else {
+        chunker = digestToChunkers.get(digest);
+        if (chunker == null) {
+          String message = "FindMissingBlobs call returned an unknown digest: " + digest;
+          throw new IOException(message);
+        }
       }
-      filesToUpload.add(new Chunker(file));
+      filesToUpload.add(chunker);
     }
+
     if (!filesToUpload.isEmpty()) {
       uploader.uploadBlobs(filesToUpload);
     }
@@ -416,19 +345,6 @@
     return digest;
   }
 
-  byte[] downloadBlob(Digest digest)
-      throws IOException, InterruptedException {
-    if (digest.getSizeBytes() == 0) {
-      return new byte[0];
-    }
-    return retrier.execute(
-        () -> {
-          ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
-          readBlob(digest, stream);
-          return stream.toByteArray();
-        });
-  }
-
   // Execution Cache API
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
deleted file mode 100644
index 6bc53aa..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.actions.ExecException;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.remote.DigestUtil.ActionKey;
-import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
-import com.google.devtools.build.lib.util.io.FileOutErr;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command;
-import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
-
-/** A cache for storing artifacts (input and output) as well as the output of running an action. */
-@ThreadCompatible
-interface RemoteActionCache {
-  // CAS API
-
-  // TODO(buchgr): consider removing the CacheNotFoundException, and replacing it with other
-  // ways to signal a cache miss.
-
-  /**
-   * Ensures that the tree structure of the inputs, the input files themselves, and the command are
-   * available in the remote cache, such that the tree can be reassembled and executed on another
-   * machine given the root digest.
-   *
-   * <p>The cache may check whether files or parts of the tree structure are already present, and do
-   * not need to be uploaded again.
-   *
-   * <p>Note that this method is only required for remote execution, not for caching itself.
-   * However, remote execution uses a cache to store input files, and that may be a separate
-   * end-point from the executor itself, so the functionality lives here. A pure remote caching
-   * implementation that does not support remote execution may choose not to implement this
-   * function, and throw {@link UnsupportedOperationException} instead. If so, it should be clearly
-   * documented that it cannot be used for remote execution.
-   */
-  void ensureInputsPresent(
-      TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
-          throws IOException, InterruptedException;
-
-  /**
-   * Download the output files and directory trees of a remotely executed action to the local
-   * machine, as well stdin / stdout to the given files.
-   *
-   * <p>In case of failure, this method must delete any output files it might have already created.
-   *
-   * @throws CacheNotFoundException in case of a cache miss.
-   * @throws ExecException in case clean up after a failed download failed.
-   */
-  // TODO(olaola): will need to amend to include the TreeNodeRepository for updating.
-  void download(ActionResult result, Path execRoot, FileOutErr outErr)
-      throws ExecException, IOException, InterruptedException;
-  /**
-   * Attempts to look up the given action in the remote cache and return its result, if present.
-   * Returns {@code null} if there is no such entry. Note that a successful result from this method
-   * does not guarantee the availability of the corresponding output files in the remote cache.
-   */
-  @Nullable
-  ActionResult getCachedActionResult(ActionKey actionKey) throws IOException, InterruptedException;
-
-  /**
-   * Upload the result of a locally executed action to the cache by uploading any necessary files,
-   * stdin / stdout, as well as adding an entry for the given action key to the cache if
-   * uploadAction is true.
-   */
-  void upload(
-      ActionKey actionKey,
-      Path execRoot,
-      Collection<Path> files,
-      FileOutErr outErr,
-      boolean uploadAction)
-      throws IOException, InterruptedException;
-
-  /** Release resources associated with the cache. The cache may not be used after calling this. */
-  void close();
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
index c66015a..a20b51f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -34,13 +34,13 @@
  */
 final class RemoteActionContextProvider extends ActionContextProvider {
   private final CommandEnvironment env;
-  private final RemoteActionCache cache;
+  private final AbstractRemoteActionCache cache;
   private final GrpcRemoteExecutor executor;
   private final DigestUtil digestUtil;
 
   RemoteActionContextProvider(
       CommandEnvironment env,
-      @Nullable RemoteActionCache cache,
+      @Nullable AbstractRemoteActionCache cache,
       @Nullable GrpcRemoteExecutor executor,
       DigestUtil digestUtil) {
     this.env = env;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 2cd1222..235f971 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -117,7 +117,7 @@
               remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
       // TODO(davido): The naming is wrong here. "Remote"-prefix in RemoteActionCache class has no
       // meaning.
-      final RemoteActionCache cache;
+      final AbstractRemoteActionCache cache;
       if (remoteOrLocalCache) {
         cache =
             new SimpleBlobStoreActionCache(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index b0487a7..e8af197 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -52,7 +52,7 @@
   private final Path execRoot;
   private final RemoteOptions options;
 
-  private final RemoteActionCache remoteCache;
+  private final AbstractRemoteActionCache remoteCache;
   private final String buildRequestId;
   private final String commandId;
   private final boolean verboseFailures;
@@ -67,7 +67,7 @@
   RemoteSpawnCache(
       Path execRoot,
       RemoteOptions options,
-      RemoteActionCache remoteCache,
+      AbstractRemoteActionCache remoteCache,
       String buildRequestId,
       String commandId,
       boolean verboseFailures,
@@ -95,6 +95,7 @@
     Command command = RemoteSpawnRunner.buildCommand(spawn.getArguments(), spawn.getEnvironment());
     Action action =
         RemoteSpawnRunner.buildAction(
+            execRoot,
             spawn.getOutputFiles(),
             digestUtil.compute(command),
             repository.getMerkleDigest(inputRoot),
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 695f6ea..55a64df 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -78,7 +78,7 @@
   private final boolean verboseFailures;
 
   @Nullable private final Reporter cmdlineReporter;
-  @Nullable private final RemoteActionCache remoteCache;
+  @Nullable private final AbstractRemoteActionCache remoteCache;
   @Nullable private final GrpcRemoteExecutor remoteExecutor;
   private final String buildRequestId;
   private final String commandId;
@@ -95,7 +95,7 @@
       @Nullable Reporter cmdlineReporter,
       String buildRequestId,
       String commandId,
-      @Nullable RemoteActionCache remoteCache,
+      @Nullable AbstractRemoteActionCache remoteCache,
       @Nullable GrpcRemoteExecutor remoteExecutor,
       DigestUtil digestUtil) {
     this.execRoot = execRoot;
@@ -127,6 +127,7 @@
     Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment());
     Action action =
         buildAction(
+            execRoot,
             spawn.getOutputFiles(),
             digestUtil.compute(command),
             repository.getMerkleDigest(inputRoot),
@@ -260,6 +261,7 @@
   }
 
   static Action buildAction(
+      Path execRoot,
       Collection<? extends ActionInput> outputs,
       Digest command,
       Digest inputRoot,
@@ -271,11 +273,17 @@
     action.setCommandDigest(command);
     action.setInputRootDigest(inputRoot);
     ArrayList<String> outputPaths = new ArrayList<>();
+    ArrayList<String> outputDirectoryPaths = new ArrayList<>();
     for (ActionInput output : outputs) {
-      outputPaths.add(output.getExecPathString());
+      String pathString = output.getExecPathString();
+      if (execRoot.getRelative(pathString).isDirectory()) {
+        outputDirectoryPaths.add(pathString);
+      } else {
+        outputPaths.add(pathString);
+      }
     }
     Collections.sort(outputPaths);
-    // TODO: output directories should be handled here, when they are supported.
+    Collections.sort(outputDirectoryPaths);
     action.addAllOutputFiles(outputPaths);
 
     // Get the remote platform properties.
@@ -350,7 +358,7 @@
       SpawnExecutionPolicy policy,
       SortedMap<PathFragment, ActionInput> inputMap,
       boolean uploadToCache,
-      @Nullable RemoteActionCache remoteCache,
+      @Nullable AbstractRemoteActionCache remoteCache,
       @Nullable ActionKey actionKey)
       throws ExecException, IOException, InterruptedException {
     if (uploadToCache && remoteCache != null && actionKey != null) {
@@ -364,7 +372,7 @@
       Spawn spawn,
       SpawnExecutionPolicy policy,
       SortedMap<PathFragment, ActionInput> inputMap,
-      RemoteActionCache remoteCache,
+      AbstractRemoteActionCache remoteCache,
       ActionKey actionKey)
       throws ExecException, IOException, InterruptedException {
     Map<Path, Long> ctimesBefore = getInputCtimes(inputMap);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index db60846..d9c67dc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -15,8 +15,6 @@
 package com.google.devtools.build.lib.remote;
 
 import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.EnvironmentalExecException;
-import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.MetadataProvider;
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
@@ -32,7 +30,6 @@
 import com.google.devtools.remoteexecution.v1test.Directory;
 import com.google.devtools.remoteexecution.v1test.DirectoryNode;
 import com.google.devtools.remoteexecution.v1test.FileNode;
-import com.google.devtools.remoteexecution.v1test.OutputFile;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.ByteArrayInputStream;
@@ -41,6 +38,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
+import java.util.Map;
 
 /**
  * A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files
@@ -51,15 +49,14 @@
  * <p>Note that this class is used from src/tools/remote.
  */
 @ThreadSafe
-public final class SimpleBlobStoreActionCache implements RemoteActionCache {
+public final class SimpleBlobStoreActionCache extends AbstractRemoteActionCache {
   private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024;
 
   private final SimpleBlobStore blobStore;
-  private final DigestUtil digestUtil;
 
   public SimpleBlobStoreActionCache(SimpleBlobStore blobStore, DigestUtil digestUtil) {
+    super(digestUtil);
     this.blobStore = blobStore;
-    this.digestUtil = digestUtil;
   }
 
   @Override
@@ -79,10 +76,11 @@
 
   public void downloadTree(Digest rootDigest, Path rootLocation)
       throws IOException, InterruptedException {
+    FileSystemUtils.createDirectoryAndParents(rootLocation);
     Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
     for (FileNode file : directory.getFilesList()) {
-      downloadFileContents(
-          file.getDigest(), rootLocation.getRelative(file.getName()), file.getIsExecutable());
+      downloadFile(
+          rootLocation.getRelative(file.getName()), file.getDigest(), file.getIsExecutable(), null);
     }
     for (DirectoryNode child : directory.getDirectoriesList()) {
       downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
@@ -109,63 +107,6 @@
   }
 
   @Override
-  public void download(ActionResult result, Path execRoot, FileOutErr outErr)
-      throws ExecException, IOException, InterruptedException {
-    try {
-      for (OutputFile file : result.getOutputFilesList()) {
-        if (!file.getContent().isEmpty()) {
-          createFile(
-              file.getContent().toByteArray(),
-              execRoot.getRelative(file.getPath()),
-              file.getIsExecutable());
-        } else {
-          downloadFileContents(
-              file.getDigest(), execRoot.getRelative(file.getPath()), file.getIsExecutable());
-        }
-      }
-      if (!result.getOutputDirectoriesList().isEmpty()) {
-        throw new UnsupportedOperationException();
-      }
-      downloadOutErr(result, outErr);
-    } catch (IOException downloadException) {
-      try {
-        // Delete any (partially) downloaded output files, since any subsequent local execution
-        // of this action may expect none of the output files to exist.
-        for (OutputFile file : result.getOutputFilesList()) {
-          execRoot.getRelative(file.getPath()).delete();
-        }
-        outErr.getOutputPath().delete();
-        outErr.getErrorPath().delete();
-      } catch (IOException e) {
-        // If deleting of output files failed, we abort the build with a decent error message as
-        // any subsequent local execution failure would likely be incomprehensible.
-
-        // We don't propagate the downloadException, as this is a recoverable error and the cause
-        // of the build failure is really that we couldn't delete output files.
-        throw new EnvironmentalExecException("Failed to delete output files after incomplete "
-            + "download. Cannot continue with local execution.", e, true);
-      }
-      throw downloadException;
-    }
-  }
-
-  private void downloadOutErr(ActionResult result, FileOutErr outErr)
-          throws IOException, InterruptedException {
-    if (!result.getStdoutRaw().isEmpty()) {
-      result.getStdoutRaw().writeTo(outErr.getOutputStream());
-      outErr.getOutputStream().flush();
-    } else if (result.hasStdoutDigest()) {
-      downloadFileContents(result.getStdoutDigest(), outErr.getOutputPath(), /*executable=*/false);
-    }
-    if (!result.getStderrRaw().isEmpty()) {
-      result.getStderrRaw().writeTo(outErr.getErrorStream());
-      outErr.getErrorStream().flush();
-    } else if (result.hasStderrDigest()) {
-      downloadFileContents(result.getStderrDigest(), outErr.getErrorPath(), /*executable=*/false);
-    }
-  }
-
-  @Override
   public void upload(
       ActionKey actionKey,
       Path execRoot,
@@ -190,26 +131,17 @@
 
   public void upload(ActionResult.Builder result, Path execRoot, Collection<Path> files)
       throws IOException, InterruptedException {
-    for (Path file : files) {
-      // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
-      // rely on the local spawn runner to stat the files, instead of statting here.
-      if (!file.exists()) {
-        continue;
+    UploadManifest manifest = new UploadManifest(result, execRoot);
+    manifest.addFiles(files);
+
+    for (Map.Entry<Digest, Path> entry : manifest.getDigestToFile().entrySet()) {
+      try (InputStream in = entry.getValue().getInputStream()) {
+        uploadStream(entry.getKey(), in);
       }
-      if (file.isDirectory()) {
-        // TODO(olaola): to implement this for a directory, will need to create or pass a
-        // TreeNodeRepository to call uploadTree.
-        throw new IOException("Storing a directory is not yet supported.");
-      }
-      // TODO(olaola): inline small file contents here.
-      // First put the file content to cache.
-      Digest digest = uploadFileContents(file);
-      // Add to protobuf.
-      result
-          .addOutputFilesBuilder()
-          .setPath(file.relativeTo(execRoot).getPathString())
-          .setDigest(digest)
-          .setIsExecutable(file.isExecutable());
+    }
+
+    for (Map.Entry<Digest, Chunker> entry : manifest.getDigestToChunkers().entrySet()) {
+      uploadBlob(entry.getValue().next().getData().toByteArray(), entry.getKey());
     }
   }
 
@@ -227,23 +159,6 @@
     }
   }
 
-  private void downloadFileContents(Digest digest, Path dest, boolean executable)
-      throws IOException, InterruptedException {
-    FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
-    try (OutputStream out = dest.getOutputStream()) {
-      downloadBlob(digest, out);
-    }
-    dest.setExecutable(executable);
-  }
-
-  private void createFile(byte[] contents, Path dest, boolean executable) throws IOException {
-    FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
-    try (OutputStream stream = dest.getOutputStream()) {
-      stream.write(contents);
-    }
-    dest.setExecutable(executable);
-  }
-
   public Digest uploadBlob(byte[] blob) throws IOException, InterruptedException {
     return uploadBlob(blob, digestUtil.compute(blob));
   }
@@ -258,27 +173,6 @@
     return digest;
   }
 
-  private void downloadBlob(Digest digest, OutputStream out)
-      throws IOException, InterruptedException {
-    if (digest.getSizeBytes() == 0) {
-      return;
-    }
-    boolean success = blobStore.get(digest.getHash(), out);
-    if (!success) {
-      throw new CacheNotFoundException(digest);
-    }
-  }
-
-  public byte[] downloadBlob(Digest digest)
-      throws IOException, InterruptedException {
-    if (digest.getSizeBytes() == 0) {
-      return new byte[0];
-    }
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    downloadBlob(digest, out);
-    return out.toByteArray();
-  }
-
   public boolean containsKey(Digest digest) throws IOException, InterruptedException {
     return blobStore.containsKey(digest.getHash());
   }
@@ -316,4 +210,28 @@
   public void close() {
     blobStore.close();
   }
+
+  @Override
+  protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
+    try (OutputStream out = dest.getOutputStream()) {
+      boolean success = blobStore.get(digest.getHash(), out);
+      if (!success) {
+        throw new CacheNotFoundException(digest);
+      }
+    }
+  }
+
+  @Override
+  public byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
+    if (digest.getSizeBytes() == 0) {
+      return new byte[0];
+    }
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      boolean success = blobStore.get(digest.getHash(), out);
+      if (!success) {
+        throw new CacheNotFoundException(digest);
+      }
+      return out.toByteArray();
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
index 0554682..1f836a9 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
@@ -24,19 +24,24 @@
 import com.google.common.collect.TreeTraverser;
 import com.google.devtools.build.lib.actions.ActionInput;
 import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
+import com.google.devtools.build.lib.actions.cache.Metadata;
 import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
 import com.google.devtools.build.lib.concurrent.BlazeInterners;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.exec.SpawnInputExpander;
+import com.google.devtools.build.lib.vfs.Dirent;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.build.lib.vfs.Symlinks;
 import com.google.devtools.remoteexecution.v1test.Digest;
 import com.google.devtools.remoteexecution.v1test.Directory;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,10 +56,22 @@
  */
 @ThreadSafe
 public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.TreeNode> {
+  // In this implementation, symlinks are NOT followed when expanding directory artifacts
+  public static final Symlinks SYMLINK_POLICY = Symlinks.NOFOLLOW;
+
   /**
    * A single node in a hierarchical directory structure. Leaves are the Artifacts, although we only
    * use the ActionInput interface. We assume that the objects used for the ActionInputs are unique
    * (same data corresponds to a canonical object in memory).
+   *
+   * <p>There are three cases:
+   *
+   * <ol>
+   *   <li>The node is a leaf that represents an artifact file.
+   *   <li>The node is a directory optionally associated with an artifact (an "artifact directory").
+   *   <li>The node is a leaf that is the descendant of an artifact directory. In this case, the
+   *       node is associated with a BasicActionInput, not a full Artifact.
+   * </ol>
    */
   @Immutable
   @ThreadSafe
@@ -62,7 +79,8 @@
 
     private final int hashCode;
     private final ImmutableList<ChildEntry> childEntries; // no need to make it a map thus far.
-    @Nullable private final ActionInput actionInput; // Null iff this is a directory.
+    @Nullable private final ActionInput actionInput;
+    private final boolean isLeaf;
 
     /** A pair of path segment, TreeNode. */
     @Immutable
@@ -105,15 +123,23 @@
     }
 
     // Should only be called by the TreeNodeRepository.
-    private TreeNode(Iterable<ChildEntry> childEntries) {
-      this.actionInput = null;
+    private TreeNode(Iterable<ChildEntry> childEntries, @Nullable ActionInput actionInput) {
+      isLeaf = false;
+      this.actionInput = actionInput;
       this.childEntries = ImmutableList.copyOf(childEntries);
-      hashCode = Arrays.hashCode(this.childEntries.toArray());
+      if (actionInput != null) {
+        hashCode = actionInput.hashCode(); // This will ensure efficient interning of TreeNodes as
+        // long as all ActionInputs either implement data-based hashCode or are interned themselves.
+      } else {
+        hashCode = Arrays.hashCode(this.childEntries.toArray());
+      }
     }
 
     // Should only be called by the TreeNodeRepository.
     private TreeNode(ActionInput actionInput) {
-      this.actionInput = actionInput;
+      isLeaf = true;
+      this.actionInput =
+          Preconditions.checkNotNull(actionInput, "a TreeNode leaf should have an ActionInput");
       this.childEntries = ImmutableList.of();
       hashCode = actionInput.hashCode(); // This will ensure efficient interning of TreeNodes as
       // long as all ActionInputs either implement data-based hashCode or are interned themselves.
@@ -128,7 +154,7 @@
     }
 
     public boolean isLeaf() {
-      return actionInput != null;
+      return isLeaf;
     }
 
     @Override
@@ -176,7 +202,8 @@
     }
   }
 
-  private static final TreeNode EMPTY_NODE = new TreeNode(ImmutableList.<TreeNode.ChildEntry>of());
+  private static final TreeNode EMPTY_NODE =
+      new TreeNode(ImmutableList.<TreeNode.ChildEntry>of(), null);
 
   // Keep only one canonical instance of every TreeNode in the repository.
   private final Interner<TreeNode> interner = BlazeInterners.newWeakInterner();
@@ -184,6 +211,8 @@
   // be part of the state.
   private final Path execRoot;
   private final ActionInputFileCache inputFileCache;
+  // For directories that are themselves artifacts, map of the ActionInput to the Merkle hash
+  private final Map<ActionInput, Digest> inputDirectoryDigestCache = new HashMap<>();
   private final Map<TreeNode, Digest> treeNodeDigestCache = new HashMap<>();
   private final Map<Digest, TreeNode> digestTreeNodeCache = new HashMap<>();
   private final Map<TreeNode, Directory> directoryCache = new HashMap<>();
@@ -226,7 +255,7 @@
         });
   }
 
-  public TreeNode buildFromActionInputs(Iterable<? extends ActionInput> inputs) {
+  public TreeNode buildFromActionInputs(Iterable<? extends ActionInput> inputs) throws IOException {
     TreeMap<PathFragment, ActionInput> sortedMap = new TreeMap<>();
     for (ActionInput input : inputs) {
       sortedMap.put(PathFragment.create(input.getExecPathString()), input);
@@ -239,7 +268,8 @@
    * of input files. TODO(olaola): switch to creating and maintaining the TreeNodeRepository based
    * on the build graph structure.
    */
-  public TreeNode buildFromActionInputs(SortedMap<PathFragment, ActionInput> sortedMap) {
+  public TreeNode buildFromActionInputs(SortedMap<PathFragment, ActionInput> sortedMap)
+      throws IOException {
     ImmutableList.Builder<ImmutableList<String>> segments = ImmutableList.builder();
     for (PathFragment path : sortedMap.keySet()) {
       segments.add(path.getSegments());
@@ -255,13 +285,35 @@
     return buildParentNode(inputs, segments.build(), 0, inputs.size(), 0);
   }
 
+  // Expand the descendant of an artifact (input) directory
+  private List<TreeNode.ChildEntry> buildInputDirectoryEntries(Path path) throws IOException {
+    List<Dirent> sortedDirent = new ArrayList<>(path.readdir(SYMLINK_POLICY));
+    sortedDirent.sort(Comparator.comparing(Dirent::getName));
+
+    List<TreeNode.ChildEntry> entries = new ArrayList<>(sortedDirent.size());
+    for (Dirent dirent : sortedDirent) {
+      String name = dirent.getName();
+      Path child = path.getRelative(name);
+      TreeNode childNode;
+      if (dirent.getType() == Dirent.Type.DIRECTORY) {
+        childNode = interner.intern(new TreeNode(buildInputDirectoryEntries(child), null));
+      } else {
+        childNode = interner.intern(new TreeNode(ActionInputHelper.fromPath(child.asFragment())));
+      }
+      entries.add(new TreeNode.ChildEntry(name, childNode));
+    }
+
+    return entries;
+  }
+
   @SuppressWarnings("ReferenceEquality") // Segments are interned.
   private TreeNode buildParentNode(
       List<ActionInput> inputs,
       ImmutableList<ImmutableList<String>> segments,
       int inputsStart,
       int inputsEnd,
-      int segmentIndex) {
+      int segmentIndex)
+      throws IOException {
     if (segments.isEmpty()) {
       // We sometimes have actions with no inputs (e.g., echo "xyz" > $@), so we need to handle that
       // case here.
@@ -273,7 +325,12 @@
       Preconditions.checkArgument(
           inputsStart == inputsEnd - 1, "Encountered two inputs with the same path.");
       // TODO: check that the actionInput is a single file!
-      return interner.intern(new TreeNode(inputs.get(inputsStart)));
+      ActionInput input = inputs.get(inputsStart);
+      Path leafPath = execRoot.getRelative(input.getExecPathString());
+      if (leafPath.isDirectory()) {
+        return interner.intern(new TreeNode(buildInputDirectoryEntries(leafPath), input));
+      }
+      return interner.intern(new TreeNode(input));
     }
     ArrayList<TreeNode.ChildEntry> entries = new ArrayList<>();
     String segment = segments.get(inputsStart).get(segmentIndex);
@@ -290,7 +347,7 @@
         }
       }
     }
-    return interner.intern(new TreeNode(entries));
+    return interner.intern(new TreeNode(entries, null));
   }
 
   private synchronized Directory getOrComputeDirectory(TreeNode node) throws IOException {
@@ -322,6 +379,9 @@
           }
         } else {
           Digest childDigest = Preconditions.checkNotNull(treeNodeDigestCache.get(child));
+          if (child.getActionInput() != null) {
+            inputDirectoryDigestCache.put(child.getActionInput(), childDigest);
+          }
           b.addDirectoriesBuilder().setName(entry.getSegment()).setDigest(childDigest);
         }
       }
@@ -380,6 +440,15 @@
     if (input instanceof VirtualActionInput) {
       return Preconditions.checkNotNull(virtualInputDigestCache.get(input));
     }
+    Metadata metadata = Preconditions.checkNotNull(inputFileCache.getMetadata(input));
+    byte[] digest = metadata.getDigest();
+    if (digest == null) {
+      // If the artifact does not have a digest, it is because it is a directory.
+      // We get the digest from the set of Merkle hashes computed in this TreeNodeRepository.
+      return Preconditions.checkNotNull(
+          inputDirectoryDigestCache.get(input),
+          "a directory should have a precomputed Merkle hash (instead of a digest)");
+    }
     return DigestUtil.getFromInputCache(input, inputFileCache);
   }