Make WorkerProxy not be a subclass of the singleplex worker, but rather of an abstract superclass.
The singleplex worker makes some assumptions that do not apply to multiplex workers, such as having a process and owning the recordingInputStream. This change fixes an outstanding TODO to rearrange the class hierarchy.
RELNOTES: n/a
PiperOrigin-RevId: 341413486
diff --git a/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java
index fc0c9f7..92c5641 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/SandboxedWorker.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.worker;
+import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
import com.google.devtools.build.lib.vfs.Path;
@@ -21,20 +22,13 @@
import java.io.IOException;
import java.util.Set;
-/** A {@link Worker} that runs inside a sandboxed execution root. */
-final class SandboxedWorker extends Worker {
- private final Path workDir;
+/** A {@link SingleplexWorker} that runs inside a sandboxed execution root. */
+final class SandboxedWorker extends SingleplexWorker {
+ private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private WorkerExecRoot workerExecRoot;
SandboxedWorker(WorkerKey workerKey, int workerId, Path workDir, Path logFile) {
super(workerKey, workerId, workDir, logFile);
- this.workDir = workDir;
- }
-
- @Override
- void destroy() throws IOException {
- super.destroy();
- workDir.deleteTree();
}
@Override
@@ -57,4 +51,14 @@
workerExecRoot.copyOutputs(execRoot);
workerExecRoot = null;
}
+
+ @Override
+ void destroy() {
+ super.destroy();
+ try {
+ workDir.deleteTree();
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log("Caught IOException while deleting workdir.");
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java
new file mode 100644
index 0000000..bc32510
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java
@@ -0,0 +1,166 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.worker;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.GoogleLogger;
+import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
+import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
+import com.google.devtools.build.lib.shell.Subprocess;
+import com.google.devtools.build.lib.shell.SubprocessBuilder;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * Interface to a worker process running as a single child process.
+ *
+ * <p>A worker process must follow this protocol to be usable via this class: The worker process is
+ * spawned on demand. The worker process is free to exit whenever necessary, as new instances will
+ * be relaunched automatically. Communication happens via the WorkerProtocol protobuf, sent to and
+ * received from the worker process via stdin / stdout.
+ *
+ * <p>Other code in Blaze can talk to the worker process via input / output streams provided by this
+ * class.
+ */
+class SingleplexWorker extends Worker {
+ private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+
+ /** The execution root of the worker. */
+ protected final Path workDir;
+ /**
+ * Stream for recording the WorkResponse as it's read, so that it can be printed in the case of
+ * parsing failures.
+ */
+ @Nullable private RecordingInputStream recordingInputStream;
+ /** The implementation of the worker protocol (JSON or Proto). */
+ @Nullable private WorkerProtocolImpl workerProtocol;
+
+ private Subprocess process;
+ /** True if we deliberately destroyed this process. */
+ private boolean wasDestroyed;
+
+ private Thread shutdownHook;
+
+ SingleplexWorker(WorkerKey workerKey, int workerId, final Path workDir, Path logFile) {
+ super(workerKey, workerId, logFile);
+ this.workDir = workDir;
+
+ final SingleplexWorker self = this;
+ this.shutdownHook =
+ new Thread(
+ () -> {
+ // Not sure why this is needed. philwo@ added it without explanation.
+ self.shutdownHook = null;
+ self.destroy();
+ });
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+
+ Subprocess createProcess() throws IOException {
+ ImmutableList<String> args = workerKey.getArgs();
+ File executable = new File(args.get(0));
+ if (!executable.isAbsolute() && executable.getParent() != null) {
+ List<String> newArgs = new ArrayList<>(args);
+ newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath());
+ args = ImmutableList.copyOf(newArgs);
+ }
+ SubprocessBuilder processBuilder = new SubprocessBuilder();
+ processBuilder.setArgv(args);
+ processBuilder.setWorkingDirectory(workDir.getPathFile());
+ processBuilder.setStderr(logFile.getPathFile());
+ processBuilder.setEnv(workerKey.getEnv());
+ return processBuilder.start();
+ }
+
+ @Override
+ public void prepareExecution(
+ SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
+ throws IOException {
+ if (process == null) {
+ process = createProcess();
+ recordingInputStream = new RecordingInputStream(process.getInputStream());
+ }
+ if (workerProtocol == null) {
+ switch (workerKey.getProtocolFormat()) {
+ case JSON:
+ workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingInputStream);
+ break;
+ case PROTO:
+ workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingInputStream);
+ break;
+ }
+ }
+ }
+
+ @Override
+ void putRequest(WorkRequest request) throws IOException {
+ workerProtocol.putRequest(request);
+ }
+
+ @Override
+ WorkResponse getResponse(int requestId) throws IOException {
+ recordingInputStream.startRecording(4096);
+ return workerProtocol.getResponse();
+ }
+
+ @Override
+ public void finishExecution(Path execRoot) throws IOException {}
+
+ @Override
+ void destroy() {
+ if (shutdownHook != null) {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ if (workerProtocol != null) {
+ try {
+ workerProtocol.close();
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log("Caught IOException while closing worker protocol.");
+ }
+ workerProtocol = null;
+ }
+ if (process != null) {
+ wasDestroyed = true;
+ process.destroyAndWait();
+ }
+ }
+
+ /** Returns true if this process is dead but we didn't deliberately kill it. */
+ @Override
+ boolean diedUnexpectedly() {
+ return process != null && !wasDestroyed && !process.isAlive();
+ }
+
+ @Override
+ public Optional<Integer> getExitValue() {
+ return process != null && !process.isAlive()
+ ? Optional.of(process.exitValue())
+ : Optional.empty();
+ }
+
+ @Override
+ String getRecordingStreamMessage() {
+ recordingInputStream.readRemaining();
+ return recordingInputStream.getRecordedDataAsString();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/Worker.java b/src/main/java/com/google/devtools/build/lib/worker/Worker.java
index cf720f0..51e1d8a 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/Worker.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/Worker.java
@@ -1,4 +1,4 @@
-// Copyright 2015 The Bazel Authors. All rights reserved.
+// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,107 +13,35 @@
// limitations under the License.
package com.google.devtools.build.lib.worker;
-
-import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashCode;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
-import com.google.devtools.build.lib.shell.Subprocess;
-import com.google.devtools.build.lib.shell.SubprocessBuilder;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
-import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
-import javax.annotation.Nullable;
/**
- * Interface to a worker process running as a child process.
- *
- * <p>A worker process must follow this protocol to be usable via this class: The worker process is
- * spawned on demand. The worker process is free to exit whenever necessary, as new instances will
- * be relaunched automatically. Communication happens via the WorkerProtocol protobuf, sent to and
- * received from the worker process via stdin / stdout.
- *
- * <p>Other code in Blaze can talk to the worker process via input / output streams provided by this
- * class.
+ * An abstract superclass for persistent workers. Workers execute actions in long-running processes
+ * that can handle multiple actions.
*/
-class Worker {
+public abstract class Worker {
+
/** An unique identifier of the work process. */
protected final WorkerKey workerKey;
/** An unique ID of the worker. It will be used in WorkRequest and WorkResponse as well. */
protected final int workerId;
- /** The execution root of the worker. */
- protected final Path workDir;
/** The path of the log file for this worker. */
- private final Path logFile;
- /**
- * Stream for recording the WorkResponse as it's read, so that it can be printed in the case of
- * parsing failures.
- */
- @Nullable private RecordingInputStream recordingInputStream;
- /** The implementation of the worker protocol (JSON or Proto). */
- @Nullable private WorkerProtocolImpl workerProtocol;
+ protected final Path logFile;
- private Subprocess process;
- private Thread shutdownHook;
- /** True if we deliberately destroyed this process. */
- private boolean wasDestroyed;
-
- Worker(WorkerKey workerKey, int workerId, final Path workDir, Path logFile) {
+ public Worker(WorkerKey workerKey, int workerId, Path logFile) {
this.workerKey = workerKey;
this.workerId = workerId;
- this.workDir = workDir;
this.logFile = logFile;
-
- final Worker self = this;
- this.shutdownHook =
- new Thread(
- () -> {
- try {
- self.shutdownHook = null;
- self.destroy();
- } catch (IOException e) {
- // We can't do anything here.
- }
- });
- Runtime.getRuntime().addShutdownHook(shutdownHook);
- }
-
- Subprocess createProcess() throws IOException {
- ImmutableList<String> args = workerKey.getArgs();
- File executable = new File(args.get(0));
- if (!executable.isAbsolute() && executable.getParent() != null) {
- List<String> newArgs = new ArrayList<>(args);
- newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath());
- args = ImmutableList.copyOf(newArgs);
- }
- SubprocessBuilder processBuilder = new SubprocessBuilder();
- processBuilder.setArgv(args);
- processBuilder.setWorkingDirectory(workDir.getPathFile());
- processBuilder.setStderr(logFile.getPathFile());
- processBuilder.setEnv(workerKey.getEnv());
- return processBuilder.start();
- }
-
- void destroy() throws IOException {
- if (shutdownHook != null) {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- }
- if (workerProtocol != null) {
- workerProtocol.close();
- workerProtocol = null;
- }
- if (process != null) {
- wasDestroyed = true;
- process.destroyAndWait();
- }
}
/**
@@ -137,56 +65,53 @@
return workerKey.getWorkerFilesWithHashes();
}
- boolean isAlive() {
- // This is horrible, but Process.isAlive() is only available from Java 8 on and this is the
- // best we can do prior to that.
- return !process.finished();
- }
+ /**
+ * Performs the necessary steps to prepare for execution. Once this is done, the worker should be
+ * able to receive a WorkRequest without further setup.
+ */
+ public abstract void prepareExecution(
+ SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
+ throws IOException;
- /** Returns true if this process is dead but we didn't deliberately kill it. */
- boolean diedUnexpectedly() {
- return process != null && !wasDestroyed && !process.isAlive();
- }
+ /**
+ * Sends a WorkRequest to the worker.
+ *
+ * @param request The request to send.
+ * @throws IOException If there was a problem doing I/O, or this thread was interrupted at a time
+ * where some or all of the expected I/O has been done.
+ * @throws InterruptedException If this thread was interrupted before doing any I/O.
+ */
+ abstract void putRequest(WorkRequest request) throws IOException, InterruptedException;
+
+ /**
+ * Waits to receive a response from the worker. This method should return as soon as a response
+ * has been received, moving of files and cleanup should wait until finishExecution().
+ *
+ * @param requestId ID of the request to retrieve a response for.
+ * @return The WorkResponse received.
+ * @throws IOException If there was a problem doing I/O.
+ * @throws InterruptedException If this thread was interrupted, which can also happen during IO.
+ */
+ abstract WorkResponse getResponse(int requestId) throws IOException, InterruptedException;
+
+ /** Does whatever cleanup may be required after execution is done. */
+ public abstract void finishExecution(Path execRoot) throws IOException;
+
+ /**
+ * Destroys this worker. Once this has been called, we assume it's safe to clean up related
+ * directories.
+ */
+ abstract void destroy();
+
+ /** Returns true if this worker is dead but we didn't deliberately kill it. */
+ abstract boolean diedUnexpectedly();
/** Returns the exit value of this worker's process, if it has exited. */
- public Optional<Integer> getExitValue() {
- return process != null && !process.isAlive()
- ? Optional.of(process.exitValue())
- : Optional.empty();
- }
+ public abstract Optional<Integer> getExitValue();
- void putRequest(WorkRequest request) throws IOException {
- workerProtocol.putRequest(request);
- }
-
- WorkResponse getResponse(int requestId) throws IOException {
- recordingInputStream.startRecording(4096);
- return workerProtocol.getResponse();
- }
-
- String getRecordingStreamMessage() {
- recordingInputStream.readRemaining();
- return recordingInputStream.getRecordedDataAsString();
- }
-
- public void prepareExecution(
- SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
- throws IOException {
- if (process == null) {
- process = createProcess();
- recordingInputStream = new RecordingInputStream(process.getInputStream());
- }
- if (workerProtocol == null) {
- switch (workerKey.getProtocolFormat()) {
- case JSON:
- workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingInputStream);
- break;
- case PROTO:
- workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingInputStream);
- break;
- }
- }
- }
-
- public void finishExecution(Path execRoot) throws IOException {}
+ /**
+ * Returns the last message received on the InputStream, if an unparseable message has been
+ * received.
+ */
+ abstract String getRecordingStreamMessage();
}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java
index 45f7561..182c0c4 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java
@@ -70,7 +70,7 @@
new WorkerProxy(
key, workerId, key.getExecRoot(), workerMultiplexer.getLogFile(), workerMultiplexer);
} else {
- worker = new Worker(key, workerId, key.getExecRoot(), logFile);
+ worker = new SingleplexWorker(key, workerId, key.getExecRoot(), logFile);
}
if (workerOptions.workerVerbose) {
reporter.handle(
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
index 881e516..dadfdcb 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
@@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Semaphore;
/**
@@ -47,18 +48,18 @@
* A map of {@code WorkResponse}s received from the worker process. They are stored in this map
* until the corresponding {@code WorkerProxy} picks them up.
*/
- private Map<Integer, InputStream> workerProcessResponse;
+ private final Map<Integer, InputStream> workerProcessResponse;
/** A semaphore to protect {@code workerProcessResponse} object. */
- private Semaphore semWorkerProcessResponse;
+ private final Semaphore semWorkerProcessResponse;
/**
* A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code
* WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code
* WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal
* {@code WorkerProxy} that the {@code WorkerResponse} has been received.
*/
- private Map<Integer, Semaphore> responseChecker;
+ private final Map<Integer, Semaphore> responseChecker;
/** A semaphore to protect responseChecker object. */
- private Semaphore semResponseChecker;
+ private final Semaphore semResponseChecker;
/** The worker process that this WorkerMultiplexer should be talking to. */
private Subprocess process;
/**
@@ -81,7 +82,8 @@
*/
private final Path logFile;
- private SubprocessFactory subprocessFactory = null;
+ /** For testing only, allow a way to fake subprocesses. */
+ private SubprocessFactory subprocessFactory;
WorkerMultiplexer(Path logFile) {
semWorkerProcessResponse = new Semaphore(1);
@@ -164,11 +166,6 @@
}
}
- /** Returns whether the worker subprocess is alive (not finished yet). */
- public boolean isProcessAlive() {
- return !this.process.finished();
- }
-
/**
* Sends the WorkRequest to worker process. This method is called on the thread of a {@code
* WorkerProxy}.
@@ -226,7 +223,7 @@
* Resets the semaphore map for {@code requestId} before sending a request to the worker process.
* This method is called on the thread of a {@code WorkerProxy}.
*/
- public void resetResponseChecker(Integer requestId) throws InterruptedException {
+ void resetResponseChecker(Integer requestId) throws InterruptedException {
semResponseChecker.acquire();
responseChecker.put(requestId, new Semaphore(0));
semResponseChecker.release();
@@ -241,7 +238,7 @@
recordingStream.startRecording(4096);
WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
- // This can only happen if the input stream is closed.
+ // A null parsedResponse can only happen if the input stream is closed.
if (parsedResponse == null) {
isWorkerStreamClosed = true;
releaseAllSemaphores();
@@ -300,13 +297,30 @@
}
}
+ String getRecordingStreamMessage() {
+ return recordingStream.getRecordedDataAsString();
+ }
+
+ /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */
+ boolean diedUnexpectedly() {
+ return process != null && !process.isAlive() && !isInterrupted;
+ }
+
+ /** Returns the exit value of multiplexer's process, if it has exited. */
+ Optional<Integer> getExitValue() {
+ return process != null && !process.isAlive()
+ ? Optional.of(process.exitValue())
+ : Optional.empty();
+ }
+
/** For testing only, to verify that maps are cleared after responses are reaped. */
@VisibleForTesting
boolean noOutstandingRequests() {
return responseChecker.isEmpty() && workerProcessResponse.isEmpty();
}
- public void setProcessFactory(SubprocessFactory factory) {
+ @VisibleForTesting
+ void setProcessFactory(SubprocessFactory factory) {
subprocessFactory = factory;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java
index 7463926..f7241c6 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java
@@ -18,21 +18,23 @@
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
-import com.google.devtools.build.lib.shell.Subprocess;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Optional;
import java.util.Set;
-// TODO(karlgray): Refactor WorkerProxy so that it does not inherit from class Worker.
/** A proxy that talks to the multiplexer */
final class WorkerProxy extends Worker {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final WorkerMultiplexer workerMultiplexer;
- private String recordingStreamMessage;
+ /** The execution root of the worker. */
+ private final Path workDir;
+
+ private Thread shutdownHook;
WorkerProxy(
WorkerKey workerKey,
@@ -40,20 +42,17 @@
Path workDir,
Path logFile,
WorkerMultiplexer workerMultiplexer) {
- super(workerKey, workerId, workDir, logFile);
+ super(workerKey, workerId, logFile);
+ this.workDir = workDir;
this.workerMultiplexer = workerMultiplexer;
- }
-
- @Override
- Subprocess createProcess() {
- throw new IllegalStateException(
- "WorkerProxy does not override createProcess(), the multiplexer process is started in"
- + " prepareExecution");
- }
-
- @Override
- boolean isAlive() {
- return workerMultiplexer.isProcessAlive();
+ final WorkerProxy self = this;
+ this.shutdownHook =
+ new Thread(
+ () -> {
+ self.shutdownHook = null;
+ self.destroy();
+ });
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
}
@Override
@@ -63,8 +62,9 @@
workerMultiplexer.createProcess(workerKey, workDir);
}
+ /** Send the WorkRequest to multiplexer. */
@Override
- synchronized void destroy() throws IOException {
+ synchronized void destroy() {
try {
WorkerMultiplexerManager.removeInstance(workerKey);
} catch (InterruptedException e) {
@@ -104,7 +104,6 @@
}
return WorkResponse.parseDelimitedFrom(inputStream);
} catch (IOException e) {
- recordingStreamMessage = e.toString();
throw new IOException(
"IOException was caught while waiting for worker response. "
+ "It could because the worker returned unparseable response.");
@@ -124,7 +123,20 @@
}
@Override
+ public void finishExecution(Path execRoot) throws IOException {}
+
+ @Override
+ boolean diedUnexpectedly() {
+ return workerMultiplexer.diedUnexpectedly();
+ }
+
+ @Override
+ public Optional<Integer> getExitValue() {
+ return workerMultiplexer.getExitValue();
+ }
+
+ @Override
String getRecordingStreamMessage() {
- return recordingStreamMessage;
+ return workerMultiplexer.getRecordingStreamMessage();
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java b/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java
index 16b80aa..b480e5d 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java
@@ -62,7 +62,7 @@
}
/** A worker that uses a fake subprocess for I/O. */
- static class TestWorker extends Worker {
+ static class TestWorker extends SingleplexWorker {
private final FakeSubprocess fakeSubprocess;
TestWorker(
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java
index 4407d47..0d69cf32 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java
@@ -73,7 +73,7 @@
WorkerKey nonProxiedWorkerKey =
createWorkerKey(/* mustBeSandboxed */ false, /* proxied */ false);
Worker nonProxiedWorker = workerFactory.create(nonProxiedWorkerKey);
- assertThat(nonProxiedWorker.getClass()).isEqualTo(Worker.class);
+ assertThat(nonProxiedWorker.getClass()).isEqualTo(SingleplexWorker.class);
WorkerKey proxiedWorkerKey = createWorkerKey(/* mustBeSandboxed */ false, /* proxied */ true);
Worker proxiedWorker = workerFactory.create(proxiedWorkerKey);
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java
index fd82c58..036ca7b 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerPoolTest.java
@@ -50,7 +50,7 @@
private FileSystem fileSystem;
private int workerIds = 1;
- private static class TestWorker extends Worker {
+ private static class TestWorker extends SingleplexWorker {
TestWorker(WorkerKey workerKey, int workerId, Path workDir, Path logFile) {
super(workerKey, workerId, workDir, logFile);
}
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
index 7f6fdf4..f753769 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java
@@ -43,7 +43,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link Worker}. */
+/** Tests for {@link SingleplexWorker}. */
@RunWith(JUnit4.class)
public final class WorkerTest {
final FileSystem fs = new InMemoryFileSystem(DigestHashFunction.SHA256);
@@ -89,7 +89,7 @@
}
@Test
- public void testPutRequest_success() throws IOException {
+ public void testPutRequest_success() throws IOException, InterruptedException {
WorkRequest request = WorkRequest.getDefaultInstance();
TestWorker testWorker = createTestWorker(new byte[0], PROTO);
@@ -103,7 +103,7 @@
}
@Test
- public void testGetResponse_success() throws IOException {
+ public void testGetResponse_success() throws IOException, InterruptedException {
WorkResponse response = WorkResponse.getDefaultInstance();
TestWorker testWorker = createTestWorker(serializeResponseToProtoBytes(response), PROTO);
@@ -113,7 +113,7 @@
}
@Test
- public void testPutRequest_json_success() throws IOException {
+ public void testPutRequest_json_success() throws IOException, InterruptedException {
TestWorker testWorker = createTestWorker(new byte[0], JSON);
testWorker.putRequest(WorkRequest.getDefaultInstance());
@@ -122,7 +122,7 @@
}
@Test
- public void testGetResponse_json_success() throws IOException {
+ public void testGetResponse_json_success() throws IOException, InterruptedException {
TestWorker testWorker = createTestWorker("{}".getBytes(UTF_8), JSON);
WorkResponse readResponse = testWorker.getResponse(0);
WorkResponse response = WorkResponse.getDefaultInstance();
@@ -131,7 +131,8 @@
}
@Test
- public void testPutRequest_json_populatedFields_success() throws IOException {
+ public void testPutRequest_json_populatedFields_success()
+ throws IOException, InterruptedException {
WorkRequest request =
WorkRequest.newBuilder()
.addArguments("testRequest")
@@ -154,7 +155,8 @@
}
@Test
- public void testGetResponse_json_populatedFields_success() throws IOException {
+ public void testGetResponse_json_populatedFields_success()
+ throws IOException, InterruptedException {
TestWorker testWorker =
createTestWorker(
"{\"exitCode\":1,\"output\":\"test output\",\"requestId\":1}".getBytes(UTF_8), JSON);