Move worker allocation to ResourceManager. This a step to make a workers as resource. Now if flag **experimental_worker_as_resource** enabled then workers allocated in ResourceManager. The logic of allocation wasn't changed (still sequential allocation). PiperOrigin-RevId: 427967810
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD index cff07db..1859062 100644 --- a/src/main/java/com/google/devtools/build/lib/BUILD +++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -246,6 +246,7 @@ "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", "//src/main/java/com/google/devtools/build/lib/actions:middleman_type", "//src/main/java/com/google/devtools/build/lib/actions:package_roots", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/analysis:analysis_cluster", "//src/main/java/com/google/devtools/build/lib/analysis:analysis_options", "//src/main/java/com/google/devtools/build/lib/analysis:analysis_phase_complete_event",
diff --git a/src/main/java/com/google/devtools/build/lib/actions/BUILD b/src/main/java/com/google/devtools/build/lib/actions/BUILD index 0f99219..0fe4c4f 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/BUILD +++ b/src/main/java/com/google/devtools/build/lib/actions/BUILD
@@ -53,6 +53,7 @@ "LocalHostResourceManagerDarwin.java", "LocalHostResourceFallback.java", "MiddlemanType.java", + "ResourceManager.java", "ResourceSet.java", "ResourceSetOrBuilder.java", "PackageRootResolver.java", @@ -97,7 +98,6 @@ "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec", "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec:serialization-constant", "//src/main/java/com/google/devtools/build/lib/starlarkbuildapi", - "//src/main/java/com/google/devtools/build/lib/unix:procmeminfo_parser", "//src/main/java/com/google/devtools/build/lib/unsafe:string", "//src/main/java/com/google/devtools/build/lib/util", "//src/main/java/com/google/devtools/build/lib/util:command", @@ -353,3 +353,21 @@ ], deps = ["//src/main/protobuf:failure_details_java_proto"], ) + +java_library( + name = "resource_manager", + srcs = [ + "ResourceManager.java", + ], + deps = [ + ":actions", + ":localhost_capacity", + "//src/main/java/com/google/devtools/build/lib/concurrent", + "//src/main/java/com/google/devtools/build/lib/profiler", + "//src/main/java/com/google/devtools/build/lib/unix:procmeminfo_parser", + "//src/main/java/com/google/devtools/build/lib/util", + "//src/main/java/com/google/devtools/build/lib/util:os", + "//src/main/java/com/google/devtools/build/lib/worker", + "//third_party:guava", + ], +)
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java index 81f1936..145fd8c 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
@@ -24,6 +24,9 @@ import com.google.devtools.build.lib.unix.ProcMeminfoParser; import com.google.devtools.build.lib.util.OS; import com.google.devtools.build.lib.util.Pair; +import com.google.devtools.build.lib.worker.Worker; +import com.google.devtools.build.lib.worker.WorkerKey; +import com.google.devtools.build.lib.worker.WorkerPool; import java.io.IOException; import java.util.Deque; import java.util.Iterator; @@ -86,6 +89,30 @@ } } + /** + * A handle returned by {@link #acquireWorkerResources(ActionExecutionMetadata, ResourceSet, + * WorkerKey, ResourcePriority)} that must be closed in order to free the resources again. + */ + public static class ResourceHandleWithWorker implements AutoCloseable { + final ResourceHandle resourceHandle; + final Worker worker; + + public ResourceHandleWithWorker(ResourceHandle resourceHandle, Worker worker) { + this.resourceHandle = resourceHandle; + this.worker = worker; + } + + public Worker getWorker() { + return worker; + } + + /** Closing the ResourceHandle releases the resources associated with it. */ + @Override + public void close() { + this.resourceHandle.close(); + } + } + private final ThreadLocal<Boolean> threadLocked = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { @@ -138,6 +165,8 @@ private final Deque<Pair<ResourceSet, CountDownLatch>> dynamicStandaloneRequests = new LinkedList<>(); + private WorkerPool workerPool; + // The total amount of resources on the local host. Must be set by // an explicit call to setAvailableResources(), often using // LocalHostCapacity.getLocalHostCapacity() as an argument. @@ -217,12 +246,32 @@ localMemoryEstimate = value; } + /** Sets worker pool for taking the workers. Must be called before requesting the workers. */ + public void setWorkerPool(WorkerPool workerPool) { + this.workerPool = workerPool; + } + /** Sets whether to prioritize local-only actions in resource allocation. */ public void setPrioritizeLocalActions(boolean prioritizeLocalActions) { this.prioritizeLocalActions = prioritizeLocalActions; } /** + * Acuqires requested resource set and worker. Will block if resource is not available. The worker + * isn't released as part of the AutoCloseable. + */ + public ResourceHandleWithWorker acquireWorkerResources( + ActionExecutionMetadata owner, + ResourceSet resources, + WorkerKey workerKey, + ResourcePriority priority) + throws InterruptedException, IOException { + Worker worker = this.workerPool.borrowObject(workerKey); + ResourceHandle handle = acquireResources(owner, resources, priority); + return new ResourceHandleWithWorker(handle, worker); + } + + /** * Acquires requested resource set. Will block if resource is not available. NB! This method must * be thread-safe! */
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BUILD b/src/main/java/com/google/devtools/build/lib/bazel/BUILD index 47e7e4e..30c9c6e 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/BUILD +++ b/src/main/java/com/google/devtools/build/lib/bazel/BUILD
@@ -156,7 +156,7 @@ "//src/main/java/com/google/devtools/build/lib/sandbox", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/starlarkdebug/module", - "//src/main/java/com/google/devtools/build/lib/worker", + "//src/main/java/com/google/devtools/build/lib/worker:worker_module", "//third_party:guava", ], )
diff --git a/src/main/java/com/google/devtools/build/lib/exec/local/BUILD b/src/main/java/com/google/devtools/build/lib/exec/local/BUILD index 41df34a..8e46af7 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/local/BUILD +++ b/src/main/java/com/google/devtools/build/lib/exec/local/BUILD
@@ -25,6 +25,7 @@ "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/exec:bin_tools", "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index a5745bf..7f8d359 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -84,7 +84,7 @@ "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/remote/zstd", - "//src/main/java/com/google/devtools/build/lib/sandbox", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", "//src/main/java/com/google/devtools/build/lib/skyframe:mutable_supplier", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception",
diff --git a/src/main/java/com/google/devtools/build/lib/sandbox/BUILD b/src/main/java/com/google/devtools/build/lib/sandbox/BUILD index 4ceba33..102ff1e 100644 --- a/src/main/java/com/google/devtools/build/lib/sandbox/BUILD +++ b/src/main/java/com/google/devtools/build/lib/sandbox/BUILD
@@ -10,21 +10,24 @@ java_library( name = "sandbox", - srcs = glob(["*.java"]), + srcs = glob( + ["*.java"], + exclude = ["SandboxHelpers.java"], + ), data = [ "//src/main/tools:linux-sandbox", ], deps = [ + ":sandbox_helpers", "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories", - "//src/main/java/com/google/devtools/build/lib/analysis:test/test_configuration", "//src/main/java/com/google/devtools/build/lib/analysis/platform:platform_utils", - "//src/main/java/com/google/devtools/build/lib/cmdline", "//src/main/java/com/google/devtools/build/lib/events", "//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy", "//src/main/java/com/google/devtools/build/lib/exec:bin_tools", @@ -52,7 +55,6 @@ "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/java/com/google/devtools/common/options", "//src/main/protobuf:failure_details_java_proto", - "//third_party:auto_value", "//third_party:flogger", "//third_party:gson", "//third_party:guava", @@ -60,3 +62,20 @@ "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], ) + +java_library( + name = "sandbox_helpers", + srcs = ["SandboxHelpers.java"], + deps = [ + "//src/main/java/com/google/devtools/build/lib/actions", + "//src/main/java/com/google/devtools/build/lib/actions:artifacts", + "//src/main/java/com/google/devtools/build/lib/analysis:test/test_configuration", + "//src/main/java/com/google/devtools/build/lib/cmdline:cmdline-primitives", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", + "//src/main/java/com/google/devtools/common/options", + "//third_party:auto_value", + "//third_party:flogger", + "//third_party:guava", + ], +)
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/BUILD b/src/main/java/com/google/devtools/build/lib/skyframe/BUILD index e1ae9f6..3232a0b 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/BUILD +++ b/src/main/java/com/google/devtools/build/lib/skyframe/BUILD
@@ -229,6 +229,7 @@ "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/actions:fileset_output_symlink", "//src/main/java/com/google/devtools/build/lib/actions:package_roots", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver", "//src/main/java/com/google/devtools/build/lib/analysis:actions/parameter_file_write_action", "//src/main/java/com/google/devtools/build/lib/analysis:actions/substitution",
diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD index 0a3c3ba..38b2d03 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/BUILD +++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD
@@ -10,9 +10,20 @@ java_library( name = "worker", - srcs = glob(["*.java"]), + srcs = glob( + ["*.java"], + exclude = [ + "WorkerSpawnRunner.java", + "WorkerModule.java", + "WorkerParser.java", + "WorkerSpawnStrategy.java", + ], + ), deps = [ - "//src/main/java/com/google/devtools/build/lib:runtime", + # ":worker_spawn_runner", + "//third_party:guava", + "//third_party/protobuf:protobuf_java", + "//third_party/protobuf:protobuf_java_util", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:action_input_helper", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", @@ -20,21 +31,10 @@ "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto", "//src/main/java/com/google/devtools/build/lib/events", - "//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy", - "//src/main/java/com/google/devtools/build/lib/exec:bin_tools", - "//src/main/java/com/google/devtools/build/lib/exec:execution_options", - "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater", - "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", - "//src/main/java/com/google/devtools/build/lib/exec:spawn_strategy_registry", - "//src/main/java/com/google/devtools/build/lib/exec/local", - "//src/main/java/com/google/devtools/build/lib/profiler", - "//src/main/java/com/google/devtools/build/lib/runtime/commands/events", - "//src/main/java/com/google/devtools/build/lib/sandbox", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/util:command", - "//src/main/java/com/google/devtools/build/lib/util:os", "//src/main/java/com/google/devtools/build/lib/util:resource_converter", - "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/java/com/google/devtools/common/options", @@ -44,10 +44,65 @@ "//third_party:auto_value", "//third_party:flogger", "//third_party:gson", - "//third_party:guava", "//third_party:jsr305", + ], +) + +java_library( + name = "worker_spawn_runner", + srcs = [ + "WorkerParser.java", + "WorkerSpawnRunner.java", + ], + deps = [ + ":worker", + "//src/main/java/com/google/devtools/build/lib:runtime", + "//src/main/java/com/google/devtools/build/lib/actions", + "//src/main/java/com/google/devtools/build/lib/actions:action_input_helper", + "//src/main/java/com/google/devtools/build/lib/actions:artifacts", + "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", + "//src/main/java/com/google/devtools/build/lib/events", + "//src/main/java/com/google/devtools/build/lib/exec:bin_tools", + "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater", + "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", + "//src/main/java/com/google/devtools/build/lib/exec/local", + "//src/main/java/com/google/devtools/build/lib/profiler", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", + "//src/main/java/com/google/devtools/build/lib/util:os", + "//src/main/java/com/google/devtools/build/lib/util/io", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", + "//src/main/protobuf:failure_details_java_proto", + "//src/main/protobuf:worker_protocol_java_proto", + "//third_party:flogger", + "//third_party:guava", "//third_party/protobuf:protobuf_java", - "//third_party/protobuf:protobuf_java_util", + ], +) + +java_library( + name = "worker_module", + srcs = [ + "WorkerModule.java", + "WorkerSpawnStrategy.java", + ], + deps = [ + ":worker", + ":worker_spawn_runner", + "//src/main/java/com/google/devtools/build/lib:runtime", + "//src/main/java/com/google/devtools/build/lib/events", + "//src/main/java/com/google/devtools/build/lib/exec:abstract_spawn_strategy", + "//src/main/java/com/google/devtools/build/lib/exec:execution_options", + "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater", + "//src/main/java/com/google/devtools/build/lib/exec:spawn_strategy_registry", + "//src/main/java/com/google/devtools/build/lib/exec/local", + "//src/main/java/com/google/devtools/build/lib/runtime/commands/events", + "//src/main/java/com/google/devtools/build/lib/sandbox", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/common/options", + "//third_party:guava", ], )
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java index 2d804d0..6d55a54 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java
@@ -32,7 +32,7 @@ * <p>We expect a small number of WorkerKeys per mnemonic. Unbounded creation of WorkerKeys will * break various things as well as render the workers less useful. */ -final class WorkerKey { +public final class WorkerKey { /** Build options. */ private final ImmutableList<String> args; /** Environment variables. */
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java index cf40bb3..837589a 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
@@ -55,7 +55,7 @@ public void beforeCommand(CommandEnvironment env) { this.env = env; env.getEventBus().register(this); - WorkerMultiplexerManager.beforeCommand(env); + WorkerMultiplexerManager.beforeCommand(env.getReporter()); } @Subscribe
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java index d34e84c..d1cedd1 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java
@@ -17,7 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.events.EventHandler; -import com.google.devtools.build.lib.runtime.CommandEnvironment; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.server.FailureDetails; import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; import com.google.devtools.build.lib.server.FailureDetails.Worker.Code; @@ -53,8 +53,8 @@ return instanceInfo.getWorkerMultiplexer(); } - static void beforeCommand(CommandEnvironment env) { - setReporter(env.getReporter()); + static void beforeCommand(Reporter reporter) { + setReporter(reporter); } static void afterCommand() {
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java index 70d5c78..1780854 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
@@ -188,6 +188,14 @@ public boolean workerCancellation; @Option( + name = "experimental_worker_as_resource", + defaultValue = "false", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.EXECUTION}, + help = "If enabled, workers are acquired as resources from ResourceManager.") + public boolean workerAsResource; + + @Option( name = "experimental_worker_multiplex_sandboxing", defaultValue = "false", documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java index 7bb8f64..c8d0b83 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
@@ -39,7 +39,7 @@ * requests, but do so through WorkerProxy instances. */ @ThreadSafe -final class WorkerPool { +public final class WorkerPool { /** Unless otherwise specified, the max number of workers per WorkerKey. */ private static final int DEFAULT_MAX_WORKERS = 4; /** Unless otherwise specified, the max number of multiplex workers per WorkerKey. */
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index bb65704..597bf85 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
@@ -33,6 +33,7 @@ import com.google.devtools.build.lib.actions.MetadataProvider; import com.google.devtools.build.lib.actions.ResourceManager; import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle; +import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandleWithWorker; import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority; import com.google.devtools.build.lib.actions.Spawn; import com.google.devtools.build.lib.actions.SpawnExecutedEvent; @@ -138,6 +139,7 @@ this.workerParser = new WorkerParser(execRoot, workerOptions, localEnvProvider, binTools); this.workerOptions = workerOptions; this.runtime = runtime; + this.resourceManager.setWorkerPool(workers); eventBus.register(this); } @@ -354,7 +356,7 @@ MetadataProvider inputFileCache, SpawnMetrics.Builder spawnMetrics) throws InterruptedException, ExecException { - Worker worker = null; + WorkerOwner workerOwner = new WorkerOwner(); WorkResponse response; WorkRequest request; ActionExecutionMetadata owner = spawn.getResourceOwner(); @@ -382,113 +384,58 @@ } } Duration setupInputsTime = setupInputsStopwatch.elapsed(); + spawnMetrics.setSetupTime(setupInputsTime); Stopwatch queueStopwatch = Stopwatch.createStarted(); - try (SilentCloseable c = - Profiler.instance().profile(ProfilerTask.WORKER_BORROW, "Waiting to borrow worker")) { - worker = workers.borrowObject(key); - worker.setReporter(workerOptions.workerVerbose ? reporter : null); - request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key); - } catch (IOException e) { - restoreInterrupt(e); - String message = "IOException while borrowing a worker from the pool:"; - throw createUserExecException(e, message, Code.BORROW_FAILURE); - } - - try (ResourceHandle handle = - resourceManager.acquireResources( - owner, - spawn.getLocalResources(), - context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) { - // We acquired a worker and resources -- mark that as queuing time. - spawnMetrics.setQueueTime(queueStopwatch.elapsed()); - - context.report(SpawnExecutingEvent.create(key.getWorkerTypeName())); - try (SilentCloseable c = - Profiler.instance() - .profile( - ProfilerTask.WORKER_SETUP, - String.format("Worker #%d preparing execution", worker.getWorkerId()))) { - // We consider `prepareExecution` to be also part of setup. - Stopwatch prepareExecutionStopwatch = Stopwatch.createStarted(); - worker.prepareExecution(inputFiles, outputs, key.getWorkerFilesWithDigests().keySet()); - initializeMetricsSet(key, worker); - spawnMetrics.setSetupTime(setupInputsTime.plus(prepareExecutionStopwatch.elapsed())); - } catch (IOException e) { - restoreInterrupt(e); - String message = - ErrorMessage.builder() - .message("IOException while preparing the execution environment of a worker:") - .logFile(worker.getLogFile()) - .exception(e) - .build() - .toString(); - throw createUserExecException(message, Code.PREPARE_FAILURE); - } - - Stopwatch executionStopwatch = Stopwatch.createStarted(); - try { - worker.putRequest(request); - } catch (IOException e) { - restoreInterrupt(e); - String message = - ErrorMessage.builder() - .message( - "Worker process quit or closed its stdin stream when we tried to send a" - + " WorkRequest:") - .logFile(worker.getLogFile()) - .exception(e) - .build() - .toString(); - throw createUserExecException(message, Code.REQUEST_FAILURE); - } - - try (SilentCloseable c = - Profiler.instance() - .profile( - ProfilerTask.WORKER_WORKING, - String.format("Worker #%d working", worker.getWorkerId()))) { - response = worker.getResponse(request.getRequestId()); - } catch (InterruptedException e) { - if (worker.isSandboxed()) { - // Sandboxed workers can safely finish their work async. - finishWorkAsync( + if (workerOptions.workerAsResource) { + // Worker doesn't automatically return to pool after closing of the handle. + try (ResourceHandleWithWorker handle = + resourceManager.acquireWorkerResources( + owner, + spawn.getLocalResources(), key, - worker, - request, - workerOptions.workerCancellation && Spawns.supportsWorkerCancellation(spawn)); - worker = null; - } else if (!context.speculating()) { - // Non-sandboxed workers interrupted outside of dynamic execution can only mean that - // the user interrupted the build, and we don't want to delay finishing. Instead we - // kill the worker. - // Technically, workers are always sandboxed under dynamic execution, at least for now. - try { - workers.invalidateObject(key, worker); - } catch (IOException e1) { - // Nothing useful we can do here, in fact it may not be possible to get here. - } finally { - worker = null; - } - } - throw e; + context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) { + workerOwner.setWorker(handle.getWorker()); + workerOwner.getWorker().setReporter(workerOptions.workerVerbose ? reporter : null); + request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key); + + // We acquired a worker and resources -- mark that as queuing time. + spawnMetrics.setQueueTime(queueStopwatch.elapsed()); + response = + executeRequest( + spawn, context, inputFiles, outputs, workerOwner, key, request, spawnMetrics); } catch (IOException e) { restoreInterrupt(e); - // If protobuf or json reader couldn't parse the response, try to print whatever the - // failing worker wrote to stdout - it's probably a stack trace or some kind of error - // message that will help the user figure out why the compiler is failing. - String recordingStreamMessage = worker.getRecordingStreamMessage(); - if (recordingStreamMessage.isEmpty()) { - throw createEmptyResponseException(worker.getLogFile()); - } else { - throw createUnparsableResponseException(recordingStreamMessage, worker.getLogFile(), e); - } + String message = "IOException while borrowing a worker from the pool:"; + throw createUserExecException(e, message, Code.BORROW_FAILURE); } - spawnMetrics.setExecutionWallTime(executionStopwatch.elapsed()); + } else { + try (SilentCloseable c = + Profiler.instance().profile(ProfilerTask.WORKER_BORROW, "Waiting to borrow worker")) { + workerOwner.setWorker(workers.borrowObject(key)); + workerOwner.getWorker().setReporter(workerOptions.workerVerbose ? reporter : null); + request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key); + } catch (IOException e) { + restoreInterrupt(e); + String message = "IOException while borrowing a worker from the pool:"; + throw createUserExecException(e, message, Code.BORROW_FAILURE); + } + + try (ResourceHandle handle = + resourceManager.acquireResources( + owner, + spawn.getLocalResources(), + context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) { + // We acquired a worker and resources -- mark that as queuing time. + spawnMetrics.setQueueTime(queueStopwatch.elapsed()); + response = + executeRequest( + spawn, context, inputFiles, outputs, workerOwner, key, request, spawnMetrics); + } } if (response == null) { - throw createEmptyResponseException(worker.getLogFile()); + throw createEmptyResponseException(workerOwner.getWorker().getLogFile()); } if (response.getWasCancelled()) { @@ -501,38 +448,39 @@ Profiler.instance() .profile( ProfilerTask.WORKER_COPYING_OUTPUTS, - String.format("Worker #%d copying output files", worker.getWorkerId()))) { + String.format( + "Worker #%d copying output files", workerOwner.getWorker().getWorkerId()))) { Stopwatch processOutputsStopwatch = Stopwatch.createStarted(); context.lockOutputFiles(response.getExitCode(), response.getOutput(), null); - worker.finishExecution(execRoot, outputs); + workerOwner.getWorker().finishExecution(execRoot, outputs); spawnMetrics.setProcessOutputsTime(processOutputsStopwatch.elapsed()); } catch (IOException e) { restoreInterrupt(e); String message = ErrorMessage.builder() .message("IOException while finishing worker execution:") - .logFile(worker.getLogFile()) + .logFile(workerOwner.getWorker().getLogFile()) .exception(e) .build() .toString(); throw createUserExecException(message, Code.FINISH_FAILURE); } } catch (UserExecException e) { - if (worker != null) { + if (workerOwner.getWorker() != null) { try { - workers.invalidateObject(key, worker); + workers.invalidateObject(key, workerOwner.getWorker()); } catch (IOException e1) { // The original exception is more important / helpful, so we'll just ignore this one. restoreInterrupt(e1); } finally { - worker = null; + workerOwner.setWorker(null); } } throw e; } finally { - if (worker != null) { - workers.returnObject(key, worker); + if (workerOwner.getWorker() != null) { + workers.returnObject(key, workerOwner.getWorker()); } } @@ -540,6 +488,110 @@ } /** + * Executes worker request in worker, waits until the response is ready. Worker and resources + * should be allocated before call. + */ + private WorkResponse executeRequest( + Spawn spawn, + SpawnExecutionContext context, + SandboxInputs inputFiles, + SandboxOutputs outputs, + WorkerOwner workerOwner, + WorkerKey key, + WorkRequest request, + SpawnMetrics.Builder spawnMetrics) + throws ExecException, InterruptedException { + WorkResponse response; + context.report(SpawnExecutingEvent.create(key.getWorkerTypeName())); + Worker worker = workerOwner.getWorker(); + + try (SilentCloseable c = + Profiler.instance() + .profile( + ProfilerTask.WORKER_SETUP, + String.format("Worker #%d preparing execution", worker.getWorkerId()))) { + // We consider `prepareExecution` to be also part of setup. + Stopwatch prepareExecutionStopwatch = Stopwatch.createStarted(); + worker.prepareExecution(inputFiles, outputs, key.getWorkerFilesWithDigests().keySet()); + initializeMetricsSet(key, worker); + spawnMetrics.addSetupTime(prepareExecutionStopwatch.elapsed()); + } catch (IOException e) { + restoreInterrupt(e); + String message = + ErrorMessage.builder() + .message("IOException while preparing the execution environment of a worker:") + .logFile(worker.getLogFile()) + .exception(e) + .build() + .toString(); + throw createUserExecException(message, Code.PREPARE_FAILURE); + } + + Stopwatch executionStopwatch = Stopwatch.createStarted(); + try { + worker.putRequest(request); + } catch (IOException e) { + restoreInterrupt(e); + String message = + ErrorMessage.builder() + .message( + "Worker process quit or closed its stdin stream when we tried to send a" + + " WorkRequest:") + .logFile(worker.getLogFile()) + .exception(e) + .build() + .toString(); + throw createUserExecException(message, Code.REQUEST_FAILURE); + } + + try (SilentCloseable c = + Profiler.instance() + .profile( + ProfilerTask.WORKER_WORKING, + String.format("Worker #%d working", worker.getWorkerId()))) { + response = worker.getResponse(request.getRequestId()); + } catch (InterruptedException e) { + if (worker.isSandboxed()) { + // Sandboxed workers can safely finish their work async. + finishWorkAsync( + key, + worker, + request, + workerOptions.workerCancellation && Spawns.supportsWorkerCancellation(spawn)); + workerOwner.setWorker(null); + } else if (!context.speculating()) { + // Non-sandboxed workers interrupted outside of dynamic execution can only mean that + // the user interrupted the build, and we don't want to delay finishing. Instead we + // kill the worker. + // Technically, workers are always sandboxed under dynamic execution, at least for now. + try { + workers.invalidateObject(key, workerOwner.getWorker()); + } catch (IOException e1) { + // Nothing useful we can do here, in fact it may not be possible to get here. + } finally { + workerOwner.setWorker(null); + } + } + throw e; + } catch (IOException e) { + restoreInterrupt(e); + // If protobuf or json reader couldn't parse the response, try to print whatever the + // failing worker wrote to stdout - it's probably a stack trace or some kind of error + // message that will help the user figure out why the compiler is failing. + String recordingStreamMessage = worker.getRecordingStreamMessage(); + if (recordingStreamMessage.isEmpty()) { + throw createEmptyResponseException(worker.getLogFile()); + } else { + throw createUnparsableResponseException(recordingStreamMessage, worker.getLogFile(), e); + } + } + + spawnMetrics.setExecutionWallTime(executionStopwatch.elapsed()); + + return response; + } + + /** * Initializes metricsSet for workers. If worker metrics already exists for this worker, does * nothing */ @@ -668,6 +720,22 @@ reaper.start(); } + /** + * The structure helps to pass the worker's ownership from one function to another. If worker is + * set to null, then the ownership is taken by another function. E.g. used in finishWorkAsync. + */ + private static class WorkerOwner { + Worker worker; + + public void setWorker(Worker worker) { + this.worker = worker; + } + + public Worker getWorker() { + return worker; + } + } + private static void restoreInterrupt(IOException e) { if (e instanceof InterruptedIOException) { Thread.currentThread().interrupt();
diff --git a/src/test/java/com/google/devtools/build/lib/actions/BUILD b/src/test/java/com/google/devtools/build/lib/actions/BUILD index b2de3ed..08fc0a1 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/BUILD +++ b/src/test/java/com/google/devtools/build/lib/actions/BUILD
@@ -38,6 +38,7 @@ "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", "//src/main/java/com/google/devtools/build/lib/actions:middleman_type", "//src/main/java/com/google/devtools/build/lib/actions:package_roots", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver", "//src/main/java/com/google/devtools/build/lib/analysis:actions/custom_command_line", "//src/main/java/com/google/devtools/build/lib/analysis:actions/symlink_action",
diff --git a/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD b/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD index 374a3f1..1e5b735 100644 --- a/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD +++ b/src/test/java/com/google/devtools/build/lib/buildtool/util/BUILD
@@ -83,7 +83,7 @@ "//src/main/java/com/google/devtools/build/lib/util/io:out-err", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", - "//src/main/java/com/google/devtools/build/lib/worker", + "//src/main/java/com/google/devtools/build/lib/worker:worker_module", "//src/main/java/com/google/devtools/common/options", "//src/main/java/com/google/devtools/common/options:invocation_policy", "//src/main/protobuf:failure_details_java_proto",
diff --git a/src/test/java/com/google/devtools/build/lib/exec/local/BUILD b/src/test/java/com/google/devtools/build/lib/exec/local/BUILD index 4a053aa..c644af9 100644 --- a/src/test/java/com/google/devtools/build/lib/exec/local/BUILD +++ b/src/test/java/com/google/devtools/build/lib/exec/local/BUILD
@@ -26,6 +26,7 @@ "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/exec:bin_tools", "//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater", "//src/main/java/com/google/devtools/build/lib/exec:spawn_input_expander",
diff --git a/src/test/java/com/google/devtools/build/lib/sandbox/BUILD b/src/test/java/com/google/devtools/build/lib/sandbox/BUILD index 09f9f14..e566edc 100644 --- a/src/test/java/com/google/devtools/build/lib/sandbox/BUILD +++ b/src/test/java/com/google/devtools/build/lib/sandbox/BUILD
@@ -85,6 +85,7 @@ "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", "//src/main/java/com/google/devtools/build/lib/exec:tree_deleter", "//src/main/java/com/google/devtools/build/lib/sandbox", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", "//src/main/java/com/google/devtools/build/lib/util:os", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/java/com/google/devtools/build/lib/vfs",
diff --git a/src/test/java/com/google/devtools/build/lib/skyframe/BUILD b/src/test/java/com/google/devtools/build/lib/skyframe/BUILD index 4d99669..ea830e81 100644 --- a/src/test/java/com/google/devtools/build/lib/skyframe/BUILD +++ b/src/test/java/com/google/devtools/build/lib/skyframe/BUILD
@@ -124,6 +124,7 @@ "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/actions:middleman_type", "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver", "//src/main/java/com/google/devtools/build/lib/analysis:actions/custom_command_line", "//src/main/java/com/google/devtools/build/lib/analysis:actions/spawn_action_template",
diff --git a/src/test/java/com/google/devtools/build/lib/standalone/BUILD b/src/test/java/com/google/devtools/build/lib/standalone/BUILD index 1dbf38a..e2569d2 100644 --- a/src/test/java/com/google/devtools/build/lib/standalone/BUILD +++ b/src/test/java/com/google/devtools/build/lib/standalone/BUILD
@@ -19,6 +19,7 @@ "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/actions:thread_state_receiver", "//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories", "//src/main/java/com/google/devtools/build/lib/analysis:server_directories",
diff --git a/src/test/java/com/google/devtools/build/lib/worker/BUILD b/src/test/java/com/google/devtools/build/lib/worker/BUILD index 58ab4fd..ced018c 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/BUILD +++ b/src/test/java/com/google/devtools/build/lib/worker/BUILD
@@ -43,11 +43,12 @@ "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", "//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity", "//src/main/java/com/google/devtools/build/lib/collect/nestedset", - "//src/main/java/com/google/devtools/build/lib/sandbox", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/java/com/google/devtools/build/lib/worker", + "//src/main/java/com/google/devtools/build/lib/worker:worker_spawn_runner", "//src/test/java/com/google/devtools/build/lib/actions/util", "//third_party:guava", ], @@ -76,6 +77,7 @@ "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", + "//src/main/java/com/google/devtools/build/lib/actions:resource_manager", "//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories", "//src/main/java/com/google/devtools/build/lib/analysis:server_directories", "//src/main/java/com/google/devtools/build/lib/clock", @@ -83,7 +85,7 @@ "//src/main/java/com/google/devtools/build/lib/events", "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", "//src/main/java/com/google/devtools/build/lib/exec/local", - "//src/main/java/com/google/devtools/build/lib/sandbox", + "//src/main/java/com/google/devtools/build/lib/sandbox:sandbox_helpers", "//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception", "//src/main/java/com/google/devtools/build/lib/util:os", "//src/main/java/com/google/devtools/build/lib/util:resource_converter", @@ -91,6 +93,8 @@ "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", "//src/main/java/com/google/devtools/build/lib/worker", + "//src/main/java/com/google/devtools/build/lib/worker:worker_module", + "//src/main/java/com/google/devtools/build/lib/worker:worker_spawn_runner", "//src/main/java/com/google/devtools/common/options", "//src/main/protobuf:worker_protocol_java_proto", "//src/test/java/com/google/devtools/build/lib/testutil:TestUtils",