|  | // Copyright 2015 The Bazel Authors. All rights reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  | package com.google.devtools.build.skyframe; | 
|  |  | 
|  | import static com.google.common.truth.Truth.assertThat; | 
|  | import static org.junit.Assert.fail; | 
|  |  | 
|  | import com.google.common.base.Preconditions; | 
|  | import com.google.common.collect.ImmutableList; | 
|  | import com.google.common.collect.Iterables; | 
|  | import com.google.common.collect.Sets; | 
|  | import com.google.devtools.build.lib.concurrent.ExecutorUtil; | 
|  | import com.google.devtools.build.lib.testutil.TestRunnableWrapper; | 
|  | import com.google.devtools.build.lib.testutil.TestUtils; | 
|  | import com.google.devtools.build.lib.util.GroupedList.GroupedListHelper; | 
|  | import com.google.devtools.build.skyframe.GraphTester.StringValue; | 
|  | import com.google.devtools.build.skyframe.NodeEntry.DependencyState; | 
|  | import com.google.devtools.build.skyframe.QueryableGraph.Reason; | 
|  | import com.google.devtools.build.skyframe.ThinNodeEntry.DirtyType; | 
|  | import java.util.ArrayList; | 
|  | import java.util.List; | 
|  | import java.util.Map; | 
|  | import java.util.Random; | 
|  | import java.util.Set; | 
|  | import java.util.concurrent.CountDownLatch; | 
|  | import java.util.concurrent.ExecutorService; | 
|  | import java.util.concurrent.Executors; | 
|  | import java.util.concurrent.TimeUnit; | 
|  | import org.junit.Before; | 
|  | import org.junit.Test; | 
|  |  | 
|  | /** Base class for sanity tests on {@link EvaluableGraph} implementations. */ | 
|  | public abstract class GraphTest { | 
|  | protected ProcessableGraph graph; | 
|  | protected TestRunnableWrapper wrapper; | 
|  | private final Version startingVersion = getStartingVersion(); | 
|  |  | 
|  | // This code should really be in a @Before method, but @Before methods are executed from the | 
|  | // top down, and this class's @Before method calls #getGraph, so makeGraph must have already | 
|  | // been called. | 
|  | protected abstract void makeGraph() throws Exception; | 
|  |  | 
|  | protected abstract ProcessableGraph getGraph(Version version) throws Exception; | 
|  |  | 
|  | protected abstract Version getStartingVersion(); | 
|  |  | 
|  | protected abstract Version getNextVersion(Version version); | 
|  |  | 
|  | protected boolean checkRdeps() { | 
|  | return true; | 
|  | } | 
|  |  | 
|  | @Before | 
|  | public void init() throws Exception { | 
|  | makeGraph(); | 
|  | Version startingVersion = getStartingVersion(); | 
|  | this.graph = getGraph(startingVersion); | 
|  | this.wrapper = new TestRunnableWrapper("GraphConcurrencyTest"); | 
|  | } | 
|  |  | 
|  | protected SkyKey key(String name) { | 
|  | return GraphTester.toSkyKey(name); | 
|  | } | 
|  |  | 
|  | @Test | 
|  | public void createIfAbsentBatchSanity() throws InterruptedException { | 
|  | graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key("cat"), key("dog"))); | 
|  | } | 
|  |  | 
|  | @Test | 
|  | public void createIfAbsentConcurrentWithGet() throws InterruptedException { | 
|  | int numIters = 50; | 
|  | final SkyKey key = key("key"); | 
|  | for (int i = 0; i < numIters; i++) { | 
|  | Thread t = | 
|  | new Thread( | 
|  | wrapper.wrap( | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | try { | 
|  | graph.get(null, Reason.OTHER, key); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | } | 
|  | })); | 
|  | t.start(); | 
|  | assertThat(graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key))).isNotEmpty(); | 
|  | graph.remove(key); | 
|  | } | 
|  | } | 
|  |  | 
|  | @Test | 
|  | public void testCreateIfAbsentWithConcurrentGet() throws Exception { | 
|  | final SkyKey key = key("foo"); | 
|  | int numThreads = 50; | 
|  | final CountDownLatch startThreads = new CountDownLatch(1); | 
|  | Runnable createRunnable = | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions( | 
|  | startThreads, "threads not started"); | 
|  | try { | 
|  | graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key)); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | } | 
|  | }; | 
|  | Runnable noCreateRunnable = | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions( | 
|  | startThreads, "threads not started"); | 
|  | try { | 
|  | graph.get(null, Reason.OTHER, key); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | } | 
|  | }; | 
|  | List<Thread> threads = new ArrayList<>(2 * numThreads); | 
|  | for (int i = 0; i < numThreads; i++) { | 
|  | Thread createThread = new Thread(createRunnable); | 
|  | createThread.start(); | 
|  | threads.add(createThread); | 
|  | Thread noCreateThread = new Thread(noCreateRunnable); | 
|  | noCreateThread.start(); | 
|  | threads.add(noCreateThread); | 
|  | } | 
|  | startThreads.countDown(); | 
|  | for (Thread thread : threads) { | 
|  | thread.join(); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Tests adding and removing Rdeps of a {@link NodeEntry} while a node transitions from | 
|  | // not done to done. | 
|  | @Test | 
|  | public void testAddRemoveRdeps() throws Exception { | 
|  | SkyKey key = key("foo"); | 
|  | final NodeEntry entry = | 
|  | Iterables.getOnlyElement( | 
|  | graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key)).values()); | 
|  | // These numbers are arbitrary. | 
|  | int numThreads = 50; | 
|  | int numKeys = numThreads; | 
|  | // One chunk will be used to add and remove rdeps before setting the node value.  The second | 
|  | // chunk of work will have the node value set and the last chunk will be to add and remove | 
|  | // rdeps after the value has been set. | 
|  | final int chunkSize = 40; | 
|  | final int numIterations = chunkSize * 2; | 
|  | // This latch is used to signal that the runnables have been submitted to the executor. | 
|  | final CountDownLatch waitForStart = new CountDownLatch(1); | 
|  | // This latch is used to signal to the main thread that we have begun the second chunk | 
|  | // for sufficiently many keys.  The minimum of numThreads and numKeys is used to prevent | 
|  | // thread starvation from causing a delay here. | 
|  | final CountDownLatch waitForAddedRdep = new CountDownLatch(numThreads); | 
|  | // This latch is used to guarantee that we set the node's value before we enter the third | 
|  | // chunk for any key. | 
|  | final CountDownLatch waitForSetValue = new CountDownLatch(1); | 
|  | ExecutorService pool = Executors.newFixedThreadPool(numThreads); | 
|  | // Add single rdep before transition to done. | 
|  | assertThat(entry.addReverseDepAndCheckIfDone(key("rdep"))) | 
|  | .isEqualTo(DependencyState.NEEDS_SCHEDULING); | 
|  | List<SkyKey> rdepKeys = new ArrayList<>(); | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | rdepKeys.add(key("rdep" + i)); | 
|  | } | 
|  | graph.createIfAbsentBatch(null, Reason.OTHER, rdepKeys); | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | final int j = i; | 
|  | Runnable r = | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | try { | 
|  | waitForStart.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); | 
|  | assertThat(entry.addReverseDepAndCheckIfDone(key("rdep" + j))) | 
|  | .isNotEqualTo(DependencyState.DONE); | 
|  | waitForAddedRdep.countDown(); | 
|  | waitForSetValue.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); | 
|  | for (int k = chunkSize; k <= numIterations; k++) { | 
|  | entry.removeReverseDep(key("rdep" + j)); | 
|  | entry.addReverseDepAndCheckIfDone(key("rdep" + j)); | 
|  | if (checkRdeps()) { | 
|  | entry.getReverseDepsForDoneEntry(); | 
|  | } | 
|  | } | 
|  | } catch (InterruptedException e) { | 
|  | fail("Test failed: " + e.toString()); | 
|  | } | 
|  | } | 
|  | }; | 
|  | pool.execute(wrapper.wrap(r)); | 
|  | } | 
|  | waitForStart.countDown(); | 
|  | waitForAddedRdep.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); | 
|  | entry.markRebuilding(); | 
|  | entry.setValue(new StringValue("foo1"), startingVersion); | 
|  | waitForSetValue.countDown(); | 
|  | wrapper.waitForTasksAndMaybeThrow(); | 
|  | assertThat(ExecutorUtil.interruptibleShutdown(pool)).isFalse(); | 
|  | assertThat(graph.get(null, Reason.OTHER, key).getValue()).isEqualTo(new StringValue("foo1")); | 
|  | if (checkRdeps()) { | 
|  | assertThat(graph.get(null, Reason.OTHER, key).getReverseDepsForDoneEntry()) | 
|  | .hasSize(numKeys + 1); | 
|  | } | 
|  |  | 
|  | graph = getGraph(getNextVersion(startingVersion)); | 
|  | NodeEntry sameEntry = Preconditions.checkNotNull(graph.get(null, Reason.OTHER, key)); | 
|  | // Mark the node as dirty again and check that the reverse deps have been preserved. | 
|  | sameEntry.markDirty(DirtyType.CHANGE); | 
|  | startEvaluation(sameEntry); | 
|  | sameEntry.markRebuilding(); | 
|  | sameEntry.setValue(new StringValue("foo2"), getNextVersion(startingVersion)); | 
|  | assertThat(graph.get(null, Reason.OTHER, key).getValue()).isEqualTo(new StringValue("foo2")); | 
|  | if (checkRdeps()) { | 
|  | assertThat(graph.get(null, Reason.OTHER, key).getReverseDepsForDoneEntry()) | 
|  | .hasSize(numKeys + 1); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Tests adding inflight nodes with a given key while an existing node with the same key | 
|  | // undergoes a transition from not done to done. | 
|  | @Test | 
|  | public void testAddingInflightNodes() throws Exception { | 
|  | int numThreads = 50; | 
|  | ExecutorService pool = Executors.newFixedThreadPool(numThreads); | 
|  | final int numKeys = 500; | 
|  | // Add each pair of keys 10 times. | 
|  | final Set<SkyKey> nodeCreated = Sets.newConcurrentHashSet(); | 
|  | final Set<SkyKey> valuesSet = Sets.newConcurrentHashSet(); | 
|  | for (int i = 0; i < 10; i++) { | 
|  | for (int j = 0; j < numKeys; j++) { | 
|  | for (int k = j + 1; k < numKeys; k++) { | 
|  | final int keyNum1 = j; | 
|  | final int keyNum2 = k; | 
|  | final SkyKey key1 = key("foo" + keyNum1); | 
|  | final SkyKey key2 = key("foo" + keyNum2); | 
|  | final Iterable<SkyKey> keys = ImmutableList.of(key1, key2); | 
|  | Runnable r = | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | for (SkyKey key : keys) { | 
|  | NodeEntry entry = null; | 
|  | try { | 
|  | entry = graph.get(null, Reason.OTHER, key); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | if (entry == null) { | 
|  | nodeCreated.add(key); | 
|  | } | 
|  | } | 
|  | Map<SkyKey, ? extends NodeEntry> entries; | 
|  | try { | 
|  | entries = graph.createIfAbsentBatch(null, Reason.OTHER, keys); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | for (Integer keyNum : ImmutableList.of(keyNum1, keyNum2)) { | 
|  | SkyKey key = key("foo" + keyNum); | 
|  | NodeEntry entry = entries.get(key); | 
|  | // {@code entry.addReverseDepAndCheckIfDone(null)} should return | 
|  | // NEEDS_SCHEDULING at most once. | 
|  | try { | 
|  | if (startEvaluation(entry).equals(DependencyState.NEEDS_SCHEDULING)) { | 
|  | entry.markRebuilding(); | 
|  | assertThat(valuesSet.add(key)).isTrue(); | 
|  | // Set to done. | 
|  | entry.setValue(new StringValue("bar" + keyNum), startingVersion); | 
|  | assertThat(entry.isDone()).isTrue(); | 
|  | } | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(key + ", " + entry, e); | 
|  | } | 
|  | } | 
|  | // This shouldn't cause any problems from the other threads. | 
|  | try { | 
|  | graph.createIfAbsentBatch(null, Reason.OTHER, keys); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | } | 
|  | }; | 
|  | pool.execute(wrapper.wrap(r)); | 
|  | } | 
|  | } | 
|  | } | 
|  | wrapper.waitForTasksAndMaybeThrow(); | 
|  | assertThat(ExecutorUtil.interruptibleShutdown(pool)).isFalse(); | 
|  | // Check that all the values are as expected. | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | SkyKey key = key("foo" + i); | 
|  | assertThat(nodeCreated).contains(key); | 
|  | assertThat(valuesSet).contains(key); | 
|  | assertThat(graph.get(null, Reason.OTHER, key).getValue()) | 
|  | .isEqualTo(new StringValue("bar" + i)); | 
|  | assertThat(graph.get(null, Reason.OTHER, key).getVersion()).isEqualTo(startingVersion); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Initially calling {@link NodeEntry#setValue} and then making sure concurrent calls to {@link | 
|  | * QueryableGraph#get} and {@link QueryableGraph#getBatch} do not interfere with the node. | 
|  | */ | 
|  | @Test | 
|  | public void testDoneToDirty() throws Exception { | 
|  | final int numKeys = 1000; | 
|  | int numThreads = 50; | 
|  | final int numBatchRequests = 100; | 
|  | // Create a bunch of done nodes. | 
|  | ArrayList<SkyKey> keys = new ArrayList<>(); | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | keys.add(key("foo" + i)); | 
|  | } | 
|  | Map<SkyKey, ? extends NodeEntry> entries = graph.createIfAbsentBatch(null, Reason.OTHER, keys); | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | NodeEntry entry = entries.get(key("foo" + i)); | 
|  | startEvaluation(entry); | 
|  | entry.markRebuilding(); | 
|  | entry.setValue(new StringValue("bar"), startingVersion); | 
|  | } | 
|  |  | 
|  | assertThat(graph.get(null, Reason.OTHER, key("foo" + 0))).isNotNull(); | 
|  | graph = getGraph(getNextVersion(startingVersion)); | 
|  | assertThat(graph.get(null, Reason.OTHER, key("foo" + 0))).isNotNull(); | 
|  | ExecutorService pool1 = Executors.newFixedThreadPool(numThreads); | 
|  | ExecutorService pool2 = Executors.newFixedThreadPool(numThreads); | 
|  | ExecutorService pool3 = Executors.newFixedThreadPool(numThreads); | 
|  |  | 
|  | // Only start all the threads once the batch requests are ready. | 
|  | final CountDownLatch makeBatchCountDownLatch = new CountDownLatch(numBatchRequests); | 
|  | // Do at least 5 single requests and batch requests before transitioning node. | 
|  | final CountDownLatch getBatchCountDownLatch = new CountDownLatch(5); | 
|  | final CountDownLatch getCountDownLatch = new CountDownLatch(5); | 
|  |  | 
|  | final SkyKey dep = key("dep"); | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | final int keyNum = i; | 
|  | // Transition the nodes from done to dirty and then back to done. | 
|  | Runnable r1 = | 
|  | () -> { | 
|  | try { | 
|  | makeBatchCountDownLatch.await(); | 
|  | getBatchCountDownLatch.await(); | 
|  | getCountDownLatch.await(); | 
|  | } catch (InterruptedException e) { | 
|  | throw new AssertionError(e); | 
|  | } | 
|  | NodeEntry entry = null; | 
|  | try { | 
|  | entry = graph.get(null, Reason.OTHER, key("foo" + keyNum)); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | try { | 
|  | entry.markDirty(DirtyType.CHANGE); | 
|  |  | 
|  | // Make some changes, like adding a dep and rdep. | 
|  | entry.addReverseDepAndCheckIfDone(key("rdep")); | 
|  | entry.markRebuilding(); | 
|  | addTemporaryDirectDep(entry, dep); | 
|  | Version nextVersion = getNextVersion(startingVersion); | 
|  | entry.signalDep(nextVersion, dep); | 
|  |  | 
|  | entry.setValue(new StringValue("bar" + keyNum), nextVersion); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(keyNum + ", " + entry, e); | 
|  | } | 
|  | }; | 
|  |  | 
|  | // Start a bunch of get() calls while the node transitions from dirty to done and back. | 
|  | Runnable r2 = | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | try { | 
|  | makeBatchCountDownLatch.await(); | 
|  | } catch (InterruptedException e) { | 
|  | throw new AssertionError(e); | 
|  | } | 
|  | NodeEntry entry = null; | 
|  | try { | 
|  | entry = graph.get(null, Reason.OTHER, key("foo" + keyNum)); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | assertThat(entry).isNotNull(); | 
|  | // Requests for the value are made at the same time that the version increments from | 
|  | // the base. Check that there is no problem in requesting the version and that the | 
|  | // number is sane. | 
|  | assertThat(entry.getVersion()) | 
|  | .isAnyOf(startingVersion, getNextVersion(startingVersion)); | 
|  | getCountDownLatch.countDown(); | 
|  | } | 
|  | }; | 
|  | pool1.execute(wrapper.wrap(r1)); | 
|  | pool2.execute(wrapper.wrap(r2)); | 
|  | } | 
|  | Random r = new Random(TestUtils.getRandomSeed()); | 
|  | // Start a bunch of getBatch() calls while the node transitions from dirty to done and back. | 
|  | for (int i = 0; i < numBatchRequests; i++) { | 
|  | final List<SkyKey> batch = new ArrayList<>(numKeys); | 
|  | // Pseudorandomly uniformly sample the powerset of the keys. | 
|  | for (int j = 0; j < numKeys; j++) { | 
|  | if (r.nextBoolean()) { | 
|  | batch.add(key("foo" + j)); | 
|  | } | 
|  | } | 
|  | makeBatchCountDownLatch.countDown(); | 
|  | Runnable r3 = | 
|  | new Runnable() { | 
|  | @Override | 
|  | public void run() { | 
|  | try { | 
|  | makeBatchCountDownLatch.await(); | 
|  | } catch (InterruptedException e) { | 
|  | throw new AssertionError(e); | 
|  | } | 
|  | Map<SkyKey, ? extends NodeEntry> batchMap = null; | 
|  | try { | 
|  | batchMap = graph.getBatch(null, Reason.OTHER, batch); | 
|  | } catch (InterruptedException e) { | 
|  | throw new IllegalStateException(e); | 
|  | } | 
|  | getBatchCountDownLatch.countDown(); | 
|  | assertThat(batchMap).hasSize(batch.size()); | 
|  | for (NodeEntry entry : batchMap.values()) { | 
|  | // Batch requests are made at the same time that the version increments from the | 
|  | // base. Check that there is no problem in requesting the version and that the | 
|  | // number is sane. | 
|  | assertThat(entry.getVersion()) | 
|  | .isAnyOf(startingVersion, getNextVersion(startingVersion)); | 
|  | } | 
|  | } | 
|  | }; | 
|  | pool3.execute(wrapper.wrap(r3)); | 
|  | } | 
|  | wrapper.waitForTasksAndMaybeThrow(); | 
|  | assertThat(ExecutorUtil.interruptibleShutdown(pool1)).isFalse(); | 
|  | assertThat(ExecutorUtil.interruptibleShutdown(pool2)).isFalse(); | 
|  | assertThat(ExecutorUtil.interruptibleShutdown(pool3)).isFalse(); | 
|  | for (int i = 0; i < numKeys; i++) { | 
|  | NodeEntry entry = graph.get(null, Reason.OTHER, key("foo" + i)); | 
|  | assertThat(entry.getValue()).isEqualTo(new StringValue("bar" + i)); | 
|  | assertThat(entry.getVersion()).isEqualTo(getNextVersion(startingVersion)); | 
|  | if (checkRdeps()) { | 
|  | for (SkyKey key : entry.getReverseDepsForDoneEntry()) { | 
|  | assertThat(key).isEqualTo(key("rdep")); | 
|  | } | 
|  | } | 
|  | for (SkyKey key : entry.getDirectDeps()) { | 
|  | assertThat(key).isEqualTo(dep); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | private static DependencyState startEvaluation(NodeEntry entry) throws InterruptedException { | 
|  | return entry.addReverseDepAndCheckIfDone(null); | 
|  | } | 
|  |  | 
|  | private static void addTemporaryDirectDep(NodeEntry entry, SkyKey key) { | 
|  | GroupedListHelper<SkyKey> helper = new GroupedListHelper<>(); | 
|  | helper.add(key); | 
|  | entry.addTemporaryDirectDeps(helper); | 
|  | } | 
|  | } |