Update the docs in WorkerMultiplexer to be clearer, based on my recent understanding of how it works. RELNOTES: None PiperOrigin-RevId: 324005622
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 75b218d..c70f1a3 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
@@ -32,20 +32,27 @@ import java.util.Map; import java.util.concurrent.Semaphore; -/** An intermediate worker that sends request and receives response from the worker process. */ +/** + * An intermediate worker that sends requests and receives responses from the worker processes. + * There is at most one of these per {@code WorkerKey}, corresponding to one worker process. {@code + * WorkerMultiplexer} objects run in separate long-lived threads. {@code WorkerProxy} objects call + * into them to send requests. When a worker process returns a {@code WorkResponse}, {@code + * WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response. + */ public class WorkerMultiplexer extends Thread { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); /** - * WorkerMultiplexer is running as a thread on its own. When worker process returns the - * WorkResponse, it is stored in this map and wait for WorkerProxy to retrieve the response. + * A map of {@code WorkResponse}s received from the worker process. They are stored in this map + * until the corresponding {@code WorkerProxy} picks them up. */ private Map<Integer, InputStream> workerProcessResponse; - /** A semaphore to protect workerProcessResponse object. */ + /** A semaphore to protect {@code workerProcessResponse} object. */ private Semaphore semWorkerProcessResponse; /** - * After sending the WorkRequest, WorkerProxy will wait on a semaphore to be released. - * WorkerMultiplexer is responsible to release the corresponding semaphore in order to signal - * WorkerProxy that the WorkerResponse has been received. + * A map of semaphores corresponding to {@code WorkerProxy} objects. After sending the {@code + * WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code + * WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal + * {@code WorkerProxy} that the {@code WorkerResponse} has been received. */ private Map<Integer, Semaphore> responseChecker; /** A semaphore to protect responseChecker object. */ @@ -53,15 +60,18 @@ /** The worker process that this WorkerMultiplexer should be talking to. */ private Subprocess process; /** - * If one of the worker processes returns unparseable response, discard all the responses from - * other worker processes. + * Set to true if one of the worker processes returns an unparseable response. We then discard all + * the responses from other worker processes and abort. */ private boolean isUnparseable; - /** InputStream from worker process. */ + /** InputStream from the worker process. */ private RecordingInputStream recordingStream; - /** If worker process returns null, return null to WorkerProxy and discard all the responses. */ - private boolean isNull; - /** A flag to stop multiplexer thread. */ + /** + * True if we have received EOF on the stream from the worker process. We then stop processing, + * and all workers still waiting for responses will fail. + */ + private boolean isWorkerStreamClosed; + /** True if the multiplexer thread has been interrupted. */ private boolean isInterrupted; WorkerMultiplexer() { @@ -70,11 +80,14 @@ responseChecker = new HashMap<>(); workerProcessResponse = new HashMap<>(); isUnparseable = false; - isNull = false; + isWorkerStreamClosed = false; isInterrupted = false; } - /** Only start one worker process for each WorkerMultiplexer, if it hasn't. */ + /** + * Creates a worker process corresponding to this {@code WorkerMultiplexer}, if it doesn't already + * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread. + */ public synchronized void createProcess(WorkerKey workerKey, Path workDir, Path logFile) throws IOException { if (this.process == null) { @@ -97,6 +110,10 @@ } } + /** + * Signals this object to destroy itself, including the worker process. The object might not be + * fully destroyed at the end of this call, but will terminate soon. + */ public synchronized void destroyMultiplexer() { if (this.process != null) { destroyProcess(this.process); @@ -104,6 +121,7 @@ isInterrupted = true; } + /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */ private void destroyProcess(Subprocess process) { boolean wasInterrupted = false; try { @@ -124,34 +142,40 @@ } } + /** Returns whether the worker subprocess is alive (not finished yet). */ public boolean isProcessAlive() { return !this.process.finished(); } - /** Send the WorkRequest to worker process. */ + /** + * Sends the WorkRequest to worker process. This method is called on the thread of a {@code + * WorkerProxy}. + */ public synchronized void putRequest(WorkRequest request) throws IOException { request.writeDelimitedTo(process.getOutputStream()); process.getOutputStream().flush(); } - /** Wait on a semaphore for the WorkResponse returned from worker process. */ + /** + * Waits on a semaphore for the {@code WorkResponse} returned from worker process. This method is + * called on the thread of a {@code WorkerProxy}. + */ public InputStream getResponse(Integer workerId) throws IOException, InterruptedException { semResponseChecker.acquire(); Semaphore waitForResponse = responseChecker.get(workerId); semResponseChecker.release(); if (waitForResponse == null) { - /** - * If multiplexer is interrupted when WorkerProxy is trying to send request, request is not - * sent so no need to wait for response. - */ + // If the multiplexer is interrupted when a {@code WorkerProxy} is trying to send a request, + // the request is not sent, so there is no need to wait for a response. return null; } - // The semaphore will throw InterruptedException when the multiplexer is terminated. + // Wait for the multiplexer to get our response and release this semaphore. The semaphore will + // throw {@code InterruptedException} when the multiplexer is terminated. waitForResponse.acquire(); - if (isNull) { + if (isWorkerStreamClosed) { return null; } @@ -166,7 +190,10 @@ return response; } - /** Reset the semaphore map before sending request to worker process. */ + /** + * Resets the semaphore map for {@code workerId} before sending a request to the worker process. + * This method is called on the thread of a {@code WorkerProxy}. + */ public void resetResponseChecker(Integer workerId) throws InterruptedException { semResponseChecker.acquire(); responseChecker.put(workerId, new Semaphore(0)); @@ -174,16 +201,17 @@ } /** - * When it gets a WorkResponse from worker process, put that WorkResponse in workerProcessResponse - * and signal responseChecker. + * Waits to read a {@code WorkResponse} from worker process, put that {@code WorkResponse} in + * {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}. */ private void waitResponse() throws InterruptedException, IOException { recordingStream = new RecordingInputStream(process.getInputStream()); recordingStream.startRecording(4096); WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + // This can only happen if the input stream is closed. if (parsedResponse == null) { - isNull = true; + isWorkerStreamClosed = true; releaseAllSemaphores(); return; } @@ -201,7 +229,7 @@ semResponseChecker.release(); } - /** A multiplexer thread that listens to the WorkResponse from worker process. */ + /** The multiplexer thread that listens to the WorkResponse from worker process. */ @Override public void run() { while (!isInterrupted) {
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java index d3582d4..5387cd4 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java
@@ -22,16 +22,19 @@ import java.util.Map; import java.util.concurrent.Semaphore; -/** A manager to instantiate and destroy multiplexers. */ +/** + * A manager to instantiate and destroy multiplexers. There should only be one {@code + * WorkerMultiplexer} corresponding to workers with the same {@code WorkerKey}. If the {@code + * WorkerMultiplexer} has been constructed, other workers should point to the same one. + */ public class WorkerMultiplexerManager { /** - * There should only be one WorkerMultiplexer corresponding to workers with the same mnemonic. If - * the WorkerMultiplexer has been constructed, other workers should point to the same one. The - * hash of WorkerKey is used as key. + * A map from the hash of {@code WorkerKey} objects to the corresponding information about the + * multiplexer instance. */ private static Map<Integer, InstanceInfo> multiplexerInstance; - /** A semaphore to protect multiplexerInstance and multiplexerRefCount objects. */ + /** A semaphore to protect {@code multiplexerInstance} and {@code multiplexerRefCount} objects. */ private static Semaphore semMultiplexer; static { @@ -42,9 +45,9 @@ private WorkerMultiplexerManager() {} /** - * Returns a WorkerMultiplexer instance to WorkerProxy. WorkerProxies with the same workerHash - * talk to the same WorkerMultiplexer. Also, record how many WorkerProxies are talking to this - * WorkerMultiplexer. + * Returns a {@code WorkerMultiplexer} instance to {@code WorkerProxy}. {@code WorkerProxy} + * objects with the same workerHash talk to the same {@code WorkerMultiplexer}. Also, record how + * many {@code WorkerProxy} objects are talking to this {@code WorkerMultiplexer}. */ public static WorkerMultiplexer getInstance(Integer workerHash) throws InterruptedException { semMultiplexer.acquire(); @@ -58,7 +61,10 @@ return workerMultiplexer; } - /** Remove the WorkerMultiplexer instance and reference count since it is no longer in use. */ + /** + * Removes the {@code WorkerMultiplexer} instance and reference count since it is no longer in + * use. + */ public static void removeInstance(Integer workerHash) throws InterruptedException, UserExecException { semMultiplexer.acquire(); @@ -108,7 +114,7 @@ .build()); } - /** Contains the WorkerMultiplexer instance and reference count */ + /** Contains the WorkerMultiplexer instance and reference count. */ static class InstanceInfo { private WorkerMultiplexer workerMultiplexer; private Integer refCount;