| // Copyright 2016 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.skyframe; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.collect.Sets; |
| import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; |
| import com.google.devtools.build.lib.concurrent.ErrorClassifier; |
| import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; |
| import com.google.devtools.build.lib.concurrent.QuiescingExecutor; |
| import java.util.Collection; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| /** |
| * Threadpool manager for {@link ParallelEvaluator}. Wraps a {@link QuiescingExecutor} and keeps |
| * track of pending nodes. |
| */ |
| class NodeEntryVisitor { |
| static final ErrorClassifier NODE_ENTRY_VISITOR_ERROR_CLASSIFIER = |
| new ErrorClassifier() { |
| @Override |
| protected ErrorClassification classifyException(Exception e) { |
| if (e instanceof SchedulerException) { |
| return ErrorClassification.CRITICAL; |
| } |
| if (e instanceof RuntimeException) { |
| // We treat non-SchedulerException RuntimeExceptions as more severe than |
| // SchedulerExceptions so that AbstractQueueVisitor will propagate instances of the |
| // former. They indicate actual Blaze bugs, rather than normal Skyframe evaluation |
| // control flow. |
| return ErrorClassification.CRITICAL_AND_LOG; |
| } |
| return ErrorClassification.NOT_CRITICAL; |
| } |
| }; |
| |
| private final QuiescingExecutor quiescingExecutor; |
| private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false); |
| private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet(); |
| private final DirtyTrackingProgressReceiver progressReceiver; |
| /** |
| * Function that allows this visitor to execute the appropriate {@link Runnable} when given a |
| * {@link SkyKey} to evaluate. |
| */ |
| private final Function<SkyKey, Runnable> runnableMaker; |
| |
| NodeEntryVisitor( |
| ForkJoinPool forkJoinPool, |
| DirtyTrackingProgressReceiver progressReceiver, |
| Function<SkyKey, Runnable> runnableMaker) { |
| this.quiescingExecutor = ForkJoinQuiescingExecutor.newBuilder() |
| .withOwnershipOf(forkJoinPool) |
| .setErrorClassifier(NODE_ENTRY_VISITOR_ERROR_CLASSIFIER) |
| .build(); |
| this.progressReceiver = progressReceiver; |
| this.runnableMaker = runnableMaker; |
| } |
| |
| NodeEntryVisitor( |
| int threadCount, |
| DirtyTrackingProgressReceiver progressReceiver, |
| Function<SkyKey, Runnable> runnableMaker) { |
| quiescingExecutor = |
| new AbstractQueueVisitor( |
| threadCount, |
| /*keepAliveTime=*/ 1, |
| TimeUnit.SECONDS, |
| /*failFastOnException*/ true, |
| "skyframe-evaluator", |
| AbstractQueueVisitor.EXECUTOR_FACTORY, |
| NODE_ENTRY_VISITOR_ERROR_CLASSIFIER); |
| this.progressReceiver = progressReceiver; |
| this.runnableMaker = runnableMaker; |
| } |
| |
| void waitForCompletion() throws InterruptedException { |
| quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true); |
| } |
| |
| void enqueueEvaluation(SkyKey key) { |
| if (preventNewEvaluations.get()) { |
| // If an error happens in nokeep_going mode, we still want to mark these nodes as inflight, |
| // otherwise cleanup will not happen properly. |
| progressReceiver.enqueueAfterError(key); |
| return; |
| } |
| progressReceiver.enqueueing(key); |
| quiescingExecutor.execute(runnableMaker.apply(key)); |
| } |
| |
| /** |
| * Stop any new evaluations from being enqueued. Returns whether this was the first thread to |
| * request a halt. |
| * |
| * <p>If called from within node evaluation, the caller may use the return value to determine |
| * whether it is responsible for throwing an exception to halt evaluation at the executor level. |
| */ |
| boolean preventNewEvaluations() { |
| return preventNewEvaluations.compareAndSet(false, true); |
| } |
| |
| void noteCrash(RuntimeException e) { |
| crashes.add(e); |
| } |
| |
| Collection<RuntimeException> getCrashes() { |
| return crashes; |
| } |
| |
| @VisibleForTesting |
| CountDownLatch getExceptionLatchForTestingOnly() { |
| return quiescingExecutor.getExceptionLatchForTestingOnly(); |
| } |
| } |