Decouple Skyframe worker thread stuff from repo fetching, part 2
Following up to https://github.com/bazelbuild/bazel/commit/d24f94734266c4331ba8324a470d60ae68c26edc, this CL consolidates the API surface of using worker threads down to one class and one method, `WorkerSkyKeyComputeState.startOrContinueWork`. See Javadoc for that method for more information.
Also took this chance to move these two "Worker"-related classes to jcg/devtools/build/skyframe, as they're now completely decoupled from repo fetching.
Work towards https://github.com/bazelbuild/bazel/issues/22729
PiperOrigin-RevId: 669390115
Change-Id: I9f9a743c9f1a92bc65af8bccb98c53bc22439204
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java
index 0096a0d..967afd0 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java
@@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.RuleDefinition;
import com.google.devtools.build.lib.bazel.bzlmod.NonRegistryOverride;
@@ -58,6 +57,7 @@
import com.google.devtools.build.skyframe.SkyFunction.Environment;
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
import com.google.devtools.build.skyframe.SkyKey;
+import com.google.devtools.build.skyframe.WorkerSkyKeyComputeState;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -141,23 +141,14 @@
// See below (the `catch CancellationException` clause) for why there's a `while` loop here.
while (true) {
var state = env.getState(WorkerSkyKeyComputeState<FetchResult>::new);
- ListenableFuture<FetchResult> workerFuture =
- state.getOrStartWorker(
- "starlark-repository-" + rule.getName(),
- () -> {
- Environment workerEnv = new WorkerSkyFunctionEnvironment(state);
- setupRepoRoot(outputDirectory);
- return fetchInternal(args.toWorkerArgs(workerEnv));
- });
try {
- state.delegateEnvQueue.put(env);
- state.signalSemaphore.acquire();
- if (!workerFuture.isDone()) {
- // This means that the worker is still running, and expecting a fresh Environment. Return
- // null to trigger a Skyframe restart, but *don't* shut down the worker executor.
- return null;
- }
- return workerFuture.get();
+ return state.startOrContinueWork(
+ env,
+ "starlark-repository-" + rule.getName(),
+ (workerEnv) -> {
+ setupRepoRoot(outputDirectory);
+ return fetchInternal(args.toWorkerArgs(workerEnv));
+ });
} catch (ExecutionException e) {
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
Throwables.throwIfUnchecked(e.getCause());
@@ -172,16 +163,6 @@
RepositoryFetchProgress.ongoing(
RepositoryName.createUnvalidated(rule.getName()),
"fetch interrupted due to memory pressure; restarting."));
- } finally {
- if (workerFuture.isDone()) {
- // Unless we know the worker is waiting on a fresh Environment, we should *always* shut
- // down the worker executor by the time we finish executing (successfully or otherwise).
- // This ensures that 1) no background work happens without our knowledge, and 2) if the
- // SkyFunction is re-entered for any reason (for example b/330892334 and
- // https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new
- // worker from scratch.
- state.close();
- }
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/WorkerSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/WorkerSkyKeyComputeState.java
deleted file mode 100644
index d1bcf99..0000000
--- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/WorkerSkyKeyComputeState.java
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright 2023 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.bazel.repository.starlark;
-
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.skyframe.SkyFunction;
-import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * A {@link SkyKeyComputeState} that manages a non-Skyframe virtual worker thread that persists
- * across different invocations of a SkyFunction.
- *
- * <p>The worker thread uses a {@link SkyFunction.Environment} object acquired from the host thread.
- * When a new Skyframe dependency is needed, the worker thread itself does not need to restart;
- * instead, it can signal the host thread to restart to get a fresh Environment object.
- *
- * <p>Similar to other implementations of {@link SkyKeyComputeState}, this avoids redoing expensive
- * work when a new Skyframe dependency is needed; but because it holds on to an entire worker
- * thread, this class is more suited to cases where the intermediate result of expensive work cannot
- * be easily serialized (in particular, if there's an ongoing Starlark evaluation, as is the case in
- * repo fetching).
- */
-class WorkerSkyKeyComputeState<T> implements SkyKeyComputeState {
-
- /**
- * A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
- * (successfully or otherwise), or to indicate that the host thread should return {@code null},
- * causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
- * delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
- * over.
- */
- // A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
- // be interrupted.
- final Semaphore signalSemaphore = new Semaphore(0);
-
- /**
- * The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
- * back to the worker thread.
- */
- // We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
- // gets interrupted before the host thread restarts, the host thread doesn't hang forever.
- final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new ArrayBlockingQueue<>(1);
-
- /**
- * This future holds on to the worker thread in order to cancel it when necessary; it also serves
- * to tell whether a worker thread is already running.
- */
- @GuardedBy("this")
- @Nullable
- private ListenableFuture<T> workerFuture = null;
-
- /** The executor service that manages the worker thread. */
- // We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
- // sure the worker thread has shut down (with its blocking `close()` method).
- @GuardedBy("this")
- @Nullable
- private ListeningExecutorService workerExecutorService = null;
-
- /**
- * Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
- * back. This may only be called from the worker thread.
- */
- SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
- signalSemaphore.release();
- return delegateEnvQueue.take();
- }
-
- /**
- * Returns the worker future, or if a worker is not already running, starts a worker thread
- * running the given callable. This makes sure to release a permit on the {@code signalSemaphore}
- * when the worker finishes, successfully or otherwise. This may only be called from the host
- * Skyframe thread.
- */
- synchronized ListenableFuture<T> getOrStartWorker(String workerThreadName, Callable<T> c) {
- if (workerFuture != null) {
- return workerFuture;
- }
- // We reset the state object back to its very initial state, since the host SkyFunction may have
- // been re-entered (for example b/330892334 and
- // https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have
- // been interrupted while the host SkyFunction was inactive.
- workerExecutorService =
- MoreExecutors.listeningDecorator(
- Executors.newThreadPerTaskExecutor(
- Thread.ofVirtual().name(workerThreadName).factory()));
- signalSemaphore.drainPermits();
- delegateEnvQueue.clear();
-
- // Start the worker.
- workerFuture = workerExecutorService.submit(c);
- workerFuture.addListener(signalSemaphore::release, directExecutor());
- return workerFuture;
- }
-
- /**
- * Closes the state object, and blocks until all pending async work is finished. The state object
- * will reset to a clean slate after this method finishes.
- */
- // This may be called from any thread, including the host Skyframe thread and the
- // high-memory-pressure listener thread.
- @Override
- public synchronized void close() {
- if (workerFuture != null) {
- workerFuture.cancel(true);
- }
- workerFuture = null;
- if (workerExecutorService != null) {
- workerExecutorService.close(); // This blocks
- }
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/WorkerSkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/skyframe/WorkerSkyFunctionEnvironment.java
similarity index 88%
rename from src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/WorkerSkyFunctionEnvironment.java
rename to src/main/java/com/google/devtools/build/skyframe/WorkerSkyFunctionEnvironment.java
index 1cdedfe..1ce7617 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/WorkerSkyFunctionEnvironment.java
+++ b/src/main/java/com/google/devtools/build/skyframe/WorkerSkyFunctionEnvironment.java
@@ -12,17 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.google.devtools.build.lib.bazel.repository.starlark;
+package com.google.devtools.build.skyframe;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
-import com.google.devtools.build.skyframe.SkyFunction;
-import com.google.devtools.build.skyframe.SkyKey;
-import com.google.devtools.build.skyframe.SkyValue;
-import com.google.devtools.build.skyframe.SkyframeLookupResult;
-import com.google.devtools.build.skyframe.Version;
+import com.google.devtools.build.lib.supplier.InterruptibleSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
@@ -41,12 +37,14 @@
*/
class WorkerSkyFunctionEnvironment
implements SkyFunction.Environment, ExtendedEventHandler, SkyframeLookupResult {
- private final WorkerSkyKeyComputeState<?> state;
private SkyFunction.Environment delegate;
+ private final InterruptibleSupplier<SkyFunction.Environment> newDelegateSupplier;
- WorkerSkyFunctionEnvironment(WorkerSkyKeyComputeState<?> state) throws InterruptedException {
- this.state = state;
- this.delegate = state.delegateEnvQueue.take();
+ WorkerSkyFunctionEnvironment(
+ SkyFunction.Environment initialDelegate,
+ InterruptibleSupplier<SkyFunction.Environment> newDelegateSupplier) {
+ this.delegate = initialDelegate;
+ this.newDelegateSupplier = newDelegateSupplier;
}
@Override
@@ -60,14 +58,14 @@
delegate.getValuesAndExceptions(depKeys);
if (!delegate.valuesMissing()) {
// Do NOT just return the return value of `delegate.getValuesAndExceptions` here! That would
- // cause anyone holding onto the returned // result object to potentially use a stale version
+ // cause anyone holding onto the returned result object to potentially use a stale version
// of it after a skyfunction restart.
return this;
}
// We null out `delegate` before blocking for the fresh env so that the old one becomes
// eligible for GC.
delegate = null;
- delegate = state.signalForFreshEnv();
+ delegate = newDelegateSupplier.get();
delegate.getValuesAndExceptions(depKeys);
return this;
}
@@ -125,7 +123,7 @@
// We null out `delegate` before blocking for the fresh env so that the old one becomes
// eligible for GC.
delegate = null;
- delegate = state.signalForFreshEnv();
+ delegate = newDelegateSupplier.get();
return delegate.getValueOrThrow(depKey, e1, e2, e3, e4);
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/WorkerSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/skyframe/WorkerSkyKeyComputeState.java
new file mode 100644
index 0000000..1970411
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/WorkerSkyKeyComputeState.java
@@ -0,0 +1,202 @@
+// Copyright 2023 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.skyframe;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.skyframe.SkyFunction.Environment;
+import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A {@link SkyKeyComputeState} that manages a non-Skyframe virtual worker thread that persists
+ * across different invocations of a SkyFunction.
+ *
+ * <p>The worker thread uses a {@link SkyFunction.Environment} object acquired from the host thread.
+ * When a new Skyframe dependency is needed, the worker thread itself does not need to restart;
+ * instead, it can signal the host thread to restart to get a fresh Environment object.
+ *
+ * <p>Similar to other implementations of {@link SkyKeyComputeState}, this avoids redoing expensive
+ * work when a new Skyframe dependency is needed; but because it holds on to an entire worker
+ * thread, this class is more suited to cases where the intermediate result of expensive work cannot
+ * be easily serialized (in particular, if there's an ongoing Starlark evaluation, as is the case in
+ * repo fetching).
+ */
+public class WorkerSkyKeyComputeState<T> implements SkyKeyComputeState {
+
+ /**
+ * A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
+ * (successfully or otherwise), or to indicate that the host thread should return {@code null},
+ * causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
+ * delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
+ * over.
+ */
+ // A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
+ // be interrupted.
+ private final Semaphore signalSemaphore = new Semaphore(0);
+
+ /**
+ * The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
+ * back to the worker thread.
+ */
+ // We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
+ // gets interrupted before the host thread restarts, the host thread doesn't hang forever.
+ private final BlockingQueue<SkyFunction.Environment> delegateEnvQueue =
+ new ArrayBlockingQueue<>(1);
+
+ /**
+ * This future holds on to the worker thread in order to cancel it when necessary; it also serves
+ * to tell whether a worker thread is already running.
+ */
+ @GuardedBy("this")
+ @Nullable
+ private ListenableFuture<T> workerFuture = null;
+
+ /** The executor service that manages the worker thread. */
+ // We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
+ // sure the worker thread has shut down (with its blocking `close()` method).
+ @GuardedBy("this")
+ @Nullable
+ private ListeningExecutorService workerExecutorService = null;
+
+ /**
+ * Represents work that will will be performed on the worker thread, yielding a result of type
+ * {@code T}. The worker thread should exclusively use the provided {@code workerEnv} for Skyframe
+ * access.
+ */
+ @FunctionalInterface
+ public interface WorkerCallable<T> {
+ T call(Environment workerEnv) throws Exception;
+ }
+
+ /**
+ * Starts a worker performing the given {@link WorkerCallable}, or if such a worker already exists
+ * and is waiting for a Skyframe restart, sends over a fresh Environment and asks it to continue
+ * its work. This method blocks until the worker thread finishes (either successfully or
+ * otherwise), <em>or</em> until the worker needs a Skyframe restart, in which case the worker
+ * will suspend itself and wait for the next invocation of this method by a restarted host
+ * SkyFunction with a fresh Environment.
+ *
+ * @param env The Skyframe Environment of the host SkyFunction.
+ * @param workerThreadName The name of the worker thread to be started by this method, if one
+ * doesn't already exist.
+ * @param workerCallable The work to be performed on the worker thread. Note that code in this
+ * callable should exclusively use the Environment passed to {@link
+ * WorkerCallable#call(Environment)}, <em>not</em> the original host Environment.
+ * @return If the worker finishes successfully, this method returns whatever {@code
+ * workerCallable} returns. If the worker needs a Skyframe restart, returns null.
+ * @throws InterruptedException if the caller (host) thread is interrupted.
+ * @throws CancellationException if the worker thread is interrupted (most likely by {@link
+ * #close()})
+ * @throws ExecutionException if the worker callable throws an exception.
+ */
+ @Nullable
+ public T startOrContinueWork(
+ Environment env, String workerThreadName, WorkerCallable<T> workerCallable)
+ throws InterruptedException, CancellationException, ExecutionException {
+ ListenableFuture<T> workerFuture = getOrStartWorker(workerThreadName, workerCallable);
+ try {
+ delegateEnvQueue.put(env);
+ signalSemaphore.acquire();
+ if (!workerFuture.isDone()) {
+ // This means that the worker is still running, and expecting a fresh Environment. Return
+ // null to trigger a Skyframe restart, but *don't* shut down the worker executor.
+ return null;
+ }
+ return workerFuture.get();
+ } finally {
+ if (workerFuture.isDone()) {
+ // Unless we know the worker is waiting on a fresh Environment, we should *always* shut
+ // down the worker executor by the time we finish executing (successfully or otherwise).
+ // This ensures that 1) no background work happens without our knowledge, and 2) if the
+ // SkyFunction is re-entered for any reason (for example b/330892334 and
+ // https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new
+ // worker from scratch.
+ close();
+ }
+ }
+ }
+
+ /**
+ * Returns the worker future, or if a worker is not already running, starts a worker thread
+ * running the given callable. This makes sure to release a permit on the {@code signalSemaphore}
+ * when the worker finishes, successfully or otherwise. This may only be called from the host
+ * Skyframe thread.
+ */
+ private synchronized ListenableFuture<T> getOrStartWorker(
+ String workerThreadName, WorkerCallable<T> workerCallable) {
+ if (workerFuture != null) {
+ return workerFuture;
+ }
+ // We reset the state object back to its very initial state, since the host SkyFunction may have
+ // been re-entered (for example b/330892334 and
+ // https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have
+ // been interrupted while the host SkyFunction was inactive.
+ workerExecutorService =
+ MoreExecutors.listeningDecorator(
+ Executors.newThreadPerTaskExecutor(
+ Thread.ofVirtual().name(workerThreadName).factory()));
+ signalSemaphore.drainPermits();
+ delegateEnvQueue.clear();
+
+ // Start the worker.
+ workerFuture =
+ workerExecutorService.submit(
+ () -> {
+ var workerEnv =
+ new WorkerSkyFunctionEnvironment(
+ delegateEnvQueue.take(), this::signalForFreshEnv);
+ return workerCallable.call(workerEnv);
+ });
+ workerFuture.addListener(signalSemaphore::release, directExecutor());
+ return workerFuture;
+ }
+
+ /**
+ * Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
+ * back. This may only be called from the worker thread.
+ */
+ private SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
+ signalSemaphore.release();
+ return delegateEnvQueue.take();
+ }
+
+ /**
+ * Closes the state object, and blocks until all pending async work is finished. The state object
+ * will reset to a clean slate after this method finishes.
+ */
+ // This may be called from any thread, including the host Skyframe thread and the
+ // high-memory-pressure listener thread.
+ @Override
+ public synchronized void close() {
+ if (workerFuture != null) {
+ workerFuture.cancel(true);
+ }
+ workerFuture = null;
+ if (workerExecutorService != null) {
+ workerExecutorService.close(); // This blocks
+ }
+ }
+}