Delay cleaning of in-flight nodes until the following build. This allows us to interrupt evaluation in constant time.

Some ParallelEvaluator tests that implicitly relied on cleaning happening before the next evaluation were moved into MemoizingEvaluatorTest as a result.

--
MOS_MIGRATED_REVID=102696653
diff --git a/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
index 324be40..87db7349 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
@@ -17,6 +17,7 @@
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Receiver;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -30,6 +31,7 @@
 import com.google.devtools.build.skyframe.NodeEntry.DependencyState;
 
 import java.io.PrintStream;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -157,9 +159,29 @@
       performInvalidation();
       injectValues(intVersion);
 
-      ParallelEvaluator evaluator = new ParallelEvaluator(graph, intVersion,
-          skyFunctions, eventHandler, emittedEventState, DEFAULT_STORED_EVENT_FILTER, keepGoing,
-          numThreads, progressReceiver, dirtyKeyTracker);
+      // We must delete all nodes that are still in-flight at the end of the evaluation (in case the
+      // evaluation is aborted for some reason). In order to quickly return control to the caller,
+      // we store the set of such nodes for deletion at the start of the next evaluation.
+      Receiver<Collection<SkyKey>> lazyDeletingReceiver =
+          new Receiver<Collection<SkyKey>>() {
+            @Override
+            public void accept(Collection<SkyKey> skyKeys) {
+              valuesToDelete.addAll(skyKeys);
+            }
+          };
+      ParallelEvaluator evaluator =
+          new ParallelEvaluator(
+              graph,
+              intVersion,
+              skyFunctions,
+              eventHandler,
+              emittedEventState,
+              DEFAULT_STORED_EVENT_FILTER,
+              keepGoing,
+              numThreads,
+              progressReceiver,
+              dirtyKeyTracker,
+              lazyDeletingReceiver);
       EvaluationResult<T> result = evaluator.eval(roots);
       return EvaluationResult.<T>builder()
           .mergeFrom(result)
diff --git a/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java b/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java
index a47626d..66b84cb 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java
@@ -169,8 +169,10 @@
       return (buildingState.getLastBuildValue() == null)
               ? null
           : ValueWithMetadata.justValue(buildingState.getLastBuildValue());
+    } else {
+      // Value has not finished evaluating. It's probably about to be cleaned from the graph.
+      return null;
     }
-    throw new AssertionError("Value in bad state: " + this);
   }
 
   @Override
@@ -281,9 +283,6 @@
   @Override
   public synchronized Collection<SkyKey> getReverseDeps() {
     assertKeepEdges();
-    Preconditions.checkState(isDone() || buildingState.getReverseDepsToSignal().isEmpty(),
-        "Reverse deps should only be queried before the build has begun "
-            + "or after the node is done %s", this);
     return REVERSE_DEPS_UTIL.getReverseDeps(this);
   }
 
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 3f4a863..60978b7 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -219,52 +219,42 @@
       }
       final Pair<SkyKey, InvalidationType> invalidationPair = Pair.of(key, invalidationType);
       pendingVisitations.add(invalidationPair);
-      enqueue(new Runnable() {
-        @Override
-        public void run() {
-          NodeEntry entry = graph.get(key);
-          if (entry == null) {
-            pendingVisitations.remove(invalidationPair);
-            return;
-          }
+      enqueue(
+          new Runnable() {
+            @Override
+            public void run() {
+              NodeEntry entry = graph.get(key);
+              if (entry == null) {
+                pendingVisitations.remove(invalidationPair);
+                return;
+              }
 
-          if (traverseGraph) {
-            // Propagate deletion upwards.
-            for (SkyKey reverseDep : entry.getReverseDeps()) {
-              visit(reverseDep, InvalidationType.DELETED, !MUST_EXIST);
-            }
-          }
-
-          if (entry.isDone()) {
-            // Only process this node's value and children if it is done, since dirty nodes have
-            // no awareness of either.
-
-            // Unregister this node from direct deps, since reverse dep edges cannot point to
-            // non-existent nodes.
-            if (traverseGraph) {
-              for (SkyKey directDep : entry.getDirectDeps()) {
-                NodeEntry dep = graph.get(directDep);
-                if (dep != null) {
-                  dep.removeReverseDep(key);
+              if (traverseGraph) {
+                // Propagate deletion upwards.
+                for (SkyKey reverseDep : entry.getReverseDeps()) {
+                  visit(reverseDep, InvalidationType.DELETED, !MUST_EXIST);
+                }
+                Iterable<SkyKey> directDeps =
+                    entry.isDone() ? entry.getDirectDeps() : entry.getTemporaryDirectDeps();
+                // Unregister this node from direct deps, since reverse dep edges cannot point to
+                // non-existent nodes.
+                for (SkyKey directDep : directDeps) {
+                  NodeEntry dep = graph.get(directDep);
+                  if (dep != null) {
+                    dep.removeReverseDep(key);
+                  }
                 }
               }
-            }
-            // Allow custom key-specific logic to update dirtiness status.
-            informInvalidationReceiver(key, EvaluationProgressReceiver.InvalidationState.DELETED);
-          }
-          if (traverseGraph) {
-            // Force reverseDeps consolidation (validates that attempts to remove reverse deps were
-            // really successful.
-            entry.getReverseDeps();
-          }
-          // Actually remove the node.
-          graph.remove(key);
-          dirtyKeyTracker.notDirty(key);
+              // Allow custom key-specific logic to update dirtiness status.
+              informInvalidationReceiver(key, EvaluationProgressReceiver.InvalidationState.DELETED);
+              // Actually remove the node.
+              graph.remove(key);
+              dirtyKeyTracker.notDirty(key);
 
-          // Remove the node from the set as the last operation.
-          pendingVisitations.remove(invalidationPair);
-        }
-      });
+              // Remove the node from the set as the last operation.
+              pendingVisitations.remove(invalidationPair);
+            }
+          });
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index 991dad6..18384c9 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -16,6 +16,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import com.google.common.base.Receiver;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.base.Throwables;
@@ -26,14 +27,11 @@
 import com.google.common.collect.Interners;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.collect.nestedset.NestedSet;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
 import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ExecutorUtil;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.concurrent.ThrowableRecordingRunnableWrapper;
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.EventHandler;
 import com.google.devtools.build.lib.events.StoredEventHandler;
@@ -59,8 +57,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -130,20 +126,27 @@
   private final int threadCount;
   @Nullable private final EvaluationProgressReceiver progressReceiver;
   private final DirtyKeyTracker dirtyKeyTracker;
+  private final Receiver<Collection<SkyKey>> inflightKeysReceiver;
   private final Predicate<Event> storedEventFilter;
 
   private static final Interner<SkyKey> KEY_CANONICALIZER =  Interners.newWeakInterner();
 
-  public ParallelEvaluator(ProcessableGraph graph, Version graphVersion,
+  public ParallelEvaluator(
+      ProcessableGraph graph,
+      Version graphVersion,
       ImmutableMap<? extends SkyFunctionName, ? extends SkyFunction> skyFunctions,
       final EventHandler reporter,
       EmittedEventState emittedEventState,
-      Predicate<Event> storedEventFilter, boolean keepGoing, int threadCount,
+      Predicate<Event> storedEventFilter,
+      boolean keepGoing,
+      int threadCount,
       @Nullable EvaluationProgressReceiver progressReceiver,
-      DirtyKeyTracker dirtyKeyTracker) {
+      DirtyKeyTracker dirtyKeyTracker,
+      Receiver<Collection<SkyKey>> inflightKeysReceiver) {
     this.graph = graph;
     this.skyFunctions = skyFunctions;
     this.graphVersion = graphVersion;
+    this.inflightKeysReceiver = inflightKeysReceiver;
     this.reporter = Preconditions.checkNotNull(reporter);
     this.keepGoing = keepGoing;
     this.threadCount = threadCount;
@@ -1039,71 +1042,10 @@
           throw new IllegalStateException(entry + " for " + skyKey + " in unknown state");
       }
     }
-    boolean shouldClean = false;
     try {
-      EvaluationResult<T> result = waitForCompletionAndConstructResult(visitor, skyKeys);
-      shouldClean = true;
-      return result;
-    } catch (InterruptedException e) {
-      shouldClean = true;
-      throw e;
+      return waitForCompletionAndConstructResult(visitor, skyKeys);
     } finally {
-      if (shouldClean) {
-        // TODO(bazel-team): In nokeep_going mode or in case of an interrupt, we need to remove
-        // partial values from the graph. Find a better way to handle those cases.
-        clean(visitor.inflightNodes);
-      }
-      // If we're going to crash anyway, things might be in a weird state such that 'clean' is
-      // likely to crash too. So propagating the original exception is much more useful.
-      // Note that this is safe in the sense that this decision doesn't prevent weird
-      // SkyFunctions from doing weird things with unchecked exceptions - that's already
-      // prevented directly by SkyFunctionException#validateExceptionType.
-    }
-  }
-
-  private void clean(Set<SkyKey> inflightNodes) throws InterruptedException {
-    boolean alreadyInterrupted = Thread.interrupted();
-    // This parallel computation is fully cpu-bound, so we use a thread for each processor.
-    ExecutorService executor = Executors.newFixedThreadPool(
-        Runtime.getRuntime().availableProcessors(),
-        new ThreadFactoryBuilder().setNameFormat("ParallelEvaluator#clean %d").build());
-    ThrowableRecordingRunnableWrapper wrapper =
-        new ThrowableRecordingRunnableWrapper("ParallelEvaluator#clean");
-    for (final SkyKey key : inflightNodes) {
-      final NodeEntry entry = graph.get(key);
-      Preconditions.checkState(!entry.isDone(), "%s %s", key, entry);
-      executor.execute(wrapper.wrap(new Runnable() {
-        @Override
-        public void run() {
-          cleanInflightNode(key, entry);
-        }
-      }));
-    }
-    // We uninterruptibly wait for all nodes to be cleaned because we want to make sure the graph
-    // is left in a good state.
-    //
-    // TODO(bazel-team): Come up with a better design for graph cleaning such that we can respond
-    // to interrupts in constant time.
-    boolean newlyInterrupted = ExecutorUtil.uninterruptibleShutdown(executor);
-    Throwables.propagateIfPossible(wrapper.getFirstThrownError());
-    if (newlyInterrupted || alreadyInterrupted) {
-      throw new InterruptedException();
-    }
-  }
-
-  private void cleanInflightNode(SkyKey key, NodeEntry entry) {
-    Set<SkyKey> temporaryDeps = entry.getTemporaryDirectDeps();
-    graph.remove(key);
-    for (SkyKey dep : temporaryDeps) {
-      NodeEntry nodeEntry = graph.get(dep);
-      // The direct dep might have already been cleaned from the graph.
-      if (nodeEntry != null) {
-        // Only bother removing the reverse dep on done nodes since other in-flight nodes will be
-        // cleaned too.
-        if (nodeEntry.isDone()) {
-          nodeEntry.removeReverseDep(key);
-        }
-      }
+      inflightKeysReceiver.accept(visitor.inflightNodes);
     }
   }