| // Copyright 2023 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.concurrent; |
| |
| import static com.google.common.base.MoreObjects.toStringHelper; |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Throwables.throwIfUnchecked; |
| import static java.lang.Thread.currentThread; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import java.lang.ref.Cleaner; |
| import java.lang.ref.PhantomReference; |
| import java.lang.ref.ReferenceQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ForkJoinPool; |
| import javax.annotation.Nullable; |
| |
| /** |
| * An executor that prioritizes tasks and supports work donation. |
| * |
| * <p>This executor divides work into two tiers, CPU-heavy and non-CPU-heavy. |
| * |
| * <ul> |
| * <li><i>Non-CPU-heavy</i> tasks tend to be leaf-level and are serviced first-come first serve |
| * with higher priority than CPU-heavy tasks. |
| * <li><i>CPU-heavy</i> tasks are placed into a priority queue and serviced based on priority, |
| * requiring an additional CPU permit to be scheduled, to avoid oversubscription. |
| * </ul> |
| * |
| * <p>The queue for non-CPUHeavy tasks has a fixed capacity. When full, callers of execute assist |
| * with enqueued work. |
| * |
| * <p>Threads may voluntarily assist with queued work by calling {@link |
| * TieredPriorityExecutor#tryDoQueuedWork} when a thread is about to block. If tasks are available, |
| * the current thread may perform a task inside {@link TieredPriorityExecutor#tryDoQueuedWork} |
| * before it returns. This may add latency to the donating thread but can reduce overhead. |
| */ |
| public final class TieredPriorityExecutor implements QuiescingExecutor { |
| /** A common cleaner shared by all executors. */ |
| private static final Cleaner poolCleaner = Cleaner.create(); |
| |
| private final PriorityWorkerPool pool; |
| |
| /** |
| * An unchecked exception when submitting a job is catastrophic. |
| * |
| * <p>It could mean an inconsistent state so {@link #awaitQuiescence} attempts to throw |
| * immediately instead of waiting if this occurs to avoid becoming unresponsive. |
| */ |
| private volatile Throwable catastrophe; |
| |
| public TieredPriorityExecutor( |
| String name, int poolSize, int cpuPermits, ErrorClassifier errorClassifier) { |
| checkArgument( |
| poolSize >= cpuPermits, "expected poolSize=%s >= cpuPermits=%s", poolSize, cpuPermits); |
| this.pool = new PriorityWorkerPool(poolCleaner, name, poolSize, cpuPermits, errorClassifier); |
| // Registers a cleanup procedure for the underlying pool when this object is eligible for |
| // garbage collection. This has to be done explicitly because threads are GC roots. |
| poolCleaner.register(this, pool::dispose); |
| } |
| |
| @Override |
| public void execute(Runnable task) { |
| try { |
| pool.execute(task); |
| } catch (Throwable uncaught) { |
| pool.cancel(); |
| catastrophe = uncaught; |
| synchronized (pool.quiescenceMonitor()) { |
| pool.quiescenceMonitor().notify(); |
| } |
| } |
| } |
| |
| /** |
| * Returns after waiting for all pending work to complete. |
| * |
| * <p>There are various error scenarios. Except in the case of <b>catastrophe</b>, it should be |
| * safe to reuse the pool. |
| * |
| * <ul> |
| * <li><b>Uncaught Error in Task</b>. |
| * <ul> |
| * <li>{@link ErrorClassifier.ErrorClassification#CRITICAL} or higher: Stops processing |
| * the queue and interrupts in-flight threads. Waits for in-flight threads to |
| * complete. Resets the executor to a clean (reusable) state and throws the error |
| * (based on prioritization given by the {@link ErrorClassifier} if there were |
| * multiple) to the caller. |
| * <li>{@link ErrorClassifier.ErrorClassification#NOT_CRITICAL}: completes processing as |
| * usual and throws the exception (unless one with higher priority occurred). |
| * </ul> |
| * <li><b>Interrupted</b>: the calling thread is interrupted. |
| * <ul> |
| * <li>{@code interruptWorkers=true}: Stops processing the queue and interrupts in-flight |
| * threads. Waits for in-flight threads to complete. Resets the executor to a clean |
| * (reusable) state. Finally rethrows the {@link InterruptedException}, unless there |
| * was another uncaught exception, which gets thrown instead. |
| * <li>{@code interruptWorkers=false}: allows work to drain normally. Throws {@link |
| * InterruptedException} unless there is another uncaught exception to throw. |
| * </ul> |
| * <li><b>Catastrophe</b>: uncaught error in the act of submitting a task. Stops queue |
| * processing and interrupts all in-flight threads. Throws the error without waiting for |
| * tasks to drain, leaving this executor in an inconsistent state. The goal here is to avoid |
| * becoming unresponsive. |
| * </ul> |
| */ |
| @Override |
| public void awaitQuiescence(boolean interruptWorkers) throws InterruptedException { |
| InterruptedException interruptedException = null; |
| while (true) { |
| try { |
| synchronized (pool.quiescenceMonitor()) { |
| while (!pool.isQuiescent() && catastrophe == null) { |
| pool.quiescenceMonitor().wait(); |
| } |
| } |
| break; |
| } catch (InterruptedException e) { |
| interruptedException = e; |
| if (interruptWorkers) { |
| pool.cancel(); |
| } |
| } |
| } |
| throwIfNonNull(catastrophe); |
| var unhandled = pool.unhandled(); |
| pool.reset(); |
| |
| throwIfNonNull(unhandled); |
| if (interruptedException != null) { |
| throw interruptedException; |
| } |
| } |
| |
| @Override |
| public void dependOnFuture(ListenableFuture<?> future) { |
| // TODO(shahan): improve support if needed. This executor is currently only used for analysis, |
| // which does not use futures. |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| @VisibleForTesting |
| public CountDownLatch getExceptionLatchForTestingOnly() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| @VisibleForTesting |
| public CountDownLatch getInterruptionLatchForTestingOnly() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Attempts to donate work on the current thread. |
| * |
| * <p>Calling this method may be useful if the current thread is about to block. Subject to |
| * scheduling constraints, attempts to poll work from the queue and execute it on the current |
| * thread. |
| * |
| * @return true if work was donated, false otherwise. |
| */ |
| public static boolean tryDoQueuedWork() { |
| var thread = currentThread(); |
| if (!(thread instanceof PriorityWorkerPool.WorkerThread)) { |
| return false; |
| } |
| return ((PriorityWorkerPool.WorkerThread) thread).tryDoQueuedWork(); |
| } |
| |
| /** |
| * The parallelism target of the underlying thread pool. |
| * |
| * <p>Public to allow clients to examine this executor's configuration and suitability for reuse. |
| */ |
| public int poolSize() { |
| return pool.poolSize(); |
| } |
| |
| /** |
| * The number of tokens available to CPU-heavy tasks, which must acquire them before execution. |
| * |
| * <p>This constrains parallelism of CPU-heavy tasks and must be less than {@link #poolSize}. |
| * |
| * <p>Public to allow clients to examine this executor's configuration and suitability for reuse. |
| */ |
| public int cpuPermits() { |
| return pool.cpuPermits(); |
| } |
| |
| /** |
| * True if this executor had a catastrophic failure, making it unsuitable for reuse. |
| * |
| * <p>Public to allow clients to assess this executor's suitability for reuse. |
| */ |
| public boolean hasCatastrophe() { |
| return catastrophe != null; |
| } |
| |
| /** |
| * Hook for testing that the underlying {@link ForkJoinPool} is properly garbage collected. |
| * |
| * <p>This is important for preventing memory leaks and subtle because active threads are garbage |
| * collection roots and have back references to their owning pools. |
| */ |
| @VisibleForTesting |
| PhantomReference<ForkJoinPool> registerPoolDisposalMonitorForTesting( |
| ReferenceQueue<ForkJoinPool> referenceQueue) { |
| return pool.registerPoolDisposalMonitorForTesting(referenceQueue); |
| } |
| |
| @VisibleForTesting |
| boolean isCancelledForTestingOnly() { |
| return pool.isCancelled(); |
| } |
| |
| @Override |
| public String toString() { |
| return toStringHelper(this).add("pool", pool).add("catastrophe", catastrophe).toString(); |
| } |
| |
| /** |
| * Throws an unchecked exception if {@code e} is non-null. |
| * |
| * <p>If {@code e} is an unchecked exception, it'll be thrown as-is. Otherwise, it'll be wrapped |
| * and thrown as an {@link IllegalArgumentException}, which isn't expected here. |
| */ |
| private static void throwIfNonNull(@Nullable Throwable e) { |
| if (e == null) { |
| return; |
| } |
| throwIfUnchecked(e); |
| throw new IllegalArgumentException("Unexpected checked exception.", e); |
| } |
| } |