blob: 0c7c454d17533c61a65cc16d2cf8b3ef33101734 [file] [log] [blame]
// Copyright 2018 The Bazel Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
import static;
import static;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
* A spawn strategy that speeds up incremental builds while not slowing down full builds.
* <p>This strategy tries to run spawn actions on the local and remote machine at the same time, and
* picks the spawn action that completes first. This gives the benefits of remote execution on full
* builds, and local execution on incremental builds.
* <p>One might ask, why we don't run spawns on the workstation all the time and just "spill over"
* actions to remote execution when there are no local resources available. This would work, except
* that the cost of transferring action inputs and outputs from the local machine to and from remote
* executors over the network is way too high - there is no point in executing an action locally and
* save 0.5s of time, when it then takes us 5 seconds to upload the results to remote executors for
* another action that's scheduled to run there.
name = {"dynamic", "dynamic_worker"},
contextType = SpawnActionContext.class)
public class DynamicSpawnStrategy implements SpawnActionContext {
private static final Logger logger = Logger.getLogger(DynamicSpawnStrategy.class.getName());
private final ListeningExecutorService executorService;
private final DynamicExecutionOptions options;
private final Function<Spawn, ExecutionPolicy> getExecutionPolicy;
* Set to true by the first action that completes remotely. Until that happens, all local actions
* are delayed by the amount given in {@link DynamicExecutionOptions#localExecutionDelay}.
* <p>This is a rather simple approach to make it possible to score a cache hit on remote
* execution before even trying to start the action locally. This saves resources that would
* otherwise be wasted by continuously starting and immediately killing local processes. One
* possibility for improvement would be to establish a reporting mechanism from strategies back to
* here, where we delay starting locally until the remote strategy tells us that the action isn't
* a cache hit.
private final AtomicBoolean delayLocalExecution = new AtomicBoolean(false);
private Map<String, List<SandboxedSpawnActionContext>> localStrategiesByMnemonic;
private Map<String, List<SandboxedSpawnActionContext>> remoteStrategiesByMnemonic;
* Constructs a {@code DynamicSpawnStrategy}.
* @param executorService an {@link ExecutorService} that will be used to run Spawn actions.
public DynamicSpawnStrategy(
ExecutorService executorService,
DynamicExecutionOptions options,
Function<Spawn, ExecutionPolicy> getExecutionPolicy) {
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.options = options;
this.getExecutionPolicy = getExecutionPolicy;
* Searches for a sandboxed spawn strategy with the given name.
* @param usedContexts the action contexts used during the build
* @param name the name of the spawn strategy we are interested in
* @return a sandboxed spawn strategy
* @throws ExecutorInitException if the spawn strategy does not exist, or if it exists but is not
* sandboxed
private static SandboxedSpawnActionContext findStrategy(
Iterable<ActionContext> usedContexts, String name) throws ExecutorInitException {
for (ActionContext context : usedContexts) {
ExecutionStrategy strategy = context.getClass().getAnnotation(ExecutionStrategy.class);
if (strategy != null && Arrays.asList( {
if (!(context instanceof SandboxedSpawnActionContext)) {
throw new ExecutorInitException("Requested strategy " + name + " exists but does not "
+ "support sandboxing");
return (SandboxedSpawnActionContext) context;
throw new ExecutorInitException("Requested strategy " + name + " does not exist");
private static Map<String, List<SandboxedSpawnActionContext>> buildStrategiesMap(
Iterable<ActionContext> usedContexts, List<Map.Entry<String, List<String>>> optionVals)
throws ExecutorInitException {
Map<String, List<SandboxedSpawnActionContext>> strategiesByMnemonic = new HashMap<>();
for (Map.Entry<String, List<String>> entry : optionVals) {
List<SandboxedSpawnActionContext> strategies = Lists.newArrayList();
if (!entry.getValue().isEmpty()) {
for (String element : entry.getValue()) {
strategies.add(findStrategy(usedContexts, element));
strategiesByMnemonic.put(entry.getKey(), strategies);
return strategiesByMnemonic;
public void executorCreated(Iterable<ActionContext> usedContexts) throws ExecutorInitException {
localStrategiesByMnemonic =
buildStrategiesMap(usedContexts, DynamicExecutionModule.localStrategiesByMnemonic);
remoteStrategiesByMnemonic =
buildStrategiesMap(usedContexts, DynamicExecutionModule.remoteStrategiesByMnemonic);
* Cancels and waits for a branch (a spawn execution) to terminate.
* <p>This is intended to be used as the body of the {@link StopConcurrentSpawns} lambda passed to
* the spawn runners. Each strategy may call this at most once.
* @param branch the future running the spawn
* @param branchDone semaphore that is expected to receive a permit once the future terminates
* (after {@link InterruptedException} bubbles up through its call stack)
* @param cancellingStrategy identifier of the strategy that is performing the cancellation. Used
* to prevent cross-cancellations and to sanity-check that the same strategy doesn't issue the
* cancellation twice.
* @param strategyThatCancelled name of the first strategy that executed this method, or a null
* reference if this is the first time this method is called. If not null, we expect the value
* referenced by this to be different than {@code cancellingStrategy}, or else we have a bug.
* @throws InterruptedException if we get interrupted for any reason trying to cancel the future
* @throws DynamicInterruptedException if we lost a race against another strategy trying to cancel
* us
private static void stopBranch(
Future<List<SpawnResult>> branch,
Semaphore branchDone,
String cancellingStrategy,
AtomicReference<String> strategyThatCancelled)
throws InterruptedException {
// This multi-step, unlocked access to "strategyThatCancelled" is valid because, for a given
// value of "cancellingStrategy", we do not expect concurrent calls to this method. (If there
// are, we are in big trouble.)
String current = strategyThatCancelled.get();
if (current != null && current.equals(cancellingStrategy)) {
throw new AssertionError("stopBranch called more than once by " + cancellingStrategy);
} else {
// Protect against the two branches from cancelling each other. The first branch to set the
// reference to its own identifier wins and is allowed to issue the cancellation; the other
// branch just has to give up execution.
if (strategyThatCancelled.compareAndSet(null, cancellingStrategy)) {
boolean cancelled = branch.cancel(true);
checkState(cancelled, "Failed to cancel other branch from %s", cancellingStrategy);
} else {
throw new DynamicInterruptedException(
"Execution stopped because other strategy finished first");
* Waits for a branch (a spawn execution) to complete.
* @param branch the future running the spawn
* @return the spawn result if the execution terminated successfully, or null if the branch was
* cancelled
* @throws ExecException the execution error of the spawn if it failed
* @throws InterruptedException if we get interrupted while waiting for completion
private static List<SpawnResult> waitBranch(Future<List<SpawnResult>> branch)
throws ExecException, InterruptedException {
try {
return branch.get();
} catch (CancellationException e) {
return null;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ExecException) {
throw (ExecException) cause;
} else if (cause instanceof InterruptedException) {
// If the branch was interrupted, it might be due to a user interrupt or due to our request
// for cancellation. Assume the latter here because if this was actually a user interrupt,
// our own get() would have been interrupted as well. It makes no sense to propagate the
// interrupt status across threads.
return null;
} else {
// Even though we cannot enforce this in the future's signature (but we do in Branch#call),
// we only expect the exception types we validated above. Still, unchecked exceptions could
// propagate, so just let them bubble up.
throw new AssertionError("Unexpected exception type from strategy.exec()");
} catch (InterruptedException e) {
throw e;
* Waits for the two branches of a spawn's execution to complete.
* <p>This guarantees that the two branches are stopped both on successful termination and on an
* exception.
* @param branch1 the future running one side of the spawn (e.g. local). This future must cancel
* {@code branch2} at some point during its successful execution to guarantee termination. If
* we encounter an execution error, or if we are interrupted, then we handle such cancellation
* here.
* @param branch2 the future running the other side of the spawn (e.g. remote). Same restrictions
* apply as in {@code branch1}, but in the symmetric direction.
* @return the result of the branch that terminates first
* @throws ExecException the execution error of the spawn that terminated first
* @throws InterruptedException if we get interrupted while waiting for completion
private static List<SpawnResult> waitBranches(
Future<List<SpawnResult>> branch1, Future<List<SpawnResult>> branch2)
throws ExecException, InterruptedException {
List<SpawnResult> result1;
try {
result1 = waitBranch(branch1);
} catch (ExecException | InterruptedException | RuntimeException e) {
throw e;
List<SpawnResult> result2 = waitBranch(branch2);
if (result2 != null && result1 != null) {
throw new AssertionError("One branch did not cancel the other one");
} else if (result2 != null) {
return result2;
} else if (result1 != null) {
return result1;
} else {
throw new AssertionError("No branch completed, which might mean they cancelled each other");
public List<SpawnResult> exec(
final Spawn spawn, final ActionExecutionContext actionExecutionContext)
throws ExecException, InterruptedException {
ExecutionPolicy executionPolicy = getExecutionPolicy.apply(spawn);
if (executionPolicy.canRunLocallyOnly()) {
return runLocally(spawn, actionExecutionContext, null);
if (executionPolicy.canRunRemotelyOnly()) {
return runRemotely(spawn, actionExecutionContext, null);
// Semaphores to track termination of each branch. These are necessary to wait for the branch to
// finish its own cleanup (e.g. terminating subprocesses) once it has been cancelled.
Semaphore localDone = new Semaphore(0);
Semaphore remoteDone = new Semaphore(0);
AtomicReference<String> strategyThatCancelled = new AtomicReference<>(null);
SettableFuture<List<SpawnResult>> remoteBranch = SettableFuture.create();
AtomicBoolean localCanReportDone = new AtomicBoolean(false);
AtomicBoolean remoteCanReportDone = new AtomicBoolean(false);
ListenableFuture<List<SpawnResult>> localBranch =
new Branch("local", actionExecutionContext) {
List<SpawnResult> callImpl(ActionExecutionContext context)
throws InterruptedException, ExecException {
try {
if (!localCanReportDone.compareAndSet(false, true)) {
// If we ever get here, it's because we were cancelled and the listener ran
// first. Just make sure that's the case.
throw new InterruptedException();
if (delayLocalExecution.get()) {
return runLocally(
() -> stopBranch(remoteBranch, remoteDone, "local", strategyThatCancelled));
} finally {
() -> {
if (localCanReportDone.compareAndSet(false, true)) {
if (!localBranch.isCancelled()) {
new Branch("remote", actionExecutionContext) {
public List<SpawnResult> callImpl(ActionExecutionContext context)
throws InterruptedException, ExecException {
try {
if (!remoteCanReportDone.compareAndSet(false, true)) {
// If we ever get here, it's because we were cancelled and the listener ran
// first. Just make sure that's the case.
throw new InterruptedException();
List<SpawnResult> spawnResults =
() ->
stopBranch(localBranch, localDone, "remote", strategyThatCancelled));
return spawnResults;
} finally {
() -> {
if (remoteCanReportDone.compareAndSet(false, true)) {
if (!remoteBranch.isCancelled()) {
try {
return waitBranches(localBranch, remoteBranch);
} finally {
private static List<SandboxedSpawnActionContext> getValidStrategies(
Map<String, List<SandboxedSpawnActionContext>> strategiesByMnemonic, Spawn spawn) {
List<SandboxedSpawnActionContext> validStrategies = Lists.newArrayList();
if (strategiesByMnemonic.get(spawn.getMnemonic()) != null) {
if (strategiesByMnemonic.get("") != null) {
return validStrategies;
public boolean canExec(Spawn spawn) {
for (SandboxedSpawnActionContext strategy :
getValidStrategies(localStrategiesByMnemonic, spawn)) {
if (strategy.canExec(spawn)) {
return true;
for (SandboxedSpawnActionContext strategy :
getValidStrategies(remoteStrategiesByMnemonic, spawn)) {
if (strategy.canExec(spawn)) {
return true;
return false;
private static FileOutErr getSuffixedFileOutErr(FileOutErr fileOutErr, String suffix) {
Path outDir = checkNotNull(fileOutErr.getOutputPath().getParentDirectory());
String outBaseName = fileOutErr.getOutputPath().getBaseName();
Path errDir = checkNotNull(fileOutErr.getErrorPath().getParentDirectory());
String errBaseName = fileOutErr.getErrorPath().getBaseName();
return new FileOutErr(
outDir.getChild(outBaseName + suffix), errDir.getChild(errBaseName + suffix));
private List<SpawnResult> runLocally(
Spawn spawn,
ActionExecutionContext actionExecutionContext,
@Nullable StopConcurrentSpawns stopConcurrentSpawns)
throws ExecException, InterruptedException {
for (SandboxedSpawnActionContext strategy :
getValidStrategies(localStrategiesByMnemonic, spawn)) {
return strategy.exec(spawn, actionExecutionContext, stopConcurrentSpawns);
throw new RuntimeException(
"executorCreated not yet called or no default dynamic_local_strategy set");
private List<SpawnResult> runRemotely(
Spawn spawn,
ActionExecutionContext actionExecutionContext,
@Nullable StopConcurrentSpawns stopConcurrentSpawns)
throws ExecException, InterruptedException {
for (SandboxedSpawnActionContext strategy :
getValidStrategies(remoteStrategiesByMnemonic, spawn)) {
return strategy.exec(spawn, actionExecutionContext, stopConcurrentSpawns);
throw new RuntimeException(
"executorCreated not yet called or no default dynamic_remote_strategy set");
* Wraps the execution of a function that is supposed to execute a spawn via a strategy and only
* updates the stdout/stderr files if this spawn succeeds.
private abstract static class Branch implements Callable<List<SpawnResult>> {
private final String name;
private final ActionExecutionContext context;
* Creates a new branch of dynamic execution.
* @param name a name to describe what this branch represents (e.g. {@code remote}). Used to
* qualify temporary files.
* @param context the action execution context given to the dynamic strategy, used to obtain the
* final location of the stdout/stderr
Branch(String name, ActionExecutionContext context) { = name;
this.context = context;
* Moves a set of stdout/stderr files over another one. Errors during the move are logged and
* swallowed.
* @param from the source location
* @param to the target location
private static void moveFileOutErr(FileOutErr from, FileOutErr to) {
try {
if (from.getOutputPath().exists()) {
Files.move(from.getOutputPath().getPathFile(), to.getOutputPath().getPathFile());
if (from.getErrorPath().exists()) {
Files.move(from.getErrorPath().getPathFile(), to.getErrorPath().getPathFile());
} catch (IOException e) {
logger.log(Level.WARNING, "Could not move action logs from execution", e);
* Hook to execute a spawn using an arbitrary strategy.
* @param context the action execution context where the spawn can write its stdout/stderr. The
* location of these files is specific to this branch.
* @return the spawn results if execution was successful
* @throws InterruptedException if the branch was cancelled or an interrupt was caught
* @throws ExecException if the spawn execution fails
abstract List<SpawnResult> callImpl(ActionExecutionContext context)
throws InterruptedException, ExecException;
* Executes the {@link #callImpl} hook and handles stdout/stderr.
* @return the spawn results if execution was successful
* @throws InterruptedException if the branch was cancelled or an interrupt was caught
* @throws ExecException if the spawn execution fails
public final List<SpawnResult> call() throws InterruptedException, ExecException {
FileOutErr fileOutErr = getSuffixedFileOutErr(context.getFileOutErr(), "." + name);
List<SpawnResult> results = null;
ExecException exception = null;
try {
results = callImpl(context.withFileOutErr(fileOutErr));
} catch (ExecException e) {
exception = e;
} finally {
try {
} catch (IOException ignored) {
// Nothing we can do here.
moveFileOutErr(fileOutErr, context.getFileOutErr());
if (exception != null) {
throw exception;
} else {
return results;