Handle CPU limiting of local branches with a queue rather than dropping the excess on the floor.

This will later allow us to be smarter about which actions to schedule for local execution.

PiperOrigin-RevId: 421317543
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java b/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java
index 6cb515b..e218988 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/Branch.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.flogger.GoogleLogger;
 import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
 import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
@@ -124,6 +125,11 @@
         + (isDone() ? "done" : "not done");
   }
 
+  /** Executes this branch using the provided executor. */
+  public void execute(ListeningExecutorService executor) {
+    future.setFuture(executor.submit(this));
+  }
+
   /**
    * Moves a set of stdout/stderr files over another one. Errors during the move are logged and
    * swallowed.
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java
index 3eeccd9..ada5cab 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java
@@ -23,7 +23,6 @@
 import com.google.common.flogger.GoogleLogger;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionContext;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
 import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
@@ -45,6 +44,8 @@
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -106,6 +107,9 @@
   /** Limit on how many threads we should use for dynamic execution. */
   private final Semaphore threadLimiter;
 
+  /** Set of jobs that are waiting for local execution. */
+  private final Deque<LocalBranch> waitingLocalJobs = new ArrayDeque<>();
+
   /**
    * Constructs a {@code DynamicSpawnStrategy}.
    *
@@ -203,69 +207,76 @@
       return nonDynamicResults;
     }
 
-    // True if we got the threads we need for actual dynamic execution.
-    boolean gotThreads = false;
+    // Extra logging to debug b/194373457
+    logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
+        "Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe());
+    debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint());
+    // else both can exec. Fallthrough to below.
+
+    AtomicReference<DynamicMode> strategyThatCancelled = new AtomicReference<>(null);
+
+    LocalBranch localBranch =
+        new LocalBranch(
+            actionExecutionContext,
+            spawn,
+            strategyThatCancelled,
+            options,
+            ignoreFailureCheck,
+            getExtraSpawnForLocalExecution,
+            delayLocalExecution);
+    RemoteBranch remoteBranch =
+        new RemoteBranch(
+            actionExecutionContext,
+            spawn,
+            strategyThatCancelled,
+            options,
+            ignoreFailureCheck,
+            delayLocalExecution);
+
+    localBranch.prepareFuture(remoteBranch);
+    remoteBranch.prepareFuture(localBranch);
+    synchronized (waitingLocalJobs) {
+      waitingLocalJobs.add(localBranch);
+    }
+    tryScheduleLocalJob();
+    remoteBranch.execute(executorService);
+
     try {
-      if (threadLimiter.tryAcquire()) {
-        gotThreads = true;
-      } else {
-        // If there are no threads available for dynamic execution because we're limited
-        // to the number of CPUs, we can just execute remotely.
-        return RemoteBranch.runRemotely(spawn, actionExecutionContext, null, delayLocalExecution);
-      }
-
-      // Extra logging to debug b/194373457
-      logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
-          "Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe());
-      debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint());
-      // else both can exec. Fallthrough to below.
-
-      AtomicReference<DynamicMode> strategyThatCancelled = new AtomicReference<>(null);
-
-      LocalBranch localBranch =
-          new LocalBranch(
-              actionExecutionContext,
-              spawn,
-              strategyThatCancelled,
-              options,
-              ignoreFailureCheck,
-              getExtraSpawnForLocalExecution,
-              delayLocalExecution);
-      RemoteBranch remoteBranch =
-          new RemoteBranch(
-              actionExecutionContext,
-              spawn,
-              strategyThatCancelled,
-              options,
-              ignoreFailureCheck,
-              delayLocalExecution);
-
-      SettableFuture<ImmutableList<SpawnResult>> localFuture =
-          localBranch.prepareFuture(remoteBranch);
-      SettableFuture<ImmutableList<SpawnResult>> remoteFuture =
-          remoteBranch.prepareFuture(localBranch);
-      localFuture.setFuture(executorService.submit(localBranch));
-      remoteFuture.setFuture(executorService.submit(remoteBranch));
-
-      try {
-        return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext);
-      } finally {
-        checkState(localBranch.isDone());
-        checkState(remoteBranch.isDone());
-        logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
-            "Dynamic execution of %s ended with local %s, remote %s%n",
-            spawn.getResourceOwner().prettyPrint(),
-            localBranch.isCancelled() ? "cancelled" : "done",
-            remoteBranch.isCancelled() ? "cancelled" : "done");
-        debugLog(
-            "Dynamic execution of %s ended with local %s, remote %s%n",
-            spawn.getResourceOwner().prettyPrint(),
-            localBranch.isCancelled() ? "cancelled" : "done",
-            remoteBranch.isCancelled() ? "cancelled" : "done");
-      }
+      return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext);
     } finally {
-      if (gotThreads) {
-        threadLimiter.release();
+      checkState(localBranch.isDone());
+      checkState(remoteBranch.isDone());
+      if (!waitingLocalJobs.contains(localBranch)) {
+        synchronized (waitingLocalJobs) {
+          if (!waitingLocalJobs.contains(localBranch)) {
+            threadLimiter.release();
+            tryScheduleLocalJob();
+          }
+        }
+      }
+      logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
+          "Dynamic execution of %s ended with local %s, remote %s%n",
+          spawn.getResourceOwner().prettyPrint(),
+          localBranch.isCancelled() ? "cancelled" : "done",
+          remoteBranch.isCancelled() ? "cancelled" : "done");
+      debugLog(
+          "Dynamic execution of %s ended with local %s, remote %s%n",
+          spawn.getResourceOwner().prettyPrint(),
+          localBranch.isCancelled() ? "cancelled" : "done",
+          remoteBranch.isCancelled() ? "cancelled" : "done");
+    }
+  }
+
+  /**
+   * Tries to schedule as many local jobs as are permitted by {@link #threadLimiter}. "Scheduling"
+   * here means putting it on a thread and making it start the normal strategy execution, but it
+   * will still have to wait for resources, so it may not execute for a while.
+   */
+  private void tryScheduleLocalJob() {
+    synchronized (waitingLocalJobs) {
+      while (!waitingLocalJobs.isEmpty() && threadLimiter.tryAcquire()) {
+        LocalBranch job = waitingLocalJobs.pollLast();
+        job.execute(executorService);
       }
     }
   }
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java b/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java
index 90ff49b..70feb2b 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/LocalBranch.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.flogger.GoogleLogger;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
 import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
 import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
@@ -132,7 +131,7 @@
   }
 
   /** Sets up the {@link Future} used in the local branch to know what remote branch to cancel. */
-  protected SettableFuture<ImmutableList<SpawnResult>> prepareFuture(RemoteBranch remoteBranch) {
+  protected void prepareFuture(RemoteBranch remoteBranch) {
     // TODO(b/203094728): Maybe generify this method and move it up.
     this.remoteBranch = remoteBranch;
     future.addListener(
@@ -147,7 +146,6 @@
           }
         },
         MoreExecutors.directExecutor());
-    return future;
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java b/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java
index 02b28dc..4968e4c 100644
--- a/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java
+++ b/src/main/java/com/google/devtools/build/lib/dynamic/RemoteBranch.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.flogger.GoogleLogger;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
 import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
 import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
@@ -111,7 +110,7 @@
   }
 
   /** Sets up the future for this branch, once the other branch is available. */
-  public SettableFuture<ImmutableList<SpawnResult>> prepareFuture(LocalBranch localBranch) {
+  public void prepareFuture(LocalBranch localBranch) {
     this.localBranch = localBranch;
     future.addListener(
         () -> {
@@ -125,7 +124,6 @@
           }
         },
         MoreExecutors.directExecutor());
-    return future;
   }
 
   @Override