Handle CPU limiting of local branches with a queue rather than dropping the excess on the floor.
This will later allow us to be smarter about which actions to schedule for local execution.
PiperOrigin-RevId: 421317543
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java b/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java
index 6cb515b..e218988 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
@@ -124,6 +125,11 @@
+ (isDone() ? "done" : "not done");
}
+ /** Executes this branch using the provided executor. */
+ public void execute(ListeningExecutorService executor) {
+ future.setFuture(executor.submit(this));
+ }
+
/**
* Moves a set of stdout/stderr files over another one. Errors during the move are logged and
* swallowed.
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java
index 3eeccd9..ada5cab 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java
@@ -23,7 +23,6 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionContext;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
@@ -45,6 +44,8 @@
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@@ -106,6 +107,9 @@
/** Limit on how many threads we should use for dynamic execution. */
private final Semaphore threadLimiter;
+ /** Set of jobs that are waiting for local execution. */
+ private final Deque<LocalBranch> waitingLocalJobs = new ArrayDeque<>();
+
/**
* Constructs a {@code DynamicSpawnStrategy}.
*
@@ -203,69 +207,76 @@
return nonDynamicResults;
}
- // True if we got the threads we need for actual dynamic execution.
- boolean gotThreads = false;
+ // Extra logging to debug b/194373457
+ logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
+ "Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe());
+ debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint());
+ // else both can exec. Fallthrough to below.
+
+ AtomicReference<DynamicMode> strategyThatCancelled = new AtomicReference<>(null);
+
+ LocalBranch localBranch =
+ new LocalBranch(
+ actionExecutionContext,
+ spawn,
+ strategyThatCancelled,
+ options,
+ ignoreFailureCheck,
+ getExtraSpawnForLocalExecution,
+ delayLocalExecution);
+ RemoteBranch remoteBranch =
+ new RemoteBranch(
+ actionExecutionContext,
+ spawn,
+ strategyThatCancelled,
+ options,
+ ignoreFailureCheck,
+ delayLocalExecution);
+
+ localBranch.prepareFuture(remoteBranch);
+ remoteBranch.prepareFuture(localBranch);
+ synchronized (waitingLocalJobs) {
+ waitingLocalJobs.add(localBranch);
+ }
+ tryScheduleLocalJob();
+ remoteBranch.execute(executorService);
+
try {
- if (threadLimiter.tryAcquire()) {
- gotThreads = true;
- } else {
- // If there are no threads available for dynamic execution because we're limited
- // to the number of CPUs, we can just execute remotely.
- return RemoteBranch.runRemotely(spawn, actionExecutionContext, null, delayLocalExecution);
- }
-
- // Extra logging to debug b/194373457
- logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
- "Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe());
- debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint());
- // else both can exec. Fallthrough to below.
-
- AtomicReference<DynamicMode> strategyThatCancelled = new AtomicReference<>(null);
-
- LocalBranch localBranch =
- new LocalBranch(
- actionExecutionContext,
- spawn,
- strategyThatCancelled,
- options,
- ignoreFailureCheck,
- getExtraSpawnForLocalExecution,
- delayLocalExecution);
- RemoteBranch remoteBranch =
- new RemoteBranch(
- actionExecutionContext,
- spawn,
- strategyThatCancelled,
- options,
- ignoreFailureCheck,
- delayLocalExecution);
-
- SettableFuture<ImmutableList<SpawnResult>> localFuture =
- localBranch.prepareFuture(remoteBranch);
- SettableFuture<ImmutableList<SpawnResult>> remoteFuture =
- remoteBranch.prepareFuture(localBranch);
- localFuture.setFuture(executorService.submit(localBranch));
- remoteFuture.setFuture(executorService.submit(remoteBranch));
-
- try {
- return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext);
- } finally {
- checkState(localBranch.isDone());
- checkState(remoteBranch.isDone());
- logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
- "Dynamic execution of %s ended with local %s, remote %s%n",
- spawn.getResourceOwner().prettyPrint(),
- localBranch.isCancelled() ? "cancelled" : "done",
- remoteBranch.isCancelled() ? "cancelled" : "done");
- debugLog(
- "Dynamic execution of %s ended with local %s, remote %s%n",
- spawn.getResourceOwner().prettyPrint(),
- localBranch.isCancelled() ? "cancelled" : "done",
- remoteBranch.isCancelled() ? "cancelled" : "done");
- }
+ return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext);
} finally {
- if (gotThreads) {
- threadLimiter.release();
+ checkState(localBranch.isDone());
+ checkState(remoteBranch.isDone());
+ if (!waitingLocalJobs.contains(localBranch)) {
+ synchronized (waitingLocalJobs) {
+ if (!waitingLocalJobs.contains(localBranch)) {
+ threadLimiter.release();
+ tryScheduleLocalJob();
+ }
+ }
+ }
+ logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
+ "Dynamic execution of %s ended with local %s, remote %s%n",
+ spawn.getResourceOwner().prettyPrint(),
+ localBranch.isCancelled() ? "cancelled" : "done",
+ remoteBranch.isCancelled() ? "cancelled" : "done");
+ debugLog(
+ "Dynamic execution of %s ended with local %s, remote %s%n",
+ spawn.getResourceOwner().prettyPrint(),
+ localBranch.isCancelled() ? "cancelled" : "done",
+ remoteBranch.isCancelled() ? "cancelled" : "done");
+ }
+ }
+
+ /**
+ * Tries to schedule as many local jobs as are permitted by {@link #threadLimiter}. "Scheduling"
+ * here means putting it on a thread and making it start the normal strategy execution, but it
+ * will still have to wait for resources, so it may not execute for a while.
+ */
+ private void tryScheduleLocalJob() {
+ synchronized (waitingLocalJobs) {
+ while (!waitingLocalJobs.isEmpty() && threadLimiter.tryAcquire()) {
+ LocalBranch job = waitingLocalJobs.pollLast();
+ job.execute(executorService);
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java b/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java
index 90ff49b..70feb2b 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
@@ -132,7 +131,7 @@
}
/** Sets up the {@link Future} used in the local branch to know what remote branch to cancel. */
- protected SettableFuture<ImmutableList<SpawnResult>> prepareFuture(RemoteBranch remoteBranch) {
+ protected void prepareFuture(RemoteBranch remoteBranch) {
// TODO(b/203094728): Maybe generify this method and move it up.
this.remoteBranch = remoteBranch;
future.addListener(
@@ -147,7 +146,6 @@
}
},
MoreExecutors.directExecutor());
- return future;
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java b/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java
index 02b28dc..4968e4c 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
@@ -111,7 +110,7 @@
}
/** Sets up the future for this branch, once the other branch is available. */
- public SettableFuture<ImmutableList<SpawnResult>> prepareFuture(LocalBranch localBranch) {
+ public void prepareFuture(LocalBranch localBranch) {
this.localBranch = localBranch;
future.addListener(
() -> {
@@ -125,7 +124,6 @@
}
},
MoreExecutors.directExecutor());
- return future;
}
@Override