remote: make the dynamic spawn scheduler work. Fixes #8646

This change fixes the correctness issue of dynamic spawn scheduler when being used with remote execution. See #8646 for more details.

There's a performance issue remaining: #8647

Closes #8648.

PiperOrigin-RevId: 253998300
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
index 1e9b650..78c7e3d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
@@ -46,6 +46,7 @@
 import com.google.devtools.build.lib.actions.UserExecException;
 import com.google.devtools.build.lib.actions.cache.MetadataInjector;
 import com.google.devtools.build.lib.concurrent.ThreadSafety;
+import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.profiler.Profiler;
 import com.google.devtools.build.lib.profiler.SilentCloseable;
 import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.DirectoryMetadata;
@@ -70,6 +71,7 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -83,6 +85,12 @@
 @ThreadSafety.ThreadSafe
 public abstract class AbstractRemoteActionCache implements AutoCloseable {
 
+  /** See {@link SpawnExecutionContext#lockOutputFiles()}. */
+  @FunctionalInterface
+  interface OutputFilesLocker {
+    void lock() throws InterruptedException;
+  }
+
   private static final ListenableFuture<Void> COMPLETED_SUCCESS = SettableFuture.create();
   private static final ListenableFuture<byte[]> EMPTY_BYTES = SettableFuture.create();
 
@@ -161,16 +169,26 @@
     return outerF;
   }
 
+  private static Path toTmpDownloadPath(Path actualPath) {
+    return actualPath.getParentDirectory().getRelative(actualPath.getBaseName() + ".tmp");
+  }
+
   /**
    * Download the output files and directory trees of a remotely executed action to the local
    * machine, as well stdin / stdout to the given files.
    *
    * <p>In case of failure, this method deletes any output files it might have already created.
    *
+   * @param outputFilesLocker ensures that we are the only ones writing to the output files when
+   *     using the dynamic spawn strategy.
    * @throws IOException in case of a cache miss or if the remote cache is unavailable.
    * @throws ExecException in case clean up after a failed download failed.
    */
-  public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
+  public void download(
+      ActionResult result,
+      Path execRoot,
+      FileOutErr origOutErr,
+      OutputFilesLocker outputFilesLocker)
       throws ExecException, IOException, InterruptedException {
     ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);
 
@@ -182,7 +200,8 @@
             .map(
                 (file) -> {
                   try {
-                    ListenableFuture<Void> download = downloadFile(file.path(), file.digest());
+                    ListenableFuture<Void> download =
+                        downloadFile(toTmpDownloadPath(file.path()), file.digest());
                     return Futures.transform(download, (d) -> file, directExecutor());
                   } catch (IOException e) {
                     return Futures.<FileMetadata>immediateFailedFuture(e);
@@ -209,10 +228,8 @@
 
     for (ListenableFuture<FileMetadata> download : downloads) {
       try {
-        FileMetadata outputFile = getFromFuture(download);
-        if (outputFile != null) {
-          outputFile.path().setExecutable(outputFile.isExecutable());
-        }
+        // Wait for all downloads to finish.
+        getFromFuture(download);
       } catch (IOException e) {
         downloadException = downloadException == null ? e : downloadException;
       } catch (InterruptedException e) {
@@ -222,10 +239,9 @@
 
     if (downloadException != null || interruptedException != null) {
       try {
-        // Delete any (partially) downloaded output files, since any subsequent local execution
-        // of this action may expect none of the output files to exist.
+        // Delete any (partially) downloaded output files.
         for (OutputFile file : result.getOutputFilesList()) {
-          execRoot.getRelative(file.getPath()).delete();
+          toTmpDownloadPath(execRoot.getRelative(file.getPath())).delete();
         }
         for (OutputDirectory directory : result.getOutputDirectoriesList()) {
           // Only delete the directories below the output directories because the output
@@ -261,6 +277,12 @@
       tmpOutErr.clearErr();
     }
 
+    // Ensure that we are the only ones writing to the output files when using the dynamic spawn
+    // strategy.
+    outputFilesLocker.lock();
+
+    moveOutputsToFinalLocation(downloads);
+
     List<SymlinkMetadata> symlinksInDirectories = new ArrayList<>();
     for (Entry<Path, DirectoryMetadata> entry : metadata.directories()) {
       entry.getKey().createDirectoryAndParents();
@@ -275,6 +297,36 @@
     createSymlinks(symlinks);
   }
 
+  /**
+   * Copies moves the downloaded outputs from their download location to their declared location.
+   */
+  private void moveOutputsToFinalLocation(List<ListenableFuture<FileMetadata>> downloads)
+      throws IOException, InterruptedException {
+    List<FileMetadata> finishedDownloads = new ArrayList<>(downloads.size());
+    for (ListenableFuture<FileMetadata> finishedDownload : downloads) {
+      FileMetadata outputFile = getFromFuture(finishedDownload);
+      if (outputFile != null) {
+        finishedDownloads.add(outputFile);
+      }
+    }
+    /*
+     * Sort the list lexicographically based on its temporary download path in order to avoid
+     * filename clashes when moving the files:
+     *
+     * Consider an action that produces two outputs foo and foo.tmp. These outputs would initially
+     * be downloaded to foo.tmp and foo.tmp.tmp. When renaming them to foo and foo.tmp we need to
+     * ensure that rename(foo.tmp, foo) happens before rename(foo.tmp.tmp, foo.tmp). We ensure this
+     * by doing the renames in lexicographical order of the download names.
+     */
+    Collections.sort(finishedDownloads, Comparator.comparing(f -> toTmpDownloadPath(f.path())));
+
+    // Move the output files from their temporary name to the actual output file name.
+    for (FileMetadata outputFile : finishedDownloads) {
+      FileSystemUtils.moveFile(toTmpDownloadPath(outputFile.path()), outputFile.path());
+      outputFile.path().setExecutable(outputFile.isExecutable());
+    }
+  }
+
   private void createSymlinks(Iterable<SymlinkMetadata> symlinks) throws IOException {
     for (SymlinkMetadata symlink : symlinks) {
       if (symlink.target().isAbsolute()) {
@@ -376,6 +428,8 @@
    * @param execRoot the execution root
    * @param metadataInjector the action's metadata injector that allows this method to inject
    *     metadata about an action output instead of downloading the output
+   * @param outputFilesLocker ensures that we are the only ones writing to the output files when
+   *     using the dynamic spawn strategy.
    * @throws IOException in case of failure
    * @throws InterruptedException in case of receiving an interrupt
    */
@@ -386,7 +440,8 @@
       @Nullable PathFragment inMemoryOutputPath,
       OutErr outErr,
       Path execRoot,
-      MetadataInjector metadataInjector)
+      MetadataInjector metadataInjector,
+      OutputFilesLocker outputFilesLocker)
       throws IOException, InterruptedException {
     Preconditions.checkState(
         result.getExitCode() == 0,
@@ -403,6 +458,10 @@
               + "--experimental_remote_download_outputs=minimal");
     }
 
+    // Ensure that when using dynamic spawn strategy that we are the only ones writing to the
+    // output files.
+    outputFilesLocker.lock();
+
     ActionInput inMemoryOutput = null;
     Digest inMemoryOutputDigest = null;
     for (ActionInput output : outputs) {