// Copyright 2015 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.skyframe;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.ExecutorUtil;
import com.google.devtools.build.lib.testutil.TestRunnableWrapper;
import com.google.devtools.build.lib.testutil.TestUtils;
import com.google.devtools.build.lib.util.GroupedList.GroupedListHelper;
import com.google.devtools.build.skyframe.GraphTester.StringValue;
import com.google.devtools.build.skyframe.NodeEntry.DependencyState;
import com.google.devtools.build.skyframe.QueryableGraph.Reason;
import com.google.devtools.build.skyframe.ThinNodeEntry.DirtyType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

/** Base class for sanity tests on {@link EvaluableGraph} implementations. */
public abstract class GraphTest {
  protected ProcessableGraph graph;
  protected TestRunnableWrapper wrapper;
  private final Version startingVersion = getStartingVersion();

  // This code should really be in a @Before method, but @Before methods are executed from the
  // top down, and this class's @Before method calls #getGraph, so makeGraph must have already
  // been called.
  protected abstract void makeGraph() throws Exception;

  protected abstract ProcessableGraph getGraph(Version version) throws Exception;

  protected abstract Version getStartingVersion();

  protected abstract Version getNextVersion(Version version);

  protected boolean checkRdeps() {
    return true;
  }

  @Before
  public void init() throws Exception {
    makeGraph();
    Version startingVersion = getStartingVersion();
    this.graph = getGraph(startingVersion);
    this.wrapper = new TestRunnableWrapper("GraphConcurrencyTest");
  }

  protected SkyKey key(String name) {
    return GraphTester.toSkyKey(name);
  }

  @Test
  public void createIfAbsentBatchSanity() throws InterruptedException {
    graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key("cat"), key("dog")));
  }

  @Test
  public void createIfAbsentConcurrentWithGet() throws InterruptedException {
    int numIters = 50;
    final SkyKey key = key("key");
    for (int i = 0; i < numIters; i++) {
      Thread t =
          new Thread(
              wrapper.wrap(
                  new Runnable() {
                    @Override
                    public void run() {
                      try {
                        graph.get(null, Reason.OTHER, key);
                      } catch (InterruptedException e) {
                        throw new IllegalStateException(e);
                      }
                    }
                  }));
      t.start();
      assertThat(graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key))).isNotEmpty();
      graph.remove(key);
    }
  }

  @Test
  public void testCreateIfAbsentWithConcurrentGet() throws Exception {
    final SkyKey key = key("foo");
    int numThreads = 50;
    final CountDownLatch startThreads = new CountDownLatch(1);
    Runnable createRunnable =
        new Runnable() {
          @Override
          public void run() {
            TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions(
                startThreads, "threads not started");
            try {
              graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key));
            } catch (InterruptedException e) {
              throw new IllegalStateException(e);
            }
          }
        };
    Runnable noCreateRunnable =
        new Runnable() {
          @Override
          public void run() {
            TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions(
                startThreads, "threads not started");
            try {
              graph.get(null, Reason.OTHER, key);
            } catch (InterruptedException e) {
              throw new IllegalStateException(e);
            }
          }
        };
    List<Thread> threads = new ArrayList<>(2 * numThreads);
    for (int i = 0; i < numThreads; i++) {
      Thread createThread = new Thread(createRunnable);
      createThread.start();
      threads.add(createThread);
      Thread noCreateThread = new Thread(noCreateRunnable);
      noCreateThread.start();
      threads.add(noCreateThread);
    }
    startThreads.countDown();
    for (Thread thread : threads) {
      thread.join();
    }
  }

  // Tests adding and removing Rdeps of a {@link NodeEntry} while a node transitions from
  // not done to done.
  @Test
  public void testAddRemoveRdeps() throws Exception {
    SkyKey key = key("foo");
    final NodeEntry entry =
        Iterables.getOnlyElement(
            graph.createIfAbsentBatch(null, Reason.OTHER, ImmutableList.of(key)).values());
    // These numbers are arbitrary.
    int numThreads = 50;
    int numKeys = numThreads;
    // One chunk will be used to add and remove rdeps before setting the node value.  The second
    // chunk of work will have the node value set and the last chunk will be to add and remove
    // rdeps after the value has been set.
    final int chunkSize = 40;
    final int numIterations = chunkSize * 2;
    // This latch is used to signal that the runnables have been submitted to the executor.
    final CountDownLatch waitForStart = new CountDownLatch(1);
    // This latch is used to signal to the main thread that we have begun the second chunk
    // for sufficiently many keys.  The minimum of numThreads and numKeys is used to prevent
    // thread starvation from causing a delay here.
    final CountDownLatch waitForAddedRdep = new CountDownLatch(numThreads);
    // This latch is used to guarantee that we set the node's value before we enter the third
    // chunk for any key.
    final CountDownLatch waitForSetValue = new CountDownLatch(1);
    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
    // Add single rdep before transition to done.
    assertThat(entry.addReverseDepAndCheckIfDone(key("rdep")))
        .isEqualTo(DependencyState.NEEDS_SCHEDULING);
    List<SkyKey> rdepKeys = new ArrayList<>();
    for (int i = 0; i < numKeys; i++) {
      rdepKeys.add(key("rdep" + i));
    }
    graph.createIfAbsentBatch(null, Reason.OTHER, rdepKeys);
    for (int i = 0; i < numKeys; i++) {
      final int j = i;
      Runnable r =
          new Runnable() {
            @Override
            public void run() {
              try {
                waitForStart.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                assertThat(entry.addReverseDepAndCheckIfDone(key("rdep" + j)))
                    .isNotEqualTo(DependencyState.DONE);
                waitForAddedRdep.countDown();
                waitForSetValue.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                for (int k = chunkSize; k <= numIterations; k++) {
                  entry.removeReverseDep(key("rdep" + j));
                  entry.addReverseDepAndCheckIfDone(key("rdep" + j));
                  if (checkRdeps()) {
                    entry.getReverseDepsForDoneEntry();
                  }
                }
              } catch (InterruptedException e) {
                fail("Test failed: " + e.toString());
              }
            }
          };
      pool.execute(wrapper.wrap(r));
    }
    waitForStart.countDown();
    waitForAddedRdep.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
    entry.markRebuilding();
    entry.setValue(new StringValue("foo1"), startingVersion, null);
    waitForSetValue.countDown();
    wrapper.waitForTasksAndMaybeThrow();
    assertThat(ExecutorUtil.interruptibleShutdown(pool)).isFalse();
    assertThat(graph.get(null, Reason.OTHER, key).getValue()).isEqualTo(new StringValue("foo1"));
    if (checkRdeps()) {
      assertThat(graph.get(null, Reason.OTHER, key).getReverseDepsForDoneEntry())
          .hasSize(numKeys + 1);
    }

    graph = getGraph(getNextVersion(startingVersion));
    NodeEntry sameEntry = Preconditions.checkNotNull(graph.get(null, Reason.OTHER, key));
    // Mark the node as dirty again and check that the reverse deps have been preserved.
    sameEntry.markDirty(DirtyType.CHANGE);
    startEvaluation(sameEntry);
    sameEntry.markRebuilding();
    sameEntry.setValue(new StringValue("foo2"), getNextVersion(startingVersion), null);
    assertThat(graph.get(null, Reason.OTHER, key).getValue()).isEqualTo(new StringValue("foo2"));
    if (checkRdeps()) {
      assertThat(graph.get(null, Reason.OTHER, key).getReverseDepsForDoneEntry())
          .hasSize(numKeys + 1);
    }
  }

  // Tests adding inflight nodes with a given key while an existing node with the same key
  // undergoes a transition from not done to done.
  @Test
  public void testAddingInflightNodes() throws Exception {
    int numThreads = 50;
    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
    final int numKeys = 500;
    // Add each pair of keys 10 times.
    final Set<SkyKey> nodeCreated = Sets.newConcurrentHashSet();
    final Set<SkyKey> valuesSet = Sets.newConcurrentHashSet();
    for (int i = 0; i < 10; i++) {
      for (int j = 0; j < numKeys; j++) {
        for (int k = j + 1; k < numKeys; k++) {
          final int keyNum1 = j;
          final int keyNum2 = k;
          final SkyKey key1 = key("foo" + keyNum1);
          final SkyKey key2 = key("foo" + keyNum2);
          final Iterable<SkyKey> keys = ImmutableList.of(key1, key2);
          Runnable r =
              new Runnable() {
                @Override
                public void run() {
                  for (SkyKey key : keys) {
                    NodeEntry entry = null;
                    try {
                      entry = graph.get(null, Reason.OTHER, key);
                    } catch (InterruptedException e) {
                      throw new IllegalStateException(e);
                    }
                    if (entry == null) {
                      nodeCreated.add(key);
                    }
                  }
                  Map<SkyKey, ? extends NodeEntry> entries;
                  try {
                    entries = graph.createIfAbsentBatch(null, Reason.OTHER, keys);
                  } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                  }
                  for (Integer keyNum : ImmutableList.of(keyNum1, keyNum2)) {
                    SkyKey key = key("foo" + keyNum);
                    NodeEntry entry = entries.get(key);
                    // {@code entry.addReverseDepAndCheckIfDone(null)} should return
                    // NEEDS_SCHEDULING at most once.
                    try {
                      if (startEvaluation(entry).equals(DependencyState.NEEDS_SCHEDULING)) {
                        entry.markRebuilding();
                        assertThat(valuesSet.add(key)).isTrue();
                        // Set to done.
                        entry.setValue(new StringValue("bar" + keyNum), startingVersion, null);
                        assertThat(entry.isDone()).isTrue();
                      }
                    } catch (InterruptedException e) {
                      throw new IllegalStateException(key + ", " + entry, e);
                    }
                  }
                  // This shouldn't cause any problems from the other threads.
                  try {
                    graph.createIfAbsentBatch(null, Reason.OTHER, keys);
                  } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                  }
                }
              };
          pool.execute(wrapper.wrap(r));
        }
      }
    }
    wrapper.waitForTasksAndMaybeThrow();
    assertThat(ExecutorUtil.interruptibleShutdown(pool)).isFalse();
    // Check that all the values are as expected.
    for (int i = 0; i < numKeys; i++) {
      SkyKey key = key("foo" + i);
      assertThat(nodeCreated).contains(key);
      assertThat(valuesSet).contains(key);
      assertThat(graph.get(null, Reason.OTHER, key).getValue())
          .isEqualTo(new StringValue("bar" + i));
      assertThat(graph.get(null, Reason.OTHER, key).getVersion()).isEqualTo(startingVersion);
    }
  }

  /**
   * Initially calling {@link NodeEntry#setValue} and then making sure concurrent calls to {@link
   * QueryableGraph#get} and {@link QueryableGraph#getBatch} do not interfere with the node.
   */
  @Test
  public void testDoneToDirty() throws Exception {
    final int numKeys = 1000;
    int numThreads = 50;
    final int numBatchRequests = 100;
    // Create a bunch of done nodes.
    ArrayList<SkyKey> keys = new ArrayList<>();
    for (int i = 0; i < numKeys; i++) {
      keys.add(key("foo" + i));
    }
    Map<SkyKey, ? extends NodeEntry> entries = graph.createIfAbsentBatch(null, Reason.OTHER, keys);
    for (int i = 0; i < numKeys; i++) {
      NodeEntry entry = entries.get(key("foo" + i));
      startEvaluation(entry);
      entry.markRebuilding();
      entry.setValue(new StringValue("bar"), startingVersion, null);
    }

    assertThat(graph.get(null, Reason.OTHER, key("foo" + 0))).isNotNull();
    graph = getGraph(getNextVersion(startingVersion));
    assertThat(graph.get(null, Reason.OTHER, key("foo" + 0))).isNotNull();
    ExecutorService pool1 = Executors.newFixedThreadPool(numThreads);
    ExecutorService pool2 = Executors.newFixedThreadPool(numThreads);
    ExecutorService pool3 = Executors.newFixedThreadPool(numThreads);

    // Only start all the threads once the batch requests are ready.
    final CountDownLatch makeBatchCountDownLatch = new CountDownLatch(numBatchRequests);
    // Do at least 5 single requests and batch requests before transitioning node.
    final CountDownLatch getBatchCountDownLatch = new CountDownLatch(5);
    final CountDownLatch getCountDownLatch = new CountDownLatch(5);

    final SkyKey dep = key("dep");
    for (int i = 0; i < numKeys; i++) {
      final int keyNum = i;
      // Transition the nodes from done to dirty and then back to done.
      Runnable r1 =
          () -> {
            try {
              makeBatchCountDownLatch.await();
              getBatchCountDownLatch.await();
              getCountDownLatch.await();
            } catch (InterruptedException e) {
              throw new AssertionError(e);
            }
            NodeEntry entry = null;
            try {
              entry = graph.get(null, Reason.OTHER, key("foo" + keyNum));
            } catch (InterruptedException e) {
              throw new IllegalStateException(e);
            }
            try {
              entry.markDirty(DirtyType.CHANGE);

              // Make some changes, like adding a dep and rdep.
              entry.addReverseDepAndCheckIfDone(key("rdep"));
              entry.markRebuilding();
              addTemporaryDirectDep(entry, dep);
              Version nextVersion = getNextVersion(startingVersion);
              entry.signalDep(nextVersion, dep);

              entry.setValue(new StringValue("bar" + keyNum), nextVersion, null);
            } catch (InterruptedException e) {
              throw new IllegalStateException(keyNum + ", " + entry, e);
            }
          };

      // Start a bunch of get() calls while the node transitions from dirty to done and back.
      Runnable r2 =
          new Runnable() {
            @Override
            public void run() {
              try {
                makeBatchCountDownLatch.await();
              } catch (InterruptedException e) {
                throw new AssertionError(e);
              }
              NodeEntry entry = null;
              try {
                entry = graph.get(null, Reason.OTHER, key("foo" + keyNum));
              } catch (InterruptedException e) {
                throw new IllegalStateException(e);
              }
              assertThat(entry).isNotNull();
              // Requests for the value are made at the same time that the version increments from
              // the base. Check that there is no problem in requesting the version and that the
              // number is sane.
              assertThat(entry.getVersion())
                  .isAnyOf(startingVersion, getNextVersion(startingVersion));
              getCountDownLatch.countDown();
            }
          };
      pool1.execute(wrapper.wrap(r1));
      pool2.execute(wrapper.wrap(r2));
    }
    Random r = new Random(TestUtils.getRandomSeed());
    // Start a bunch of getBatch() calls while the node transitions from dirty to done and back.
    for (int i = 0; i < numBatchRequests; i++) {
      final List<SkyKey> batch = new ArrayList<>(numKeys);
      // Pseudorandomly uniformly sample the powerset of the keys.
      for (int j = 0; j < numKeys; j++) {
        if (r.nextBoolean()) {
          batch.add(key("foo" + j));
        }
      }
      makeBatchCountDownLatch.countDown();
      Runnable r3 =
          new Runnable() {
            @Override
            public void run() {
              try {
                makeBatchCountDownLatch.await();
              } catch (InterruptedException e) {
                throw new AssertionError(e);
              }
              Map<SkyKey, ? extends NodeEntry> batchMap = null;
              try {
                batchMap = graph.getBatch(null, Reason.OTHER, batch);
              } catch (InterruptedException e) {
                throw new IllegalStateException(e);
              }
              getBatchCountDownLatch.countDown();
              assertThat(batchMap).hasSize(batch.size());
              for (NodeEntry entry : batchMap.values()) {
                // Batch requests are made at the same time that the version increments from the
                // base. Check that there is no problem in requesting the version and that the
                // number is sane.
                assertThat(entry.getVersion())
                    .isAnyOf(startingVersion, getNextVersion(startingVersion));
              }
            }
          };
      pool3.execute(wrapper.wrap(r3));
    }
    wrapper.waitForTasksAndMaybeThrow();
    assertThat(ExecutorUtil.interruptibleShutdown(pool1)).isFalse();
    assertThat(ExecutorUtil.interruptibleShutdown(pool2)).isFalse();
    assertThat(ExecutorUtil.interruptibleShutdown(pool3)).isFalse();
    for (int i = 0; i < numKeys; i++) {
      NodeEntry entry = graph.get(null, Reason.OTHER, key("foo" + i));
      assertThat(entry.getValue()).isEqualTo(new StringValue("bar" + i));
      assertThat(entry.getVersion()).isEqualTo(getNextVersion(startingVersion));
      if (checkRdeps()) {
        for (SkyKey key : entry.getReverseDepsForDoneEntry()) {
          assertThat(key).isEqualTo(key("rdep"));
        }
      }
      for (SkyKey key : entry.getDirectDeps()) {
        assertThat(key).isEqualTo(dep);
      }
    }
  }

  private static DependencyState startEvaluation(NodeEntry entry) throws InterruptedException {
    return entry.addReverseDepAndCheckIfDone(null);
  }

  private static void addTemporaryDirectDep(NodeEntry entry, SkyKey key) {
    GroupedListHelper<SkyKey> helper = new GroupedListHelper<>();
    helper.add(key);
    entry.addTemporaryDirectDeps(helper);
  }
}
