Properly preserve state when interrupted during a deletion. Also use best Bazel practices for reinitializing maps by reassigning, since just clearing can retain memory. When enqueuing deleted nodes, don't use more threads than there are nodes. Finally, fix a test accidentally changed in https://github.com/bazelbuild/bazel/commit/3ea75594bfdb5e6794250155995c45a5adc00c28.
PiperOrigin-RevId: 392680387
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index a84da19..9cc4e01 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
+import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MINUTES;
import com.google.common.annotations.VisibleForTesting;
@@ -215,24 +216,36 @@
}
static final class DeletingInvalidationState extends InvalidationState {
- private final ConcurrentHashMap<SkyKey, Boolean> doneKeysWithRdepsToRemove =
- new ConcurrentHashMap<>(EXPECTED_PENDING_SET_SIZE, .75f, DEFAULT_THREAD_COUNT);
+ private ConcurrentHashMap<SkyKey, Boolean> doneKeysWithRdepsToRemove;
+ private ConcurrentHashMap<SkyKey, Boolean> visitedKeysAcrossInterruptions;
DeletingInvalidationState() {
super(InvalidationType.DELETED);
+ initializeFields();
+ }
+
+ private void initializeFields() {
+ doneKeysWithRdepsToRemove =
+ new ConcurrentHashMap<>(EXPECTED_PENDING_SET_SIZE, .75f, DEFAULT_THREAD_COUNT);
+ visitedKeysAcrossInterruptions =
+ new ConcurrentHashMap<>(EXPECTED_PENDING_SET_SIZE, .75f, DEFAULT_THREAD_COUNT);
}
@Override
boolean isEmpty() {
return super.isEmpty() && doneKeysWithRdepsToRemove.isEmpty();
}
+
+ void clear() {
+ initializeFields();
+ }
}
/** A node-deleting implementation. */
static final class DeletingNodeVisitor extends InvalidatingNodeVisitor<InMemoryGraph> {
private final Set<SkyKey> visited = Sets.newConcurrentHashSet();
private final boolean traverseGraph;
- private final ConcurrentHashMap<SkyKey, Boolean> doneKeysWithRdepsToRemove;
+ private final DeletingInvalidationState state;
DeletingNodeVisitor(
InMemoryGraph graph,
@@ -245,7 +258,7 @@
state,
NamedForkJoinPool.newNamedPool("deleting node visitor", DEFAULT_THREAD_COUNT));
this.traverseGraph = traverseGraph;
- this.doneKeysWithRdepsToRemove = state.doneKeysWithRdepsToRemove;
+ this.state = state;
}
@Override
@@ -256,15 +269,16 @@
// To avoid contention and scheduling too many jobs for our #cpus, we start
// DEFAULT_THREAD_COUNT jobs, each processing a chunk of the pending visitations.
int listSize = pendingList.size();
- for (int i = 0; i < DEFAULT_THREAD_COUNT; i++) {
+ int numThreads = min(DEFAULT_THREAD_COUNT, listSize);
+ for (int i = 0; i < numThreads; i++) {
int index = i;
executor.execute(
() ->
visit(
Iterables.transform(
pendingList.subList(
- (index * listSize) / DEFAULT_THREAD_COUNT,
- ((index + 1) * listSize) / DEFAULT_THREAD_COUNT),
+ (index * listSize) / numThreads,
+ ((index + 1) * listSize) / numThreads),
Pair::getFirst),
InvalidationType.DELETED));
}
@@ -275,16 +289,17 @@
}
try (AutoProfiler ignored =
GoogleAutoProfilerUtils.logged("reverse dep removal", MIN_TIME_FOR_LOGGING)) {
- doneKeysWithRdepsToRemove.forEachEntry(
+ state.doneKeysWithRdepsToRemove.forEachEntry(
/*parallelismThreshold=*/ 1024,
e -> {
NodeEntry entry = graph.get(null, Reason.RDEP_REMOVAL, e.getKey());
if (entry == null) {
return;
}
- entry.removeReverseDepsFromDoneEntryDueToDeletion(visited);
+ entry.removeReverseDepsFromDoneEntryDueToDeletion(
+ state.visitedKeysAcrossInterruptions.keySet());
});
- doneKeysWithRdepsToRemove.clear();
+ state.clear();
}
}
@@ -350,7 +365,7 @@
Iterables.filter(
directDeps,
k ->
- !visited.contains(k)
+ !state.visitedKeysAcrossInterruptions.containsKey(k)
&& !pendingVisitations.contains(
Pair.of(k, InvalidationType.DELETED))));
if (!depMap.isEmpty()) {
@@ -363,7 +378,8 @@
continue;
}
if (dep.isDone()) {
- doneKeysWithRdepsToRemove.putIfAbsent(directDepEntry.getKey(), Boolean.TRUE);
+ state.doneKeysWithRdepsToRemove.putIfAbsent(
+ directDepEntry.getKey(), Boolean.TRUE);
continue;
}
if (!signalingDeps.contains(directDepEntry.getKey())) {
@@ -395,7 +411,8 @@
// Actually remove the node.
graph.remove(key);
- // Remove the node from the set as the last operation.
+ // Remove the node from the set and add it to global visited as the last operation.
+ state.visitedKeysAcrossInterruptions.put(key, Boolean.TRUE);
pendingVisitations.remove(invalidationPair);
});
}
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 9d2d260..9d26f88 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -484,6 +484,39 @@
}
@Test
+ public void interruptRecoversNextTime() throws InterruptedException {
+ graph = new InMemoryGraphImpl();
+ SkyKey dep = GraphTester.nonHermeticKey("dep");
+ SkyKey toDelete = GraphTester.nonHermeticKey("top");
+ tester.getOrCreate(toDelete).addDependency(dep).setConstantValue(new StringValue("top"));
+ tester.set(dep, new StringValue("dep"));
+ eval(/*keepGoing=*/ false, toDelete);
+ Thread mainThread = Thread.currentThread();
+ assertThrows(
+ InterruptedException.class,
+ () ->
+ invalidateWithoutError(
+ new DirtyTrackingProgressReceiver(null) {
+ @Override
+ public void invalidated(SkyKey skyKey, InvalidationState state) {
+ mainThread.interrupt();
+ // Wait for the main thread to be interrupted uninterruptibly, because the
+ // main thread is going to interrupt us, and we don't want to get into an
+ // interrupt fight. Only if we get interrupted without the main thread also
+ // being interrupted will this throw an InterruptedException.
+ TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions(
+ visitor.get().getInterruptionLatchForTestingOnly(),
+ "Main thread was not interrupted");
+ }
+ },
+ toDelete));
+ invalidateWithoutError(new DirtyTrackingProgressReceiver(null));
+ eval(/*keepGoing=*/ false, toDelete);
+ invalidateWithoutError(new DirtyTrackingProgressReceiver(null), toDelete);
+ eval(/*keepGoing=*/ false, toDelete);
+ }
+
+ @Test
public void interruptThreadInReceiver() throws Exception {
Random random = new Random(TestUtils.getRandomSeed());
int graphSize = 1000;
diff --git a/src/test/java/com/google/devtools/build/skyframe/ReverseDepsUtilityTest.java b/src/test/java/com/google/devtools/build/skyframe/ReverseDepsUtilityTest.java
index d2064341..58221fb 100644
--- a/src/test/java/com/google/devtools/build/skyframe/ReverseDepsUtilityTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/ReverseDepsUtilityTest.java
@@ -33,7 +33,7 @@
@Parameters(name = "numElements-{0}")
public static List<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
- for (int i = 1; i < 2; i++) {
+ for (int i = 0; i < 20; i++) {
params.add(new Object[] {i});
}
return params;
@@ -92,7 +92,7 @@
ReverseDepsUtility.addReverseDep(example, Key.create(0));
if (numElements == 0) {
// Will not throw.
- assertThat(ReverseDepsUtility.getReverseDeps(example, /*checkConsistency=*/ true)).isEmpty();
+ assertThat(ReverseDepsUtility.getReverseDeps(example, /*checkConsistency=*/ true)).hasSize(1);
} else {
assertThrows(
RuntimeException.class,