Delay cleaning of in-flight nodes until the following build. This allows us to interrupt evaluation in constant time.
Some ParallelEvaluator tests that implicitly relied on cleaning happening before the next evaluation were moved into MemoizingEvaluatorTest as a result.
--
MOS_MIGRATED_REVID=102696653
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index 991dad6..18384c9 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -16,6 +16,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
+import com.google.common.base.Receiver;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
@@ -26,14 +27,11 @@
import com.google.common.collect.Interners;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ExecutorUtil;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.concurrent.ThrowableRecordingRunnableWrapper;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.events.StoredEventHandler;
@@ -59,8 +57,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -130,20 +126,27 @@
private final int threadCount;
@Nullable private final EvaluationProgressReceiver progressReceiver;
private final DirtyKeyTracker dirtyKeyTracker;
+ private final Receiver<Collection<SkyKey>> inflightKeysReceiver;
private final Predicate<Event> storedEventFilter;
private static final Interner<SkyKey> KEY_CANONICALIZER = Interners.newWeakInterner();
- public ParallelEvaluator(ProcessableGraph graph, Version graphVersion,
+ public ParallelEvaluator(
+ ProcessableGraph graph,
+ Version graphVersion,
ImmutableMap<? extends SkyFunctionName, ? extends SkyFunction> skyFunctions,
final EventHandler reporter,
EmittedEventState emittedEventState,
- Predicate<Event> storedEventFilter, boolean keepGoing, int threadCount,
+ Predicate<Event> storedEventFilter,
+ boolean keepGoing,
+ int threadCount,
@Nullable EvaluationProgressReceiver progressReceiver,
- DirtyKeyTracker dirtyKeyTracker) {
+ DirtyKeyTracker dirtyKeyTracker,
+ Receiver<Collection<SkyKey>> inflightKeysReceiver) {
this.graph = graph;
this.skyFunctions = skyFunctions;
this.graphVersion = graphVersion;
+ this.inflightKeysReceiver = inflightKeysReceiver;
this.reporter = Preconditions.checkNotNull(reporter);
this.keepGoing = keepGoing;
this.threadCount = threadCount;
@@ -1039,71 +1042,10 @@
throw new IllegalStateException(entry + " for " + skyKey + " in unknown state");
}
}
- boolean shouldClean = false;
try {
- EvaluationResult<T> result = waitForCompletionAndConstructResult(visitor, skyKeys);
- shouldClean = true;
- return result;
- } catch (InterruptedException e) {
- shouldClean = true;
- throw e;
+ return waitForCompletionAndConstructResult(visitor, skyKeys);
} finally {
- if (shouldClean) {
- // TODO(bazel-team): In nokeep_going mode or in case of an interrupt, we need to remove
- // partial values from the graph. Find a better way to handle those cases.
- clean(visitor.inflightNodes);
- }
- // If we're going to crash anyway, things might be in a weird state such that 'clean' is
- // likely to crash too. So propagating the original exception is much more useful.
- // Note that this is safe in the sense that this decision doesn't prevent weird
- // SkyFunctions from doing weird things with unchecked exceptions - that's already
- // prevented directly by SkyFunctionException#validateExceptionType.
- }
- }
-
- private void clean(Set<SkyKey> inflightNodes) throws InterruptedException {
- boolean alreadyInterrupted = Thread.interrupted();
- // This parallel computation is fully cpu-bound, so we use a thread for each processor.
- ExecutorService executor = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setNameFormat("ParallelEvaluator#clean %d").build());
- ThrowableRecordingRunnableWrapper wrapper =
- new ThrowableRecordingRunnableWrapper("ParallelEvaluator#clean");
- for (final SkyKey key : inflightNodes) {
- final NodeEntry entry = graph.get(key);
- Preconditions.checkState(!entry.isDone(), "%s %s", key, entry);
- executor.execute(wrapper.wrap(new Runnable() {
- @Override
- public void run() {
- cleanInflightNode(key, entry);
- }
- }));
- }
- // We uninterruptibly wait for all nodes to be cleaned because we want to make sure the graph
- // is left in a good state.
- //
- // TODO(bazel-team): Come up with a better design for graph cleaning such that we can respond
- // to interrupts in constant time.
- boolean newlyInterrupted = ExecutorUtil.uninterruptibleShutdown(executor);
- Throwables.propagateIfPossible(wrapper.getFirstThrownError());
- if (newlyInterrupted || alreadyInterrupted) {
- throw new InterruptedException();
- }
- }
-
- private void cleanInflightNode(SkyKey key, NodeEntry entry) {
- Set<SkyKey> temporaryDeps = entry.getTemporaryDirectDeps();
- graph.remove(key);
- for (SkyKey dep : temporaryDeps) {
- NodeEntry nodeEntry = graph.get(dep);
- // The direct dep might have already been cleaned from the graph.
- if (nodeEntry != null) {
- // Only bother removing the reverse dep on done nodes since other in-flight nodes will be
- // cleaned too.
- if (nodeEntry.isDone()) {
- nodeEntry.removeReverseDep(key);
- }
- }
+ inflightKeysReceiver.accept(visitor.inflightNodes);
}
}