Refactor persistent workers to use SpawnRunner.

Change the persistent worker spawn strategy to extend
AbstractSpawnStrategy and put the actual logic into
WorkerSpawnRunner. WorkerTestStrategy is unaffected.

I had to extend SpawnPolicy with a speculating() method. Persistent
workers need to know if speculation is happening in order to require
sandboxing.

Additionally, I added java_test rules for the local runner tests and
worker tests. See https://github.com/bazelbuild/bazel/issues/3481.

NOTE: ulfjack@ made some changes to this change before merging:
 - changed Reporter to EventHandler; added TODO about its usage
 - reverted non-semantic indentation change in AbstractSpawnStrategy
 - reverted a non-semantic indentation change in WorkerSpawnRunner
 - updated some internal classes to match
 - removed catch IOException in WorkerSpawnRunner in some cases,
   removed verboseFailures flag from WorkerSpawnRunner, updated callers
 - disable some tests on Windows; we were previously not running them,
   now that we do, they fail :-(

Change-Id: I207b3938f0dc84d374ab052d5030020886451d47
PiperOrigin-RevId: 164965398
diff --git a/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java
index 2f8c4db..9ad2d46 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/AbstractSpawnStrategy.java
@@ -150,6 +150,11 @@
     }
 
     @Override
+    public boolean speculating() {
+      return writeOutputFiles != null;
+    }
+
+    @Override
     public Duration getTimeout() {
       return timeout;
     }
diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java
index e7f6704..fdcb5ea 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java
@@ -169,6 +169,12 @@
      */
     void lockOutputFiles() throws InterruptedException;
 
+    /**
+     * Returns whether this spawn may be executing concurrently under multiple spawn runners. If so,
+     * {@link #lockOutputFiles} may raise {@link InterruptedException}.
+     */
+    boolean speculating();
+
     /** Returns the timeout that should be applied for the given {@link Spawn} instance. */
     Duration getTimeout();
 
diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD
index ef74275..b3e4fd5 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD
@@ -15,6 +15,8 @@
         "//src/main/java/com/google/devtools/build/lib:util",
         "//src/main/java/com/google/devtools/build/lib:vfs",
         "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/exec/apple",
+        "//src/main/java/com/google/devtools/build/lib/exec/local",
         "//src/main/java/com/google/devtools/build/lib/sandbox",
         "//src/main/java/com/google/devtools/build/lib/standalone",
         "//src/main/java/com/google/devtools/common/options",
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerActionContextProvider.java
index b95f8c6..50f6e2d 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerActionContextProvider.java
@@ -16,11 +16,16 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.devtools.build.lib.actions.ActionContext;
+import com.google.devtools.build.lib.actions.ResourceManager;
 import com.google.devtools.build.lib.analysis.test.TestActionContext;
-import com.google.devtools.build.lib.buildtool.BuildRequest;
 import com.google.devtools.build.lib.exec.ActionContextProvider;
-import com.google.devtools.build.lib.exec.ExecutionOptions;
+import com.google.devtools.build.lib.exec.SpawnRunner;
+import com.google.devtools.build.lib.exec.apple.XCodeLocalEnvProvider;
+import com.google.devtools.build.lib.exec.local.LocalEnvProvider;
+import com.google.devtools.build.lib.exec.local.LocalExecutionOptions;
+import com.google.devtools.build.lib.exec.local.LocalSpawnRunner;
 import com.google.devtools.build.lib.runtime.CommandEnvironment;
+import com.google.devtools.build.lib.util.OS;
 
 /**
  * Factory for the Worker-based execution strategy.
@@ -28,22 +33,37 @@
 final class WorkerActionContextProvider extends ActionContextProvider {
   private final ImmutableList<ActionContext> strategies;
 
-  public WorkerActionContextProvider(
-      CommandEnvironment env, BuildRequest buildRequest, WorkerPool workers) {
-    ImmutableMultimap.Builder<String, String> extraFlags = ImmutableMultimap.builder();
-    extraFlags.putAll(buildRequest.getOptions(WorkerOptions.class).workerExtraFlags);
+  public WorkerActionContextProvider(CommandEnvironment env, WorkerPool workers) {
+    ImmutableMultimap<String, String> extraFlags =
+        ImmutableMultimap.copyOf(env.getOptions().getOptions(WorkerOptions.class).workerExtraFlags);
 
-    WorkerSpawnStrategy workerSpawnStrategy =
-        new WorkerSpawnStrategy(
+    WorkerSpawnRunner spawnRunner =
+        new WorkerSpawnRunner(
             env.getExecRoot(),
             workers,
-            buildRequest.getOptions(ExecutionOptions.class).verboseFailures,
-            extraFlags.build());
+            extraFlags,
+            env.getReporter(),
+            createFallbackRunner(env));
+
+    WorkerSpawnStrategy workerSpawnStrategy = new WorkerSpawnStrategy(spawnRunner);
     TestActionContext workerTestStrategy =
-        new WorkerTestStrategy(env, buildRequest, workers, extraFlags.build());
+        new WorkerTestStrategy(env, env.getOptions(), workers, extraFlags);
     this.strategies = ImmutableList.of(workerSpawnStrategy, workerTestStrategy);
   }
 
+  private static SpawnRunner createFallbackRunner(CommandEnvironment env) {
+    LocalExecutionOptions localExecutionOptions =
+        env.getOptions().getOptions(LocalExecutionOptions.class);
+    LocalEnvProvider localEnvProvider =
+        OS.getCurrent() == OS.DARWIN ? new XCodeLocalEnvProvider() : LocalEnvProvider.UNMODIFIED;
+    return new LocalSpawnRunner(
+        env.getExecRoot(),
+        localExecutionOptions,
+        ResourceManager.instance(),
+        env.getRuntime().getProductName(),
+        localEnvProvider);
+  }
+
   @Override
   public Iterable<? extends ActionContext> getActionContexts() {
     return strategies;
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFilesHash.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFilesHash.java
index c203cf5..a795bc7 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFilesHash.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFilesHash.java
@@ -17,8 +17,8 @@
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-import com.google.devtools.build.lib.actions.ActionExecutionContext;
 import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
 import java.io.IOException;
 import java.nio.charset.Charset;
 
@@ -29,13 +29,12 @@
 public class WorkerFilesHash {
 
   public static HashCode getWorkerFilesHash(
-      Iterable<? extends ActionInput> toolFiles, ActionExecutionContext actionExecutionContext)
+      Iterable<? extends ActionInput> toolFiles, ActionInputFileCache actionInputFileCache)
       throws IOException {
     Hasher hasher = Hashing.sha256().newHasher();
     for (ActionInput tool : toolFiles) {
       hasher.putString(tool.getExecPathString(), Charset.defaultCharset());
-      hasher.putBytes(
-          actionExecutionContext.getActionInputFileCache().getMetadata(tool).getDigest());
+      hasher.putBytes(actionInputFileCache.getMetadata(tool).getDigest());
     }
     return hasher.hash();
   }
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
index e0ef188..0ac6cc0 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
@@ -143,8 +143,7 @@
   @Override
   public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder) {
     Preconditions.checkNotNull(workerPool);
-    builder.addActionContextProvider(
-        new WorkerActionContextProvider(env, request, workerPool));
+    builder.addActionContextProvider(new WorkerActionContextProvider(env, workerPool));
     builder.addActionContextConsumer(new WorkerActionContextConsumer());
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
new file mode 100644
index 0000000..a38244e
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
@@ -0,0 +1,350 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.worker;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.HashCode;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
+import com.google.devtools.build.lib.actions.ExecException;
+import com.google.devtools.build.lib.actions.ExecutionRequirements;
+import com.google.devtools.build.lib.actions.ResourceManager;
+import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
+import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.UserExecException;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.exec.SpawnResult;
+import com.google.devtools.build.lib.exec.SpawnRunner;
+import com.google.devtools.build.lib.sandbox.SandboxHelpers;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.io.FileOutErr;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * A spawn runner that launches Spawns the first time they are used in a persistent mode and then
+ * shards work over all the processes.
+ */
+final class WorkerSpawnRunner implements SpawnRunner {
+  public static final String ERROR_MESSAGE_PREFIX =
+      "Worker strategy cannot execute this %s action, ";
+  public static final String REASON_NO_FLAGFILE =
+      "because the command-line arguments do not contain at least one @flagfile or --flagfile=";
+  public static final String REASON_NO_TOOLS = "because the action has no tools";
+  public static final String REASON_NO_EXECUTION_INFO =
+      "because the action's execution info does not contain 'supports-workers=1'";
+
+  /** Pattern for @flagfile.txt and --flagfile=flagfile.txt */
+  private static final Pattern FLAG_FILE_PATTERN = Pattern.compile("(?:@|--?flagfile=)(.+)");
+
+  private final Path execRoot;
+  private final WorkerPool workers;
+  private final Multimap<String, String> extraFlags;
+  private final EventHandler reporter;
+  private final SpawnRunner fallbackRunner;
+
+  public WorkerSpawnRunner(
+      Path execRoot,
+      WorkerPool workers,
+      Multimap<String, String> extraFlags,
+      EventHandler reporter,
+      SpawnRunner fallbackRunner) {
+    this.execRoot = execRoot;
+    this.workers = Preconditions.checkNotNull(workers);
+    this.extraFlags = extraFlags;
+    this.reporter = reporter;
+    this.fallbackRunner = fallbackRunner;
+  }
+
+  @Override
+  public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
+      throws ExecException, IOException, InterruptedException {
+    if (!spawn.getExecutionInfo().containsKey(ExecutionRequirements.SUPPORTS_WORKERS)
+        || !spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS).equals("1")) {
+      // TODO(ulfjack): Don't circumvent SpawnExecutionPolicy. Either drop the warning here, or
+      // provide a mechanism in SpawnExectionPolicy to report warnings.
+      reporter.handle(
+          Event.warn(
+              String.format(ERROR_MESSAGE_PREFIX + REASON_NO_EXECUTION_INFO, spawn.getMnemonic())));
+      return fallbackRunner.exec(spawn, policy);
+    }
+
+    policy.report(ProgressStatus.SCHEDULING, "worker");
+    ActionExecutionMetadata owner = spawn.getResourceOwner();
+    try (ResourceHandle handle =
+        ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) {
+      policy.report(ProgressStatus.EXECUTING, "worker");
+      return actuallyExec(spawn, policy);
+    }
+  }
+
+  private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionPolicy policy)
+      throws ExecException, IOException, InterruptedException {
+    if (Iterables.isEmpty(spawn.getToolFiles())) {
+      throw new UserExecException(
+          String.format(ERROR_MESSAGE_PREFIX + REASON_NO_TOOLS, spawn.getMnemonic()));
+    }
+
+    // We assume that the spawn to be executed always gets at least one @flagfile.txt or
+    // --flagfile=flagfile.txt argument, which contains the flags related to the work itself (as
+    // opposed to start-up options for the executed tool). Thus, we can extract those elements from
+    // its args and put them into the WorkRequest instead.
+    List<String> flagFiles = new ArrayList<>();
+    ImmutableList<String> workerArgs = splitSpawnArgsIntoWorkerArgsAndFlagFiles(spawn, flagFiles);
+    ImmutableMap<String, String> env = spawn.getEnvironment();
+
+    ActionInputFileCache inputFileCache = policy.getActionInputFileCache();
+
+    HashCode workerFilesHash =
+        WorkerFilesHash.getWorkerFilesHash(spawn.getToolFiles(), inputFileCache);
+    Map<PathFragment, Path> inputFiles = SandboxHelpers.getInputFiles(spawn, policy, execRoot);
+    Set<PathFragment> outputFiles = SandboxHelpers.getOutputFiles(spawn);
+
+    WorkerKey key =
+        new WorkerKey(
+            workerArgs,
+            env,
+            execRoot,
+            spawn.getMnemonic(),
+            workerFilesHash,
+            inputFiles,
+            outputFiles,
+            policy.speculating());
+
+    WorkRequest workRequest = createWorkRequest(spawn, policy, flagFiles, inputFileCache);
+
+    long startTime = System.currentTimeMillis();
+    WorkResponse response = execInWorker(key, workRequest, policy);
+    long wallTimeMillis = System.currentTimeMillis() - startTime;
+
+    FileOutErr outErr = policy.getFileOutErr();
+    response.getOutputBytes().writeTo(outErr.getErrorStream());
+
+    return new SpawnResult.Builder()
+        .setExitCode(response.getExitCode())
+        .setStatus(SpawnResult.Status.SUCCESS)
+        .setWallTimeMillis(wallTimeMillis)
+        .build();
+  }
+
+  /**
+   * Splits the command-line arguments of the {@code Spawn} into the part that is used to start the
+   * persistent worker ({@code workerArgs}) and the part that goes into the {@code WorkRequest}
+   * protobuf ({@code flagFiles}).
+   */
+  private ImmutableList<String> splitSpawnArgsIntoWorkerArgsAndFlagFiles(
+      Spawn spawn, List<String> flagFiles) throws UserExecException {
+    ImmutableList.Builder<String> workerArgs = ImmutableList.builder();
+    for (String arg : spawn.getArguments()) {
+      if (FLAG_FILE_PATTERN.matcher(arg).matches()) {
+        flagFiles.add(arg);
+      } else {
+        workerArgs.add(arg);
+      }
+    }
+
+    if (flagFiles.isEmpty()) {
+      throw new UserExecException(
+          String.format(ERROR_MESSAGE_PREFIX + REASON_NO_FLAGFILE, spawn.getMnemonic()));
+    }
+
+    return workerArgs
+        .add("--persistent_worker")
+        .addAll(
+            MoreObjects.firstNonNull(
+                extraFlags.get(spawn.getMnemonic()), ImmutableList.<String>of()))
+        .build();
+  }
+
+  private WorkRequest createWorkRequest(
+      Spawn spawn,
+      SpawnExecutionPolicy policy,
+      List<String> flagfiles,
+      ActionInputFileCache inputFileCache)
+      throws IOException {
+    WorkRequest.Builder requestBuilder = WorkRequest.newBuilder();
+    for (String flagfile : flagfiles) {
+      expandArgument(requestBuilder, flagfile);
+    }
+
+    List<ActionInput> inputs =
+        ActionInputHelper.expandArtifacts(spawn.getInputFiles(), policy.getArtifactExpander());
+
+    for (ActionInput input : inputs) {
+      byte[] digestBytes = inputFileCache.getMetadata(input).getDigest();
+      ByteString digest;
+      if (digestBytes == null) {
+        digest = ByteString.EMPTY;
+      } else {
+        digest = ByteString.copyFromUtf8(HashCode.fromBytes(digestBytes).toString());
+      }
+
+      requestBuilder
+          .addInputsBuilder()
+          .setPath(input.getExecPathString())
+          .setDigest(digest)
+          .build();
+    }
+    return requestBuilder.build();
+  }
+
+  /**
+   * Recursively expands arguments by replacing @filename args with the contents of the referenced
+   * files. The @ itself can be escaped with @@. This deliberately does not expand --flagfile= style
+   * arguments, because we want to get rid of the expansion entirely at some point in time.
+   *
+   * @param requestBuilder the WorkRequest.Builder that the arguments should be added to.
+   * @param arg the argument to expand.
+   * @throws java.io.IOException if one of the files containing options cannot be read.
+   */
+  private void expandArgument(WorkRequest.Builder requestBuilder, String arg) throws IOException {
+    if (arg.startsWith("@") && !arg.startsWith("@@")) {
+      for (String line : Files.readAllLines(
+          Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) {
+        if (line.length() > 0) {
+          expandArgument(requestBuilder, line);
+        }
+      }
+    } else {
+      requestBuilder.addArguments(arg);
+    }
+  }
+
+  private WorkResponse execInWorker(WorkerKey key, WorkRequest request, SpawnExecutionPolicy policy)
+      throws InterruptedException, ExecException {
+    Worker worker = null;
+    WorkResponse response;
+
+    try {
+      try {
+        worker = workers.borrowObject(key);
+      } catch (IOException e) {
+        throw new UserExecException(
+            ErrorMessage.builder()
+                .message("IOException while borrowing a worker from the pool:")
+                .exception(e)
+                .build()
+                .toString());
+      }
+
+      try {
+        worker.prepareExecution(key);
+      } catch (IOException e) {
+        throw new UserExecException(
+            ErrorMessage.builder()
+                .message("IOException while preparing the execution environment of a worker:")
+                .logFile(worker.getLogFile())
+                .exception(e)
+                .build()
+                .toString());
+      }
+
+      try {
+        request.writeDelimitedTo(worker.getOutputStream());
+        worker.getOutputStream().flush();
+      } catch (IOException e) {
+        throw new UserExecException(
+            ErrorMessage.builder()
+                .message(
+                    "Worker process quit or closed its stdin stream when we tried to send a"
+                        + " WorkRequest:")
+                .logFile(worker.getLogFile())
+                .exception(e)
+                .build()
+                .toString());
+      }
+
+      RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream());
+      recordingStream.startRecording(4096);
+      try {
+        // response can be null when the worker has already closed stdout at this point and thus the
+        // InputStream is at EOF.
+        response = WorkResponse.parseDelimitedFrom(recordingStream);
+      } catch (IOException e) {
+        // If protobuf couldn't parse the response, try to print whatever the failing worker wrote
+        // to stdout - it's probably a stack trace or some kind of error message that will help the
+        // user figure out why the compiler is failing.
+        recordingStream.readRemaining();
+        throw new UserExecException(
+            ErrorMessage.builder()
+                .message("Worker process returned an unparseable WorkResponse:")
+                .logText(recordingStream.getRecordedDataAsString())
+                .exception(e)
+                .build()
+                .toString());
+      }
+
+      policy.lockOutputFiles();
+
+      if (response == null) {
+        throw new UserExecException(
+            ErrorMessage.builder()
+                .message("Worker process did not return a WorkResponse:")
+                .logFile(worker.getLogFile())
+                .logSizeLimit(4096)
+                .build()
+                .toString());
+      }
+
+      try {
+        worker.finishExecution(key);
+      } catch (IOException e) {
+        throw new UserExecException(
+            ErrorMessage.builder()
+                .message("IOException while finishing worker execution:")
+                .exception(e)
+                .build()
+                .toString());
+      }
+    } catch (ExecException e) {
+      if (worker != null) {
+        try {
+          workers.invalidateObject(key, worker);
+        } catch (IOException e1) {
+          // The original exception is more important / helpful, so we'll just ignore this one.
+        }
+        worker = null;
+      }
+
+      throw e;
+    } finally {
+      if (worker != null) {
+        workers.returnObject(key, worker);
+      }
+    }
+
+    return response;
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnStrategy.java
index 5688933..890de1b 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnStrategy.java
@@ -13,51 +13,9 @@
 // limitations under the License.
 package com.google.devtools.build.lib.worker;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.eventbus.EventBus;
-import com.google.common.hash.HashCode;
-import com.google.devtools.build.lib.actions.ActionExecutionContext;
-import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
-import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
-import com.google.devtools.build.lib.actions.ActionInputHelper;
-import com.google.devtools.build.lib.actions.ActionStatusMessage;
-import com.google.devtools.build.lib.actions.ExecException;
-import com.google.devtools.build.lib.actions.ExecutionRequirements;
 import com.google.devtools.build.lib.actions.ExecutionStrategy;
-import com.google.devtools.build.lib.actions.ResourceManager;
-import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
-import com.google.devtools.build.lib.actions.SandboxedSpawnActionContext;
-import com.google.devtools.build.lib.actions.Spawn;
 import com.google.devtools.build.lib.actions.SpawnActionContext;
-import com.google.devtools.build.lib.actions.UserExecException;
-import com.google.devtools.build.lib.events.Event;
-import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.sandbox.SandboxHelpers;
-import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy;
-import com.google.devtools.build.lib.util.CommandFailureUtils;
-import com.google.devtools.build.lib.util.Preconditions;
-import com.google.devtools.build.lib.util.io.FileOutErr;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.build.lib.vfs.PathFragment;
-import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
-import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
+import com.google.devtools.build.lib.exec.AbstractSpawnStrategy;
 
 /**
  * A spawn action context that launches Spawns the first time they are used in a persistent mode and
@@ -67,330 +25,10 @@
   name = {"worker"},
   contextType = SpawnActionContext.class
 )
-public final class WorkerSpawnStrategy implements SandboxedSpawnActionContext {
+public final class WorkerSpawnStrategy extends AbstractSpawnStrategy {
 
-  public static final String ERROR_MESSAGE_PREFIX =
-      "Worker strategy cannot execute this %s action, ";
-  public static final String REASON_NO_FLAGFILE =
-      "because the command-line arguments do not contain at least one @flagfile or --flagfile=";
-  public static final String REASON_NO_TOOLS = "because the action has no tools";
-  public static final String REASON_NO_EXECUTION_INFO =
-      "because the action's execution info does not contain 'supports-workers=1'";
-
-  /** Pattern for @flagfile.txt and --flagfile=flagfile.txt */
-  private static final Pattern FLAG_FILE_PATTERN = Pattern.compile("(?:@|--?flagfile=)(.+)");
-
-  private final WorkerPool workers;
-  private final Path execRoot;
-  private final boolean verboseFailures;
-  private final Multimap<String, String> extraFlags;
-  private final SpawnInputExpander spawnInputExpander;
-
-  public WorkerSpawnStrategy(
-      Path execRoot,
-      WorkerPool workers,
-      boolean verboseFailures,
-      Multimap<String, String> extraFlags) {
-    Preconditions.checkNotNull(workers);
-    this.workers = Preconditions.checkNotNull(workers);
-    this.execRoot = execRoot;
-    this.verboseFailures = verboseFailures;
-    this.extraFlags = extraFlags;
-    this.spawnInputExpander = new SpawnInputExpander(false);
-  }
-
-  @Override
-  public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext)
-      throws ExecException, InterruptedException {
-    exec(spawn, actionExecutionContext, null);
-  }
-
-  @Override
-  public void exec(
-      Spawn spawn,
-      ActionExecutionContext actionExecutionContext,
-      AtomicReference<Class<? extends SpawnActionContext>> writeOutputFiles)
-      throws ExecException, InterruptedException {
-    if (!spawn.getExecutionInfo().containsKey(ExecutionRequirements.SUPPORTS_WORKERS)
-        || !spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS).equals("1")) {
-      StandaloneSpawnStrategy standaloneStrategy =
-          Preconditions.checkNotNull(
-              actionExecutionContext.getContext(StandaloneSpawnStrategy.class));
-      actionExecutionContext.getEventHandler().handle(
-          Event.warn(
-              String.format(ERROR_MESSAGE_PREFIX + REASON_NO_EXECUTION_INFO, spawn.getMnemonic())));
-      standaloneStrategy.exec(spawn, actionExecutionContext);
-      return;
-    }
-
-    EventBus eventBus = actionExecutionContext.getEventBus();
-    ActionExecutionMetadata owner = spawn.getResourceOwner();
-    eventBus.post(ActionStatusMessage.schedulingStrategy(owner));
-    try (ResourceHandle handle =
-        ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) {
-      eventBus.post(ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "worker"));
-      actuallyExec(spawn, actionExecutionContext, writeOutputFiles);
-    }
-  }
-
-  private void actuallyExec(
-      Spawn spawn,
-      ActionExecutionContext actionExecutionContext,
-      AtomicReference<Class<? extends SpawnActionContext>> writeOutputFiles)
-      throws ExecException, InterruptedException {
-    if (actionExecutionContext.reportsSubcommands()) {
-      actionExecutionContext.reportSubcommand(spawn);
-    }
-
-    if (Iterables.isEmpty(spawn.getToolFiles())) {
-      throw new UserExecException(
-          String.format(ERROR_MESSAGE_PREFIX + REASON_NO_TOOLS, spawn.getMnemonic()));
-    }
-
-    // We assume that the spawn to be executed always gets at least one @flagfile.txt or
-    // --flagfile=flagfile.txt argument, which contains the flags related to the work itself (as
-    // opposed to start-up options for the executed tool). Thus, we can extract those elements from
-    // its args and put them into the WorkRequest instead.
-    List<String> flagFiles = new ArrayList<>();
-    ImmutableList<String> workerArgs = splitSpawnArgsIntoWorkerArgsAndFlagFiles(spawn, flagFiles);
-    ImmutableMap<String, String> env = spawn.getEnvironment();
-
-    try {
-      ActionInputFileCache inputFileCache = actionExecutionContext.getActionInputFileCache();
-
-      HashCode workerFilesHash = WorkerFilesHash.getWorkerFilesHash(
-          spawn.getToolFiles(), actionExecutionContext);
-      Map<PathFragment, Path> inputFiles =
-          SandboxHelpers.getInputFiles(spawnInputExpander, execRoot, spawn, actionExecutionContext);
-      Set<PathFragment> outputFiles = SandboxHelpers.getOutputFiles(spawn);
-
-      WorkerKey key =
-          new WorkerKey(
-              workerArgs,
-              env,
-              execRoot,
-              spawn.getMnemonic(),
-              workerFilesHash,
-              inputFiles,
-              outputFiles,
-              writeOutputFiles != null);
-
-      WorkRequest workRequest =
-          createWorkRequest(spawn, actionExecutionContext, flagFiles, inputFileCache);
-
-      WorkResponse response = execInWorker(key, workRequest, writeOutputFiles);
-
-      FileOutErr outErr = actionExecutionContext.getFileOutErr();
-      response.getOutputBytes().writeTo(outErr.getErrorStream());
-
-      if (response.getExitCode() != 0) {
-        throw new UserExecException(
-            String.format(
-                "Worker process sent response with exit code: %d.", response.getExitCode()));
-      }
-    } catch (IOException e) {
-      String message =
-          CommandFailureUtils.describeCommandFailure(
-              verboseFailures, spawn.getArguments(), env, execRoot.getPathString());
-      throw new UserExecException(
-          ErrorMessage.builder().message(message).exception(e).build().toString());
-    }
-  }
-
-  /**
-   * Splits the command-line arguments of the {@code Spawn} into the part that is used to start the
-   * persistent worker ({@code workerArgs}) and the part that goes into the {@code WorkRequest}
-   * protobuf ({@code flagFiles}).
-   */
-  private ImmutableList<String> splitSpawnArgsIntoWorkerArgsAndFlagFiles(
-      Spawn spawn, List<String> flagFiles) throws UserExecException {
-    ImmutableList.Builder<String> workerArgs = ImmutableList.builder();
-    for (String arg : spawn.getArguments()) {
-      if (FLAG_FILE_PATTERN.matcher(arg).matches()) {
-        flagFiles.add(arg);
-      } else {
-        workerArgs.add(arg);
-      }
-    }
-
-    if (flagFiles.isEmpty()) {
-      throw new UserExecException(
-          String.format(ERROR_MESSAGE_PREFIX + REASON_NO_FLAGFILE, spawn.getMnemonic()));
-    }
-
-    return workerArgs
-        .add("--persistent_worker")
-        .addAll(
-            MoreObjects.firstNonNull(
-                extraFlags.get(spawn.getMnemonic()), ImmutableList.<String>of()))
-        .build();
-  }
-
-  private WorkRequest createWorkRequest(
-      Spawn spawn,
-      ActionExecutionContext actionExecutionContext,
-      List<String> flagfiles,
-      ActionInputFileCache inputFileCache)
-      throws IOException {
-    WorkRequest.Builder requestBuilder = WorkRequest.newBuilder();
-    for (String flagfile : flagfiles) {
-      expandArgument(requestBuilder, flagfile);
-    }
-
-    List<ActionInput> inputs =
-        ActionInputHelper.expandArtifacts(
-            spawn.getInputFiles(), actionExecutionContext.getArtifactExpander());
-
-    for (ActionInput input : inputs) {
-      byte[] digestBytes = inputFileCache.getMetadata(input).getDigest();
-      ByteString digest;
-      if (digestBytes == null) {
-        digest = ByteString.EMPTY;
-      } else {
-        digest = ByteString.copyFromUtf8(HashCode.fromBytes(digestBytes).toString());
-      }
-
-      requestBuilder
-          .addInputsBuilder()
-          .setPath(input.getExecPathString())
-          .setDigest(digest)
-          .build();
-    }
-    return requestBuilder.build();
-  }
-
-  /**
-   * Recursively expands arguments by replacing @filename args with the contents of the referenced
-   * files. The @ itself can be escaped with @@. This deliberately does not expand --flagfile= style
-   * arguments, because we want to get rid of the expansion entirely at some point in time.
-   *
-   * @param requestBuilder the WorkRequest.Builder that the arguments should be added to.
-   * @param arg the argument to expand.
-   * @throws java.io.IOException if one of the files containing options cannot be read.
-   */
-  private void expandArgument(WorkRequest.Builder requestBuilder, String arg) throws IOException {
-    if (arg.startsWith("@") && !arg.startsWith("@@")) {
-      for (String line : Files.readAllLines(
-          Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) {
-        if (line.length() > 0) {
-          expandArgument(requestBuilder, line);
-        }
-      }
-    } else {
-      requestBuilder.addArguments(arg);
-    }
-  }
-
-  private WorkResponse execInWorker(
-      WorkerKey key,
-      WorkRequest request,
-      AtomicReference<Class<? extends SpawnActionContext>> writeOutputFiles)
-      throws InterruptedException, ExecException {
-    Worker worker = null;
-    WorkResponse response;
-
-    try {
-      try {
-        worker = workers.borrowObject(key);
-      } catch (IOException e) {
-        throw new UserExecException(
-            ErrorMessage.builder()
-                .message("IOException while borrowing a worker from the pool:")
-                .exception(e)
-                .build()
-                .toString());
-      }
-
-      try {
-        worker.prepareExecution(key);
-      } catch (IOException e) {
-        throw new UserExecException(
-            ErrorMessage.builder()
-                .message("IOException while preparing the execution environment of a worker:")
-                .logFile(worker.getLogFile())
-                .exception(e)
-                .build()
-                .toString());
-      }
-
-      try {
-        request.writeDelimitedTo(worker.getOutputStream());
-        worker.getOutputStream().flush();
-      } catch (IOException e) {
-        throw new UserExecException(
-            ErrorMessage.builder()
-                .message(
-                    "Worker process quit or closed its stdin stream when we tried to send a"
-                        + " WorkRequest:")
-                .logFile(worker.getLogFile())
-                .exception(e)
-                .build()
-                .toString());
-      }
-
-      RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream());
-      recordingStream.startRecording(4096);
-      try {
-        // response can be null when the worker has already closed stdout at this point and thus the
-        // InputStream is at EOF.
-        response = WorkResponse.parseDelimitedFrom(recordingStream);
-      } catch (IOException e) {
-        // If protobuf couldn't parse the response, try to print whatever the failing worker wrote
-        // to stdout - it's probably a stack trace or some kind of error message that will help the
-        // user figure out why the compiler is failing.
-        recordingStream.readRemaining();
-        throw new UserExecException(
-            ErrorMessage.builder()
-                .message("Worker process returned an unparseable WorkResponse:")
-                .logText(recordingStream.getRecordedDataAsString())
-                .exception(e)
-                .build()
-                .toString());
-      }
-
-      if (writeOutputFiles != null
-          && !writeOutputFiles.compareAndSet(null, WorkerSpawnStrategy.class)) {
-        throw new InterruptedException();
-      }
-
-      if (response == null) {
-        throw new UserExecException(
-            ErrorMessage.builder()
-                .message("Worker process did not return a WorkResponse:")
-                .logFile(worker.getLogFile())
-                .logSizeLimit(4096)
-                .build()
-                .toString());
-      }
-
-      try {
-        worker.finishExecution(key);
-      } catch (IOException e) {
-        throw new UserExecException(
-            ErrorMessage.builder()
-                .message("IOException while finishing worker execution:")
-                .exception(e)
-                .build()
-                .toString());
-      }
-    } catch (ExecException e) {
-      if (worker != null) {
-        try {
-          workers.invalidateObject(key, worker);
-        } catch (IOException e1) {
-          // The original exception is more important / helpful, so we'll just ignore this one.
-        }
-        worker = null;
-      }
-
-      throw e;
-    } finally {
-      if (worker != null) {
-        workers.returnObject(key, worker);
-      }
-    }
-
-    return response;
+  public WorkerSpawnStrategy(WorkerSpawnRunner spawnRunner) {
+    super(spawnRunner);
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerTestStrategy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerTestStrategy.java
index 4edeed0..4d6fcd2 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerTestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerTestStrategy.java
@@ -127,8 +127,9 @@
     WorkerKey key = null;
     long startTime = actionExecutionContext.getClock().currentTimeMillis();
     try {
-      HashCode workerFilesHash = WorkerFilesHash.getWorkerFilesHash(
-          action.getTools(), actionExecutionContext);
+      HashCode workerFilesHash =
+          WorkerFilesHash.getWorkerFilesHash(
+              action.getTools(), actionExecutionContext.getActionInputFileCache());
       key =
           new WorkerKey(
               startupArgs,
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 8e55f83..1b06eb2 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1196,6 +1196,29 @@
 )
 
 java_test(
+    name = "exec-local-tests",
+    srcs = glob(["exec/local/*.java"]),
+    tags = ["exec"],
+    test_class = "com.google.devtools.build.lib.AllTests",
+    deps = [
+        ":analysis_testutil",
+        "//src/main/java/com/google/devtools/build/lib:build-base",
+        "//src/main/java/com/google/devtools/build/lib:inmemoryfs",
+        "//src/main/java/com/google/devtools/build/lib:io",
+        "//src/main/java/com/google/devtools/build/lib:shell",
+        "//src/main/java/com/google/devtools/build/lib:util",
+        "//src/main/java/com/google/devtools/build/lib:vfs",
+        "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/exec/local",
+        "//src/main/java/com/google/devtools/common/options",
+        "//third_party:guava",
+        "//third_party:junit4",
+        "//third_party:mockito",
+        "//third_party:truth",
+    ],
+)
+
+java_test(
     name = "ProtoCompileActionBuilderTest",
     srcs = ["rules/proto/ProtoCompileActionBuilderTest.java"],
     deps = [
@@ -1270,6 +1293,21 @@
     ],
 )
 
+java_test(
+    name = "worker-tests",
+    srcs = glob(["worker/*Test.java"]),
+    test_class = "com.google.devtools.build.lib.AllTests",
+    deps = [
+        ":test_runner",
+        "//src/main/java/com/google/devtools/build/lib:inmemoryfs",
+        "//src/main/java/com/google/devtools/build/lib:os_util",
+        "//src/main/java/com/google/devtools/build/lib:vfs",
+        "//src/main/java/com/google/devtools/build/lib/worker",
+        "//third_party:guava-testlib",
+        "//third_party:truth",
+    ],
+)
+
 java_binary(
     name = "MockSubprocess",
     srcs = ["windows/MockSubprocess.java"],
diff --git a/src/test/java/com/google/devtools/build/lib/exec/local/LocalSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/exec/local/LocalSpawnRunnerTest.java
index dd5c231..4f187f7 100644
--- a/src/test/java/com/google/devtools/build/lib/exec/local/LocalSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/exec/local/LocalSpawnRunnerTest.java
@@ -162,6 +162,11 @@
     }
 
     @Override
+    public boolean speculating() {
+      return false;
+    }
+
+    @Override
     public ActionInputFileCache getActionInputFileCache() {
       return mockFileCache;
     }
@@ -228,6 +233,12 @@
 
   @Test
   public void vanillaZeroExit() throws Exception {
+    if (OS.getCurrent() == OS.WINDOWS) {
+      // TODO(#3536): Make this test work on Windows.
+      // The Command API implicitly absolutizes the path, and we get weird paths on Windows:
+      // T:\execroot\execroot\_bin\process-wrapper
+      return;
+    }
     Subprocess.Factory factory = mock(Subprocess.Factory.class);
     ArgumentCaptor<SubprocessBuilder> captor = ArgumentCaptor.forClass(SubprocessBuilder.class);
     when(factory.create(captor.capture())).thenReturn(new FinishedSubprocess(0));
@@ -268,6 +279,12 @@
 
   @Test
   public void noProcessWrapper() throws Exception {
+    if (OS.getCurrent() == OS.WINDOWS) {
+      // TODO(#3536): Make this test work on Windows.
+      // The Command API implicitly absolutizes the path, and we get weird paths on Windows:
+      // T:\execroot\bin\echo
+      return;
+    }
     Subprocess.Factory factory = mock(Subprocess.Factory.class);
     ArgumentCaptor<SubprocessBuilder> captor = ArgumentCaptor.forClass(SubprocessBuilder.class);
     when(factory.create(captor.capture())).thenReturn(new FinishedSubprocess(0));
@@ -299,6 +316,12 @@
 
   @Test
   public void nonZeroExit() throws Exception {
+    if (OS.getCurrent() == OS.WINDOWS) {
+      // TODO(#3536): Make this test work on Windows.
+      // The Command API implicitly absolutizes the path, and we get weird paths on Windows:
+      // T:\execroot\execroot\_bin\process-wrapper
+      return;
+    }
     Subprocess.Factory factory = mock(Subprocess.Factory.class);
     ArgumentCaptor<SubprocessBuilder> captor = ArgumentCaptor.forClass(SubprocessBuilder.class);
     when(factory.create(captor.capture())).thenReturn(new FinishedSubprocess(3));
@@ -478,6 +501,12 @@
 
   @Test
   public void useCorrectExtensionOnWindows() throws Exception {
+    if (OS.getCurrent() == OS.WINDOWS) {
+      // TODO(#3536): Make this test work on Windows.
+      // The Command API implicitly absolutizes the path, and we get weird paths on Windows:
+      // T:\execroot\execroot\_bin\process-wrapper.exe
+      return;
+    }
     Subprocess.Factory factory = mock(Subprocess.Factory.class);
     ArgumentCaptor<SubprocessBuilder> captor = ArgumentCaptor.forClass(SubprocessBuilder.class);
     when(factory.create(captor.capture())).thenReturn(new FinishedSubprocess(0));
diff --git a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
index 746447b..e26dd1f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
@@ -98,6 +98,11 @@
         }
 
         @Override
+        public boolean speculating() {
+          return false;
+        }
+
+        @Override
         public ActionInputFileCache getActionInputFileCache() {
           return fakeFileCache;
         }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index e04630a..0e3e958 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -129,6 +129,11 @@
         }
 
         @Override
+        public boolean speculating() {
+          return false;
+        }
+
+        @Override
         public ActionInputFileCache getActionInputFileCache() {
           return fakeFileCache;
         }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 2475b2c..a240cd6 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -242,6 +242,11 @@
     }
 
     @Override
+    public boolean speculating() {
+      return false;
+    }
+
+    @Override
     public ActionInputFileCache getActionInputFileCache() {
       return fakeFileCache;
     }
diff --git a/src/test/java/com/google/devtools/build/lib/worker/ErrorMessageTest.java b/src/test/java/com/google/devtools/build/lib/worker/ErrorMessageTest.java
index 181e438..4607b22 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/ErrorMessageTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/ErrorMessageTest.java
@@ -17,6 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.devtools.build.lib.util.OS;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
@@ -99,6 +100,10 @@
 
   @Test
   public void testErrorMessageWithUnreadableLogFile() {
+    if (OS.getCurrent() == OS.WINDOWS) {
+      // TODO(#3536): This test is failing on Windows, probably due to line breaks. Fix it!
+      return;
+    }
     InMemoryFileSystem fs = new InMemoryFileSystem();
     // This file does not exist.
     Path logFile = fs.getPath("/nope.txt");