blob: 5325b8fd4bd8fd707e909ee1637256f022e2f72f [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.remote;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.remote.CacheNotFoundException;
import com.google.devtools.build.lib.remote.ConcurrentMapActionCache;
import com.google.devtools.build.lib.remote.ConcurrentMapFactory;
import com.google.devtools.build.lib.remote.ContentDigests;
import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
import com.google.devtools.build.lib.remote.RemoteOptions;
import com.google.devtools.build.lib.remote.RemoteProtocol;
import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry;
import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
import com.google.devtools.build.lib.shell.AbnormalTerminationException;
import com.google.devtools.build.lib.shell.Command;
import com.google.devtools.build.lib.shell.CommandException;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.util.ProcessUtils;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.UnixFileSystem;
import com.google.devtools.common.options.OptionsParser;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Implements a remote worker that accepts work items as protobufs. The server implementation is
* based on grpc.
*/
public class RemoteWorker extends ExecuteServiceImplBase {
private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
private final Path workPath;
private final RemoteOptions remoteOptions;
private final RemoteWorkerOptions options;
private final ConcurrentMapActionCache cache;
public RemoteWorker(
Path workPath,
RemoteOptions remoteOptions,
RemoteWorkerOptions options,
ConcurrentMapActionCache cache) {
this.workPath = workPath;
this.remoteOptions = remoteOptions;
this.options = options;
this.cache = cache;
}
private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
HashMap<String, String> result = new HashMap<>();
for (EnvironmentEntry entry : command.getEnvironmentList()) {
result.put(entry.getVariable(), entry.getValue());
}
return result;
}
public ExecuteReply execute(Action action, Path execRoot)
throws IOException, InterruptedException {
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
try {
RemoteProtocol.Command command =
RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
cache.downloadTree(action.getInputRootDigest(), execRoot);
List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
for (String output : action.getOutputPathList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output file already exists: " + file);
}
FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
outputs.add(file);
}
// TODO(olaola): time out after specified server-side deadline.
Command cmd =
new Command(
command.getArgvList().toArray(new String[] {}),
getEnvironmentVariables(command),
new File(execRoot.getPathString()));
cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
// Execute throws a CommandException on non-zero return values, so action has succeeded.
ImmutableList<ContentDigest> outErrDigests =
cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
ActionResult.Builder result =
ActionResult.newBuilder()
.setReturnCode(0)
.setStdoutDigest(outErrDigests.get(0))
.setStderrDigest(outErrDigests.get(1));
cache.uploadAllResults(execRoot, outputs, result);
cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build());
return ExecuteReply.newBuilder()
.setResult(result)
.setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
.build();
} catch (CommandException e) {
ImmutableList<ContentDigest> outErrDigests =
cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
final int returnCode =
e instanceof AbnormalTerminationException
? ((AbnormalTerminationException) e).getResult().getTerminationStatus().getExitCode()
: -1;
return ExecuteReply.newBuilder()
.setResult(
ActionResult.newBuilder()
.setReturnCode(returnCode)
.setStdoutDigest(outErrDigests.get(0))
.setStderrDigest(outErrDigests.get(1)))
.setStatus(
ExecutionStatus.newBuilder()
.setExecuted(true)
.setSucceeded(false)
.setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
.setErrorDetail(e.toString()))
.build();
} catch (CacheNotFoundException e) {
LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
return ExecuteReply.newBuilder()
.setCasError(
CasStatus.newBuilder()
.setSucceeded(false)
.addMissingDigest(e.getMissingDigest())
.setError(CasStatus.ErrorCode.MISSING_DIGEST)
.setErrorDetail(e.toString()))
.setStatus(
ExecutionStatus.newBuilder()
.setExecuted(false)
.setSucceeded(false)
.setError(
e.getMissingDigest() == action.getCommandDigest()
? ExecutionStatus.ErrorCode.MISSING_COMMAND
: ExecutionStatus.ErrorCode.MISSING_INPUT)
.setErrorDetail(e.toString()))
.build();
}
}
@Override
public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
try {
tempRoot.createDirectory();
if (LOG_FINER) {
LOG.fine(
"Work received has "
+ request.getTotalInputFileCount()
+ " input files and "
+ request.getAction().getOutputPathCount()
+ " output files.");
}
ExecuteReply reply = execute(request.getAction(), tempRoot);
responseObserver.onNext(reply);
if (options.debug) {
if (!reply.getStatus().getSucceeded()) {
LOG.warning("Work failed. Request: " + request.toString() + ".");
} else if (LOG_FINER) {
LOG.fine("Work completed.");
}
}
if (!options.debug) {
FileSystemUtils.deleteTree(tempRoot);
} else {
LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
}
} catch (IOException | InterruptedException e) {
ExecuteReply.Builder reply = ExecuteReply.newBuilder();
reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
responseObserver.onNext(reply.build());
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} finally {
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws Exception {
OptionsParser parser =
OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);
parser.parseAndExitUponError(args);
RemoteOptions remoteOptions = parser.getOptions(RemoteOptions.class);
RemoteWorkerOptions remoteWorkerOptions = parser.getOptions(RemoteWorkerOptions.class);
if (remoteWorkerOptions.workPath == null) {
printUsage(parser);
return;
}
System.out.println("*** Starting Hazelcast server.");
ConcurrentMapActionCache cache =
new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions));
System.out.println(
"*** Starting grpc server on all locally bound IPs on port "
+ remoteWorkerOptions.listenPort
+ ".");
Path workPath = getFileSystem().getPath(remoteWorkerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
RemoteWorker worker = new RemoteWorker(workPath, remoteOptions, remoteWorkerOptions, cache);
final Server server =
ServerBuilder.forPort(remoteWorkerOptions.listenPort).addService(worker).build();
server.start();
final Path pidFile;
if (remoteWorkerOptions.pidFile != null) {
pidFile = getFileSystem().getPath(remoteWorkerOptions.pidFile);
PrintWriter writer = new PrintWriter(pidFile.getOutputStream());
writer.append(Integer.toString(ProcessUtils.getpid()));
writer.append("\n");
writer.close();
} else {
pidFile = null;
}
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
public void run() {
System.err.println("*** Shutting down grpc server.");
server.shutdown();
if (pidFile != null) {
try {
pidFile.delete();
} catch (IOException e) {
System.err.println("Cannot remove pid file: " + pidFile.toString());
}
}
System.err.println("*** Server shut down.");
}
});
server.awaitTermination();
}
public static void printUsage(OptionsParser parser) {
System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a RPC service.");
System.out.println(
parser.describeOptions(
Collections.<String, String>emptyMap(), OptionsParser.HelpVerbosity.LONG));
}
static FileSystem getFileSystem() {
return OS.getCurrent() == OS.WINDOWS ? new JavaIoFileSystem() : new UnixFileSystem();
}
}