Extract a SpawnRunner interface

The RemoteSpawnRunner now implements the SpawnRunner interface.

Note that Google's internal implementations were also retrofitted, and
SpawnRunner is intended as a stable interface; that's also why I decided to
move all params into SpawnExecutionPolicy, which is, unfortunately, not quite
done yet.

The specification of SpawnRunner is also still incomplete. In particular, it
is still missing execution info keys, as well as inputs and outputs handling.

This is a step towards unifying all SpawnStrategy implementations, with the
SpawnRunner implementations performing the actual Spawn execution.

There should be no user-visible semantic changes to the code, but one small
fix:
- GrpcActionCache was trying to download files even if there were none

PiperOrigin-RevId: 152105696
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/Label.java b/src/main/java/com/google/devtools/build/lib/cmdline/Label.java
index f276bb9..1d31f2e 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/Label.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/Label.java
@@ -548,7 +548,7 @@
    * Returns a suitable string for the user-friendly representation of the Label. Works even if the
    * argument is null.
    */
-  public static String print(Label label) {
+  public static String print(@Nullable Label label) {
     return label == null ? "(unknown)" : label.toString();
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java
new file mode 100644
index 0000000..1f50827
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java
@@ -0,0 +1,133 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.exec;
+
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.actions.ExecException;
+import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.util.io.FileOutErr;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import java.io.IOException;
+import java.util.SortedMap;
+
+/**
+ * A runner for spawns. Implementations can execute spawns on the local machine as a subprocess with
+ * or without sandboxing, on a remote machine, or only consult a remote cache.
+ *
+ * <h2>Environment Variables</h2>
+ * <ul>
+ *   <li>Implementations MUST set the specified environment variables.
+ *   <li>Implementations MAY add TMPDIR as an additional env variable, if it is not set already.
+ *   <li>If an implementation sets TMPDIR, it MUST be set to an absolute path.
+ *   <li>Implementations MUST NOT add any other environment variables.
+ * </ul>
+ *
+ * <h2>Command line</h2>
+ * <ul>
+ *   <li>Implementations MUST use the specified command line unmodified by default.
+ *   <li>Implementations MAY modify the specified command line if explicitly requested by the user.
+ * </ul>
+ *
+ * <h2>Process</h2>
+ * <ul>
+ *   <li>Implementations MUST ensure that all child processes (including transitive) exit in all
+ *       cases, including successful completion, interruption, and timeout
+ *   <li>Implementations MUST return the exit code as observed from the subprocess if the subprocess
+ *       exits naturally; they MUST not throw an exception for non-zero exit codes
+ *   <li>Implementations MUST be interruptible; they MUST throw {@link InterruptedException} from
+ *       {@link #exec} when interrupted
+ *   <li>Implementations MUST apply the specified timeout to the execution of the subprocess
+ *     <ul>
+ *       <li>If no timeout is specified, the implementation MAY apply an implementation-specific
+ *           timeout
+ *       <li>If the specified timeout is larger than an implementation-dependent maximum, then the
+ *           implementation MUST throw {@link IllegalArgumentException}; it MUST not silently change
+ *           the timeout to a smaller value
+ *       <li>If the timeout is exceeded, the implementation MUST throw TimeoutException, with the
+ *           timeout that was applied to the subprocess (TODO)
+ *     </ul>
+ * </ul>
+ *
+ * <h2>Optimistic Concurrency</h2>
+ * Bazel may choose to execute a spawn using multiple {@link SpawnRunner} implementations
+ * simultaneously in order to minimize total latency. This is especially useful for builds with few
+ * actions where remotely executing the actions incurs high round trip times.
+ * <ul>
+ *   <li>All implementations MUST call {@link SpawnExecutionPolicy#lockOutputFiles} before writing
+ *       to any of the output files, but may write to stdout and stderr without calling it. Instead,
+ *       all callers must provide temporary locations for stdout & stderr if they ever call multiple
+ *       {@link SpawnRunner} implementations concurrently. Spawn runners that use the local machine
+ *       MUST either call it before starting the subprocess, or ensure that subprocesses write to
+ *       temporary locations (for example by running in a mount namespace) and then copy or move the
+ *       outputs into place.
+ *   <li>Implementations SHOULD delay calling {@link SpawnExecutionPolicy#lockOutputFiles} until
+ *       just before writing.
+ * </ul>
+ */
+public interface SpawnRunner {
+  /**
+   * A helper class to provide additional tools and methods to {@link SpawnRunner} implementations.
+   *
+   * <p>This interface may change without notice.
+   */
+  public interface SpawnExecutionPolicy {
+    /**
+     * Returns whether inputs should be prefetched to the local machine using {@link
+     * ActionInputPrefetcher} if the spawn is executed locally (with or without sandboxing).
+     */
+    // TODO(ulfjack): Use an execution info value instead.
+    boolean shouldPrefetchInputsForLocalExecution(Spawn spawn);
+
+    /**
+     * The input file cache for this specific spawn.
+     */
+    ActionInputFileCache getActionInputFileCache();
+
+    /**
+     * All implementations must call this method before writing to the provided stdout / stderr or
+     * to any of the output file locations. This method is used to coordinate - implementations
+     * must throw an {@link InterruptedException} for all but one caller.
+     */
+    void lockOutputFiles() throws InterruptedException;
+
+    /**
+     * Returns the timeout that should be applied for the given {@link Spawn} instance.
+     */
+    long getTimeoutMillis();
+
+    /**
+     * The files to which to write stdout and stderr.
+     */
+    FileOutErr getFileOutErr();
+
+    SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException;
+  }
+
+  /**
+   * Run the given spawn.
+   *
+   * @param spawn the spawn to run
+   * @param policy a helper that provides additional parameters
+   * @return the result from running the spawn
+   * @throws InterruptedException if the calling thread was interrupted, or if the runner could not
+   *         lock the output files (see {@link SpawnExecutionPolicy#lockOutputFiles()})
+   * @throws IOException if something went wrong reading or writing to the local file system
+   * @throws ExecException if the request is malformed
+   */
+  SpawnResult exec(
+      Spawn spawn,
+      SpawnExecutionPolicy policy)
+          throws InterruptedException, IOException, ExecException;
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 3e70003..51ff22f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -22,12 +22,10 @@
 import com.google.common.eventbus.EventBus;
 import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
 import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
 import com.google.devtools.build.lib.actions.ActionStatusMessage;
-import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander;
 import com.google.devtools.build.lib.actions.Spawn;
-import com.google.devtools.build.lib.exec.SpawnInputExpander;
 import com.google.devtools.build.lib.exec.SpawnResult;
+import com.google.devtools.build.lib.exec.SpawnRunner;
 import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
 import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
@@ -38,7 +36,7 @@
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
 import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
 import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
-import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.util.io.FileOutErr;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.protobuf.TextFormat;
@@ -55,26 +53,22 @@
 /**
  * A client for the remote execution service.
  */
-final class RemoteSpawnRunner {
+final class RemoteSpawnRunner implements SpawnRunner {
   private final EventBus eventBus;
   private final Path execRoot;
   private final RemoteOptions options;
   // TODO(olaola): This will be set on a per-action basis instead.
   private final Platform platform;
-  private final String workspaceName;
-  private final SpawnInputExpander spawnInputExpander = new SpawnInputExpander(/*strict=*/false);
 
   private final GrpcRemoteExecutor executor;
 
   RemoteSpawnRunner(
       Path execRoot,
       EventBus eventBus,
-      String workspaceName,
       RemoteOptions options,
       GrpcRemoteExecutor executor) {
     this.execRoot = execRoot;
     this.eventBus = eventBus;
-    this.workspaceName = workspaceName;
     this.options = options;
     if (options.experimentalRemotePlatformOverride != null) {
       Platform.Builder platformBuilder = Platform.newBuilder();
@@ -93,9 +87,8 @@
   RemoteSpawnRunner(
       Path execRoot,
       EventBus eventBus,
-      String workspaceName,
       RemoteOptions options) {
-    this(execRoot, eventBus, workspaceName, options, connect(options));
+    this(execRoot, eventBus, options, connect(options));
   }
 
   private static GrpcRemoteExecutor connect(RemoteOptions options) {
@@ -109,13 +102,10 @@
     return new GrpcRemoteExecutor(channel, options);
   }
 
+  @Override
   public SpawnResult exec(
       Spawn spawn,
-      // TODO(ulfjack): Change this back to FileOutErr.
-      OutErr outErr,
-      ActionInputFileCache actionInputFileCache,
-      ArtifactExpander artifactExpander,
-      float timeout) throws InterruptedException, IOException {
+      SpawnExecutionPolicy policy) throws InterruptedException, IOException {
     ActionExecutionMetadata owner = spawn.getResourceOwner();
     if (owner.getOwner() != null) {
       eventBus.post(ActionStatusMessage.runningStrategy(owner, "remote"));
@@ -124,13 +114,8 @@
     try {
       // Temporary hack: the TreeNodeRepository should be created and maintained upstream!
       TreeNodeRepository repository =
-          new TreeNodeRepository(execRoot, actionInputFileCache);
-      SortedMap<PathFragment, ActionInput> inputMap =
-          spawnInputExpander.getInputMapping(
-              spawn,
-              artifactExpander,
-              actionInputFileCache,
-              workspaceName);
+          new TreeNodeRepository(execRoot, policy.getActionInputFileCache());
+      SortedMap<PathFragment, ActionInput> inputMap = policy.getInputMapping();
       TreeNode inputRoot = repository.buildFromActionInputs(inputMap);
       repository.computeMerkleDigests(inputRoot);
       Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment());
@@ -155,7 +140,7 @@
                 .setAction(action)
                 .setAcceptCached(this.options.remoteAcceptCached)
                 .setTotalInputFileCount(inputMap.size())
-                .setTimeoutMillis((int) (1000 * timeout));
+                .setTimeoutMillis(policy.getTimeoutMillis());
         ExecuteReply reply = executor.executeRemotely(request.build());
         ExecutionStatus status = reply.getStatus();
 
@@ -171,7 +156,7 @@
       }
 
       // TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
-      passRemoteOutErr(executor, result, outErr);
+      passRemoteOutErr(executor, result, policy.getFileOutErr());
       executor.downloadAllResults(result, execRoot);
       return new SpawnResult.Builder()
           .setSetupSuccess(true)
@@ -211,7 +196,8 @@
   }
 
   private static void passRemoteOutErr(
-      RemoteActionCache cache, ActionResult result, OutErr outErr) throws CacheNotFoundException {
+      RemoteActionCache cache, ActionResult result, FileOutErr outErr)
+          throws CacheNotFoundException {
     ImmutableList<byte[]> streams =
         cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
     outErr.printOut(new String(streams.get(0), UTF_8));
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index f22dc73..97fa28a 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -14,7 +14,7 @@
 package com.google.devtools.build.lib.remote;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -39,7 +39,10 @@
 import com.google.devtools.build.lib.actions.ResourceSet;
 import com.google.devtools.build.lib.actions.RunfilesSupplier;
 import com.google.devtools.build.lib.actions.SimpleSpawn;
+import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.exec.SpawnInputExpander;
 import com.google.devtools.build.lib.exec.SpawnResult;
+import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
 import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
 import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
@@ -60,10 +63,11 @@
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
 import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.util.io.FileOutErr;
 import com.google.devtools.build.lib.vfs.FileSystem;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
 import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
 import com.google.devtools.common.options.Options;
 import com.google.protobuf.ByteString;
@@ -77,6 +81,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
 import javax.annotation.Nullable;
 import org.junit.Before;
 import org.junit.Test;
@@ -363,6 +368,42 @@
   private SimpleSpawn simpleSpawn;
   private FakeActionInputFileCache fakeFileCache;
 
+  private FileOutErr outErr;
+  private long timeoutMillis = 0;
+
+  private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() {
+    @Override
+    public boolean shouldPrefetchInputsForLocalExecution(Spawn spawn) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void lockOutputFiles() throws InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ActionInputFileCache getActionInputFileCache() {
+      return fakeFileCache;
+    }
+
+    @Override
+    public long getTimeoutMillis() {
+      return timeoutMillis;
+    }
+
+    @Override
+    public FileOutErr getFileOutErr() {
+      return outErr;
+    }
+
+    @Override
+    public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
+      return new SpawnInputExpander(/*strict*/false)
+          .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
+    }
+  };
+
   @Before
   public final void setUp() throws Exception {
     fs = new InMemoryFileSystem();
@@ -379,6 +420,12 @@
         /*outputs=*/ImmutableList.<ActionInput>of(),
         ResourceSet.ZERO
     );
+
+    Path stdout = fs.getPath("/tmp/stdout");
+    Path stderr = fs.getPath("/tmp/stderr");
+    FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
+    FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
+    outErr = new FileOutErr(stdout, stderr);
   }
 
   private void scratch(ActionInput input, String content) throws IOException {
@@ -397,7 +444,7 @@
     GrpcRemoteExecutor executor =
         new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
     RemoteSpawnRunner client =
-        new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor);
+        new RemoteSpawnRunner(execRoot, eventBus, options, executor);
 
     scratch(simpleSpawn.getInputFiles().get(0), "xyz");
 
@@ -407,16 +454,12 @@
         .build();
     when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
 
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    ByteArrayOutputStream err = new ByteArrayOutputStream();
-    OutErr outErr = OutErr.create(out, err);
-    SpawnResult result =
-        client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1);
+    SpawnResult result = client.exec(simpleSpawn, simplePolicy);
     verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
     assertThat(result.setupSuccess()).isTrue();
     assertThat(result.exitCode()).isEqualTo(0);
-    assertThat(out.toByteArray()).isEmpty();
-    assertThat(err.toByteArray()).isEmpty();
+    assertThat(outErr.hasRecordedOutput()).isFalse();
+    assertThat(outErr.hasRecordedStderr()).isFalse();
   }
 
   @Test
@@ -428,7 +471,7 @@
     GrpcRemoteExecutor executor =
         new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
     RemoteSpawnRunner client =
-        new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor);
+        new RemoteSpawnRunner(execRoot, eventBus, options, executor);
 
     scratch(simpleSpawn.getInputFiles().get(0), "xyz");
     byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
@@ -445,16 +488,12 @@
         .build();
     when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
 
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    ByteArrayOutputStream err = new ByteArrayOutputStream();
-    OutErr outErr = OutErr.create(out, err);
-    SpawnResult result =
-        client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1);
+    SpawnResult result = client.exec(simpleSpawn, simplePolicy);
     verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
     assertThat(result.setupSuccess()).isTrue();
     assertThat(result.exitCode()).isEqualTo(0);
-    assertThat(out.toByteArray()).isEqualTo(cacheStdOut);
-    assertThat(err.toByteArray()).isEqualTo(cacheStdErr);
+    assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+    assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
   }
 
   @Test
@@ -466,7 +505,7 @@
     GrpcRemoteExecutor executor =
         new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
     RemoteSpawnRunner client =
-        new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor);
+        new RemoteSpawnRunner(execRoot, eventBus, options, executor);
 
     scratch(simpleSpawn.getInputFiles().get(0), "xyz");
     byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
@@ -488,15 +527,11 @@
                 .setStderrDigest(stdErrDigest))
             .build()).iterator());
 
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    ByteArrayOutputStream err = new ByteArrayOutputStream();
-    OutErr outErr = OutErr.create(out, err);
-    SpawnResult result =
-        client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1);
+    SpawnResult result = client.exec(simpleSpawn, simplePolicy);
     verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
     assertThat(result.setupSuccess()).isTrue();
     assertThat(result.exitCode()).isEqualTo(0);
-    assertThat(out.toByteArray()).isEqualTo(cacheStdOut);
-    assertThat(err.toByteArray()).isEqualTo(cacheStdErr);
+    assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+    assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
   }
 }