blob: 6d0dbfa6ff2439d4c10b55023837ab8c0f18a607 [file] [log] [blame]
Janak Ramakrishnanb449e3f2016-08-25 22:42:37 +00001// Copyright 2016 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.skyframe;
15
16import com.google.common.annotations.VisibleForTesting;
17import com.google.common.base.Function;
18import com.google.common.collect.Sets;
19import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
20import com.google.devtools.build.lib.concurrent.ErrorClassifier;
21import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
22import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
23import java.util.Collection;
24import java.util.Set;
25import java.util.concurrent.CountDownLatch;
26import java.util.concurrent.ForkJoinPool;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.atomic.AtomicBoolean;
29
30/**
31 * Threadpool manager for {@link ParallelEvaluator}. Wraps a {@link QuiescingExecutor} and keeps
32 * track of pending nodes.
33 */
34class NodeEntryVisitor {
35 private static final ErrorClassifier NODE_ENTRY_VISITOR_ERROR_CLASSIFIER =
36 new ErrorClassifier() {
37 @Override
38 protected ErrorClassification classifyException(Exception e) {
39 if (e instanceof SchedulerException) {
40 return ErrorClassification.CRITICAL;
41 }
42 if (e instanceof RuntimeException) {
43 // We treat non-SchedulerException RuntimeExceptions as more severe than
44 // SchedulerExceptions so that AbstractQueueVisitor will propagate instances of the
45 // former. They indicate actual Blaze bugs, rather than normal Skyframe evaluation
46 // control flow.
47 return ErrorClassification.CRITICAL_AND_LOG;
48 }
49 return ErrorClassification.NOT_CRITICAL;
50 }
51 };
52
53 private final QuiescingExecutor quiescingExecutor;
54 private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
55 private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
56 private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
57 private final DirtyKeyTracker dirtyKeyTracker;
58 private final EvaluationProgressReceiver progressReceiver;
59 /**
60 * Function that allows this visitor to execute the appropriate {@link Runnable} when given a
61 * {@link SkyKey} to evaluate.
62 */
63 private final Function<SkyKey, Runnable> runnableMaker;
64
65 NodeEntryVisitor(
66 ForkJoinPool forkJoinPool,
67 DirtyKeyTracker dirtyKeyTracker,
68 EvaluationProgressReceiver progressReceiver,
69 Function<SkyKey, Runnable> runnableMaker) {
70 quiescingExecutor =
71 new ForkJoinQuiescingExecutor(forkJoinPool, NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
72 this.dirtyKeyTracker = dirtyKeyTracker;
73 this.progressReceiver = progressReceiver;
74 this.runnableMaker = runnableMaker;
75 }
76
77 NodeEntryVisitor(
78 int threadCount,
79 DirtyKeyTracker dirtyKeyTracker,
80 EvaluationProgressReceiver progressReceiver,
81 Function<SkyKey, Runnable> runnableMaker) {
82 quiescingExecutor =
83 new AbstractQueueVisitor(
84 /*concurrent*/ true,
85 threadCount,
86 /*keepAliveTime=*/ 1,
87 TimeUnit.SECONDS,
88 /*failFastOnException*/ true,
89 "skyframe-evaluator",
90 NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
91 this.dirtyKeyTracker = dirtyKeyTracker;
92 this.progressReceiver = progressReceiver;
93 this.runnableMaker = runnableMaker;
94 }
95
96 void waitForCompletion() throws InterruptedException {
97 quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
98 }
99
100 void enqueueEvaluation(SkyKey key) {
101 // We unconditionally add the key to the set of in-flight nodes because even if evaluation is
102 // never scheduled we still want to remove the previously created NodeEntry from the graph.
103 // Otherwise we would leave the graph in a weird state (wasteful garbage in the best case and
104 // inconsistent in the worst case).
105 boolean newlyEnqueued = inflightNodes.add(key);
106 // All nodes enqueued for evaluation will be either verified clean, re-evaluated, or cleaned
107 // up after being in-flight when an error happens in nokeep_going mode or in the event of an
108 // interrupt. In any of these cases, they won't be dirty anymore.
109 if (newlyEnqueued) {
110 dirtyKeyTracker.notDirty(key);
111 }
112 if (preventNewEvaluations.get()) {
113 return;
114 }
115 if (newlyEnqueued && progressReceiver != null) {
116 progressReceiver.enqueueing(key);
117 }
118 quiescingExecutor.execute(runnableMaker.apply(key));
119 }
120
121 /**
122 * Stop any new evaluations from being enqueued. Returns whether this was the first thread to
123 * request a halt. If true, this thread should proceed to throw an exception. If false, another
124 * thread already requested a halt and will throw an exception, and so this thread can simply end.
125 */
126 boolean preventNewEvaluations() {
127 return preventNewEvaluations.compareAndSet(false, true);
128 }
129
130 void noteCrash(RuntimeException e) {
131 crashes.add(e);
132 }
133
134 Collection<RuntimeException> getCrashes() {
135 return crashes;
136 }
137
138 void notifyDone(SkyKey key) {
139 inflightNodes.remove(key);
140 }
141
142 boolean isInflight(SkyKey key) {
143 return inflightNodes.contains(key);
144 }
145
146 Set<SkyKey> getInflightNodes() {
147 return inflightNodes;
148 }
149
150 @VisibleForTesting
151 CountDownLatch getExceptionLatchForTestingOnly() {
152 return quiescingExecutor.getExceptionLatchForTestingOnly();
153 }
154}