blob: 6276d4e60bb34ff62fa46da00394fc7ab6a6f597 [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.bugreport.BugReport;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.runtime.BlazeCommandResult;
import com.google.devtools.build.lib.runtime.BlazeRuntime;
import com.google.devtools.build.lib.runtime.CommandDispatcher;
import com.google.devtools.build.lib.runtime.CommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy;
import com.google.devtools.build.lib.server.CommandManager.RunningCommand;
import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
import com.google.devtools.build.lib.server.CommandProtos.CancelResponse;
import com.google.devtools.build.lib.server.CommandProtos.PingRequest;
import com.google.devtools.build.lib.server.CommandProtos.PingResponse;
import com.google.devtools.build.lib.server.CommandProtos.RunRequest;
import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.server.CommandProtos.ServerInfo;
import com.google.devtools.build.lib.server.CommandProtos.StartupOption;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.common.options.InvocationPolicyParser;
import com.google.devtools.common.options.OptionsParsingException;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Server;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
* gRPC server class.
*
* <p>Only this class should depend on gRPC so that we only need to exclude this during
* bootstrapping.
*
* <p>This class is a little complicated and rich in multithreading, so an explanation of its
* innards follows.
*
* <p>We use the direct executor for gRPC so that it calls our methods directly on its event handler
* threads (which it creates itself). This is acceptable for {@code ping()} and {@code cancel()}
* because they run very quickly. For {@code run()}, we transfer the call to our own threads in
* {@code commandExecutorPool}. We do this instead of setting an executor on the server object
* because gRPC insists on serializing calls within a single RPC call, which means that the Runnable
* passed to {@code setOnReadyHandler} doesn't get called while the main RPC method is running,
* which means we can't use flow control, which we need so that gRPC doesn't buffer an unbounded
* amount of outgoing data.
*
* <p>Two threads are spawned for each command: one that handles the command in {@code
* commandExecutorPool} and one that streams the result back to the client in {@code
* streamExecutorPool}.
*
* <p>In addition to these threads, we maintain one extra thread for handling the server timeout and
* an interrupt watcher thread is started for each interrupt request that logs if it takes too long
* to take effect.
*
* <p>Each running RPC has a UUID associated with it that is used to identify it when a client wants
* to cancel it. Cancellation is done by the client sending the server a {@code cancel()} RPC call
* which results in the main thread of the command being interrupted.
*/
public class GrpcServerImpl extends CommandServerGrpc.CommandServerImplBase implements RPCServer {
private static final Logger logger = Logger.getLogger(GrpcServerImpl.class.getName());
private final boolean shutdownOnLowSysMem;
/**
* Factory class. Instantiated by reflection.
*
* <p>Used so that method calls using reflection are as simple as possible.
*/
public static class Factory implements RPCServer.Factory {
@Override
public RPCServer create(
CommandDispatcher dispatcher,
Clock clock,
int port,
Path serverDirectory,
int maxIdleSeconds,
boolean shutdownOnLowSysMem,
boolean idleServerTasks)
throws IOException {
SecureRandom random = new SecureRandom();
return new GrpcServerImpl(
dispatcher,
clock,
port,
generateCookie(random, 16),
generateCookie(random, 16),
serverDirectory,
maxIdleSeconds,
shutdownOnLowSysMem,
idleServerTasks);
}
}
@VisibleForTesting
enum StreamType {
STDOUT,
STDERR,
}
/**
* A wrapper for {@link StreamObserver} that blocks on {@link #onNext} calls if the underlying
* observer is not ready.
*
* <p>It does not react to the interrupt flag in order to allow Bazel to complete the current
* command while printing output as well as sending the final exit code to the client. However, it
* maintains the interrupt flag if it is already set.
*/
// TODO(ulfjack): Move FlowControl and its tests to top-level classes.
@VisibleForTesting
static class BlockingStreamObserver<T> {
private final ServerCallStreamObserver<T> observer;
BlockingStreamObserver(ServerCallStreamObserver<T> observer) {
this.observer = observer;
this.observer.setOnReadyHandler(this::notifyWaiters);
this.observer.setOnCancelHandler(this::notifyWaiters);
}
private synchronized void notifyWaiters() {
// This class does not restrict the number of concurrent calls to onNext, so we call notifyAll
// here. In practice we'll usually only see one concurrent call; the ExperimentalEventHandler
// uses synchronization to prevent multiple concurrent calls, but let's not rely on that here.
notifyAll();
}
public synchronized void onNext(T response) {
boolean interrupted = false;
while (!observer.isReady() && !observer.isCancelled()) {
try {
wait();
} catch (InterruptedException e) {
// We intentionally do not break or return here. The interrupt signal can be due the user
// pressing ctrl-c: it can take Bazel a while to shut down (e.g., it is not currently
// possible to interrupt persistent workers), and we must allow it to continue printing
// output until the current operation comes to a finish.
interrupted = true;
}
}
try {
// According to the documentation, if onNext is called in a canceled stream, it will be
// silently ignored.
observer.onNext(response);
} finally {
// onNext does not specify whether it can throw unchecked exceptions. We use a finally block
// here to make sure that the interrupt bit is not lost even if it does.
if (interrupted || observer.isCancelled()) {
Thread.currentThread().interrupt();
}
}
}
public void onCompleted() {
observer.onCompleted();
}
}
/**
* An output stream that forwards the data written to it over the gRPC command stream.
*
* <p>Note that wraping this class with a {@code Channel} can cause a deadlock if there is an
* {@link OutputStream} in between that synchronizes both on {@code #close()} and {@code #write()}
* because then if an interrupt happens in {@code FlowControl#onNext}, the thread on which {@code
* interrupt()} was called will wait until the {@code Channel} closes itself while holding a lock
* for interrupting the thread on which {@code FlowControl#onNext} is being executed and that
* thread will hold a lock that is needed for the {@code Channel} to be closed and call {@code
* interrupt()} in {@code FlowControl#onNext}, which will in turn try to acquire the interrupt
* lock.
*/
private static class RpcOutputStream extends OutputStream {
private static final int CHUNK_SIZE = 8192;
// Store commandId and responseCookie as ByteStrings to avoid String -> UTF8 bytes conversion
// for each serialized chunk of output.
private final ByteString commandIdBytes;
private final ByteString responseCookieBytes;
private final StreamType type;
private final BlockingStreamObserver<RunResponse> observer;
RpcOutputStream(
String commandId,
String responseCookie,
StreamType type,
BlockingStreamObserver<RunResponse> observer) {
this.commandIdBytes = ByteString.copyFromUtf8(commandId);
this.responseCookieBytes = ByteString.copyFromUtf8(responseCookie);
this.type = type;
this.observer = observer;
}
@Override
public void write(byte[] b, int off, int inlen) throws IOException {
for (int i = 0; i < inlen; i += CHUNK_SIZE) {
ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i));
RunResponse.Builder response = RunResponse
.newBuilder()
.setCookieBytes(responseCookieBytes)
.setCommandIdBytes(commandIdBytes);
switch (type) {
case STDOUT: response.setStandardOutput(input); break;
case STDERR: response.setStandardError(input); break;
default: throw new IllegalStateException();
}
try {
// This can block waiting for the client to read the available data.
observer.onNext(response.build());
} catch (StatusRuntimeException e) {
// I am not sure whether there are any circumstances under which this call could throw an
// exception, but I'd rather it be logged than that we crash silently. The documentation
// only says that onNext does not throw a CancelledException if the stream is canceled,
// but otherwise does not say anything about exceptions that can be thrown from onNext.
// Note that Blaze redirects System.{out,err} to this output stream, so attempting to call
// printStackTrace() from here could go into an infinite loop.
BugReport.sendBugReport(e);
Thread.currentThread().interrupt();
}
}
}
@Override
public void write(int byteAsInt) throws IOException {
write(new byte[] {(byte) byteAsInt}, 0, 1);
}
}
/**
* A thread that watches if the PID file changes and shuts down the server immediately if so.
*/
private class PidFileWatcherThread extends Thread {
private boolean shuttingDown = false;
private PidFileWatcherThread() {
super("pid-file-watcher");
setDaemon(true);
}
// The synchronized block is here so that if the "PID file deleted" timer kicks in during a
// regular shutdown, they don't race.
private synchronized void signalShutdown() {
shuttingDown = true;
}
@Override
public void run() {
while (true) {
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
boolean ok = false;
try {
String pidFileContents = new String(FileSystemUtils.readContentAsLatin1(pidFile));
ok = pidFileContents.equals(pidInFile);
} catch (IOException e) {
logger.info("Cannot read PID file: " + e.getMessage());
// Handled by virtue of ok not being set to true
}
if (!ok) {
synchronized (PidFileWatcherThread.this) {
if (shuttingDown) {
logger.warning("PID file deleted or overwritten but shutdown is already in progress");
break;
}
shuttingDown = true;
// Someone overwrote the PID file. Maybe it's another server, so shut down as quickly
// as possible without even running the shutdown hooks (that would delete it)
logger.severe("PID file deleted or overwritten, exiting as quickly as possible");
Runtime.getRuntime().halt(ExitCode.BLAZE_INTERNAL_ERROR.getNumericExitCode());
}
}
}
}
}
// These paths are all relative to the server directory
private static final String PORT_FILE = "command_port";
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
private static final String SERVER_INFO_FILE = "server_info.rawproto";
private static final AtomicBoolean runShutdownHooks = new AtomicBoolean(true);
private final CommandManager commandManager;
private final CommandDispatcher dispatcher;
private final Executor commandExecutorPool;
private final Clock clock;
private final Path serverDirectory;
private final String requestCookie;
private final String responseCookie;
private final int maxIdleSeconds;
private final PidFileWatcherThread pidFileWatcherThread;
private final Path pidFile;
private final String pidInFile;
private final List<Path> filesToDeleteAtExit = new ArrayList<>();
private final int port;
private Server server;
private boolean serving;
@VisibleForTesting
GrpcServerImpl(
CommandDispatcher dispatcher,
Clock clock,
int port,
String requestCookie,
String responseCookie,
Path serverDirectory,
int maxIdleSeconds,
boolean shutdownOnLowSysMem,
boolean doIdleServerTasks)
throws IOException {
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdownHook()));
// server.pid was written in the C++ launcher after fork() but before exec() .
// The client only accesses the pid file after connecting to the socket
// which ensures that it gets the correct pid value.
pidFile = serverDirectory.getRelative("server.pid.txt");
pidInFile = new String(FileSystemUtils.readContentAsLatin1(pidFile));
deleteAtExit(pidFile);
this.dispatcher = dispatcher;
this.clock = clock;
this.serverDirectory = serverDirectory;
this.port = port;
this.maxIdleSeconds = maxIdleSeconds;
this.shutdownOnLowSysMem = shutdownOnLowSysMem;
this.serving = false;
this.commandExecutorPool =
Context.currentContextExecutor(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("grpc-command-%d")
.setDaemon(true)
.build()));
this.requestCookie = requestCookie;
this.responseCookie = responseCookie;
pidFileWatcherThread = new PidFileWatcherThread();
pidFileWatcherThread.start();
commandManager = new CommandManager(doIdleServerTasks);
}
private static String generateCookie(SecureRandom random, int byteCount) {
byte[] bytes = new byte[byteCount];
random.nextBytes(bytes);
StringBuilder result = new StringBuilder();
for (byte b : bytes) {
result.append(Integer.toHexString(b + 128));
}
return result.toString();
}
/**
* This is called when the server is shut down as a result of a "clean --expunge".
*
* <p>In this case, no files should be deleted on shutdown hooks, since clean also deletes the
* lock file, and there is a small possibility of the following sequence of events:
*
* <ol>
* <li>Client 1 runs "blaze clean --expunge"
* <li>Client 2 runs a command and waits for client 1 to finish
* <li>The clean command deletes everything including the lock file
* <li>Client 2 starts running and since the output base is empty, starts up a new server, which
* creates its own socket and PID files
* <li>The server used by client runs its shutdown hooks, deleting the PID files created by the
* new server
* </ol>
*
* It also disables the "die when the PID file changes" handler so that it doesn't kill the server
* while the "clean --expunge" command is running.
*/
@Override
public void prepareForAbruptShutdown() {
disableShutdownHooks();
pidFileWatcherThread.signalShutdown();
}
@Override
public void interrupt() {
commandManager.interruptInflightCommands();
}
@Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
// For reasons only Apple knows, you cannot bind to IPv4-localhost when you run in a sandbox
// that only allows loopback traffic, but binding to IPv6-localhost works fine. This would
// however break on systems that don't support IPv6. So what we'll do is to try to bind to IPv6
// and if that fails, try again with IPv4.
InetSocketAddress address = new InetSocketAddress("[::1]", port);
try {
server =
NettyServerBuilder.forAddress(address).addService(this).directExecutor().build().start();
} catch (IOException e) {
address = new InetSocketAddress("127.0.0.1", port);
server =
NettyServerBuilder.forAddress(address).addService(this).directExecutor().build().start();
}
if (maxIdleSeconds > 0) {
Thread timeoutAndMemoryCheckingThread =
new Thread(
new ServerWatcherRunnable(
server, maxIdleSeconds, shutdownOnLowSysMem, commandManager));
timeoutAndMemoryCheckingThread.setName("grpc-timeout-and-memory");
timeoutAndMemoryCheckingThread.setDaemon(true);
timeoutAndMemoryCheckingThread.start();
}
serving = true;
writeServerStatusFiles(address);
try {
server.awaitTermination();
} catch (InterruptedException e) {
// TODO(lberki): Handle SIGINT in a reasonable way
throw new IllegalStateException(e);
}
}
private void writeServerStatusFiles(InetSocketAddress address) throws IOException {
String addressString = InetAddresses.toUriString(address.getAddress()) + ":" + server.getPort();
writeServerFile(PORT_FILE, addressString);
writeServerFile(REQUEST_COOKIE_FILE, requestCookie);
writeServerFile(RESPONSE_COOKIE_FILE, responseCookie);
ServerInfo info =
ServerInfo.newBuilder()
.setPid(Integer.parseInt(pidInFile))
.setAddress(addressString)
.setRequestCookie(requestCookie)
.setResponseCookie(responseCookie)
.build();
// Write then mv so the user never sees incomplete contents.
Path serverInfoTmpFile = serverDirectory.getChild(SERVER_INFO_FILE + ".tmp");
try (OutputStream out = serverInfoTmpFile.getOutputStream()) {
info.writeTo(out);
}
Path serverInfoFile = serverDirectory.getChild(SERVER_INFO_FILE);
serverInfoTmpFile.renameTo(serverInfoFile);
deleteAtExit(serverInfoFile);
}
private void writeServerFile(String name, String contents) throws IOException {
Path file = serverDirectory.getChild(name);
FileSystemUtils.writeContentAsLatin1(file, contents);
deleteAtExit(file);
}
protected void disableShutdownHooks() {
runShutdownHooks.set(false);
}
private void shutdownHook() {
if (!runShutdownHooks.get()) {
return;
}
List<Path> files;
synchronized (filesToDeleteAtExit) {
files = new ArrayList<>(filesToDeleteAtExit);
}
for (Path path : files) {
try {
path.delete();
} catch (IOException e) {
printStack(e);
}
}
}
/**
* Schedule the specified file for (attempted) deletion at JVM exit.
*/
protected void deleteAtExit(final Path path) {
synchronized (filesToDeleteAtExit) {
filesToDeleteAtExit.add(path);
}
}
static void printStack(IOException e) {
/*
* Hopefully this never happens. It's not very nice to just write this
* to the user's console, but I'm not sure what better choice we have.
*/
StringWriter err = new StringWriter();
PrintWriter printErr = new PrintWriter(err);
printErr.println("=======[BAZEL SERVER: ENCOUNTERED IO EXCEPTION]=======");
e.printStackTrace(printErr);
printErr.println("=====================================================");
logger.severe(err.toString());
}
private void executeCommand(RunRequest request, BlockingStreamObserver<RunResponse> observer) {
if (!request.getCookie().equals(requestCookie) || request.getClientDescription().isEmpty()) {
try {
observer.onNext(
RunResponse.newBuilder()
.setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode())
.build());
observer.onCompleted();
} catch (StatusRuntimeException e) {
logger.info("Client cancelled command while rejecting it: " + e.getMessage());
}
return;
}
String commandId;
BlazeCommandResult result;
// TODO(b/63925394): This information needs to be passed to the GotOptionsEvent, which does not
// currently have the explicit startup options. See Improved Command Line Reporting design doc
// for details.
// Convert the startup options record to Java strings, source first.
ImmutableList.Builder<Pair<String, String>> startupOptions = ImmutableList.builder();
for (StartupOption option : request.getStartupOptionsList()) {
// UTF-8 won't do because we want to be able to pass arbitrary binary strings.
// Not that the internals of Bazel handle that correctly, but why not make at least this
// little part correct?
startupOptions.add(new Pair<>(
option.getSource().toString(StandardCharsets.ISO_8859_1),
option.getOption().toString(StandardCharsets.ISO_8859_1)));
}
try (RunningCommand command = commandManager.create()) {
commandId = command.getId();
try {
// Send the client the command id as soon as we know it.
observer.onNext(
RunResponse.newBuilder()
.setCookie(responseCookie)
.setCommandId(commandId)
.build());
} catch (StatusRuntimeException e) {
logger.info(
"The client cancelled the command before receiving the command id: " + e.getMessage());
}
OutErr rpcOutErr =
OutErr.create(
new RpcOutputStream(command.getId(), responseCookie, StreamType.STDOUT, observer),
new RpcOutputStream(command.getId(), responseCookie, StreamType.STDERR, observer));
try {
// UTF-8 won't do because we want to be able to pass arbitrary binary strings.
// Not that the internals of Bazel handle that correctly, but why not make at least this
// little part correct?
ImmutableList<String> args = request.getArgList().stream()
.map(arg -> arg.toString(StandardCharsets.ISO_8859_1))
.collect(ImmutableList.toImmutableList());
InvocationPolicy policy = InvocationPolicyParser.parsePolicy(request.getInvocationPolicy());
logger.info(BlazeRuntime.getRequestLogString(args));
result =
dispatcher.exec(
policy,
args,
rpcOutErr,
request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
request.getClientDescription(),
clock.currentTimeMillis(),
Optional.of(startupOptions.build()));
} catch (OptionsParsingException e) {
rpcOutErr.printErrLn(e.getMessage());
result = BlazeCommandResult.exitCode(ExitCode.COMMAND_LINE_ERROR);
}
} catch (InterruptedException e) {
result = BlazeCommandResult.exitCode(ExitCode.INTERRUPTED);
commandId = ""; // The default value, the client will ignore it
}
RunResponse.Builder response = RunResponse.newBuilder()
.setCookie(responseCookie)
.setCommandId(commandId)
.setFinished(true)
.setTerminationExpected(result.shutdown());
if (result.getExecRequest() != null) {
response.setExitCode(0);
response.setExecRequest(result.getExecRequest());
} else {
response.setExitCode(result.getExitCode().getNumericExitCode());
}
try {
observer.onNext(response.build());
observer.onCompleted();
} catch (StatusRuntimeException e) {
logger.info(
"The client cancelled the command before receiving the command id: " + e.getMessage());
}
if (result.shutdown()) {
server.shutdown();
}
}
@Override
public void run(final RunRequest request, final StreamObserver<RunResponse> observer) {
// Switch to our own threads so that onReadyStateHandler can be called (see class-level
// comment).
ServerCallStreamObserver<RunResponse> serverCallStreamObserver =
((ServerCallStreamObserver<RunResponse>) observer);
BlockingStreamObserver<RunResponse> blockingStreamObserver =
new BlockingStreamObserver<>(serverCallStreamObserver);
commandExecutorPool.execute(() -> executeCommand(request, blockingStreamObserver));
}
@Override
public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObserver) {
try (RunningCommand command = commandManager.create()) {
PingResponse.Builder response = PingResponse.newBuilder();
if (pingRequest.getCookie().equals(requestCookie)) {
response.setCookie(responseCookie);
}
streamObserver.onNext(response.build());
streamObserver.onCompleted();
}
}
@Override
public void cancel(
final CancelRequest request, final StreamObserver<CancelResponse> streamObserver) {
logger.info(String.format("Got CancelRequest for command id %s", request.getCommandId()));
if (!request.getCookie().equals(requestCookie)) {
streamObserver.onCompleted();
return;
}
// Actually performing the cancellation can result in some blocking which we don't want
// to do on the dispatcher thread, instead offload to command pool.
commandExecutorPool.execute(() -> doCancel(request, streamObserver));
}
private void doCancel(CancelRequest request, StreamObserver<CancelResponse> streamObserver) {
commandManager.doCancel(request);
try {
streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build());
streamObserver.onCompleted();
} catch (StatusRuntimeException e) {
// There is no one to report the failure to
logger.info("Client cancelled RPC of cancellation request for " + request.getCommandId());
}
}
}