blob: db7591093755452721305080c066ad3bbb4ca342 [file] [log] [blame]
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.testutil.TestUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for MoreFutures
*/
@RunWith(JUnit4.class)
public class MoreFuturesTest {
private ExecutorService executorService;
@Before
public final void createExecutor() throws Exception {
executorService = Executors.newFixedThreadPool(5);
}
@After
public final void shutdownExecutor() throws Exception {
MoreExecutors.shutdownAndAwaitTermination(executorService, TestUtils.WAIT_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
}
/** Test the normal path where everything is successful. */
@Test
public void allAsListOrCancelAllHappy() throws ExecutionException, InterruptedException {
final List<DelayedFuture> futureList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
DelayedFuture future = new DelayedFuture(i);
executorService.execute(future);
futureList.add(future);
}
ListenableFuture<List<Object>> list = MoreFutures.allAsListOrCancelAll(futureList);
List<Object> result = list.get();
assertThat(result).hasSize(futureList.size());
for (DelayedFuture delayedFuture : futureList) {
assertThat(delayedFuture.wasCanceled).isFalse();
assertThat(delayedFuture.wasInterrupted).isFalse();
assertThat(delayedFuture.get()).isNotNull();
assertThat(result).contains(delayedFuture.get());
}
}
/** Test that if any of the futures in the list fails, we cancel all the futures immediately. */
@Test
public void allAsListOrCancelAllCancellation() throws InterruptedException {
final List<DelayedFuture> futureList = new ArrayList<>();
for (int i = 1; i < 6; i++) {
DelayedFuture future = new DelayedFuture(i * 1000);
executorService.execute(future);
futureList.add(future);
}
DelayedFuture toFail = new DelayedFuture(1000);
futureList.add(toFail);
toFail.makeItFail();
ListenableFuture<List<Object>> list = MoreFutures.allAsListOrCancelAll(futureList);
try {
list.get();
fail("This should fail");
} catch (InterruptedException | ExecutionException ignored) {
}
Thread.sleep(100);
for (DelayedFuture delayedFuture : futureList) {
assertThat(delayedFuture.wasCanceled || delayedFuture == toFail).isTrue();
assertThat(delayedFuture.wasInterrupted).isFalse();
}
}
@Test
public void waitForAllInterruptiblyFailFast_AllSuccessful() throws Exception {
List<DelayedFuture> futureList = new ArrayList<>();
for (int i = 1; i < 6; i++) {
DelayedFuture future = new DelayedFuture(i * 1000);
executorService.execute(future);
futureList.add(future);
}
MoreFutures.waitForAllInterruptiblyFailFast(futureList);
for (DelayedFuture delayedFuture : futureList) {
assertThat(delayedFuture.wasCanceled).isFalse();
assertThat(delayedFuture.wasInterrupted).isFalse();
assertThat(delayedFuture.get()).isNotNull();
}
}
@Test
public void waitForAllInterruptiblyFailFast_Interrupt() throws Exception {
final List<DelayedFuture> futureList = new ArrayList<>();
for (int i = 1; i < 6; i++) {
// When we have a bunch of futures that never complete.
DelayedFuture future = new DelayedFuture(Integer.MAX_VALUE);
// And submit them to an Executor.
executorService.execute(future);
futureList.add(future);
}
final Thread testThread = Thread.currentThread();
// And have a thread that interrupts the current thread (the one running the test) once all the
// futures were polled at least once via Future#get(long, TimeUnit).
Thread interruptThread = new Thread() {
@Override
public void run() {
for (DelayedFuture delayedFuture : futureList) {
try {
delayedFuture.getLatch.await(
TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
throw new IllegalStateException(ie);
}
}
testThread.interrupt();
}
};
// And run this thread in the background.
interruptThread.start();
try {
// And then wait for all the futures to complete, interruptibly.
// Then, as expected, waitForAllInterruptiblyFailFast propagates the interrupt sent to the
// main test thread by our background thread.
assertThrows(
InterruptedException.class,
() -> MoreFutures.waitForAllInterruptiblyFailFast(futureList));
} finally {
// The @After-annotated shutdownExecutor method blocks on completion of all tasks. Since we
// submitted a bunch of tasks that never complete, we need to explicitly cancel them.
for (DelayedFuture delayedFuture : futureList) {
delayedFuture.cancel(/*mayInterruptIfRunning=*/ true);
}
// If we're here and the test were to pass, then the background thread must have already
// completed. Interrupt it unconditionally - if the test were to pass, this is benign. But if
// the test were to fail then we'd have a rogue thread in the background which can be very
// evil (e.g. can interfere with the execution of other test cases).
interruptThread.interrupt();
}
}
@Test
public void waitForAllInterruptiblyFailFast_Failure() throws Exception {
List<DelayedFuture> futureList = new ArrayList<>();
for (int i = 1; i < 6; i++) {
DelayedFuture future = new DelayedFuture(i * 1000);
executorService.execute(future);
futureList.add(future);
}
DelayedFuture toFail = new DelayedFuture(1000);
futureList.add(toFail);
toFail.makeItFail();
ExecutionException ee =
assertThrows(
ExecutionException.class,
() -> MoreFutures.waitForAllInterruptiblyFailFast(futureList));
assertThat(ee).hasCauseThat().hasMessageThat().isEqualTo("I like to fail!!");
}
/**
* A future that (if added to an executor) waits {@code delay} milliseconds before setting a
* response.
*/
private static class DelayedFuture extends AbstractFuture<Object> implements Runnable {
private final int delay;
private final CountDownLatch failOrInterruptLatch = new CountDownLatch(1);
private final CountDownLatch getLatch = new CountDownLatch(1);
private boolean wasCanceled;
private boolean wasInterrupted;
public DelayedFuture(int delay) {
this.delay = delay;
}
@Override
public void run() {
try {
wasCanceled = failOrInterruptLatch.await(delay, TimeUnit.MILLISECONDS);
// Not canceled and not done (makeItFail sets the value, so in that case is done).
if (!wasCanceled && !isDone()) {
set(new Object());
}
} catch (InterruptedException e) {
wasInterrupted = true;
}
}
public void makeItFail() {
setException(new RuntimeException("I like to fail!!"));
failOrInterruptLatch.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}
@Override
protected void interruptTask() {
failOrInterruptLatch.countDown();
}
@Override
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {
getLatch.countDown();
return super.get(timeout, unit);
}
}
}