blob: 536088e7c8768bc49e1be052a36d132f629963b8 [file] [log] [blame]
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.shell.Subprocess;
import com.google.devtools.build.lib.shell.SubprocessBuilder;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.logging.Logger;
/** An intermediate worker that sends request and receives response from the worker process. */
public class WorkerMultiplexer extends Thread {
private static final Logger logger = Logger.getLogger(WorkerMultiplexer.class.getName());
/**
* WorkerMultiplexer is running as a thread on its own. When worker process returns the
* WorkResponse, it is stored in this map and wait for WorkerProxy to retrieve the response.
*/
private Map<Integer, InputStream> workerProcessResponse;
/** A semaphore to protect workerProcessResponse object. */
private Semaphore semWorkerProcessResponse;
/**
* After sending the WorkRequest, WorkerProxy will wait on a semaphore to be released.
* WorkerMultiplexer is responsible to release the corresponding semaphore in order to signal
* WorkerProxy that the WorkerResponse has been received.
*/
private Map<Integer, Semaphore> responseChecker;
/** A semaphore to protect responseChecker object. */
private Semaphore semResponseChecker;
/** The worker process that this WorkerMultiplexer should be talking to. */
private Subprocess process;
/**
* If one of the worker processes returns unparseable response, discard all the responses from
* other worker processes.
*/
private boolean isUnparseable;
/** InputStream from worker process. */
private RecordingInputStream recordingStream;
/** If worker process returns null, return null to WorkerProxy and discard all the responses. */
private boolean isNull;
/** A flag to stop multiplexer thread. */
private boolean isInterrupted;
WorkerMultiplexer() {
semWorkerProcessResponse = new Semaphore(1);
semResponseChecker = new Semaphore(1);
responseChecker = new HashMap<>();
workerProcessResponse = new HashMap<>();
isUnparseable = false;
isNull = false;
isInterrupted = false;
}
/** Only start one worker process for each WorkerMultiplexer, if it hasn't. */
public synchronized void createProcess(WorkerKey workerKey, Path workDir, Path logFile)
throws IOException {
if (this.process == null) {
ImmutableList<String> args = workerKey.getArgs();
File executable = new File(args.get(0));
if (!executable.isAbsolute() && executable.getParent() != null) {
List<String> newArgs = new ArrayList<>(args);
newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath());
args = ImmutableList.copyOf(newArgs);
}
SubprocessBuilder processBuilder = new SubprocessBuilder();
processBuilder.setArgv(args);
processBuilder.setWorkingDirectory(workDir.getPathFile());
processBuilder.setStderr(logFile.getPathFile());
processBuilder.setEnv(workerKey.getEnv());
this.process = processBuilder.start();
}
if (!this.isAlive()) {
this.start();
}
}
public synchronized void destroyMultiplexer() {
if (this.process != null) {
destroyProcess(this.process);
}
isInterrupted = true;
}
private void destroyProcess(Subprocess process) {
boolean wasInterrupted = false;
try {
process.destroy();
while (true) {
try {
process.waitFor();
return;
} catch (InterruptedException ie) {
wasInterrupted = true;
}
}
} finally {
// Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/
if (wasInterrupted) {
Thread.currentThread().interrupt(); // preserve interrupted status
}
}
}
public boolean isProcessAlive() {
return !this.process.finished();
}
/** Send the WorkRequest to worker process. */
public synchronized void putRequest(WorkRequest request) throws IOException {
request.writeDelimitedTo(process.getOutputStream());
process.getOutputStream().flush();
}
/** Wait on a semaphore for the WorkResponse returned from worker process. */
public InputStream getResponse(Integer workerId) throws IOException, InterruptedException {
semResponseChecker.acquire();
Semaphore waitForResponse = responseChecker.get(workerId);
semResponseChecker.release();
if (waitForResponse == null) {
/**
* If multiplexer is interrupted when WorkerProxy is trying to send request, request is not
* sent so no need to wait for response.
*/
return null;
}
// The semaphore will throw InterruptedException when the multiplexer is terminated.
waitForResponse.acquire();
if (isNull) {
return null;
}
if (isUnparseable) {
recordingStream.readRemaining();
throw new IOException(recordingStream.getRecordedDataAsString());
}
semWorkerProcessResponse.acquire();
InputStream response = workerProcessResponse.get(workerId);
semWorkerProcessResponse.release();
return response;
}
/** Reset the semaphore map before sending request to worker process. */
public void resetResponseChecker(Integer workerId) throws InterruptedException {
semResponseChecker.acquire();
responseChecker.put(workerId, new Semaphore(0));
semResponseChecker.release();
}
/**
* When it gets a WorkResponse from worker process, put that WorkResponse in workerProcessResponse
* and signal responseChecker.
*/
private void waitResponse() throws InterruptedException, IOException {
recordingStream = new RecordingInputStream(process.getInputStream());
recordingStream.startRecording(4096);
WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
if (parsedResponse == null) {
isNull = true;
releaseAllSemaphores();
return;
}
Integer workerId = parsedResponse.getRequestId();
ByteArrayOutputStream tempOs = new ByteArrayOutputStream();
parsedResponse.writeDelimitedTo(tempOs);
semWorkerProcessResponse.acquire();
workerProcessResponse.put(workerId, new ByteArrayInputStream(tempOs.toByteArray()));
semWorkerProcessResponse.release();
semResponseChecker.acquire();
responseChecker.get(workerId).release();
semResponseChecker.release();
}
/** A multiplexer thread that listens to the WorkResponse from worker process. */
@Override
public void run() {
while (!isInterrupted) {
try {
waitResponse();
} catch (IOException e) {
isUnparseable = true;
releaseAllSemaphores();
logger.warning(
"IOException was caught while waiting for worker response. "
+ "It could because the worker returned unparseable response.");
} catch (InterruptedException e) {
logger.warning(
"InterruptedException was caught while waiting for worker response. "
+ "It could because the multiplexer was interrupted.");
}
}
logger.warning(
"Multiplexer thread has been terminated. It could because the memory is running low on"
+ " your machine. There may be other reasons.");
}
/** Release all the semaphores */
private void releaseAllSemaphores() {
try {
semResponseChecker.acquire();
for (Integer workerId : responseChecker.keySet()) {
responseChecker.get(workerId).release();
}
} catch (InterruptedException e) {
// Do nothing
} finally {
semResponseChecker.release();
}
}
}