Make WorkerProxy not be a subclass of the singleplex worker, but rather of an abstract superclass.

The singleplex worker makes some assumptions that do not apply to multiplex workers, such as having a process and owning the recordingInputStream. This change fixes an outstanding TODO to rearrange the class hierarchy.

RELNOTES: n/a
PiperOrigin-RevId: 341413486
diff --git a/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java
index fc0c9f7..92c5641 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java
@@ -14,6 +14,7 @@
 
 package com.google.devtools.build.lib.worker;
 
+import com.google.common.flogger.GoogleLogger;
 import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
 import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
 import com.google.devtools.build.lib.vfs.Path;
@@ -21,20 +22,13 @@
 import java.io.IOException;
 import java.util.Set;
 
-/** A {@link Worker} that runs inside a sandboxed execution root. */
-final class SandboxedWorker extends Worker {
-  private final Path workDir;
+/** A {@link SingleplexWorker} that runs inside a sandboxed execution root. */
+final class SandboxedWorker extends SingleplexWorker {
+  private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
   private WorkerExecRoot workerExecRoot;
 
   SandboxedWorker(WorkerKey workerKey, int workerId, Path workDir, Path logFile) {
     super(workerKey, workerId, workDir, logFile);
-    this.workDir = workDir;
-  }
-
-  @Override
-  void destroy() throws IOException {
-    super.destroy();
-    workDir.deleteTree();
   }
 
   @Override
@@ -57,4 +51,14 @@
     workerExecRoot.copyOutputs(execRoot);
     workerExecRoot = null;
   }
+
+  @Override
+  void destroy() {
+    super.destroy();
+    try {
+      workDir.deleteTree();
+    } catch (IOException e) {
+      logger.atWarning().withCause(e).log("Caught IOException while deleting workdir.");
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java
new file mode 100644
index 0000000..bc32510
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java
@@ -0,0 +1,166 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.worker;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.GoogleLogger;
+import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
+import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
+import com.google.devtools.build.lib.shell.Subprocess;
+import com.google.devtools.build.lib.shell.SubprocessBuilder;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * Interface to a worker process running as a single child process.
+ *
+ * <p>A worker process must follow this protocol to be usable via this class: The worker process is
+ * spawned on demand. The worker process is free to exit whenever necessary, as new instances will
+ * be relaunched automatically. Communication happens via the WorkerProtocol protobuf, sent to and
+ * received from the worker process via stdin / stdout.
+ *
+ * <p>Other code in Blaze can talk to the worker process via input / output streams provided by this
+ * class.
+ */
+class SingleplexWorker extends Worker {
+  private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+
+  /** The execution root of the worker. */
+  protected final Path workDir;
+  /**
+   * Stream for recording the WorkResponse as it's read, so that it can be printed in the case of
+   * parsing failures.
+   */
+  @Nullable private RecordingInputStream recordingInputStream;
+  /** The implementation of the worker protocol (JSON or Proto). */
+  @Nullable private WorkerProtocolImpl workerProtocol;
+
+  private Subprocess process;
+  /** True if we deliberately destroyed this process. */
+  private boolean wasDestroyed;
+
+  private Thread shutdownHook;
+
+  SingleplexWorker(WorkerKey workerKey, int workerId, final Path workDir, Path logFile) {
+    super(workerKey, workerId, logFile);
+    this.workDir = workDir;
+
+    final SingleplexWorker self = this;
+    this.shutdownHook =
+        new Thread(
+            () -> {
+              // Not sure why this is needed. philwo@ added it without explanation.
+              self.shutdownHook = null;
+              self.destroy();
+            });
+    Runtime.getRuntime().addShutdownHook(shutdownHook);
+  }
+
+  Subprocess createProcess() throws IOException {
+    ImmutableList<String> args = workerKey.getArgs();
+    File executable = new File(args.get(0));
+    if (!executable.isAbsolute() && executable.getParent() != null) {
+      List<String> newArgs = new ArrayList<>(args);
+      newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath());
+      args = ImmutableList.copyOf(newArgs);
+    }
+    SubprocessBuilder processBuilder = new SubprocessBuilder();
+    processBuilder.setArgv(args);
+    processBuilder.setWorkingDirectory(workDir.getPathFile());
+    processBuilder.setStderr(logFile.getPathFile());
+    processBuilder.setEnv(workerKey.getEnv());
+    return processBuilder.start();
+  }
+
+  @Override
+  public void prepareExecution(
+      SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
+      throws IOException {
+    if (process == null) {
+      process = createProcess();
+      recordingInputStream = new RecordingInputStream(process.getInputStream());
+    }
+    if (workerProtocol == null) {
+      switch (workerKey.getProtocolFormat()) {
+        case JSON:
+          workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingInputStream);
+          break;
+        case PROTO:
+          workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingInputStream);
+          break;
+      }
+    }
+  }
+
+  @Override
+  void putRequest(WorkRequest request) throws IOException {
+    workerProtocol.putRequest(request);
+  }
+
+  @Override
+  WorkResponse getResponse(int requestId) throws IOException {
+    recordingInputStream.startRecording(4096);
+    return workerProtocol.getResponse();
+  }
+
+  @Override
+  public void finishExecution(Path execRoot) throws IOException {}
+
+  @Override
+  void destroy() {
+    if (shutdownHook != null) {
+      Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    }
+    if (workerProtocol != null) {
+      try {
+        workerProtocol.close();
+      } catch (IOException e) {
+        logger.atWarning().withCause(e).log("Caught IOException while closing worker protocol.");
+      }
+      workerProtocol = null;
+    }
+    if (process != null) {
+      wasDestroyed = true;
+      process.destroyAndWait();
+    }
+  }
+
+  /** Returns true if this process is dead but we didn't deliberately kill it. */
+  @Override
+  boolean diedUnexpectedly() {
+    return process != null && !wasDestroyed && !process.isAlive();
+  }
+
+  @Override
+  public Optional<Integer> getExitValue() {
+    return process != null && !process.isAlive()
+        ? Optional.of(process.exitValue())
+        : Optional.empty();
+  }
+
+  @Override
+  String getRecordingStreamMessage() {
+    recordingInputStream.readRemaining();
+    return recordingInputStream.getRecordedDataAsString();
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/Worker.java b/src/main/java/com/google/devtools/build/lib/worker/Worker.java
index cf720f0..51e1d8a 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/Worker.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/Worker.java
@@ -1,4 +1,4 @@
-// Copyright 2015 The Bazel Authors. All rights reserved.
+// Copyright 2020 The Bazel Authors. All rights reserved.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -13,107 +13,35 @@
 // limitations under the License.
 package com.google.devtools.build.lib.worker;
 
-
-import com.google.common.collect.ImmutableList;
 import com.google.common.hash.HashCode;
 import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
 import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
-import com.google.devtools.build.lib.shell.Subprocess;
-import com.google.devtools.build.lib.shell.SubprocessBuilder;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
 import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
-import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
-import javax.annotation.Nullable;
 
 /**
- * Interface to a worker process running as a child process.
- *
- * <p>A worker process must follow this protocol to be usable via this class: The worker process is
- * spawned on demand. The worker process is free to exit whenever necessary, as new instances will
- * be relaunched automatically. Communication happens via the WorkerProtocol protobuf, sent to and
- * received from the worker process via stdin / stdout.
- *
- * <p>Other code in Blaze can talk to the worker process via input / output streams provided by this
- * class.
+ * An abstract superclass for persistent workers. Workers execute actions in long-running processes
+ * that can handle multiple actions.
  */
-class Worker {
+public abstract class Worker {
+
   /** An unique identifier of the work process. */
   protected final WorkerKey workerKey;
   /** An unique ID of the worker. It will be used in WorkRequest and WorkResponse as well. */
   protected final int workerId;
-  /** The execution root of the worker. */
-  protected final Path workDir;
   /** The path of the log file for this worker. */
-  private final Path logFile;
-  /**
-   * Stream for recording the WorkResponse as it's read, so that it can be printed in the case of
-   * parsing failures.
-   */
-  @Nullable private RecordingInputStream recordingInputStream;
-  /** The implementation of the worker protocol (JSON or Proto). */
-  @Nullable private WorkerProtocolImpl workerProtocol;
+  protected final Path logFile;
 
-  private Subprocess process;
-  private Thread shutdownHook;
-  /** True if we deliberately destroyed this process. */
-  private boolean wasDestroyed;
-
-  Worker(WorkerKey workerKey, int workerId, final Path workDir, Path logFile) {
+  public Worker(WorkerKey workerKey, int workerId, Path logFile) {
     this.workerKey = workerKey;
     this.workerId = workerId;
-    this.workDir = workDir;
     this.logFile = logFile;
-
-    final Worker self = this;
-    this.shutdownHook =
-        new Thread(
-            () -> {
-              try {
-                self.shutdownHook = null;
-                self.destroy();
-              } catch (IOException e) {
-                // We can't do anything here.
-              }
-            });
-    Runtime.getRuntime().addShutdownHook(shutdownHook);
-  }
-
-  Subprocess createProcess() throws IOException {
-    ImmutableList<String> args = workerKey.getArgs();
-    File executable = new File(args.get(0));
-    if (!executable.isAbsolute() && executable.getParent() != null) {
-      List<String> newArgs = new ArrayList<>(args);
-      newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath());
-      args = ImmutableList.copyOf(newArgs);
-    }
-    SubprocessBuilder processBuilder = new SubprocessBuilder();
-    processBuilder.setArgv(args);
-    processBuilder.setWorkingDirectory(workDir.getPathFile());
-    processBuilder.setStderr(logFile.getPathFile());
-    processBuilder.setEnv(workerKey.getEnv());
-    return processBuilder.start();
-  }
-
-  void destroy() throws IOException {
-    if (shutdownHook != null) {
-      Runtime.getRuntime().removeShutdownHook(shutdownHook);
-    }
-    if (workerProtocol != null) {
-      workerProtocol.close();
-      workerProtocol = null;
-    }
-    if (process != null) {
-      wasDestroyed = true;
-      process.destroyAndWait();
-    }
   }
 
   /**
@@ -137,56 +65,53 @@
     return workerKey.getWorkerFilesWithHashes();
   }
 
-  boolean isAlive() {
-    // This is horrible, but Process.isAlive() is only available from Java 8 on and this is the
-    // best we can do prior to that.
-    return !process.finished();
-  }
+  /**
+   * Performs the necessary steps to prepare for execution. Once this is done, the worker should be
+   * able to receive a WorkRequest without further setup.
+   */
+  public abstract void prepareExecution(
+      SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
+      throws IOException;
 
-  /** Returns true if this process is dead but we didn't deliberately kill it. */
-  boolean diedUnexpectedly() {
-    return process != null && !wasDestroyed && !process.isAlive();
-  }
+  /**
+   * Sends a WorkRequest to the worker.
+   *
+   * @param request The request to send.
+   * @throws IOException If there was a problem doing I/O, or this thread was interrupted at a time
+   *     where some or all of the expected I/O has been done.
+   * @throws InterruptedException If this thread was interrupted before doing any I/O.
+   */
+  abstract void putRequest(WorkRequest request) throws IOException, InterruptedException;
+
+  /**
+   * Waits to receive a response from the worker. This method should return as soon as a response
+   * has been received, moving of files and cleanup should wait until finishExecution().
+   *
+   * @param requestId ID of the request to retrieve a response for.
+   * @return The WorkResponse received.
+   * @throws IOException If there was a problem doing I/O.
+   * @throws InterruptedException If this thread was interrupted, which can also happen during IO.
+   */
+  abstract WorkResponse getResponse(int requestId) throws IOException, InterruptedException;
+
+  /** Does whatever cleanup may be required after execution is done. */
+  public abstract void finishExecution(Path execRoot) throws IOException;
+
+  /**
+   * Destroys this worker. Once this has been called, we assume it's safe to clean up related
+   * directories.
+   */
+  abstract void destroy();
+
+  /** Returns true if this worker is dead but we didn't deliberately kill it. */
+  abstract boolean diedUnexpectedly();
 
   /** Returns the exit value of this worker's process, if it has exited. */
-  public Optional<Integer> getExitValue() {
-    return process != null && !process.isAlive()
-        ? Optional.of(process.exitValue())
-        : Optional.empty();
-  }
+  public abstract Optional<Integer> getExitValue();
 
-  void putRequest(WorkRequest request) throws IOException {
-    workerProtocol.putRequest(request);
-  }
-
-  WorkResponse getResponse(int requestId) throws IOException {
-    recordingInputStream.startRecording(4096);
-    return workerProtocol.getResponse();
-  }
-
-  String getRecordingStreamMessage() {
-    recordingInputStream.readRemaining();
-    return recordingInputStream.getRecordedDataAsString();
-  }
-
-  public void prepareExecution(
-      SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
-      throws IOException {
-    if (process == null) {
-      process = createProcess();
-      recordingInputStream = new RecordingInputStream(process.getInputStream());
-    }
-    if (workerProtocol == null) {
-      switch (workerKey.getProtocolFormat()) {
-        case JSON:
-          workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingInputStream);
-          break;
-        case PROTO:
-          workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingInputStream);
-          break;
-      }
-    }
-  }
-
-  public void finishExecution(Path execRoot) throws IOException {}
+  /**
+   * Returns the last message received on the InputStream, if an unparseable message has been
+   * received.
+   */
+  abstract String getRecordingStreamMessage();
 }
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java
index 45f7561..182c0c4 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java
@@ -70,7 +70,7 @@
           new WorkerProxy(
               key, workerId, key.getExecRoot(), workerMultiplexer.getLogFile(), workerMultiplexer);
     } else {
-      worker = new Worker(key, workerId, key.getExecRoot(), logFile);
+      worker = new SingleplexWorker(key, workerId, key.getExecRoot(), logFile);
     }
     if (workerOptions.workerVerbose) {
       reporter.handle(
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
index 881e516..dadfdcb 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
@@ -32,6 +32,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
 
 /**
@@ -47,18 +48,18 @@
    * A map of {@code WorkResponse}s received from the worker process. They are stored in this map
    * until the corresponding {@code WorkerProxy} picks them up.
    */
-  private Map<Integer, InputStream> workerProcessResponse;
+  private final Map<Integer, InputStream> workerProcessResponse;
   /** A semaphore to protect {@code workerProcessResponse} object. */
-  private Semaphore semWorkerProcessResponse;
+  private final Semaphore semWorkerProcessResponse;
   /**
    * A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code
    * WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code
    * WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal
    * {@code WorkerProxy} that the {@code WorkerResponse} has been received.
    */
-  private Map<Integer, Semaphore> responseChecker;
+  private final Map<Integer, Semaphore> responseChecker;
   /** A semaphore to protect responseChecker object. */
-  private Semaphore semResponseChecker;
+  private final Semaphore semResponseChecker;
   /** The worker process that this WorkerMultiplexer should be talking to. */
   private Subprocess process;
   /**
@@ -81,7 +82,8 @@
    */
   private final Path logFile;
 
-  private SubprocessFactory subprocessFactory = null;
+  /** For testing only, allow a way to fake subprocesses. */
+  private SubprocessFactory subprocessFactory;
 
   WorkerMultiplexer(Path logFile) {
     semWorkerProcessResponse = new Semaphore(1);
@@ -164,11 +166,6 @@
     }
   }
 
-  /** Returns whether the worker subprocess is alive (not finished yet). */
-  public boolean isProcessAlive() {
-    return !this.process.finished();
-  }
-
   /**
    * Sends the WorkRequest to worker process. This method is called on the thread of a {@code
    * WorkerProxy}.
@@ -226,7 +223,7 @@
    * Resets the semaphore map for {@code requestId} before sending a request to the worker process.
    * This method is called on the thread of a {@code WorkerProxy}.
    */
-  public void resetResponseChecker(Integer requestId) throws InterruptedException {
+  void resetResponseChecker(Integer requestId) throws InterruptedException {
     semResponseChecker.acquire();
     responseChecker.put(requestId, new Semaphore(0));
     semResponseChecker.release();
@@ -241,7 +238,7 @@
     recordingStream.startRecording(4096);
     WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
 
-    // This can only happen if the input stream is closed.
+    // A null parsedResponse can only happen if the input stream is closed.
     if (parsedResponse == null) {
       isWorkerStreamClosed = true;
       releaseAllSemaphores();
@@ -300,13 +297,30 @@
     }
   }
 
+  String getRecordingStreamMessage() {
+    return recordingStream.getRecordedDataAsString();
+  }
+
+  /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */
+  boolean diedUnexpectedly() {
+    return process != null && !process.isAlive() && !isInterrupted;
+  }
+
+  /** Returns the exit value of multiplexer's process, if it has exited. */
+  Optional<Integer> getExitValue() {
+    return process != null && !process.isAlive()
+        ? Optional.of(process.exitValue())
+        : Optional.empty();
+  }
+
   /** For testing only, to verify that maps are cleared after responses are reaped. */
   @VisibleForTesting
   boolean noOutstandingRequests() {
     return responseChecker.isEmpty() && workerProcessResponse.isEmpty();
   }
 
-  public void setProcessFactory(SubprocessFactory factory) {
+  @VisibleForTesting
+  void setProcessFactory(SubprocessFactory factory) {
     subprocessFactory = factory;
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java
index 7463926..f7241c6 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java
@@ -18,21 +18,23 @@
 import com.google.devtools.build.lib.actions.UserExecException;
 import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
 import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
-import com.google.devtools.build.lib.shell.Subprocess;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
 import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Optional;
 import java.util.Set;
 
-// TODO(karlgray): Refactor WorkerProxy so that it does not inherit from class Worker.
 /** A proxy that talks to the multiplexer */
 final class WorkerProxy extends Worker {
   private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
   private final WorkerMultiplexer workerMultiplexer;
-  private String recordingStreamMessage;
+  /** The execution root of the worker. */
+  private final Path workDir;
+
+  private Thread shutdownHook;
 
   WorkerProxy(
       WorkerKey workerKey,
@@ -40,20 +42,17 @@
       Path workDir,
       Path logFile,
       WorkerMultiplexer workerMultiplexer) {
-    super(workerKey, workerId, workDir, logFile);
+    super(workerKey, workerId, logFile);
+    this.workDir = workDir;
     this.workerMultiplexer = workerMultiplexer;
-  }
-
-  @Override
-  Subprocess createProcess() {
-    throw new IllegalStateException(
-        "WorkerProxy does not override createProcess(), the multiplexer process is started in"
-            + " prepareExecution");
-  }
-
-  @Override
-  boolean isAlive() {
-    return workerMultiplexer.isProcessAlive();
+    final WorkerProxy self = this;
+    this.shutdownHook =
+        new Thread(
+            () -> {
+              self.shutdownHook = null;
+              self.destroy();
+            });
+    Runtime.getRuntime().addShutdownHook(shutdownHook);
   }
 
   @Override
@@ -63,8 +62,9 @@
     workerMultiplexer.createProcess(workerKey, workDir);
   }
 
+  /** Send the WorkRequest to multiplexer. */
   @Override
-  synchronized void destroy() throws IOException {
+  synchronized void destroy() {
     try {
       WorkerMultiplexerManager.removeInstance(workerKey);
     } catch (InterruptedException e) {
@@ -104,7 +104,6 @@
       }
       return WorkResponse.parseDelimitedFrom(inputStream);
     } catch (IOException e) {
-      recordingStreamMessage = e.toString();
       throw new IOException(
           "IOException was caught while waiting for worker response. "
               + "It could because the worker returned unparseable response.");
@@ -124,7 +123,20 @@
   }
 
   @Override
+  public void finishExecution(Path execRoot) throws IOException {}
+
+  @Override
+  boolean diedUnexpectedly() {
+    return workerMultiplexer.diedUnexpectedly();
+  }
+
+  @Override
+  public Optional<Integer> getExitValue() {
+    return workerMultiplexer.getExitValue();
+  }
+
+  @Override
   String getRecordingStreamMessage() {
-    return recordingStreamMessage;
+    return workerMultiplexer.getRecordingStreamMessage();
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java b/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java
index 16b80aa..b480e5d 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java
@@ -62,7 +62,7 @@
   }
 
   /** A worker that uses a fake subprocess for I/O. */
-  static class TestWorker extends Worker {
+  static class TestWorker extends SingleplexWorker {
     private final FakeSubprocess fakeSubprocess;
 
     TestWorker(
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java
index 4407d47..0d69cf32 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java
@@ -73,7 +73,7 @@
     WorkerKey nonProxiedWorkerKey =
         createWorkerKey(/* mustBeSandboxed */ false, /* proxied */ false);
     Worker nonProxiedWorker = workerFactory.create(nonProxiedWorkerKey);
-    assertThat(nonProxiedWorker.getClass()).isEqualTo(Worker.class);
+    assertThat(nonProxiedWorker.getClass()).isEqualTo(SingleplexWorker.class);
 
     WorkerKey proxiedWorkerKey = createWorkerKey(/* mustBeSandboxed */ false, /* proxied */ true);
     Worker proxiedWorker = workerFactory.create(proxiedWorkerKey);
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java
index fd82c58..036ca7b 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java
@@ -50,7 +50,7 @@
   private FileSystem fileSystem;
   private int workerIds = 1;
 
-  private static class TestWorker extends Worker {
+  private static class TestWorker extends SingleplexWorker {
     TestWorker(WorkerKey workerKey, int workerId, Path workDir, Path logFile) {
       super(workerKey, workerId, workDir, logFile);
     }
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
index 7f6fdf4..f753769 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
@@ -43,7 +43,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link Worker}. */
+/** Tests for {@link SingleplexWorker}. */
 @RunWith(JUnit4.class)
 public final class WorkerTest {
   final FileSystem fs = new InMemoryFileSystem(DigestHashFunction.SHA256);
@@ -89,7 +89,7 @@
   }
 
   @Test
-  public void testPutRequest_success() throws IOException {
+  public void testPutRequest_success() throws IOException, InterruptedException {
     WorkRequest request = WorkRequest.getDefaultInstance();
 
     TestWorker testWorker = createTestWorker(new byte[0], PROTO);
@@ -103,7 +103,7 @@
   }
 
   @Test
-  public void testGetResponse_success() throws IOException {
+  public void testGetResponse_success() throws IOException, InterruptedException {
     WorkResponse response = WorkResponse.getDefaultInstance();
 
     TestWorker testWorker = createTestWorker(serializeResponseToProtoBytes(response), PROTO);
@@ -113,7 +113,7 @@
   }
 
   @Test
-  public void testPutRequest_json_success() throws IOException {
+  public void testPutRequest_json_success() throws IOException, InterruptedException {
     TestWorker testWorker = createTestWorker(new byte[0], JSON);
     testWorker.putRequest(WorkRequest.getDefaultInstance());
 
@@ -122,7 +122,7 @@
   }
 
   @Test
-  public void testGetResponse_json_success() throws IOException {
+  public void testGetResponse_json_success() throws IOException, InterruptedException {
     TestWorker testWorker = createTestWorker("{}".getBytes(UTF_8), JSON);
     WorkResponse readResponse = testWorker.getResponse(0);
     WorkResponse response = WorkResponse.getDefaultInstance();
@@ -131,7 +131,8 @@
   }
 
   @Test
-  public void testPutRequest_json_populatedFields_success() throws IOException {
+  public void testPutRequest_json_populatedFields_success()
+      throws IOException, InterruptedException {
     WorkRequest request =
         WorkRequest.newBuilder()
             .addArguments("testRequest")
@@ -154,7 +155,8 @@
   }
 
   @Test
-  public void testGetResponse_json_populatedFields_success() throws IOException {
+  public void testGetResponse_json_populatedFields_success()
+      throws IOException, InterruptedException {
     TestWorker testWorker =
         createTestWorker(
             "{\"exitCode\":1,\"output\":\"test output\",\"requestId\":1}".getBytes(UTF_8), JSON);