Move worker allocation to ResourceManager.

This a step to make a workers as resource. Now if flag **experimental_worker_as_resource** enabled then workers allocated in ResourceManager. The logic of allocation wasn't changed (still sequential allocation).

PiperOrigin-RevId: 427967810
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index cff07db..1859062 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -246,6 +246,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
         "//src/main/java/com/google/devtools/build/lib/actions:middleman_type",
         "//src/main/java/com/google/devtools/build/lib/actions:package_roots",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/analysis:analysis_cluster",
         "//src/main/java/com/google/devtools/build/lib/analysis:analysis_options",
         "//src/main/java/com/google/devtools/build/lib/analysis:analysis_phase_complete_event",
diff --git a/src/main/java/com/google/devtools/build/lib/actions/BUILD b/src/main/java/com/google/devtools/build/lib/actions/BUILD
index 0f99219..0fe4c4f 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/actions/BUILD
@@ -53,6 +53,7 @@
             "LocalHostResourceManagerDarwin.java",
             "LocalHostResourceFallback.java",
             "MiddlemanType.java",
+            "ResourceManager.java",
             "ResourceSet.java",
             "ResourceSetOrBuilder.java",
             "PackageRootResolver.java",
@@ -97,7 +98,6 @@
         "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec",
         "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec:serialization-constant",
         "//src/main/java/com/google/devtools/build/lib/starlarkbuildapi",
-        "//src/main/java/com/google/devtools/build/lib/unix:procmeminfo_parser",
         "//src/main/java/com/google/devtools/build/lib/unsafe:string",
         "//src/main/java/com/google/devtools/build/lib/util",
         "//src/main/java/com/google/devtools/build/lib/util:command",
@@ -353,3 +353,21 @@
     ],
     deps = ["//src/main/protobuf:failure_details_java_proto"],
 )
+
+java_library(
+    name = "resource_manager",
+    srcs = [
+        "ResourceManager.java",
+    ],
+    deps = [
+        ":actions",
+        ":localhost_capacity",
+        "//src/main/java/com/google/devtools/build/lib/concurrent",
+        "//src/main/java/com/google/devtools/build/lib/profiler",
+        "//src/main/java/com/google/devtools/build/lib/unix:procmeminfo_parser",
+        "//src/main/java/com/google/devtools/build/lib/util",
+        "//src/main/java/com/google/devtools/build/lib/util:os",
+        "//src/main/java/com/google/devtools/build/lib/worker",
+        "//third_party:guava",
+    ],
+)
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
index 81f1936..145fd8c 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
+++ b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
@@ -24,6 +24,9 @@
 import com.google.devtools.build.lib.unix.ProcMeminfoParser;
 import com.google.devtools.build.lib.util.OS;
 import com.google.devtools.build.lib.util.Pair;
+import com.google.devtools.build.lib.worker.Worker;
+import com.google.devtools.build.lib.worker.WorkerKey;
+import com.google.devtools.build.lib.worker.WorkerPool;
 import java.io.IOException;
 import java.util.Deque;
 import java.util.Iterator;
@@ -86,6 +89,30 @@
     }
   }
 
+  /**
+   * A handle returned by {@link #acquireWorkerResources(ActionExecutionMetadata, ResourceSet,
+   * WorkerKey, ResourcePriority)} that must be closed in order to free the resources again.
+   */
+  public static class ResourceHandleWithWorker implements AutoCloseable {
+    final ResourceHandle resourceHandle;
+    final Worker worker;
+
+    public ResourceHandleWithWorker(ResourceHandle resourceHandle, Worker worker) {
+      this.resourceHandle = resourceHandle;
+      this.worker = worker;
+    }
+
+    public Worker getWorker() {
+      return worker;
+    }
+
+    /** Closing the ResourceHandle releases the resources associated with it. */
+    @Override
+    public void close() {
+      this.resourceHandle.close();
+    }
+  }
+
   private final ThreadLocal<Boolean> threadLocked = new ThreadLocal<Boolean>() {
     @Override
     protected Boolean initialValue() {
@@ -138,6 +165,8 @@
   private final Deque<Pair<ResourceSet, CountDownLatch>> dynamicStandaloneRequests =
       new LinkedList<>();
 
+  private WorkerPool workerPool;
+
   // The total amount of resources on the local host. Must be set by
   // an explicit call to setAvailableResources(), often using
   // LocalHostCapacity.getLocalHostCapacity() as an argument.
@@ -217,12 +246,32 @@
     localMemoryEstimate = value;
   }
 
+  /** Sets worker pool for taking the workers. Must be called before requesting the workers. */
+  public void setWorkerPool(WorkerPool workerPool) {
+    this.workerPool = workerPool;
+  }
+
   /** Sets whether to prioritize local-only actions in resource allocation. */
   public void setPrioritizeLocalActions(boolean prioritizeLocalActions) {
     this.prioritizeLocalActions = prioritizeLocalActions;
   }
 
   /**
+   * Acuqires requested resource set and worker. Will block if resource is not available. The worker
+   * isn't released as part of the AutoCloseable.
+   */
+  public ResourceHandleWithWorker acquireWorkerResources(
+      ActionExecutionMetadata owner,
+      ResourceSet resources,
+      WorkerKey workerKey,
+      ResourcePriority priority)
+      throws InterruptedException, IOException {
+    Worker worker = this.workerPool.borrowObject(workerKey);
+    ResourceHandle handle = acquireResources(owner, resources, priority);
+    return new ResourceHandleWithWorker(handle, worker);
+  }
+
+  /**
    * Acquires requested resource set. Will block if resource is not available. NB! This method must
    * be thread-safe!
    */
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BUILD b/src/main/java/com/google/devtools/build/lib/bazel/BUILD
index 47e7e4e..30c9c6e 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/bazel/BUILD
@@ -156,7 +156,7 @@
         "//src/main/java/com/google/devtools/build/lib/sandbox",
         "//src/main/java/com/google/devtools/build/lib/standalone",
         "//src/main/java/com/google/devtools/build/lib/starlarkdebug/module",
-        "//src/main/java/com/google/devtools/build/lib/worker",
+        "//src/main/java/com/google/devtools/build/lib/worker:worker_module",
         "//third_party:guava",
     ],
 )
diff --git a/src/main/java/com/google/devtools/build/lib/exec/local/BUILD b/src/main/java/com/google/devtools/build/lib/exec/local/BUILD
index 41df34a..8e46af7 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/local/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/exec/local/BUILD
@@ -25,6 +25,7 @@
         "//src/main/java/com/google/devtools/build/lib:runtime",
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/concurrent",
         "//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
         "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index a5745bf..7f8d359 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -84,7 +84,7 @@
         "//src/main/java/com/google/devtools/build/lib/remote/options",
         "//src/main/java/com/google/devtools/build/lib/remote/util",
         "//src/main/java/com/google/devtools/build/lib/remote/zstd",
-        "//src/main/java/com/google/devtools/build/lib/sandbox",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
         "//src/main/java/com/google/devtools/build/lib/skyframe:mutable_supplier",
         "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
         "//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception",
diff --git a/src/main/java/com/google/devtools/build/lib/sandbox/BUILD b/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
index 4ceba33..102ff1e 100644
--- a/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
@@ -10,21 +10,24 @@
 
 java_library(
     name = "sandbox",
-    srcs = glob(["*.java"]),
+    srcs = glob(
+        ["*.java"],
+        exclude = ["SandboxHelpers.java"],
+    ),
     data = [
         "//src/main/tools:linux-sandbox",
     ],
     deps = [
+        ":sandbox_helpers",
         "//src/main/java/com/google/devtools/build/lib:runtime",
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
         "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
         "//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories",
-        "//src/main/java/com/google/devtools/build/lib/analysis:test/test_configuration",
         "//src/main/java/com/google/devtools/build/lib/analysis/platform:platform_utils",
-        "//src/main/java/com/google/devtools/build/lib/cmdline",
         "//src/main/java/com/google/devtools/build/lib/events",
         "//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy",
         "//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
@@ -52,7 +55,6 @@
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/com/google/devtools/common/options",
         "//src/main/protobuf:failure_details_java_proto",
-        "//third_party:auto_value",
         "//third_party:flogger",
         "//third_party:gson",
         "//third_party:guava",
@@ -60,3 +62,20 @@
         "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
     ],
 )
+
+java_library(
+    name = "sandbox_helpers",
+    srcs = ["SandboxHelpers.java"],
+    deps = [
+        "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
+        "//src/main/java/com/google/devtools/build/lib/analysis:test/test_configuration",
+        "//src/main/java/com/google/devtools/build/lib/cmdline:cmdline-primitives",
+        "//src/main/java/com/google/devtools/build/lib/vfs",
+        "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
+        "//src/main/java/com/google/devtools/common/options",
+        "//third_party:auto_value",
+        "//third_party:flogger",
+        "//third_party:guava",
+    ],
+)
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/BUILD b/src/main/java/com/google/devtools/build/lib/skyframe/BUILD
index e1ae9f6..3232a0b 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/BUILD
@@ -229,6 +229,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
         "//src/main/java/com/google/devtools/build/lib/actions:fileset_output_symlink",
         "//src/main/java/com/google/devtools/build/lib/actions:package_roots",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver",
         "//src/main/java/com/google/devtools/build/lib/analysis:actions/parameter_file_write_action",
         "//src/main/java/com/google/devtools/build/lib/analysis:actions/substitution",
diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD
index 0a3c3ba..38b2d03 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD
@@ -10,9 +10,20 @@
 
 java_library(
     name = "worker",
-    srcs = glob(["*.java"]),
+    srcs = glob(
+        ["*.java"],
+        exclude = [
+            "WorkerSpawnRunner.java",
+            "WorkerModule.java",
+            "WorkerParser.java",
+            "WorkerSpawnStrategy.java",
+        ],
+    ),
     deps = [
-        "//src/main/java/com/google/devtools/build/lib:runtime",
+        # ":worker_spawn_runner",
+        "//third_party:guava",
+        "//third_party/protobuf:protobuf_java",
+        "//third_party/protobuf:protobuf_java_util",
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/actions:action_input_helper",
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
@@ -20,21 +31,10 @@
         "//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
         "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
         "//src/main/java/com/google/devtools/build/lib/events",
-        "//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy",
-        "//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
-        "//src/main/java/com/google/devtools/build/lib/exec:execution_options",
-        "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
-        "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
-        "//src/main/java/com/google/devtools/build/lib/exec:spawn_strategy_registry",
-        "//src/main/java/com/google/devtools/build/lib/exec/local",
-        "//src/main/java/com/google/devtools/build/lib/profiler",
-        "//src/main/java/com/google/devtools/build/lib/runtime/commands/events",
-        "//src/main/java/com/google/devtools/build/lib/sandbox",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
         "//src/main/java/com/google/devtools/build/lib/shell",
         "//src/main/java/com/google/devtools/build/lib/util:command",
-        "//src/main/java/com/google/devtools/build/lib/util:os",
         "//src/main/java/com/google/devtools/build/lib/util:resource_converter",
-        "//src/main/java/com/google/devtools/build/lib/util/io",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/com/google/devtools/common/options",
@@ -44,10 +44,65 @@
         "//third_party:auto_value",
         "//third_party:flogger",
         "//third_party:gson",
-        "//third_party:guava",
         "//third_party:jsr305",
+    ],
+)
+
+java_library(
+    name = "worker_spawn_runner",
+    srcs = [
+        "WorkerParser.java",
+        "WorkerSpawnRunner.java",
+    ],
+    deps = [
+        ":worker",
+        "//src/main/java/com/google/devtools/build/lib:runtime",
+        "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/actions:action_input_helper",
+        "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
+        "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
+        "//src/main/java/com/google/devtools/build/lib/events",
+        "//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
+        "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
+        "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
+        "//src/main/java/com/google/devtools/build/lib/exec/local",
+        "//src/main/java/com/google/devtools/build/lib/profiler",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
+        "//src/main/java/com/google/devtools/build/lib/util:os",
+        "//src/main/java/com/google/devtools/build/lib/util/io",
+        "//src/main/java/com/google/devtools/build/lib/vfs",
+        "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
+        "//src/main/protobuf:failure_details_java_proto",
+        "//src/main/protobuf:worker_protocol_java_proto",
+        "//third_party:flogger",
+        "//third_party:guava",
         "//third_party/protobuf:protobuf_java",
-        "//third_party/protobuf:protobuf_java_util",
+    ],
+)
+
+java_library(
+    name = "worker_module",
+    srcs = [
+        "WorkerModule.java",
+        "WorkerSpawnStrategy.java",
+    ],
+    deps = [
+        ":worker",
+        ":worker_spawn_runner",
+        "//src/main/java/com/google/devtools/build/lib:runtime",
+        "//src/main/java/com/google/devtools/build/lib/events",
+        "//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy",
+        "//src/main/java/com/google/devtools/build/lib/exec:execution_options",
+        "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
+        "//src/main/java/com/google/devtools/build/lib/exec:spawn_strategy_registry",
+        "//src/main/java/com/google/devtools/build/lib/exec/local",
+        "//src/main/java/com/google/devtools/build/lib/runtime/commands/events",
+        "//src/main/java/com/google/devtools/build/lib/sandbox",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
+        "//src/main/java/com/google/devtools/build/lib/vfs",
+        "//src/main/java/com/google/devtools/common/options",
+        "//third_party:guava",
     ],
 )
 
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java
index 2d804d0..6d55a54 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java
@@ -32,7 +32,7 @@
  * <p>We expect a small number of WorkerKeys per mnemonic. Unbounded creation of WorkerKeys will
  * break various things as well as render the workers less useful.
  */
-final class WorkerKey {
+public final class WorkerKey {
   /** Build options. */
   private final ImmutableList<String> args;
   /** Environment variables. */
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
index cf40bb3..837589a 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
@@ -55,7 +55,7 @@
   public void beforeCommand(CommandEnvironment env) {
     this.env = env;
     env.getEventBus().register(this);
-    WorkerMultiplexerManager.beforeCommand(env);
+    WorkerMultiplexerManager.beforeCommand(env.getReporter());
   }
 
   @Subscribe
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java
index d34e84c..d1cedd1 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java
@@ -17,7 +17,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.devtools.build.lib.actions.UserExecException;
 import com.google.devtools.build.lib.events.EventHandler;
-import com.google.devtools.build.lib.runtime.CommandEnvironment;
+import com.google.devtools.build.lib.events.Reporter;
 import com.google.devtools.build.lib.server.FailureDetails;
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
 import com.google.devtools.build.lib.server.FailureDetails.Worker.Code;
@@ -53,8 +53,8 @@
     return instanceInfo.getWorkerMultiplexer();
   }
 
-  static void beforeCommand(CommandEnvironment env) {
-    setReporter(env.getReporter());
+  static void beforeCommand(Reporter reporter) {
+    setReporter(reporter);
   }
 
   static void afterCommand() {
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
index 70d5c78..1780854 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
@@ -188,6 +188,14 @@
   public boolean workerCancellation;
 
   @Option(
+      name = "experimental_worker_as_resource",
+      defaultValue = "false",
+      documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+      effectTags = {OptionEffectTag.EXECUTION},
+      help = "If enabled, workers are acquired as resources from ResourceManager.")
+  public boolean workerAsResource;
+
+  @Option(
       name = "experimental_worker_multiplex_sandboxing",
       defaultValue = "false",
       documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
index 7bb8f64..c8d0b83 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
@@ -39,7 +39,7 @@
  * requests, but do so through WorkerProxy instances.
  */
 @ThreadSafe
-final class WorkerPool {
+public final class WorkerPool {
   /** Unless otherwise specified, the max number of workers per WorkerKey. */
   private static final int DEFAULT_MAX_WORKERS = 4;
   /** Unless otherwise specified, the max number of multiplex workers per WorkerKey. */
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
index bb65704..597bf85 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
@@ -33,6 +33,7 @@
 import com.google.devtools.build.lib.actions.MetadataProvider;
 import com.google.devtools.build.lib.actions.ResourceManager;
 import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
+import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandleWithWorker;
 import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority;
 import com.google.devtools.build.lib.actions.Spawn;
 import com.google.devtools.build.lib.actions.SpawnExecutedEvent;
@@ -138,6 +139,7 @@
     this.workerParser = new WorkerParser(execRoot, workerOptions, localEnvProvider, binTools);
     this.workerOptions = workerOptions;
     this.runtime = runtime;
+    this.resourceManager.setWorkerPool(workers);
     eventBus.register(this);
   }
 
@@ -354,7 +356,7 @@
       MetadataProvider inputFileCache,
       SpawnMetrics.Builder spawnMetrics)
       throws InterruptedException, ExecException {
-    Worker worker = null;
+    WorkerOwner workerOwner = new WorkerOwner();
     WorkResponse response;
     WorkRequest request;
     ActionExecutionMetadata owner = spawn.getResourceOwner();
@@ -382,113 +384,58 @@
         }
       }
       Duration setupInputsTime = setupInputsStopwatch.elapsed();
+      spawnMetrics.setSetupTime(setupInputsTime);
 
       Stopwatch queueStopwatch = Stopwatch.createStarted();
-      try (SilentCloseable c =
-          Profiler.instance().profile(ProfilerTask.WORKER_BORROW, "Waiting to borrow worker")) {
-        worker = workers.borrowObject(key);
-        worker.setReporter(workerOptions.workerVerbose ? reporter : null);
-        request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key);
-      } catch (IOException e) {
-        restoreInterrupt(e);
-        String message = "IOException while borrowing a worker from the pool:";
-        throw createUserExecException(e, message, Code.BORROW_FAILURE);
-      }
-
-      try (ResourceHandle handle =
-          resourceManager.acquireResources(
-              owner,
-              spawn.getLocalResources(),
-              context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) {
-        // We acquired a worker and resources -- mark that as queuing time.
-        spawnMetrics.setQueueTime(queueStopwatch.elapsed());
-
-        context.report(SpawnExecutingEvent.create(key.getWorkerTypeName()));
-        try (SilentCloseable c =
-            Profiler.instance()
-                .profile(
-                    ProfilerTask.WORKER_SETUP,
-                    String.format("Worker #%d preparing execution", worker.getWorkerId()))) {
-          // We consider `prepareExecution` to be also part of setup.
-          Stopwatch prepareExecutionStopwatch = Stopwatch.createStarted();
-          worker.prepareExecution(inputFiles, outputs, key.getWorkerFilesWithDigests().keySet());
-          initializeMetricsSet(key, worker);
-          spawnMetrics.setSetupTime(setupInputsTime.plus(prepareExecutionStopwatch.elapsed()));
-        } catch (IOException e) {
-          restoreInterrupt(e);
-          String message =
-              ErrorMessage.builder()
-                  .message("IOException while preparing the execution environment of a worker:")
-                  .logFile(worker.getLogFile())
-                  .exception(e)
-                  .build()
-                  .toString();
-          throw createUserExecException(message, Code.PREPARE_FAILURE);
-        }
-
-        Stopwatch executionStopwatch = Stopwatch.createStarted();
-        try {
-          worker.putRequest(request);
-        } catch (IOException e) {
-          restoreInterrupt(e);
-          String message =
-              ErrorMessage.builder()
-                  .message(
-                      "Worker process quit or closed its stdin stream when we tried to send a"
-                          + " WorkRequest:")
-                  .logFile(worker.getLogFile())
-                  .exception(e)
-                  .build()
-                  .toString();
-          throw createUserExecException(message, Code.REQUEST_FAILURE);
-        }
-
-        try (SilentCloseable c =
-            Profiler.instance()
-                .profile(
-                    ProfilerTask.WORKER_WORKING,
-                    String.format("Worker #%d working", worker.getWorkerId()))) {
-          response = worker.getResponse(request.getRequestId());
-        } catch (InterruptedException e) {
-          if (worker.isSandboxed()) {
-            // Sandboxed workers can safely finish their work async.
-            finishWorkAsync(
+      if (workerOptions.workerAsResource) {
+        // Worker doesn't automatically return to pool after closing of the handle.
+        try (ResourceHandleWithWorker handle =
+            resourceManager.acquireWorkerResources(
+                owner,
+                spawn.getLocalResources(),
                 key,
-                worker,
-                request,
-                workerOptions.workerCancellation && Spawns.supportsWorkerCancellation(spawn));
-            worker = null;
-          } else if (!context.speculating()) {
-            // Non-sandboxed workers interrupted outside of dynamic execution can only mean that
-            // the user interrupted the build, and we don't want to delay finishing. Instead we
-            // kill the worker.
-            // Technically, workers are always sandboxed under dynamic execution, at least for now.
-            try {
-              workers.invalidateObject(key, worker);
-            } catch (IOException e1) {
-              // Nothing useful we can do here, in fact it may not be possible to get here.
-            } finally {
-              worker = null;
-            }
-          }
-          throw e;
+                context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) {
+          workerOwner.setWorker(handle.getWorker());
+          workerOwner.getWorker().setReporter(workerOptions.workerVerbose ? reporter : null);
+          request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key);
+
+          // We acquired a worker and resources -- mark that as queuing time.
+          spawnMetrics.setQueueTime(queueStopwatch.elapsed());
+          response =
+              executeRequest(
+                  spawn, context, inputFiles, outputs, workerOwner, key, request, spawnMetrics);
         } catch (IOException e) {
           restoreInterrupt(e);
-          // If protobuf or json reader couldn't parse the response, try to print whatever the
-          // failing worker wrote to stdout - it's probably a stack trace or some kind of error
-          // message that will help the user figure out why the compiler is failing.
-          String recordingStreamMessage = worker.getRecordingStreamMessage();
-          if (recordingStreamMessage.isEmpty()) {
-            throw createEmptyResponseException(worker.getLogFile());
-          } else {
-            throw createUnparsableResponseException(recordingStreamMessage, worker.getLogFile(), e);
-          }
+          String message = "IOException while borrowing a worker from the pool:";
+          throw createUserExecException(e, message, Code.BORROW_FAILURE);
         }
-        spawnMetrics.setExecutionWallTime(executionStopwatch.elapsed());
+      } else {
+        try (SilentCloseable c =
+            Profiler.instance().profile(ProfilerTask.WORKER_BORROW, "Waiting to borrow worker")) {
+          workerOwner.setWorker(workers.borrowObject(key));
+          workerOwner.getWorker().setReporter(workerOptions.workerVerbose ? reporter : null);
+          request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key);
+        } catch (IOException e) {
+          restoreInterrupt(e);
+          String message = "IOException while borrowing a worker from the pool:";
+          throw createUserExecException(e, message, Code.BORROW_FAILURE);
+        }
+
+        try (ResourceHandle handle =
+            resourceManager.acquireResources(
+                owner,
+                spawn.getLocalResources(),
+                context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) {
+          // We acquired a worker and resources -- mark that as queuing time.
+          spawnMetrics.setQueueTime(queueStopwatch.elapsed());
+          response =
+              executeRequest(
+                  spawn, context, inputFiles, outputs, workerOwner, key, request, spawnMetrics);
+        }
       }
 
       if (response == null) {
-        throw createEmptyResponseException(worker.getLogFile());
+        throw createEmptyResponseException(workerOwner.getWorker().getLogFile());
       }
 
       if (response.getWasCancelled()) {
@@ -501,38 +448,39 @@
           Profiler.instance()
               .profile(
                   ProfilerTask.WORKER_COPYING_OUTPUTS,
-                  String.format("Worker #%d copying output files", worker.getWorkerId()))) {
+                  String.format(
+                      "Worker #%d copying output files", workerOwner.getWorker().getWorkerId()))) {
         Stopwatch processOutputsStopwatch = Stopwatch.createStarted();
         context.lockOutputFiles(response.getExitCode(), response.getOutput(), null);
-        worker.finishExecution(execRoot, outputs);
+        workerOwner.getWorker().finishExecution(execRoot, outputs);
         spawnMetrics.setProcessOutputsTime(processOutputsStopwatch.elapsed());
       } catch (IOException e) {
         restoreInterrupt(e);
         String message =
             ErrorMessage.builder()
                 .message("IOException while finishing worker execution:")
-                .logFile(worker.getLogFile())
+                .logFile(workerOwner.getWorker().getLogFile())
                 .exception(e)
                 .build()
                 .toString();
         throw createUserExecException(message, Code.FINISH_FAILURE);
       }
     } catch (UserExecException e) {
-      if (worker != null) {
+      if (workerOwner.getWorker() != null) {
         try {
-          workers.invalidateObject(key, worker);
+          workers.invalidateObject(key, workerOwner.getWorker());
         } catch (IOException e1) {
           // The original exception is more important / helpful, so we'll just ignore this one.
           restoreInterrupt(e1);
         } finally {
-          worker = null;
+          workerOwner.setWorker(null);
         }
       }
 
       throw e;
     } finally {
-      if (worker != null) {
-        workers.returnObject(key, worker);
+      if (workerOwner.getWorker() != null) {
+        workers.returnObject(key, workerOwner.getWorker());
       }
     }
 
@@ -540,6 +488,110 @@
   }
 
   /**
+   * Executes worker request in worker, waits until the response is ready. Worker and resources
+   * should be allocated before call.
+   */
+  private WorkResponse executeRequest(
+      Spawn spawn,
+      SpawnExecutionContext context,
+      SandboxInputs inputFiles,
+      SandboxOutputs outputs,
+      WorkerOwner workerOwner,
+      WorkerKey key,
+      WorkRequest request,
+      SpawnMetrics.Builder spawnMetrics)
+      throws ExecException, InterruptedException {
+    WorkResponse response;
+    context.report(SpawnExecutingEvent.create(key.getWorkerTypeName()));
+    Worker worker = workerOwner.getWorker();
+
+    try (SilentCloseable c =
+        Profiler.instance()
+            .profile(
+                ProfilerTask.WORKER_SETUP,
+                String.format("Worker #%d preparing execution", worker.getWorkerId()))) {
+      // We consider `prepareExecution` to be also part of setup.
+      Stopwatch prepareExecutionStopwatch = Stopwatch.createStarted();
+      worker.prepareExecution(inputFiles, outputs, key.getWorkerFilesWithDigests().keySet());
+      initializeMetricsSet(key, worker);
+      spawnMetrics.addSetupTime(prepareExecutionStopwatch.elapsed());
+    } catch (IOException e) {
+      restoreInterrupt(e);
+      String message =
+          ErrorMessage.builder()
+              .message("IOException while preparing the execution environment of a worker:")
+              .logFile(worker.getLogFile())
+              .exception(e)
+              .build()
+              .toString();
+      throw createUserExecException(message, Code.PREPARE_FAILURE);
+    }
+
+    Stopwatch executionStopwatch = Stopwatch.createStarted();
+    try {
+      worker.putRequest(request);
+    } catch (IOException e) {
+      restoreInterrupt(e);
+      String message =
+          ErrorMessage.builder()
+              .message(
+                  "Worker process quit or closed its stdin stream when we tried to send a"
+                      + " WorkRequest:")
+              .logFile(worker.getLogFile())
+              .exception(e)
+              .build()
+              .toString();
+      throw createUserExecException(message, Code.REQUEST_FAILURE);
+    }
+
+    try (SilentCloseable c =
+        Profiler.instance()
+            .profile(
+                ProfilerTask.WORKER_WORKING,
+                String.format("Worker #%d working", worker.getWorkerId()))) {
+      response = worker.getResponse(request.getRequestId());
+    } catch (InterruptedException e) {
+      if (worker.isSandboxed()) {
+        // Sandboxed workers can safely finish their work async.
+        finishWorkAsync(
+            key,
+            worker,
+            request,
+            workerOptions.workerCancellation && Spawns.supportsWorkerCancellation(spawn));
+        workerOwner.setWorker(null);
+      } else if (!context.speculating()) {
+        // Non-sandboxed workers interrupted outside of dynamic execution can only mean that
+        // the user interrupted the build, and we don't want to delay finishing. Instead we
+        // kill the worker.
+        // Technically, workers are always sandboxed under dynamic execution, at least for now.
+        try {
+          workers.invalidateObject(key, workerOwner.getWorker());
+        } catch (IOException e1) {
+          // Nothing useful we can do here, in fact it may not be possible to get here.
+        } finally {
+          workerOwner.setWorker(null);
+        }
+      }
+      throw e;
+    } catch (IOException e) {
+      restoreInterrupt(e);
+      // If protobuf or json reader couldn't parse the response, try to print whatever the
+      // failing worker wrote to stdout - it's probably a stack trace or some kind of error
+      // message that will help the user figure out why the compiler is failing.
+      String recordingStreamMessage = worker.getRecordingStreamMessage();
+      if (recordingStreamMessage.isEmpty()) {
+        throw createEmptyResponseException(worker.getLogFile());
+      } else {
+        throw createUnparsableResponseException(recordingStreamMessage, worker.getLogFile(), e);
+      }
+    }
+
+    spawnMetrics.setExecutionWallTime(executionStopwatch.elapsed());
+
+    return response;
+  }
+
+  /**
    * Initializes metricsSet for workers. If worker metrics already exists for this worker, does
    * nothing
    */
@@ -668,6 +720,22 @@
     reaper.start();
   }
 
+  /**
+   * The structure helps to pass the worker's ownership from one function to another. If worker is
+   * set to null, then the ownership is taken by another function. E.g. used in finishWorkAsync.
+   */
+  private static class WorkerOwner {
+    Worker worker;
+
+    public void setWorker(Worker worker) {
+      this.worker = worker;
+    }
+
+    public Worker getWorker() {
+      return worker;
+    }
+  }
+
   private static void restoreInterrupt(IOException e) {
     if (e instanceof InterruptedIOException) {
       Thread.currentThread().interrupt();
diff --git a/src/test/java/com/google/devtools/build/lib/actions/BUILD b/src/test/java/com/google/devtools/build/lib/actions/BUILD
index b2de3ed..08fc0a1 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/actions/BUILD
@@ -38,6 +38,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
         "//src/main/java/com/google/devtools/build/lib/actions:middleman_type",
         "//src/main/java/com/google/devtools/build/lib/actions:package_roots",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver",
         "//src/main/java/com/google/devtools/build/lib/analysis:actions/custom_command_line",
         "//src/main/java/com/google/devtools/build/lib/analysis:actions/symlink_action",
diff --git a/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD b/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD
index 374a3f1..1e5b735 100644
--- a/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD
@@ -83,7 +83,7 @@
         "//src/main/java/com/google/devtools/build/lib/util/io:out-err",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
-        "//src/main/java/com/google/devtools/build/lib/worker",
+        "//src/main/java/com/google/devtools/build/lib/worker:worker_module",
         "//src/main/java/com/google/devtools/common/options",
         "//src/main/java/com/google/devtools/common/options:invocation_policy",
         "//src/main/protobuf:failure_details_java_proto",
diff --git a/src/test/java/com/google/devtools/build/lib/exec/local/BUILD b/src/test/java/com/google/devtools/build/lib/exec/local/BUILD
index 4a053aa..c644af9 100644
--- a/src/test/java/com/google/devtools/build/lib/exec/local/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/exec/local/BUILD
@@ -26,6 +26,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
         "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
         "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
         "//src/main/java/com/google/devtools/build/lib/exec:spawn_input_expander",
diff --git a/src/test/java/com/google/devtools/build/lib/sandbox/BUILD b/src/test/java/com/google/devtools/build/lib/sandbox/BUILD
index 09f9f14..e566edc 100644
--- a/src/test/java/com/google/devtools/build/lib/sandbox/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/sandbox/BUILD
@@ -85,6 +85,7 @@
         "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
         "//src/main/java/com/google/devtools/build/lib/exec:tree_deleter",
         "//src/main/java/com/google/devtools/build/lib/sandbox",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
         "//src/main/java/com/google/devtools/build/lib/util:os",
         "//src/main/java/com/google/devtools/build/lib/util/io",
         "//src/main/java/com/google/devtools/build/lib/vfs",
diff --git a/src/test/java/com/google/devtools/build/lib/skyframe/BUILD b/src/test/java/com/google/devtools/build/lib/skyframe/BUILD
index 4d99669..ea830e81 100644
--- a/src/test/java/com/google/devtools/build/lib/skyframe/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/skyframe/BUILD
@@ -124,6 +124,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
         "//src/main/java/com/google/devtools/build/lib/actions:middleman_type",
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver",
         "//src/main/java/com/google/devtools/build/lib/analysis:actions/custom_command_line",
         "//src/main/java/com/google/devtools/build/lib/analysis:actions/spawn_action_template",
diff --git a/src/test/java/com/google/devtools/build/lib/standalone/BUILD b/src/test/java/com/google/devtools/build/lib/standalone/BUILD
index 1dbf38a..e2569d2 100644
--- a/src/test/java/com/google/devtools/build/lib/standalone/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/standalone/BUILD
@@ -19,6 +19,7 @@
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/actions:artifacts",
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver",
         "//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories",
         "//src/main/java/com/google/devtools/build/lib/analysis:server_directories",
diff --git a/src/test/java/com/google/devtools/build/lib/worker/BUILD b/src/test/java/com/google/devtools/build/lib/worker/BUILD
index 58ab4fd..ced018c 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/worker/BUILD
@@ -43,11 +43,12 @@
         "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
         "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
         "//src/main/java/com/google/devtools/build/lib/collect/nestedset",
-        "//src/main/java/com/google/devtools/build/lib/sandbox",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
         "//src/main/java/com/google/devtools/build/lib/shell",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/com/google/devtools/build/lib/worker",
+        "//src/main/java/com/google/devtools/build/lib/worker:worker_spawn_runner",
         "//src/test/java/com/google/devtools/build/lib/actions/util",
         "//third_party:guava",
     ],
@@ -76,6 +77,7 @@
         "//src/main/java/com/google/devtools/build/lib:runtime",
         "//src/main/java/com/google/devtools/build/lib/actions",
         "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
+        "//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
         "//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories",
         "//src/main/java/com/google/devtools/build/lib/analysis:server_directories",
         "//src/main/java/com/google/devtools/build/lib/clock",
@@ -83,7 +85,7 @@
         "//src/main/java/com/google/devtools/build/lib/events",
         "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
         "//src/main/java/com/google/devtools/build/lib/exec/local",
-        "//src/main/java/com/google/devtools/build/lib/sandbox",
+        "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers",
         "//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception",
         "//src/main/java/com/google/devtools/build/lib/util:os",
         "//src/main/java/com/google/devtools/build/lib/util:resource_converter",
@@ -91,6 +93,8 @@
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
         "//src/main/java/com/google/devtools/build/lib/worker",
+        "//src/main/java/com/google/devtools/build/lib/worker:worker_module",
+        "//src/main/java/com/google/devtools/build/lib/worker:worker_spawn_runner",
         "//src/main/java/com/google/devtools/common/options",
         "//src/main/protobuf:worker_protocol_java_proto",
         "//src/test/java/com/google/devtools/build/lib/testutil:TestUtils",