blob: ca4c0f3496084b441bbd3c6ac123e23c21c2f89d [file] [log] [blame]
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.skyframe;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.skyframe.SkyFunction.Environment;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A {@link SkyKeyComputeState} that manages a non-Skyframe virtual worker thread that persists
* across different invocations of a SkyFunction.
*
* <p>The worker thread uses a {@link SkyFunction.Environment} object acquired from the host thread.
* When a new Skyframe dependency is needed, the worker thread itself does not need to restart;
* instead, it can signal the host thread to restart to get a fresh Environment object.
*
* <p>Similar to other implementations of {@link SkyKeyComputeState}, this avoids redoing expensive
* work when a new Skyframe dependency is needed; but because it holds on to an entire worker
* thread, this class is more suited to cases where the intermediate result of expensive work cannot
* be easily serialized (in particular, if there's an ongoing Starlark evaluation, as is the case in
* repo fetching).
*/
public class WorkerSkyKeyComputeState<T> implements SkyKeyComputeState {
/**
* A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
* (successfully or otherwise), or to indicate that the host thread should return {@code null},
* causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
* delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
* over.
*/
// A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
// be interrupted.
private final Semaphore signalSemaphore = new Semaphore(0);
/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
// We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
// gets interrupted before the host thread restarts, the host thread doesn't hang forever.
private final BlockingQueue<SkyFunction.Environment> delegateEnvQueue =
new ArrayBlockingQueue<>(1);
/**
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
* to tell whether a worker thread is already running.
*/
@GuardedBy("this")
@Nullable
private ListenableFuture<T> workerFuture = null;
/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
// sure the worker thread has shut down (with its blocking `close()` method).
@GuardedBy("this")
@Nullable
private ListeningExecutorService workerExecutorService = null;
/**
* Represents work that will will be performed on the worker thread, yielding a result of type
* {@code T}. The worker thread should exclusively use the provided {@code workerEnv} for Skyframe
* access.
*/
@FunctionalInterface
public interface WorkerCallable<T> {
T call(Environment workerEnv) throws Exception;
}
/**
* Starts a worker performing the given {@link WorkerCallable}, or if such a worker already exists
* and is waiting for a Skyframe restart, sends over a fresh Environment and asks it to continue
* its work. This method blocks until the worker thread finishes (either successfully or
* otherwise), <em>or</em> until the worker needs a Skyframe restart, in which case the worker
* will suspend itself and wait for the next invocation of this method by a restarted host
* SkyFunction with a fresh Environment.
*
* @param env The Skyframe Environment of the host SkyFunction.
* @param workerThreadName The name of the worker thread to be started by this method, if one
* doesn't already exist.
* @param workerCallable The work to be performed on the worker thread. Note that code in this
* callable should exclusively use the Environment passed to {@link
* WorkerCallable#call(Environment)}, <em>not</em> the original host Environment.
* @return If the worker finishes successfully, this method returns whatever {@code
* workerCallable} returns. If the worker needs a Skyframe restart, returns null.
* @throws InterruptedException if the caller (host) thread is interrupted.
* @throws CancellationException if the worker thread is interrupted (most likely by {@link
* #close()})
* @throws ExecutionException if the worker callable throws an exception.
*/
@Nullable
public T startOrContinueWork(
Environment env, String workerThreadName, WorkerCallable<T> workerCallable)
throws InterruptedException, CancellationException, ExecutionException {
ListenableFuture<T> workerFuture = getOrStartWorker(workerThreadName, workerCallable);
try {
delegateEnvQueue.put(env);
signalSemaphore.acquire();
if (!workerFuture.isDone()) {
// This means that the worker is still running, and expecting a fresh Environment. Return
// null to trigger a Skyframe restart, but *don't* shut down the worker executor.
return null;
}
return workerFuture.get();
} finally {
if (workerFuture.isDone()) {
// Unless we know the worker is waiting on a fresh Environment, we should *always* shut
// down the worker executor by the time we finish executing (successfully or otherwise).
// This ensures that 1) no background work happens without our knowledge, and 2) if the
// SkyFunction is re-entered for any reason (for example b/330892334 and
// https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new
// worker from scratch.
close();
}
}
}
/**
* Returns the worker future, or if a worker is not already running, starts a worker thread
* running the given callable. This makes sure to release a permit on the {@code signalSemaphore}
* when the worker finishes, successfully or otherwise. This may only be called from the host
* Skyframe thread.
*/
@SuppressWarnings("AllowVirtualThreads")
private synchronized ListenableFuture<T> getOrStartWorker(
String workerThreadName, WorkerCallable<T> workerCallable) {
if (workerFuture != null) {
return workerFuture;
}
// We reset the state object back to its very initial state, since the host SkyFunction may have
// been re-entered (for example b/330892334 and
// https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have
// been interrupted while the host SkyFunction was inactive.
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name(workerThreadName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
// Start the worker.
workerFuture =
workerExecutorService.submit(
() -> {
var workerEnv =
new WorkerSkyFunctionEnvironment(
delegateEnvQueue.take(), this::signalForFreshEnv);
return workerCallable.call(workerEnv);
});
workerFuture.addListener(signalSemaphore::release, directExecutor());
return workerFuture;
}
/**
* Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
* back. This may only be called from the worker thread.
*/
private SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalSemaphore.release();
return delegateEnvQueue.take();
}
/**
* Closes the state object, and blocks until all pending async work is finished. The state object
* will reset to a clean slate after this method finishes.
*/
// This may be called from any thread, including the host Skyframe thread and the
// high-memory-pressure listener thread.
@Override
public synchronized void close() {
if (workerFuture != null) {
workerFuture.cancel(true);
}
workerFuture = null;
if (workerExecutorService != null) {
workerExecutorService.close(); // This blocks
}
}
}