blob: 0912b1f3753d5b66a4b4571f38f25293b5aa7b66 [file] [log] [blame]
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.remote.worker;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.remote.CacheNotFoundException;
import com.google.devtools.build.lib.remote.ExecutionStatusException;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.shell.AbnormalTerminationException;
import com.google.devtools.build.lib.shell.Command;
import com.google.devtools.build.lib.shell.CommandException;
import com.google.devtools.build.lib.shell.CommandResult;
import com.google.devtools.build.lib.shell.FutureCommandResult;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Action;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
import com.google.devtools.remoteexecution.v1test.Platform;
import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.longrunning.Operation;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.Context;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/** A basic implementation of an {@link ExecutionImplBase} service. */
final class ExecutionServer extends ExecutionImplBase {
private static final Logger logger = Logger.getLogger(ExecutionServer.class.getName());
private final Object lock = new Object();
// The name of the container image entry in the Platform proto
// (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
// experimental_remote_platform_override in
// src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java)
private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image";
private static final String DOCKER_IMAGE_PREFIX = "docker://";
// How long to wait for the uid command.
private static final Duration uidTimeout = Duration.ofMillis(30);
private static final int LOCAL_EXEC_ERROR = -1;
private final Path workPath;
private final Path sandboxPath;
private final RemoteWorkerOptions workerOptions;
private final SimpleBlobStoreActionCache cache;
private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
private final ListeningExecutorService executorService;
private final DigestUtil digestUtil;
public ExecutionServer(
Path workPath,
Path sandboxPath,
RemoteWorkerOptions workerOptions,
SimpleBlobStoreActionCache cache,
ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache,
DigestUtil digestUtil) {
this.workPath = workPath;
this.sandboxPath = sandboxPath;
this.workerOptions = workerOptions;
this.cache = cache;
this.operationsCache = operationsCache;
this.digestUtil = digestUtil;
ThreadPoolExecutor realExecutor = new ThreadPoolExecutor(
// This is actually the max number of concurrent jobs.
workerOptions.jobs,
// Since we use an unbounded queue, the executor ignores this value, but it still checks
// that it is greater or equal to the value above.
workerOptions.jobs,
// Shut down idle threads after one minute. Threads aren't all that expensive, but we also
// don't need to keep them around if we don't need them.
1, TimeUnit.MINUTES,
// We use an unbounded queue for now.
// TODO(ulfjack): We need to reject work eventually.
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("subprocess-handler-%d").build());
// Allow the core threads to die.
realExecutor.allowCoreThreadTimeOut(true);
this.executorService = MoreExecutors.listeningDecorator(realExecutor);
}
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
final String opName = UUID.randomUUID().toString();
ListenableFuture<ActionResult> future =
executorService.submit(Context.current().wrap(() -> execute(request, opName)));
operationsCache.put(opName, future);
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
responseObserver.onCompleted();
}
private ActionResult execute(ExecuteRequest request, String id)
throws IOException, InterruptedException, StatusException {
Path tempRoot = workPath.getRelative("build-" + id);
String workDetails = "";
try {
tempRoot.createDirectory();
RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
workDetails =
String.format(
"build-request-id: %s command-id: %s action-id: %s",
meta.getCorrelatedInvocationsId(), meta.getToolInvocationId(), meta.getActionId());
logger.log(FINE, "Received work for: {0}", workDetails);
ActionResult result = execute(request.getAction(), tempRoot);
logger.log(FINE, "Completed {0}.", workDetails);
return result;
} catch (Exception e) {
logger.log(Level.SEVERE, "Work failed: {0} {1}.", new Object[] {workDetails, e});
throw e;
} finally {
if (workerOptions.debug) {
logger.log(INFO, "Preserving work directory {0}.", tempRoot);
} else {
try {
FileSystemUtils.deleteTree(tempRoot);
} catch (IOException e) {
logger.log(SEVERE,
String.format(
"Failed to delete tmp directory %s: %s",
tempRoot, Throwables.getStackTraceAsString(e)));
}
}
}
}
private ActionResult execute(Action action, Path execRoot)
throws IOException, InterruptedException, StatusException {
com.google.devtools.remoteexecution.v1test.Command command = null;
try {
command =
com.google.devtools.remoteexecution.v1test.Command.parseFrom(
cache.downloadBlob(action.getCommandDigest()));
cache.downloadTree(action.getInputRootDigest(), execRoot);
} catch (CacheNotFoundException e) {
throw StatusUtils.notFoundError(e.getMissingDigest());
}
List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
for (String output : action.getOutputFilesList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output file already exists: " + file);
}
FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
outputs.add(file);
}
for (String output : action.getOutputDirectoriesList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output directory/file already exists: " + file);
}
FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
outputs.add(file);
}
// TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
// implementation instead of copying it.
Command cmd =
getCommand(
action,
command.getArgumentsList(),
getEnvironmentVariables(command),
execRoot.getPathString());
long startTime = System.currentTimeMillis();
CommandResult cmdResult = null;
FutureCommandResult futureCmdResult = null;
synchronized (lock) {
// Linux does not provide a safe API for a multi-threaded program to fork a subprocess.
// Consider the case where two threads both write an executable file and then try to execute
// it. It can happen that the first thread writes its executable file, with the file
// descriptor still being open when the second thread forks, with the fork inheriting a copy
// of the file descriptor. Then the first thread closes the original file descriptor, and
// proceeds to execute the file. At that point Linux sees an open file descriptor to the file
// and returns ETXTBSY (Text file busy) as an error. This race is inherent in the fork / exec
// duality, with fork always inheriting a copy of the file descriptor table; if there was a
// way to fork without copying the entire file descriptor table (e.g., only copy specific
// entries), we could avoid this race.
//
// I was able to reproduce this problem reliably by running significantly more threads than
// there are CPU cores on my workstation - the more threads the more likely it happens.
//
// As a workaround, we put a synchronized block around the fork.
try {
futureCmdResult = cmd.executeAsync();
} catch (CommandException e) {
Throwables.throwIfInstanceOf(e.getCause(), IOException.class);
}
}
if (futureCmdResult != null) {
try {
cmdResult = futureCmdResult.get();
} catch (AbnormalTerminationException e) {
cmdResult = e.getResult();
}
}
long timeoutMillis =
action.hasTimeout()
? Durations.toMillis(action.getTimeout())
: TimeUnit.MINUTES.toMillis(15);
boolean wasTimeout =
(cmdResult != null && cmdResult.getTerminationStatus().timedOut())
|| wasTimeout(timeoutMillis, System.currentTimeMillis() - startTime);
final int exitCode;
Status errStatus = null;
ExecuteResponse.Builder resp = ExecuteResponse.newBuilder();
if (wasTimeout) {
final String errMessage =
String.format(
"Command:\n%s\nexceeded deadline of %f seconds.",
Arrays.toString(command.getArgumentsList().toArray()), timeoutMillis / 1000.0);
logger.warning(errMessage);
errStatus =
Status.newBuilder()
.setCode(Code.DEADLINE_EXCEEDED.getNumber())
.setMessage(errMessage)
.build();
exitCode = LOCAL_EXEC_ERROR;
} else if (cmdResult == null) {
exitCode = LOCAL_EXEC_ERROR;
} else {
exitCode = cmdResult.getTerminationStatus().getRawExitCode();
}
ActionResult.Builder result = ActionResult.newBuilder();
try {
cache.upload(result, execRoot, outputs);
} catch (ExecException e) {
if (errStatus == null) {
errStatus =
Status.newBuilder()
.setCode(Code.FAILED_PRECONDITION.getNumber())
.setMessage(e.getMessage())
.build();
}
}
byte[] stdout = cmdResult.getStdout();
byte[] stderr = cmdResult.getStderr();
cache.uploadOutErr(result, stdout, stderr);
ActionResult finalResult = result.setExitCode(exitCode).build();
resp.setResult(finalResult);
if (errStatus != null) {
resp.setStatus(errStatus);
throw new ExecutionStatusException(errStatus, resp.build());
} else if (exitCode == 0 && !action.getDoNotCache()) {
ActionKey actionKey = digestUtil.computeActionKey(action);
cache.setCachedActionResult(actionKey, finalResult);
}
return finalResult;
}
// Returns true if the OS being run on is Windows (or some close approximation thereof).
private boolean isWindows() {
return System.getProperty("os.name").startsWith("Windows");
}
private boolean wasTimeout(long timeoutMillis, long wallTimeMillis) {
return timeoutMillis > 0 && wallTimeMillis > timeoutMillis;
}
private Map<String, String> getEnvironmentVariables(
com.google.devtools.remoteexecution.v1test.Command command) {
HashMap<String, String> result = new HashMap<>();
for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
result.put(v.getName(), v.getValue());
}
return result;
}
// Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other
// platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric
// number, etc), logs a WARNING and return -1.
// This is used to set "-u UID" flag for commands running inside Docker containers. There are
// only a small handful of cases where uid is vital (e.g., if strict permissions are set on the
// output files), so most use cases would work without setting uid.
private long getUid() {
Command cmd =
new Command(
new String[] {"id", "-u"},
/*environmentVariables=*/null,
/*workingDirectory=*/null,
uidTimeout);
try {
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
cmd.execute(stdout, stderr);
return Long.parseLong(stdout.toString().trim());
} catch (CommandException | NumberFormatException e) {
logger.log(
WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e);
return -1;
}
}
// Checks Action for docker container definition. If no docker container specified, returns
// null. Otherwise returns docker container name from the parameters.
private String dockerContainer(Action action) throws StatusException {
String result = null;
for (Platform.Property property : action.getPlatform().getPropertiesList()) {
if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
if (result != null) {
// Multiple container name entries
throw StatusUtils.invalidArgumentError(
"platform", // Field name.
String.format(
"Multiple entries for %s in action.Platform", CONTAINER_IMAGE_ENTRY_NAME));
}
result = property.getValue();
if (!result.startsWith(DOCKER_IMAGE_PREFIX)) {
throw StatusUtils.invalidArgumentError(
"platform", // Field name.
String.format(
"%s: Docker images must be stored in gcr.io with an image spec in the form "
+ "'docker://gcr.io/{IMAGE_NAME}'",
CONTAINER_IMAGE_ENTRY_NAME));
}
result = result.substring(DOCKER_IMAGE_PREFIX.length());
}
}
return result;
}
// Takes an Action and parameters that can be used to create a Command. Returns the Command.
// If no docker container is specified inside Action, creates a Command straight from the
// arguments. Otherwise, returns a Command that would run the specified command inside the
// specified docker container.
private Command getCommand(
Action action,
List<String> commandLineElements,
Map<String, String> environmentVariables,
String pathString) throws StatusException {
String container = dockerContainer(action);
if (container != null) {
// Run command inside a docker container.
ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
newCommandLineElements.add("docker");
newCommandLineElements.add("run");
// -u doesn't currently make sense for Windows:
// https://github.com/docker/for-win/issues/636#issuecomment-293653788
if (!isWindows()) {
long uid = getUid();
if (uid >= 0) {
newCommandLineElements.add("-u");
newCommandLineElements.add(Long.toString(uid));
}
}
String dockerPathString = pathString + "-docker";
newCommandLineElements.add("-v");
newCommandLineElements.add(pathString + ":" + dockerPathString);
newCommandLineElements.add("-w");
newCommandLineElements.add(dockerPathString);
for (Map.Entry<String, String> entry : environmentVariables.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
newCommandLineElements.add("-e");
newCommandLineElements.add(key + "=" + value);
}
newCommandLineElements.add(container);
newCommandLineElements.addAll(commandLineElements);
return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString));
} else if (sandboxPath != null) {
// Run command with sandboxing.
ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
newCommandLineElements.add(sandboxPath.getPathString());
if (workerOptions.sandboxingBlockNetwork) {
newCommandLineElements.add("-N");
}
for (String writablePath : workerOptions.sandboxingWritablePaths) {
newCommandLineElements.add("-w");
newCommandLineElements.add(writablePath);
}
for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) {
newCommandLineElements.add("-e");
newCommandLineElements.add(tmpfsDir);
}
newCommandLineElements.add("--");
newCommandLineElements.addAll(commandLineElements);
return new Command(
newCommandLineElements.toArray(new String[0]),
environmentVariables,
new File(pathString));
} else {
// Just run the command.
return new Command(
commandLineElements.toArray(new String[0]), environmentVariables, new File(pathString));
}
}
}