| // Copyright 2014 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.concurrent; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| import static com.google.common.truth.Truth.assertWithMessage; |
| import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; |
| import com.google.devtools.build.lib.testutil.TestThread; |
| import com.google.devtools.build.lib.testutil.TestUtils; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** |
| * Tests for AbstractQueueVisitor. |
| */ |
| @RunWith(JUnit4.class) |
| public class AbstractQueueVisitorTest { |
| |
| private static final RuntimeException THROWABLE = new RuntimeException(); |
| |
| @Test |
| public void simpleCounter() throws Exception { |
| CountingQueueVisitor counter = new CountingQueueVisitor(); |
| counter.enqueue(); |
| counter.awaitQuiescence(/*interruptWorkers=*/ false); |
| assertThat(counter.getCount()).isSameInstanceAs(10); |
| } |
| |
| @Test |
| public void externalDep() throws Exception { |
| SettableFuture<Object> future = SettableFuture.create(); |
| AbstractQueueVisitor counter = |
| new AbstractQueueVisitor( |
| /*parallelism=*/ 2, |
| /* keepAliveTime= */ 3L, |
| TimeUnit.SECONDS, |
| /* failFastOnException= */ true, |
| "FOO-BAR", |
| ErrorClassifier.DEFAULT); |
| counter.dependOnFuture(future); |
| new Thread( |
| () -> { |
| try { |
| Thread.sleep(5); |
| future.set(new Object()); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| }) |
| .start(); |
| counter.awaitQuiescence(/*interruptWorkers=*/ false); |
| } |
| |
| @Test |
| public void externalDepWithInterrupt() throws Exception { |
| SettableFuture<Object> future = SettableFuture.create(); |
| AbstractQueueVisitor counter = |
| new AbstractQueueVisitor( |
| /*parallelism=*/ 2, |
| /* keepAliveTime= */ 3L, |
| TimeUnit.SECONDS, |
| /* failFastOnException= */ true, |
| "FOO-BAR", |
| ErrorClassifier.DEFAULT); |
| counter.dependOnFuture(future); |
| Thread.currentThread().interrupt(); |
| assertThrows( |
| InterruptedException.class, () -> counter.awaitQuiescence(/*interruptWorkers=*/ true)); |
| assertThat(future.isCancelled()).isTrue(); |
| } |
| |
| @Test |
| public void callerOwnedPool() throws Exception { |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| assertThat(executor.getActiveCount()).isSameInstanceAs(0); |
| |
| CountingQueueVisitor counter = new CountingQueueVisitor(executor); |
| counter.enqueue(); |
| counter.awaitQuiescence(/*interruptWorkers=*/ false); |
| assertThat(counter.getCount()).isSameInstanceAs(10); |
| |
| executor.shutdown(); |
| assertThat(executor.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) |
| .isTrue(); |
| } |
| |
| @Test |
| public void doubleCounter() throws Exception { |
| CountingQueueVisitor counter = new CountingQueueVisitor(); |
| counter.enqueue(); |
| counter.enqueue(); |
| counter.awaitQuiescence(/*interruptWorkers=*/ false); |
| assertThat(counter.getCount()).isSameInstanceAs(10); |
| } |
| |
| @Test |
| public void exceptionFromWorkerThread() { |
| final RuntimeException myException = new IllegalStateException(); |
| ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); |
| visitor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| throw myException; |
| } |
| }); |
| |
| // The exception from the worker thread should be re-thrown from the main thread. |
| Exception e = |
| assertThrows(Exception.class, () -> visitor.awaitQuiescence(/*interruptWorkers=*/ false)); |
| assertThat(e).isSameInstanceAs(myException); |
| } |
| |
| // Regression test for "AbstractQueueVisitor loses track of jobs if thread allocation fails". |
| @Test |
| public void threadPoolThrowsSometimes() throws Exception { |
| // In certain cases (for example, if the address space is almost entirely consumed by a huge |
| // JVM heap), thread allocation can fail with an OutOfMemoryError. If the queue visitor |
| // does not handle this gracefully, we lose track of tasks and hang the visitor indefinitely. |
| |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()) { |
| private final AtomicLong count = new AtomicLong(); |
| |
| @Override |
| public void execute(Runnable command) { |
| long count = this.count.incrementAndGet(); |
| if (count == 6) { |
| throw new Error("Could not create thread (fakeout)"); |
| } |
| super.execute(command); |
| } |
| }; |
| |
| CountingQueueVisitor counter = new CountingQueueVisitor(executor); |
| counter.enqueue(); |
| Error expected = |
| assertThrows(Error.class, () -> counter.awaitQuiescence(/*interruptWorkers=*/ false)); |
| assertThat(expected).hasMessageThat().isEqualTo("Could not create thread (fakeout)"); |
| assertThat(counter.getCount()).isSameInstanceAs(5); |
| |
| executor.shutdown(); |
| assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); |
| } |
| |
| // Regression test to make sure that AbstractQueueVisitor doesn't swallow unchecked exceptions if |
| // it is interrupted concurrently with the unchecked exception being thrown. |
| @Test |
| public void interruptAndThrownIsInterruptedAndThrown() throws Exception { |
| final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); |
| // Use a latch to make sure the thread gets a chance to start. |
| final CountDownLatch threadStarted = new CountDownLatch(1); |
| visitor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| threadStarted.countDown(); |
| assertThat( |
| Uninterruptibles.awaitUninterruptibly( |
| visitor.getInterruptionLatchForTestingOnly(), 2, TimeUnit.SECONDS)) |
| .isTrue(); |
| throw THROWABLE; |
| } |
| }); |
| assertThat(threadStarted.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue(); |
| // Interrupt will not be processed until work starts. |
| Thread.currentThread().interrupt(); |
| Exception e = |
| assertThrows(Exception.class, () -> visitor.awaitQuiescence(/*interruptWorkers=*/ true)); |
| assertThat(e).isEqualTo(THROWABLE); |
| assertThat(Thread.interrupted()).isTrue(); |
| } |
| |
| @Test |
| public void interruptionWithoutInterruptingWorkers() throws Exception { |
| final Thread mainThread = Thread.currentThread(); |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final CountDownLatch latch2 = new CountDownLatch(1); |
| final boolean[] workerThreadCompleted = { false }; |
| final ConcreteQueueVisitor visitor = new ConcreteQueueVisitor(); |
| |
| visitor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| latch1.countDown(); |
| latch2.await(); |
| workerThreadCompleted[0] = true; |
| } catch (InterruptedException e) { |
| // Do not set workerThreadCompleted to true |
| } |
| } |
| }); |
| |
| TestThread interrupterThread = |
| new TestThread( |
| () -> { |
| latch1.await(); |
| mainThread.interrupt(); |
| assertThat( |
| visitor |
| .getInterruptionLatchForTestingOnly() |
| .await(TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS)) |
| .isTrue(); |
| latch2.countDown(); |
| }); |
| interrupterThread.start(); |
| |
| assertThrows( |
| InterruptedException.class, () -> visitor.awaitQuiescence(/*interruptWorkers=*/ false)); |
| |
| interrupterThread.joinAndAssertState(400); |
| assertThat(workerThreadCompleted[0]).isTrue(); |
| } |
| |
| @Test |
| public void interruptionWithInterruptingWorkers() throws Exception { |
| assertInterruptWorkers(null); |
| |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| assertInterruptWorkers(executor); |
| executor.shutdown(); |
| executor.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| } |
| |
| private static void assertInterruptWorkers(ThreadPoolExecutor executor) throws Exception { |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final CountDownLatch latch2 = new CountDownLatch(1); |
| final boolean[] workerThreadInterrupted = { false }; |
| ConcreteQueueVisitor visitor = (executor == null) |
| ? new ConcreteQueueVisitor() |
| : new ConcreteQueueVisitor(executor, true); |
| |
| visitor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| latch1.countDown(); |
| latch2.await(); |
| } catch (InterruptedException e) { |
| workerThreadInterrupted[0] = true; |
| } |
| } |
| }); |
| |
| latch1.await(); |
| Thread.currentThread().interrupt(); |
| |
| assertThrows( |
| InterruptedException.class, () -> visitor.awaitQuiescence(/*interruptWorkers=*/ true)); |
| |
| assertThat(workerThreadInterrupted[0]).isTrue(); |
| } |
| |
| @Test |
| public void failFast() throws Exception { |
| // In failFast mode, we only run actions queued before the exception. |
| assertFailFast(null, true, false, "a", "b"); |
| |
| // In !failFast mode, we complete all queued actions. |
| assertFailFast(null, false, false, "a", "b", "1", "2"); |
| |
| // Now check fail-fast on interrupt: |
| assertFailFast(null, false, true, "a", "b"); |
| } |
| |
| @Test |
| public void failFastNoShutdown() throws Exception { |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| // In failFast mode, we only run actions queued before the exception. |
| assertFailFast(executor, true, false, "a", "b"); |
| |
| // In !failFast mode, we complete all queued actions. |
| assertFailFast(executor, false, false, "a", "b", "1", "2"); |
| |
| // Now check fail-fast on interrupt: |
| assertFailFast(executor, false, true, "a", "b"); |
| |
| executor.shutdown(); |
| assertThat(executor.awaitTermination(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) |
| .isTrue(); |
| } |
| |
| private static void assertFailFast( |
| ThreadPoolExecutor executor, |
| boolean failFastOnException, |
| boolean interrupt, |
| String... expectedVisited) |
| throws Exception { |
| assertThat(executor == null || !executor.isShutdown()).isTrue(); |
| AbstractQueueVisitor visitor = |
| (executor == null) |
| ? new ConcreteQueueVisitor(failFastOnException) |
| : new ConcreteQueueVisitor(executor, failFastOnException); |
| |
| List<String> visitedList = Collections.synchronizedList(Lists.<String>newArrayList()); |
| |
| // Runnable "ra" will await the uncaught exception from |
| // "throwingRunnable", then add "a" to the list and |
| // enqueue "r1". Runnable "r1" should be |
| // executed iff !failFast. |
| |
| CountDownLatch latchA = new CountDownLatch(1); |
| CountDownLatch latchB = new CountDownLatch(1); |
| |
| Runnable r1 = awaitAddAndEnqueueRunnable(interrupt, visitor, null, visitedList, "1", null); |
| Runnable r2 = awaitAddAndEnqueueRunnable(interrupt, visitor, null, visitedList, "2", null); |
| Runnable ra = awaitAddAndEnqueueRunnable(interrupt, visitor, latchA, visitedList, "a", r1); |
| Runnable rb = awaitAddAndEnqueueRunnable(interrupt, visitor, latchB, visitedList, "b", r2); |
| |
| visitor.execute(ra); |
| visitor.execute(rb); |
| latchA.await(); |
| latchB.await(); |
| visitor.execute(interrupt ? interruptingRunnable(Thread.currentThread()) : throwingRunnable()); |
| |
| Exception e = |
| assertThrows(Exception.class, () -> visitor.awaitQuiescence(/*interruptWorkers=*/ false)); |
| if (interrupt) { |
| assertThat(e).isInstanceOf(InterruptedException.class); |
| } else { |
| assertThat(e).isSameInstanceAs(THROWABLE); |
| } |
| assertWithMessage("got: " + visitedList + "\nwant: " + Arrays.toString(expectedVisited)) |
| .that(Sets.newHashSet(visitedList)) |
| .isEqualTo(Sets.newHashSet(expectedVisited)); |
| |
| if (executor != null) { |
| assertThat(executor.isShutdown()).isFalse(); |
| assertThat(visitor.getTaskCount()).isEqualTo(0); |
| } |
| } |
| |
| @Test |
| public void jobIsInterruptedWhenOtherFails() throws Exception { |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| |
| final AbstractQueueVisitor visitor = |
| createQueueVisitorWithConstantErrorClassification(executor, ErrorClassification.CRITICAL); |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final AtomicBoolean wasInterrupted = new AtomicBoolean(false); |
| |
| Runnable r1 = new Runnable() { |
| |
| @Override |
| public void run() { |
| latch1.countDown(); |
| try { |
| // Interruption is expected during a sleep. There is no sense in fail or assert call |
| // because exception is going to be swallowed inside AbstractQueueVisitor. |
| // We are using wasInterrupted flag to assert in the end of test. |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| wasInterrupted.set(true); |
| } |
| } |
| }; |
| |
| visitor.execute(r1); |
| latch1.await(); |
| visitor.execute(throwingRunnable()); |
| CountDownLatch exnLatch = visitor.getExceptionLatchForTestingOnly(); |
| |
| Exception e = |
| assertThrows(Exception.class, () -> visitor.awaitQuiescence(/*interruptWorkers=*/ true)); |
| assertThat(e).isSameInstanceAs(THROWABLE); |
| |
| assertThat(wasInterrupted.get()).isTrue(); |
| assertThat(executor.isShutdown()).isTrue(); |
| assertThat(exnLatch.await(0, TimeUnit.MILLISECONDS)).isTrue(); |
| } |
| |
| @Test |
| public void javaErrorConsideredCriticalNoMatterWhat() throws Exception { |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| final Error error = new Error("bad!"); |
| AbstractQueueVisitor visitor = |
| createQueueVisitorWithConstantErrorClassification( |
| executor, ErrorClassification.NOT_CRITICAL); |
| final CountDownLatch latch = new CountDownLatch(1); |
| final AtomicBoolean sleepFinished = new AtomicBoolean(false); |
| final AtomicBoolean sleepInterrupted = new AtomicBoolean(false); |
| Runnable errorRunnable = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| latch.await(TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException expected) { |
| // Should only happen if the test itself is interrupted. |
| } |
| throw error; |
| } |
| }; |
| Runnable sleepRunnable = new Runnable() { |
| @Override |
| public void run() { |
| latch.countDown(); |
| try { |
| Thread.sleep(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| sleepFinished.set(true); |
| } catch (InterruptedException unexpected) { |
| sleepInterrupted.set(true); |
| } |
| } |
| }; |
| CountDownLatch exnLatch = visitor.getExceptionLatchForTestingOnly(); |
| visitor.execute(errorRunnable); |
| visitor.execute(sleepRunnable); |
| Error thrownError = null; |
| // Interrupt workers on a critical error. That way we can test that visitor.work doesn't wait |
| // for all workers to finish if one of them already had a critical error. |
| try { |
| visitor.awaitQuiescence(/*interruptWorkers=*/ true); |
| } catch (Error e) { |
| thrownError = e; |
| } |
| assertThat(sleepInterrupted.get()).isTrue(); |
| assertThat(sleepFinished.get()).isFalse(); |
| assertThat(thrownError).isEqualTo(error); |
| assertThat(exnLatch.await(0, TimeUnit.MILLISECONDS)).isTrue(); |
| } |
| |
| private static class ClassifiedException extends RuntimeException { |
| private final ErrorClassification classification; |
| |
| private ClassifiedException(ErrorClassification classification) { |
| this.classification = classification; |
| } |
| } |
| |
| @Test |
| public void mostSevereErrorPropagated() throws Exception { |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| final ClassifiedException criticalException = |
| new ClassifiedException(ErrorClassification.CRITICAL); |
| final ClassifiedException criticalAndLogException = |
| new ClassifiedException(ErrorClassification.CRITICAL_AND_LOG); |
| final ErrorClassifier errorClassifier = new ErrorClassifier() { |
| @Override |
| protected ErrorClassification classifyException(Exception e) { |
| return (e instanceof ClassifiedException) |
| ? ((ClassifiedException) e).classification |
| : ErrorClassification.NOT_CRITICAL; |
| } |
| }; |
| AbstractQueueVisitor visitor = |
| new AbstractQueueVisitor( |
| executor, |
| /*shutdownOnCompletion=*/ true, |
| /*failFastOnException=*/ false, |
| errorClassifier); |
| final CountDownLatch exnLatch = visitor.getExceptionLatchForTestingOnly(); |
| Runnable criticalExceptionRunnable = new Runnable() { |
| @Override |
| public void run() { |
| throw criticalException; |
| } |
| }; |
| Runnable criticalAndLogExceptionRunnable = new Runnable() { |
| @Override |
| public void run() { |
| // Wait for the critical exception to be thrown. There's a benign race between our 'await' |
| // call completing because the exception latch was counted down, and our thread being |
| // interrupted by AbstractQueueVisitor because the critical error was encountered. This is |
| // completely fine; all that matters is that we have a chance to throw our error _after_ |
| // the previous one was thrown by the other Runnable. |
| try { |
| exnLatch.await(); |
| } catch (InterruptedException e) { |
| // Ignored. |
| } |
| throw criticalAndLogException; |
| } |
| }; |
| visitor.execute(criticalExceptionRunnable); |
| visitor.execute(criticalAndLogExceptionRunnable); |
| ClassifiedException exn = null; |
| try { |
| visitor.awaitQuiescence(/*interruptWorkers=*/ true); |
| } catch (ClassifiedException e) { |
| exn = e; |
| } |
| assertThat(exn).isEqualTo(criticalAndLogException); |
| } |
| |
| private static Runnable throwingRunnable() { |
| return new Runnable() { |
| @Override |
| public void run() { |
| throw THROWABLE; |
| } |
| }; |
| } |
| |
| private static Runnable interruptingRunnable(final Thread thread) { |
| return new Runnable() { |
| @Override |
| public void run() { |
| thread.interrupt(); |
| } |
| }; |
| } |
| |
| private static Runnable awaitAddAndEnqueueRunnable(final boolean interrupt, |
| final AbstractQueueVisitor visitor, |
| final CountDownLatch started, |
| final List<String> list, |
| final String toAdd, |
| final Runnable toEnqueue) { |
| return new Runnable() { |
| @Override |
| public void run() { |
| if (started != null) { |
| started.countDown(); |
| } |
| |
| try { |
| assertThat( |
| interrupt |
| ? visitor.getInterruptionLatchForTestingOnly().await(1, TimeUnit.MINUTES) |
| : visitor.getExceptionLatchForTestingOnly().await(1, TimeUnit.MINUTES)) |
| .isTrue(); |
| } catch (InterruptedException e) { |
| // Unexpected. |
| throw new RuntimeException(e); |
| } |
| list.add(toAdd); |
| if (toEnqueue != null) { |
| visitor.execute(toEnqueue); |
| } |
| } |
| }; |
| } |
| |
| private static class CountingQueueVisitor extends AbstractQueueVisitor { |
| |
| private final static String THREAD_NAME = "BlazeTest CountingQueueVisitor"; |
| |
| private int theInt = 0; |
| private final Object lock = new Object(); |
| |
| public CountingQueueVisitor() { |
| super( |
| /*parallelism=*/ 5, |
| /* keepAliveTime= */ 3L, |
| TimeUnit.SECONDS, |
| /* failFastOnException= */ false, |
| THREAD_NAME, |
| ErrorClassifier.DEFAULT); |
| } |
| |
| CountingQueueVisitor(ThreadPoolExecutor executor) { |
| super(executor, false, true, ErrorClassifier.DEFAULT); |
| } |
| |
| public void enqueue() { |
| super.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| synchronized (lock) { |
| if (theInt < 10) { |
| theInt++; |
| enqueue(); |
| } |
| } |
| } |
| }); |
| } |
| |
| public int getCount() { |
| return theInt; |
| } |
| } |
| |
| private static class ConcreteQueueVisitor extends AbstractQueueVisitor { |
| |
| private final static String THREAD_NAME = "BlazeTest ConcreteQueueVisitor"; |
| |
| ConcreteQueueVisitor() { |
| super( |
| 5, |
| 3L, |
| TimeUnit.SECONDS, |
| /* failFastOnException= */ false, |
| THREAD_NAME, |
| ErrorClassifier.DEFAULT); |
| } |
| |
| ConcreteQueueVisitor(boolean failFast) { |
| super( |
| 5, |
| 3L, |
| TimeUnit.SECONDS, |
| failFast, |
| THREAD_NAME, |
| ErrorClassifier.DEFAULT); |
| } |
| |
| ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast) { |
| super(executor, /*shutdownOnCompletion=*/ false, failFast, ErrorClassifier.DEFAULT); |
| } |
| } |
| |
| private static AbstractQueueVisitor createQueueVisitorWithConstantErrorClassification( |
| ThreadPoolExecutor executor, final ErrorClassification classification) { |
| return new AbstractQueueVisitor( |
| executor, |
| /*shutdownOnCompletion=*/ true, |
| /*failFastOnException=*/ false, |
| new ErrorClassifier() { |
| @Override |
| protected ErrorClassification classifyException(Exception e) { |
| return classification; |
| } |
| }); |
| } |
| } |