// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.testutil.TestUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
 * Tests for MoreFutures
 */
@RunWith(JUnit4.class)
public class MoreFuturesTest {

  private ExecutorService executorService;

  @Before
  public final void createExecutor() throws Exception  {
    executorService = Executors.newFixedThreadPool(5);
  }

  @After
  public final void shutdownExecutor() throws Exception  {
    MoreExecutors.shutdownAndAwaitTermination(executorService, TestUtils.WAIT_TIMEOUT_SECONDS,
        TimeUnit.SECONDS);

  }

  /** Test the normal path where everything is successful. */
  @Test
  public void allAsListOrCancelAllHappy() throws ExecutionException, InterruptedException {
    final List<DelayedFuture> futureList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
      DelayedFuture future = new DelayedFuture(i);
      executorService.execute(future);
      futureList.add(future);
    }
    ListenableFuture<List<Object>> list = MoreFutures.allAsListOrCancelAll(futureList);
    List<Object> result = list.get();
    assertThat(result).hasSize(futureList.size());
    for (DelayedFuture delayedFuture : futureList) {
      assertThat(delayedFuture.wasCanceled).isFalse();
      assertThat(delayedFuture.wasInterrupted).isFalse();
      assertThat(delayedFuture.get()).isNotNull();
      assertThat(result).contains(delayedFuture.get());
    }
  }

  /** Test that if any of the futures in the list fails, we cancel all the futures immediately. */
  @Test
  public void allAsListOrCancelAllCancellation() throws InterruptedException {
    final List<DelayedFuture> futureList = new ArrayList<>();
    for (int i = 1; i < 6; i++) {
      DelayedFuture future = new DelayedFuture(i * 1000);
      executorService.execute(future);
      futureList.add(future);
    }
    DelayedFuture toFail = new DelayedFuture(1000);
    futureList.add(toFail);
    toFail.makeItFail();
    ListenableFuture<List<Object>> list = MoreFutures.allAsListOrCancelAll(futureList);

    try {
      list.get();
      fail("This should fail");
    } catch (InterruptedException | ExecutionException ignored) {
    }
    Thread.sleep(100);
    for (DelayedFuture delayedFuture : futureList) {
      assertThat(delayedFuture.wasCanceled || delayedFuture == toFail).isTrue();
      assertThat(delayedFuture.wasInterrupted).isFalse();
    }
  }

  @Test
  public void waitForAllInterruptiblyFailFast_AllSuccessful() throws Exception {
    List<DelayedFuture> futureList = new ArrayList<>();
    for (int i = 1; i < 6; i++) {
      DelayedFuture future = new DelayedFuture(i * 1000);
      executorService.execute(future);
      futureList.add(future);
    }
    MoreFutures.waitForAllInterruptiblyFailFast(futureList);
    for (DelayedFuture delayedFuture : futureList) {
      assertThat(delayedFuture.wasCanceled).isFalse();
      assertThat(delayedFuture.wasInterrupted).isFalse();
      assertThat(delayedFuture.get()).isNotNull();
    }
  }

  @Test
  public void waitForAllInterruptiblyFailFast_Interrupt() throws Exception {
    final List<DelayedFuture> futureList = new ArrayList<>();
    for (int i = 1; i < 6; i++) {
      // When we have a bunch of futures that never complete.
      DelayedFuture future = new DelayedFuture(Integer.MAX_VALUE);
      // And submit them to an Executor.
      executorService.execute(future);
      futureList.add(future);
    }
    final Thread testThread = Thread.currentThread();
    // And have a thread that interrupts the current thread (the one running the test) once all the
    // futures were polled at least once via Future#get(long, TimeUnit).
    Thread interruptThread = new Thread() {
      @Override
      public void run() {
        for (DelayedFuture delayedFuture : futureList) {
          try {
            delayedFuture.getLatch.await(
                TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
          } catch (InterruptedException ie) {
            throw new IllegalStateException(ie);
          }
        }
        testThread.interrupt();
      }
    };
    // And run this thread in the background.
    interruptThread.start();
    try {
      // And then wait for all the futures to complete, interruptibly.
      // Then, as expected, waitForAllInterruptiblyFailFast propagates the interrupt sent to the
      // main test thread by our background thread.
      assertThrows(
          InterruptedException.class,
          () -> MoreFutures.waitForAllInterruptiblyFailFast(futureList));
    } finally {
      // The @After-annotated shutdownExecutor method blocks on completion of all tasks. Since we
      // submitted a bunch of tasks that never complete, we need to explicitly cancel them.
      for (DelayedFuture delayedFuture : futureList) {
        delayedFuture.cancel(/*mayInterruptIfRunning=*/ true);
      }
      // If we're here and the test were to pass, then the background thread must have already
      // completed. Interrupt it unconditionally - if the test were to pass, this is benign. But if
      // the test were to fail then we'd have a rogue thread in the background which can be very
      // evil (e.g. can interfere with the execution of other test cases).
      interruptThread.interrupt();
    }
  }

  @Test
  public void waitForAllInterruptiblyFailFast_Failure() throws Exception {
    List<DelayedFuture> futureList = new ArrayList<>();
    for (int i = 1; i < 6; i++) {
      DelayedFuture future = new DelayedFuture(i * 1000);
      executorService.execute(future);
      futureList.add(future);
    }
    DelayedFuture toFail = new DelayedFuture(1000);
    futureList.add(toFail);
    toFail.makeItFail();
    ExecutionException ee =
        assertThrows(
            ExecutionException.class,
            () -> MoreFutures.waitForAllInterruptiblyFailFast(futureList));
    assertThat(ee).hasCauseThat().hasMessageThat().isEqualTo("I like to fail!!");
  }

  /**
   * A future that (if added to an executor) waits {@code delay} milliseconds before setting a
   * response.
   */
  private static class DelayedFuture extends AbstractFuture<Object> implements Runnable {

    private final int delay;
    private final CountDownLatch failOrInterruptLatch = new CountDownLatch(1);
    private final CountDownLatch getLatch = new CountDownLatch(1);
    private boolean wasCanceled;
    private boolean wasInterrupted;

    public DelayedFuture(int delay) {
      this.delay = delay;
    }

    @Override
    public void run() {
      try {
        wasCanceled = failOrInterruptLatch.await(delay, TimeUnit.MILLISECONDS);
        // Not canceled and not done (makeItFail sets the value, so in that case is done).
        if (!wasCanceled && !isDone()) {
          set(new Object());
        }
      } catch (InterruptedException e) {
        wasInterrupted = true;
      }
    }

    public void makeItFail() {
      setException(new RuntimeException("I like to fail!!"));
      failOrInterruptLatch.countDown();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      return super.cancel(mayInterruptIfRunning);
    }

    @Override
    protected void interruptTask() {
      failOrInterruptLatch.countDown();
    }

    @Override
    public Object get(long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException, ExecutionException {
      getLatch.countDown();
      return super.get(timeout, unit);
    }
  }

}
