blob: 2374da9124ec4aa6eaaf9da731878a807b3bdbab [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.BlazeCommandResult;
import com.google.devtools.build.lib.runtime.BlazeRuntime;
import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy;
import com.google.devtools.build.lib.server.CommandManager.RunningCommand;
import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
import com.google.devtools.build.lib.server.CommandProtos.CancelResponse;
import com.google.devtools.build.lib.server.CommandProtos.PingRequest;
import com.google.devtools.build.lib.server.CommandProtos.PingResponse;
import com.google.devtools.build.lib.server.CommandProtos.RunRequest;
import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.server.CommandProtos.StartupOption;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.common.options.InvocationPolicyParser;
import com.google.devtools.common.options.OptionsParsingException;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
/**
* gRPC server class.
*
* <p>Only this class should depend on gRPC so that we only need to exclude this during
* bootstrapping.
*
* <p>This class is a little complicated and rich in multithreading, so an explanation of its
* innards follows.
*
* <p>We use the direct executor for gRPC so that it calls our methods directly on its event handler
* threads (which it creates itself). This is acceptable for {@code ping()} and {@code cancel()}
* because they run very quickly. For {@code run()}, we transfer the call to our own threads in
* {@code commandExecutorPool}. We do this instead of setting an executor on the server object
* because gRPC insists on serializing calls within a single RPC call, which means that the Runnable
* passed to {@code setOnReadyHandler} doesn't get called while the main RPC method is running,
* which means we can't use flow control, which we need so that gRPC doesn't buffer an unbounded
* amount of outgoing data.
*
* <p>Two threads are spawned for each command: one that handles the command in {@code
* commandExecutorPool} and one that streams the result back to the client in {@code
* streamExecutorPool}.
*
* <p>In addition to these threads, we maintain one extra thread for handling the server timeout and
* an interrupt watcher thread is started for each interrupt request that logs if it takes too long
* to take effect.
*
* <p>Each running RPC has a UUID associated with it that is used to identify it when a client wants
* to cancel it. Cancellation is done by the client sending the server a {@code cancel()} RPC call
* which results in the main thread of the command being interrupted.
*/
public class GrpcServerImpl implements RPCServer {
private static final Logger logger = Logger.getLogger(GrpcServerImpl.class.getName());
private final boolean shutdownOnLowSysMem;
/**
* Factory class. Instantiated by reflection.
*
* <p>Used so that method calls using reflection are as simple as possible.
*/
public static class Factory implements RPCServer.Factory {
@Override
public RPCServer create(
BlazeCommandDispatcher dispatcher,
Clock clock,
int port,
Path serverDirectory,
int maxIdleSeconds,
boolean shutdownOnLowSysMem,
boolean idleServerTasks)
throws IOException {
return new GrpcServerImpl(
dispatcher,
clock,
port,
serverDirectory,
maxIdleSeconds,
shutdownOnLowSysMem,
idleServerTasks);
}
}
@VisibleForTesting
enum StreamType {
STDOUT,
STDERR,
}
/** Actions {@link GrpcSink} can do. */
private enum SinkThreadAction {
DISCONNECT,
FINISH,
READY,
SEND,
}
/**
* Sent back and forth between threads wanting to write to the client stream and the stream
* handler thread.
*/
@Immutable
private static final class SinkThreadItem {
private final boolean success;
private final RunResponse message;
private SinkThreadItem(boolean success, RunResponse message) {
this.success = success;
this.message = message;
}
}
/**
* A class that handles communicating through a gRPC interface for a streaming rpc call.
*
* <p>It can do four things:
* <li>Send a response message over the wire. If the channel is ready, it's sent immediately, if
* it's not, blocks until it is. Note that there can always be only one thread in {@link
* #offer(RunResponse)} because it's synchronized. This results in the associated streams
* blocking if gRPC is not ready, which is how we implement pushback.
* <li>Be notified that gRPC is ready. If there is a pending message, it is then sent.
* <li>Be notified that the client disconnected. In this case, an {@link IOException} is reported
* and the thread from which the stream was written to is interrupted so that the server
* becomes free as soon as possible.
* <li>Processing can be terminated. It is reported whether the client disconnected before.
*/
@VisibleForTesting
static class GrpcSink {
private final LinkedBlockingQueue<SinkThreadAction> actionQueue;
private final Exchanger<SinkThreadItem> exchanger;
private final ServerCallStreamObserver<RunResponse> observer;
private final Future<?> future;
private final AtomicReference<Thread> commandThread = new AtomicReference<>();
private final AtomicBoolean disconnected = new AtomicBoolean(false);
private final AtomicLong receivedEventCount = new AtomicLong(0);
@VisibleForTesting
GrpcSink(
final String rpcCommandName,
ServerCallStreamObserver<RunResponse> observer,
ExecutorService executor) {
// This queue is intentionally unbounded: we always act on it fairly quickly so filling up
// RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers.
this.actionQueue = new LinkedBlockingQueue<>();
this.exchanger = new Exchanger<>();
this.observer = observer;
this.observer.setOnCancelHandler(
() -> {
Thread commandThread = GrpcSink.this.commandThread.get();
if (commandThread != null) {
logger.info(
String.format(
"Interrupting thread %s due to the streaming %s call being cancelled "
+ "(likely client hang up or explicit gRPC-level cancellation)",
commandThread.getName(), rpcCommandName));
commandThread.interrupt();
}
actionQueue.offer(SinkThreadAction.DISCONNECT);
});
this.observer.setOnReadyHandler(() -> actionQueue.offer(SinkThreadAction.READY));
this.future = executor.submit(GrpcSink.this::call);
}
@VisibleForTesting
long getReceivedEventCount() {
return receivedEventCount.get();
}
@VisibleForTesting
void setCommandThread(Thread thread) {
Thread old = commandThread.getAndSet(thread);
if (old != null) {
throw new IllegalStateException(String.format("Command state set twice (thread %s ->%s)",
old.getName(), Thread.currentThread().getName()));
}
}
/**
* Sends an item to the client.
*
* @return true if the item was sent successfully, false if the connection to the client was
* lost
*/
@VisibleForTesting
synchronized boolean offer(RunResponse item) {
SinkThreadItem queueItem = new SinkThreadItem(false, item);
actionQueue.offer(SinkThreadAction.SEND);
return exchange(queueItem, false).success;
}
private boolean disconnected() {
return disconnected.get();
}
@VisibleForTesting
boolean finish() {
actionQueue.offer(SinkThreadAction.FINISH);
try {
Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
}
// Reset the interrupted bit so that it doesn't stay set for the next command that is handled
// by this thread
Thread.interrupted();
return disconnected();
}
private SinkThreadItem exchange(SinkThreadItem item, boolean swallowInterrupts) {
boolean interrupted = false;
SinkThreadItem result;
while (true) {
try {
result = exchanger.exchange(item);
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted && !swallowInterrupts) {
Thread.currentThread().interrupt();
}
return result;
}
private void sendPendingItem() {
SinkThreadItem item = exchange(new SinkThreadItem(true, null), true);
try {
observer.onNext(item.message);
} catch (StatusRuntimeException e) {
// The RPC was cancelled e.g. by the client terminating unexpectedly. We'll eventually get
// notified about this and interrupt the command thread, but in the meantime, we can just
// ignore the error; the client is dead, so there isn't anyone to talk to so swallowing the
// output is fine.
logger.info(
String.format(
"Client cancelled command for streamer thread %s",
Thread.currentThread().getName()));
}
}
/** Main function of the streamer thread. */
private void call() {
boolean itemPending = false;
while (true) {
SinkThreadAction action;
action = Uninterruptibles.takeUninterruptibly(actionQueue);
receivedEventCount.incrementAndGet();
switch (action) {
case FINISH:
if (itemPending) {
exchange(new SinkThreadItem(false, null), true);
itemPending = false;
}
// Reset the interrupted bit so that it doesn't stay set for the next command that is
// handled by this thread
Thread.interrupted();
return;
case READY:
if (itemPending) {
sendPendingItem();
itemPending = false;
}
break;
case DISCONNECT:
logger.info(
"Client disconnected for stream thread " + Thread.currentThread().getName());
disconnected.set(true);
if (itemPending) {
exchange(new SinkThreadItem(false, null), true);
itemPending = false;
}
break;
case SEND:
if (disconnected()) {
exchange(new SinkThreadItem(false, null), true);
} else if (observer.isReady()) {
sendPendingItem();
} else {
itemPending = true;
}
}
}
}
}
/**
* An output stream that forwards the data written to it over the gRPC command stream.
*
* <p>Note that wraping this class with a {@code Channel} can cause a deadlock if there is an
* {@link OutputStream} in between that synchronizes both on {@code #close()} and {@code #write()}
* because then if an interrupt happens in {@link GrpcSink#exchange(SinkThreadItem, boolean)},
* the thread on which {@code interrupt()} was called will wait until the {@code Channel} closes
* itself while holding a lock for interrupting the thread on which {@code #exchange()} is
* being executed and that thread will hold a lock that is needed for the {@code Channel} to be
* closed and call {@code interrupt()} in {@code #exchange()}, which will in turn try to acquire
* the interrupt lock.
*/
@VisibleForTesting
static class RpcOutputStream extends OutputStream {
private static final int CHUNK_SIZE = 8192;
// Store commandId and responseCookie as ByteStrings to avoid String -> UTF8 bytes conversion
// for each serialized chunk of output.
private final ByteString commandIdBytes;
private final ByteString responseCookieBytes;
private final StreamType type;
private final GrpcSink sink;
RpcOutputStream(String commandId, String responseCookie, StreamType type, GrpcSink sink) {
this.commandIdBytes = ByteString.copyFromUtf8(commandId);
this.responseCookieBytes = ByteString.copyFromUtf8(responseCookie);
this.type = type;
this.sink = sink;
}
@Override
public synchronized void write(byte[] b, int off, int inlen) throws IOException {
for (int i = 0; i < inlen; i += CHUNK_SIZE) {
ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i));
RunResponse.Builder response = RunResponse
.newBuilder()
.setCookieBytes(responseCookieBytes)
.setCommandIdBytes(commandIdBytes);
switch (type) {
case STDOUT: response.setStandardOutput(input); break;
case STDERR: response.setStandardError(input); break;
default: throw new IllegalStateException();
}
// Send the chunk to the streamer thread. May block.
if (!sink.offer(response.build())) {
// Client disconnected. Terminate the current command as soon as possible. Note that
// throwing IOException is not enough because we are in the habit of swallowing it. Note
// that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler)
// we interrupt the command thread, which should be enough to make the server come around
// as soon as possible.
logger.info(
String.format(
"Client disconnected received for command %s on thread %s",
commandIdBytes.toStringUtf8(), Thread.currentThread().getName()));
throw new IOException("Client disconnected");
}
}
}
@Override
public void write(int byteAsInt) throws IOException {
byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons
write(new byte[] {b}, 0, 1);
}
}
/**
* A thread that watches if the PID file changes and shuts down the server immediately if so.
*/
private class PidFileWatcherThread extends Thread {
private boolean shuttingDown = false;
private PidFileWatcherThread() {
super("pid-file-watcher");
setDaemon(true);
}
// The synchronized block is here so that if the "PID file deleted" timer kicks in during a
// regular shutdown, they don't race.
private synchronized void signalShutdown() {
shuttingDown = true;
}
@Override
public void run() {
while (true) {
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
boolean ok = false;
try {
String pidFileContents = new String(FileSystemUtils.readContentAsLatin1(pidFile));
ok = pidFileContents.equals(pidInFile);
} catch (IOException e) {
logger.info("Cannot read PID file: " + e.getMessage());
// Handled by virtue of ok not being set to true
}
if (!ok) {
synchronized (PidFileWatcherThread.this) {
if (shuttingDown) {
logger.warning("PID file deleted or overwritten but shutdown is already in progress");
break;
}
shuttingDown = true;
// Someone overwrote the PID file. Maybe it's another server, so shut down as quickly
// as possible without even running the shutdown hooks (that would delete it)
logger.severe("PID file deleted or overwritten, exiting as quickly as possible");
Runtime.getRuntime().halt(ExitCode.BLAZE_INTERNAL_ERROR.getNumericExitCode());
}
}
}
}
}
// These paths are all relative to the server directory
private static final String PORT_FILE = "command_port";
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
private static final AtomicBoolean runShutdownHooks = new AtomicBoolean(true);
private final CommandManager commandManager;
private final BlazeCommandDispatcher dispatcher;
private final ExecutorService streamExecutorPool;
private final ExecutorService commandExecutorPool;
private final Clock clock;
private final Path serverDirectory;
private final String requestCookie;
private final String responseCookie;
private final int maxIdleSeconds;
private final PidFileWatcherThread pidFileWatcherThread;
private final Path pidFile;
private final String pidInFile;
private final List<Path> filesToDeleteAtExit = new ArrayList<>();
private final int port;
private Server server;
private boolean serving;
public GrpcServerImpl(
BlazeCommandDispatcher dispatcher,
Clock clock,
int port,
Path serverDirectory,
int maxIdleSeconds,
boolean shutdownOnLowSysMem,
boolean doIdleServerTasks)
throws IOException {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutdownHook();
}
});
// server.pid was written in the C++ launcher after fork() but before exec() .
// The client only accesses the pid file after connecting to the socket
// which ensures that it gets the correct pid value.
pidFile = serverDirectory.getRelative("server.pid.txt");
pidInFile = new String(FileSystemUtils.readContentAsLatin1(pidFile));
deleteAtExit(pidFile);
this.dispatcher = dispatcher;
this.clock = clock;
this.serverDirectory = serverDirectory;
this.port = port;
this.maxIdleSeconds = maxIdleSeconds;
this.shutdownOnLowSysMem = shutdownOnLowSysMem;
this.serving = false;
this.streamExecutorPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("grpc-stream-%d").setDaemon(true).build());
this.commandExecutorPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("grpc-command-%d").setDaemon(true).build());
SecureRandom random = new SecureRandom();
requestCookie = generateCookie(random, 16);
responseCookie = generateCookie(random, 16);
pidFileWatcherThread = new PidFileWatcherThread();
pidFileWatcherThread.start();
commandManager = new CommandManager(doIdleServerTasks);
}
private static String generateCookie(SecureRandom random, int byteCount) {
byte[] bytes = new byte[byteCount];
random.nextBytes(bytes);
StringBuilder result = new StringBuilder();
for (byte b : bytes) {
result.append(Integer.toHexString(b + 128));
}
return result.toString();
}
/**
* This is called when the server is shut down as a result of a "clean --expunge".
*
* <p>In this case, no files should be deleted on shutdown hooks, since clean also deletes the
* lock file, and there is a small possibility of the following sequence of events:
*
* <ol>
* <li>Client 1 runs "blaze clean --expunge"
* <li>Client 2 runs a command and waits for client 1 to finish
* <li>The clean command deletes everything including the lock file
* <li>Client 2 starts running and since the output base is empty, starts up a new server, which
* creates its own socket and PID files
* <li>The server used by client runs its shutdown hooks, deleting the PID files created by the
* new server
* </ol>
*
* It also disables the "die when the PID file changes" handler so that it doesn't kill the server
* while the "clean --expunge" command is running.
*/
@Override
public void prepareForAbruptShutdown() {
disableShutdownHooks();
pidFileWatcherThread.signalShutdown();
}
@Override
public void interrupt() {
commandManager.interruptInflightCommands();
}
@Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
// For reasons only Apple knows, you cannot bind to IPv4-localhost when you run in a sandbox
// that only allows loopback traffic, but binding to IPv6-localhost works fine. This would
// however break on systems that don't support IPv6. So what we'll do is to try to bind to IPv6
// and if that fails, try again with IPv4.
InetSocketAddress address = new InetSocketAddress("[::1]", port);
try {
server =
NettyServerBuilder.forAddress(address)
.addService(commandServer)
.directExecutor()
.build()
.start();
} catch (IOException e) {
address = new InetSocketAddress("127.0.0.1", port);
server =
NettyServerBuilder.forAddress(address)
.addService(commandServer)
.directExecutor()
.build()
.start();
}
if (maxIdleSeconds > 0) {
Thread timeoutAndMemoryCheckingThread =
new Thread(
new ServerWatcherRunnable(
server, maxIdleSeconds, shutdownOnLowSysMem, commandManager));
timeoutAndMemoryCheckingThread.setName("grpc-timeout-and-memory");
timeoutAndMemoryCheckingThread.setDaemon(true);
timeoutAndMemoryCheckingThread.start();
}
serving = true;
writeServerFile(
PORT_FILE, InetAddresses.toUriString(address.getAddress()) + ":" + server.getPort());
writeServerFile(REQUEST_COOKIE_FILE, requestCookie);
writeServerFile(RESPONSE_COOKIE_FILE, responseCookie);
try {
server.awaitTermination();
} catch (InterruptedException e) {
// TODO(lberki): Handle SIGINT in a reasonable way
throw new IllegalStateException(e);
}
}
private void writeServerFile(String name, String contents) throws IOException {
Path file = serverDirectory.getChild(name);
FileSystemUtils.writeContentAsLatin1(file, contents);
deleteAtExit(file);
}
protected void disableShutdownHooks() {
runShutdownHooks.set(false);
}
private void shutdownHook() {
if (!runShutdownHooks.get()) {
return;
}
List<Path> files;
synchronized (filesToDeleteAtExit) {
files = new ArrayList<>(filesToDeleteAtExit);
}
for (Path path : files) {
try {
path.delete();
} catch (IOException e) {
printStack(e);
}
}
}
/**
* Schedule the specified file for (attempted) deletion at JVM exit.
*/
protected void deleteAtExit(final Path path) {
synchronized (filesToDeleteAtExit) {
filesToDeleteAtExit.add(path);
}
}
static void printStack(IOException e) {
/*
* Hopefully this never happens. It's not very nice to just write this
* to the user's console, but I'm not sure what better choice we have.
*/
StringWriter err = new StringWriter();
PrintWriter printErr = new PrintWriter(err);
printErr.println("=======[BAZEL SERVER: ENCOUNTERED IO EXCEPTION]=======");
e.printStackTrace(printErr);
printErr.println("=====================================================");
logger.severe(err.toString());
}
private void executeCommand(
RunRequest request, StreamObserver<RunResponse> observer, GrpcSink sink) {
sink.setCommandThread(Thread.currentThread());
if (!request.getCookie().equals(requestCookie) || request.getClientDescription().isEmpty()) {
try {
observer.onNext(
RunResponse.newBuilder()
.setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode())
.build());
observer.onCompleted();
} catch (StatusRuntimeException e) {
logger.info("Client cancelled command while rejecting it: " + e.getMessage());
}
return;
}
// There is a small period of time between calling setOnCancelHandler() and setCommandThread()
// during which the command thread is not interrupted when a cancel is signaled. Cover that
// case by explicitly checking for disconnection here.
if (sink.disconnected()) {
return;
}
String commandId;
BlazeCommandResult result;
// TODO(b/63925394): This information needs to be passed to the GotOptionsEvent, which does not
// currently have the explicit startup options. See Improved Command Line Reporting design doc
// for details.
// Convert the startup options record to Java strings, source first.
ImmutableList.Builder<Pair<String, String>> startupOptions = ImmutableList.builder();
for (StartupOption option : request.getStartupOptionsList()) {
// UTF-8 won't do because we want to be able to pass arbitrary binary strings.
// Not that the internals of Bazel handle that correctly, but why not make at least this
// little part correct?
startupOptions.add(new Pair<>(
option.getSource().toString(StandardCharsets.ISO_8859_1),
option.getOption().toString(StandardCharsets.ISO_8859_1)));
}
try (RunningCommand command = commandManager.create()) {
commandId = command.getId();
try {
// Send the client the command id as soon as we know it.
observer.onNext(
RunResponse.newBuilder()
.setCookie(responseCookie)
.setCommandId(commandId)
.build());
} catch (StatusRuntimeException e) {
logger.info(
"The client cancelled the command before receiving the command id: " + e.getMessage());
}
OutErr rpcOutErr =
OutErr.create(
new RpcOutputStream(command.getId(), responseCookie, StreamType.STDOUT, sink),
new RpcOutputStream(command.getId(), responseCookie, StreamType.STDERR, sink));
try {
// UTF-8 won't do because we want to be able to pass arbitrary binary strings.
// Not that the internals of Bazel handle that correctly, but why not make at least this
// little part correct?
ImmutableList<String> args = request.getArgList().stream()
.map(arg -> arg.toString(StandardCharsets.ISO_8859_1))
.collect(ImmutableList.toImmutableList());
InvocationPolicy policy = InvocationPolicyParser.parsePolicy(request.getInvocationPolicy());
logger.info(BlazeRuntime.getRequestLogString(args));
result =
dispatcher.exec(
policy,
args,
rpcOutErr,
request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
request.getClientDescription(),
clock.currentTimeMillis(),
Optional.of(startupOptions.build()));
} catch (OptionsParsingException e) {
rpcOutErr.printErrLn(e.getMessage());
result = BlazeCommandResult.exitCode(ExitCode.COMMAND_LINE_ERROR);
}
} catch (InterruptedException e) {
result = BlazeCommandResult.exitCode(ExitCode.INTERRUPTED);
commandId = ""; // The default value, the client will ignore it
}
if (sink.finish()) {
// Client disconnected. Then we are not allowed to call any methods on the observer.
logger.info(
String.format(
"Client disconnected before we could send exit code for command %s", commandId));
return;
}
// There is a chance that an Uninterruptibles#getUninterruptibly() leaves us with the
// interrupt bit set. So we just reset the interruption state here to make these cancel
// requests not have any effect outside of command execution (after the try block above,
// the cancel request won't find the thread to interrupt)
Thread.interrupted();
if (result.shutdown()) {
server.shutdown();
}
RunResponse.Builder response = RunResponse.newBuilder()
.setCookie(responseCookie)
.setCommandId(commandId)
.setFinished(true)
.setTerminationExpected(result.shutdown());
if (result.getExecRequest() != null) {
response.setExitCode(0);
response.setExecRequest(result.getExecRequest());
} else {
response.setExitCode(result.getExitCode().getNumericExitCode());
}
try {
observer.onNext(response.build());
observer.onCompleted();
} catch (StatusRuntimeException e) {
// The client cancelled the call. Log an error and go on.
logger.info(
String.format(
"Client cancelled command %s just right before its end: %s",
commandId, e.getMessage()));
}
}
private final CommandServerGrpc.CommandServerImplBase commandServer =
new CommandServerGrpc.CommandServerImplBase() {
@Override
public void run(final RunRequest request, final StreamObserver<RunResponse> observer) {
final GrpcSink sink =
new GrpcSink(
"Run", (ServerCallStreamObserver<RunResponse>) observer, streamExecutorPool);
// Switch to our own threads so that onReadyStateHandler can be called (see class-level
// comment)
commandExecutorPool.execute(() -> executeCommand(request, observer, sink));
}
@Override
public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObserver) {
Preconditions.checkState(serving);
try (RunningCommand command = commandManager.create()) {
PingResponse.Builder response = PingResponse.newBuilder();
if (pingRequest.getCookie().equals(requestCookie)) {
response.setCookie(responseCookie);
}
streamObserver.onNext(response.build());
streamObserver.onCompleted();
}
}
@Override
public void cancel(
final CancelRequest request, final StreamObserver<CancelResponse> streamObserver) {
logger.info(String.format("Got CancelRequest for command id %s", request.getCommandId()));
if (!request.getCookie().equals(requestCookie)) {
streamObserver.onCompleted();
return;
}
// Actually performing the cancellation can result in some blocking which we don't want
// to do on the dispatcher thread, instead offload to command pool.
commandExecutorPool.execute(() -> doCancel(request, streamObserver));
}
private void doCancel(
CancelRequest request, StreamObserver<CancelResponse> streamObserver) {
commandManager.doCancel(request);
try {
streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build());
streamObserver.onCompleted();
} catch (StatusRuntimeException e) {
// There is no one to report the failure to
logger.info(
"Client cancelled RPC of cancellation request for " + request.getCommandId());
}
}
};
}