| // Copyright 2022 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.skyframe; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.common.truth.Truth.assertThat; |
| import static com.google.devtools.build.skyframe.EvaluationResultSubjectFactory.assertThatEvaluationResult; |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; |
| import static org.junit.Assert.assertThrows; |
| import static org.junit.Assert.fail; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; |
| import com.google.devtools.build.lib.events.StoredEventHandler; |
| import com.google.devtools.build.skyframe.EvaluationContext.UnnecessaryTemporaryStateDropperReceiver; |
| import com.google.devtools.build.skyframe.GraphTester.StringValue; |
| import com.google.devtools.build.skyframe.SkyFunction.Environment; |
| import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState; |
| import com.google.devtools.build.skyframe.SkyFunction.LookupEnvironment; |
| import com.google.devtools.build.skyframe.state.Driver; |
| import com.google.devtools.build.skyframe.state.StateMachine; |
| import com.google.devtools.build.skyframe.state.StateMachineEvaluatorForTesting; |
| import com.google.devtools.build.skyframe.state.ValueOrException2Producer; |
| import com.google.devtools.build.skyframe.state.ValueOrException3Producer; |
| import com.google.devtools.build.skyframe.state.ValueOrExceptionProducer; |
| import com.google.testing.junit.testparameterinjector.TestParameter; |
| import com.google.testing.junit.testparameterinjector.TestParameterInjector; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| import javax.annotation.Nullable; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| |
| @RunWith(TestParameterInjector.class) |
| public final class StateMachineTest { |
| private static final int TEST_PARALLELISM = 5; |
| |
| private final ProcessableGraph graph = new InMemoryGraphImpl(); |
| private final GraphTester tester = new GraphTester(); |
| |
| private final StoredEventHandler reportedEvents = new StoredEventHandler(); |
| private final DirtyAndInflightTrackingProgressReceiver revalidationReceiver = |
| new DirtyAndInflightTrackingProgressReceiver(EvaluationProgressReceiver.NULL); |
| |
| private static final Version VERSION = IntVersion.of(0); |
| |
| // TODO(shahan): consider factoring this boilerplate out to a common location. |
| private <T extends SkyValue> EvaluationResult<T> eval(SkyKey root, boolean keepGoing) |
| throws InterruptedException { |
| return new ParallelEvaluator( |
| graph, |
| VERSION, |
| Version.minimal(), |
| tester.getSkyFunctionMap(), |
| reportedEvents, |
| new EmittedEventState(), |
| EventFilter.FULL_STORAGE, |
| ErrorInfoManager.UseChildErrorInfoIfNecessary.INSTANCE, |
| keepGoing, |
| revalidationReceiver, |
| GraphInconsistencyReceiver.THROWING, |
| AbstractQueueVisitor.create( |
| "test-pool", TEST_PARALLELISM, ParallelEvaluatorErrorClassifier.instance()), |
| new SimpleCycleDetector(), |
| UnnecessaryTemporaryStateDropperReceiver.NULL) |
| .eval(ImmutableList.of(root)); |
| } |
| |
| private static final SkyKey KEY_A1 = GraphTester.skyKey("A1"); |
| private static final SkyValue VALUE_A1 = new StringValue("A1"); |
| private static final SkyKey KEY_A2 = GraphTester.skyKey("A2"); |
| private static final SkyValue VALUE_A2 = new StringValue("A2"); |
| private static final SkyKey KEY_A3 = GraphTester.skyKey("A3"); |
| private static final SkyValue VALUE_A3 = new StringValue("A3"); |
| private static final SkyKey KEY_B1 = GraphTester.skyKey("B1"); |
| private static final SkyValue VALUE_B1 = new StringValue("B1"); |
| private static final SkyKey KEY_B2 = GraphTester.skyKey("B2"); |
| private static final SkyValue VALUE_B2 = new StringValue("B2"); |
| private static final SkyKey KEY_B3 = GraphTester.skyKey("B3"); |
| private static final SkyValue VALUE_B3 = new StringValue("B3"); |
| |
| @TestParameter private boolean rootKeySkipsBatchPrefetch; |
| |
| private SkyKey rootKey; |
| |
| private static final SkyValue DONE_VALUE = new StringValue("DONE"); |
| private static final StringValue SUCCESS_VALUE = new StringValue("SUCCESS"); |
| |
| @Before |
| public void predefineCommonEntries() { |
| tester.getOrCreate(KEY_A1).setConstantValue(VALUE_A1); |
| tester.getOrCreate(KEY_A2).setConstantValue(VALUE_A2); |
| tester.getOrCreate(KEY_A3).setConstantValue(VALUE_A3); |
| tester.getOrCreate(KEY_B1).setConstantValue(VALUE_B1); |
| tester.getOrCreate(KEY_B2).setConstantValue(VALUE_B2); |
| tester.getOrCreate(KEY_B3).setConstantValue(VALUE_B3); |
| rootKey = |
| rootKeySkipsBatchPrefetch |
| ? GraphTester.skipBatchPrefetchKey("root") |
| : GraphTester.skyKey("root"); |
| } |
| |
| private static class StateMachineWrapper implements SkyKeyComputeState { |
| private final Driver driver; |
| |
| private StateMachineWrapper(StateMachine machine) { |
| this.driver = new Driver(machine); |
| } |
| |
| private boolean drive(Environment env) throws InterruptedException { |
| return driver.drive(env); |
| } |
| } |
| |
| /** |
| * Defines a {@link SkyFunction} that executes the gives state machine. |
| * |
| * <p>The function always has key {@link rootKey} and value {@link DONE_VALUE}. State machine |
| * internals can be observed with consumers. |
| * |
| * @return a counter that stores the restart count. |
| */ |
| private AtomicInteger defineRootMachine(Supplier<StateMachine> rootMachineSupplier) { |
| var restartCount = new AtomicInteger(); |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| if (!env.getState(() -> new StateMachineWrapper(rootMachineSupplier.get())) |
| .drive(env)) { |
| restartCount.getAndIncrement(); |
| return null; |
| } |
| return DONE_VALUE; |
| }); |
| return restartCount; |
| } |
| |
| private int evalMachine(Supplier<StateMachine> rootMachineSupplier) throws InterruptedException { |
| var restartCount = defineRootMachine(rootMachineSupplier); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| return restartCount.get(); |
| } |
| |
| private boolean runMachine(StateMachine root) throws InterruptedException { |
| return !StateMachineEvaluatorForTesting.run( |
| root, |
| new InMemoryMemoizingEvaluator( |
| tester.getSkyFunctionMap(), new SequencedRecordingDifferencer()), |
| EvaluationContext.newBuilder() |
| .setKeepGoing(true) |
| .setParallelism(TEST_PARALLELISM) |
| .setEventHandler(reportedEvents) |
| .build()) |
| .hasError(); |
| } |
| |
| /** |
| * A simple machine having two states, fetching one value from each. |
| * |
| * <p>This machine causes two restarts, one for each of the lookups from the two states. |
| */ |
| private static class TwoStepMachine implements StateMachine { |
| private final Consumer<SkyValue> sink1; |
| private final Consumer<SkyValue> sink2; |
| |
| private TwoStepMachine(Consumer<SkyValue> sink1, Consumer<SkyValue> sink2) { |
| this.sink1 = sink1; |
| this.sink2 = sink2; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp(KEY_A1, sink1); |
| return this::step2; |
| } |
| |
| public StateMachine step2(Tasks tasks) { |
| tasks.lookUp(KEY_A2, sink2); |
| return DONE; |
| } |
| } |
| |
| @Test |
| public void smoke(@TestParameter boolean useTestingEvaluator) throws InterruptedException { |
| var v1Sink = new SkyValueSink(); |
| var v2Sink = new SkyValueSink(); |
| Supplier<StateMachine> factory = () -> new TwoStepMachine(v1Sink, v2Sink); |
| if (useTestingEvaluator) { |
| assertThat(runMachine(factory.get())).isTrue(); |
| } else { |
| assertThat(evalMachine(factory)).isEqualTo(2); |
| } |
| assertThat(v1Sink.get()).isEqualTo(VALUE_A1); |
| assertThat(v2Sink.get()).isEqualTo(VALUE_A2); |
| } |
| |
| /** Example modeled after the one described in the documentation of {@link StateMachine}. */ |
| private static class ExampleWithSubmachines implements StateMachine, SkyKeyComputeState { |
| private final Consumer<SkyValue> sinkA1; |
| private final Consumer<SkyValue> sinkA2; |
| private final Consumer<SkyValue> sinkA3; |
| private final Consumer<SkyValue> sinkB1; |
| private final Consumer<SkyValue> sinkB2; |
| private final Consumer<SkyValue> sinkB3; |
| |
| private ExampleWithSubmachines( |
| Consumer<SkyValue> sinkA1, |
| Consumer<SkyValue> sinkA2, |
| Consumer<SkyValue> sinkA3, |
| Consumer<SkyValue> sinkB1, |
| Consumer<SkyValue> sinkB2, |
| Consumer<SkyValue> sinkB3) { |
| this.sinkA1 = sinkA1; |
| this.sinkA2 = sinkA2; |
| this.sinkA3 = sinkA3; |
| this.sinkB1 = sinkB1; |
| this.sinkB2 = sinkB2; |
| this.sinkB3 = sinkB3; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| // Starts submachines in parallel. |
| tasks.enqueue(this::stepA1); |
| tasks.enqueue(this::stepB1); |
| return DONE; |
| } |
| |
| private StateMachine stepA1(Tasks tasks) { |
| tasks.lookUp(KEY_A1, sinkA1); |
| return this::stepA2; |
| } |
| |
| private StateMachine stepA2(Tasks tasks) { |
| tasks.lookUp(KEY_A2, sinkA2); |
| return this::stepA3; |
| } |
| |
| private StateMachine stepA3(Tasks tasks) { |
| tasks.lookUp(KEY_A3, sinkA3); |
| return DONE; |
| } |
| |
| private StateMachine stepB1(Tasks tasks) { |
| tasks.lookUp(KEY_B1, sinkB1); |
| return this::stepB2; |
| } |
| |
| private StateMachine stepB2(Tasks tasks) { |
| tasks.lookUp(KEY_B2, sinkB2); |
| return this::stepB3; |
| } |
| |
| private StateMachine stepB3(Tasks tasks) { |
| tasks.lookUp(KEY_B3, sinkB3); |
| return DONE; |
| } |
| } |
| |
| @Test |
| public void parallelSubmachines_runInParallel(@TestParameter boolean useTestingEvaluator) |
| throws InterruptedException { |
| var a1Sink = new SkyValueSink(); |
| var a2Sink = new SkyValueSink(); |
| var a3Sink = new SkyValueSink(); |
| var b1Sink = new SkyValueSink(); |
| var b2Sink = new SkyValueSink(); |
| var b3Sink = new SkyValueSink(); |
| |
| Supplier<StateMachine> factory = |
| () -> new ExampleWithSubmachines(a1Sink, a2Sink, a3Sink, b1Sink, b2Sink, b3Sink); |
| if (useTestingEvaluator) { |
| assertThat(runMachine(factory.get())).isTrue(); |
| } else { |
| assertThat(evalMachine(factory)).isEqualTo(3); |
| } |
| |
| assertThat(a1Sink.get()).isEqualTo(VALUE_A1); |
| assertThat(a2Sink.get()).isEqualTo(VALUE_A2); |
| assertThat(a3Sink.get()).isEqualTo(VALUE_A3); |
| assertThat(b1Sink.get()).isEqualTo(VALUE_B1); |
| assertThat(b2Sink.get()).isEqualTo(VALUE_B2); |
| assertThat(b3Sink.get()).isEqualTo(VALUE_B3); |
| } |
| |
| @Test |
| public void parallelSubmachines_shorteningBothPathsReducesRestarts() throws InterruptedException { |
| var a1Sink = new SkyValueSink(); |
| var a2Sink = new SkyValueSink(); |
| var a3Sink = new SkyValueSink(); |
| var b1Sink = new SkyValueSink(); |
| var b2Sink = new SkyValueSink(); |
| var b3Sink = new SkyValueSink(); |
| |
| // Shortens both paths by 1, but at different execution steps. |
| assertThat(eval(KEY_A1, /* keepGoing= */ false).get(KEY_A1)).isEqualTo(VALUE_A1); |
| assertThat(eval(KEY_B3, /* keepGoing= */ false).get(KEY_B3)).isEqualTo(VALUE_B3); |
| |
| assertThat( |
| evalMachine( |
| () -> new ExampleWithSubmachines(a1Sink, a2Sink, a3Sink, b1Sink, b2Sink, b3Sink))) |
| .isEqualTo(2); |
| |
| assertThat(a1Sink.get()).isEqualTo(VALUE_A1); |
| assertThat(a2Sink.get()).isEqualTo(VALUE_A2); |
| assertThat(a3Sink.get()).isEqualTo(VALUE_A3); |
| assertThat(b1Sink.get()).isEqualTo(VALUE_B1); |
| assertThat(b2Sink.get()).isEqualTo(VALUE_B2); |
| assertThat(b3Sink.get()).isEqualTo(VALUE_B3); |
| } |
| |
| @Test |
| public void unhandledException(@TestParameter boolean keepGoing) throws InterruptedException { |
| var a1Sink = new SkyValueSink(); |
| var a2Sink = new SkyValueSink(); |
| var a3Sink = new SkyValueSink(); |
| var b1Sink = new SkyValueSink(); |
| var b2Sink = new SkyValueSink(); |
| var b3Sink = new SkyValueSink(); |
| |
| tester.getOrCreate(KEY_A1).unsetConstantValue().setHasError(true); |
| |
| AtomicInteger instantiationCount = new AtomicInteger(); |
| var restartCount = |
| defineRootMachine( |
| () -> { |
| instantiationCount.getAndIncrement(); |
| return new ExampleWithSubmachines(a1Sink, a2Sink, a3Sink, b1Sink, b2Sink, b3Sink); |
| }); |
| assertThat(eval(rootKey, keepGoing).getError(rootKey)).isNotNull(); |
| |
| assertThat(restartCount.get()).isEqualTo(2); |
| assertThat(a1Sink.get()).isNull(); |
| if (keepGoing) { |
| // On restart, all values are processed before failing, so B1 is observed after restarting and |
| // after A1's unhandled error. |
| assertThat(b1Sink.get()).isEqualTo(VALUE_B1); |
| } |
| // In noKeepGoing, error bubbling resets the state cache and B1 is sometimes observed on the |
| // first pass by a re-instantiated state machine. However, B1 can be slow and there is no |
| // guarantee that it is available. |
| |
| assertThat(b2Sink.get()).isNull(); |
| |
| if (keepGoing) { |
| assertThat(instantiationCount.get()).isEqualTo(1); |
| } else { |
| // The state cache is dropped in noKeepGoing during error bubbling, resulting in a new |
| // instantiation of the state machine. |
| assertThat(instantiationCount.get()).isEqualTo(2); |
| } |
| } |
| |
| @Test |
| public void handledException(@TestParameter boolean keepGoing) throws InterruptedException { |
| tester.getOrCreate(KEY_A1).unsetConstantValue().setHasError(true); |
| |
| var a1Sink = new SkyValueSink(); |
| var errorSink = new AtomicReference<SomeErrorException>(); |
| var restartCount = |
| defineRootMachine( |
| () -> |
| tasks -> { |
| // Fully swallows the error. |
| tasks.lookUp( |
| KEY_A1, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (v != null) { |
| a1Sink.accept(v); |
| return; |
| } |
| errorSink.set(e); |
| }); |
| return StateMachine.DONE; |
| }); |
| var result = eval(rootKey, keepGoing); |
| if (keepGoing) { |
| // In keepGoing mode, the swallowed error vanishes. |
| assertThat(result.get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(result.hasError()).isFalse(); |
| } else { |
| // In nokeepGoing mode, the error is processed in error bubbling, but the function does not |
| // complete and the error is still propagated to the top level. |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A1); |
| } |
| assertThat(restartCount.get()).isEqualTo(1); |
| assertThat(a1Sink.get()).isNull(); |
| assertThat(errorSink.get()).isNotNull(); |
| } |
| |
| private static class StringOrExceptionProducer |
| extends ValueOrExceptionProducer<StringValue, SomeErrorException> |
| implements SkyKeyComputeState { |
| // Static boolean isProcessValueOrExceptionCalled is added to verify StateMachine chained after |
| // `step()` is invoked regardless of KEY_A1 looks up ends with a value or an exception. |
| // See b/290998109#comment6. |
| public static boolean isProcessValueOrExceptionCalled = false; |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp( |
| KEY_A1, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (v != null) { |
| setValue((StringValue) v); |
| return; |
| } |
| setException(e); |
| }); |
| return t -> { |
| isProcessValueOrExceptionCalled = true; |
| return DONE; |
| }; |
| } |
| } |
| |
| @Test |
| public void valueOrExceptionProducer_propagatesValues() throws InterruptedException { |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrExceptionProducer::new); |
| |
| SkyValue value; |
| try { |
| if ((value = producer.tryProduceValue(env)) == null) { |
| return null; |
| } |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return DONE_VALUE; |
| }); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(StringOrExceptionProducer.isProcessValueOrExceptionCalled).isTrue(); |
| } |
| |
| @Test |
| public void valueOrExceptionProducer_propagatesExceptions(@TestParameter boolean keepGoing) |
| throws InterruptedException { |
| var hasRestarted = new AtomicBoolean(false); |
| tester.getOrCreate(KEY_A1).unsetConstantValue().setHasError(true); |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrExceptionProducer::new); |
| if (!hasRestarted.getAndSet(true)) { |
| try { |
| // The first call returns null because a restart is needed to compute the |
| // requested key. |
| assertThat(producer.tryProduceValue(env)).isNull(); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return null; |
| } |
| assertThrows(SomeErrorException.class, () -> producer.tryProduceValue(env)); |
| return DONE_VALUE; |
| }); |
| var result = eval(rootKey, keepGoing); |
| if (keepGoing) { |
| assertThat(result.get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(result.hasError()).isFalse(); |
| } else { |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A1); |
| } |
| assertThat(StringOrExceptionProducer.isProcessValueOrExceptionCalled).isTrue(); |
| } |
| |
| /** |
| * This producer performs two concurrent lookups. |
| * |
| * <p>It is used to test the case where one of the two lookups succeeds with exception but the |
| * other value is not available. The expected result is the exception propagates. |
| * |
| * <p>This scenario may occur during error bubbling. |
| */ |
| private static class TwoLookupProducer |
| extends ValueOrExceptionProducer<StringValue, SomeErrorException> |
| implements SkyKeyComputeState { |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp(KEY_A1, unusedValue -> fail("should not be reachable")); |
| tasks.lookUp( |
| KEY_A2, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (v != null) { |
| setValue((StringValue) v); |
| return; |
| } |
| setException(e); |
| }); |
| return DONE; |
| } |
| } |
| |
| @Test |
| public void valueOrExceptionProducer_throwsExceptionsEvenWithIncompleteDeps() |
| throws InterruptedException { |
| var hasRestarted = new AtomicBoolean(false); |
| var gotError = new AtomicBoolean(false); |
| tester.getOrCreate(KEY_A2).unsetConstantValue().setHasError(true); |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (unusedKey, env) -> { |
| // Primes KEY_A2, making the error available. |
| if (!hasRestarted.getAndSet(true)) { |
| assertThat(env.getValue(KEY_A2)).isNull(); |
| return null; |
| } |
| var producer = env.getState(TwoLookupProducer::new); |
| // At this point, KEY_A2 is available but KEY_A1 is not. The state machine is in an |
| // incomplete state, but throws the exception anyway. |
| var error = |
| assertThrows(SomeErrorException.class, () -> producer.tryProduceValue(env)); |
| gotError.set(true); |
| throw new GenericFunctionException(error); |
| }); |
| // keepGoing must be false below, otherwise the state machine will be run a second time when |
| // KEY_A1 becomes available. |
| var result = eval(rootKey, /* keepGoing= */ false); |
| assertThat(gotError.get()).isTrue(); |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A2); |
| } |
| |
| private static class SomeErrorException1 extends SomeErrorException { |
| public SomeErrorException1(String msg) { |
| super(msg); |
| } |
| } |
| |
| private static class SomeErrorException2 extends SomeErrorException { |
| public SomeErrorException2(String msg) { |
| super(msg); |
| } |
| } |
| |
| private static class SomeErrorException3 extends SomeErrorException { |
| public SomeErrorException3(String msg) { |
| super(msg); |
| } |
| } |
| |
| private static class StringOrException2Producer |
| extends ValueOrException2Producer<StringValue, SomeErrorException1, SomeErrorException2> |
| implements SkyKeyComputeState { |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp( |
| KEY_A1, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (e != null) { |
| setException1(new SomeErrorException1(e.getMessage())); |
| } |
| }); |
| tasks.lookUp( |
| KEY_B1, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (e != null) { |
| setException2(new SomeErrorException2(e.getMessage())); |
| } |
| }); |
| return t -> { |
| if (getException1() == null && getException2() == null) { |
| setValue(SUCCESS_VALUE); |
| } |
| return DONE; |
| }; |
| } |
| } |
| |
| @Test |
| public void valueOrException2Producer_propagatesValues() throws InterruptedException { |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException2Producer::new); |
| SkyValue value; |
| try { |
| if ((value = producer.tryProduceValue(env)) == null) { |
| return null; |
| } |
| assertThat(value).isEqualTo(SUCCESS_VALUE); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return DONE_VALUE; |
| }); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| } |
| |
| @Test |
| public void valueOrException2Producer_propagatesExceptions( |
| @TestParameter boolean trueForException1, @TestParameter boolean keepGoing) |
| throws InterruptedException { |
| var hasRestarted = new AtomicBoolean(false); |
| SkyKey errorKey = trueForException1 ? KEY_A1 : KEY_B1; |
| tester.getOrCreate(errorKey).unsetConstantValue().setHasError(true); |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException2Producer::new); |
| if (!hasRestarted.getAndSet(true)) { |
| try { |
| assertThat(producer.tryProduceValue(env)).isNull(); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return null; |
| } |
| if (trueForException1) { |
| assertThrows(SomeErrorException1.class, () -> producer.tryProduceValue(env)); |
| } else { |
| assertThrows(SomeErrorException2.class, () -> producer.tryProduceValue(env)); |
| } |
| return DONE_VALUE; |
| }); |
| var result = eval(rootKey, keepGoing); |
| if (keepGoing) { |
| assertThat(result.get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(result.hasError()).isFalse(); |
| } else { |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(errorKey); |
| } |
| } |
| |
| /** |
| * {@link #valueOrException2Producer_singleLookup_propagatesValuesAndInvokesRunAfter} and {@link |
| * #valueOrException2Producer_singleLookup_propagatesExceptionsAndInvokesRunAfter} are added in |
| * order to verify that if looking up the SkyKey throws an exception, the runAfter {@link |
| * StateMachine} defined as the return of {@link StringOrException2ProducerWithSingleLookup#step} |
| * is invoked. |
| * |
| * <p>These tests are designed not to be integrated into {@link |
| * #valueOrException2Producer_propagatesValues} and {@link |
| * #valueOrException2Producer_propagatesExceptions}. The reason is that only when {@link |
| * Driver#drive} looks up **one** newly added {@link SkyKey}, will {@link Lookup#doLookup} be |
| * called. And these tests aim at covering calling this method. |
| * |
| * <p>Similar tests for {@link ValueOrException3Producer} are also added below. |
| * |
| * <p>See b/290998109#comment6 for more details. |
| */ |
| private static class StringOrException2ProducerWithSingleLookup |
| extends ValueOrException2Producer<StringValue, SomeErrorException1, SomeErrorException2> |
| implements SkyKeyComputeState { |
| public static boolean isProcessValueOrExceptionCalled = false; |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp( |
| KEY_A1, |
| SomeErrorException1.class, |
| SomeErrorException2.class, |
| (v, e1, e2) -> { |
| if (v != null) { |
| setValue((StringValue) v); |
| } |
| if (e1 != null) { |
| setException1(new SomeErrorException1(e1.getMessage())); |
| } |
| if (e2 != null) { |
| setException2(new SomeErrorException2(e2.getMessage())); |
| } |
| }); |
| return t -> { |
| if (getException1() == null && getException2() == null) { |
| setValue(SUCCESS_VALUE); |
| } |
| isProcessValueOrExceptionCalled = true; |
| return DONE; |
| }; |
| } |
| } |
| |
| @Test |
| public void valueOrException2Producer_singleLookup_propagatesValuesAndInvokesRunAfter() |
| throws InterruptedException { |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException2ProducerWithSingleLookup::new); |
| SkyValue value; |
| try { |
| if ((value = producer.tryProduceValue(env)) == null) { |
| return null; |
| } |
| assertThat(value).isEqualTo(SUCCESS_VALUE); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return DONE_VALUE; |
| }); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(StringOrException2ProducerWithSingleLookup.isProcessValueOrExceptionCalled).isTrue(); |
| } |
| |
| @Test |
| public void valueOrException2Producer_singleLookup_propagatesExceptionsAndInvokesRunAfter( |
| @TestParameter boolean trueForException1) throws InterruptedException { |
| var hasRestarted = new AtomicBoolean(false); |
| tester |
| .getOrCreate(KEY_A1) |
| .unsetConstantValue() |
| .setBuilder( |
| (k, env) -> { |
| throw new ExceptionWrapper( |
| trueForException1 |
| ? new SomeErrorException1("Exception 1") |
| : new SomeErrorException2("Exception 2")); |
| }); |
| |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException2ProducerWithSingleLookup::new); |
| if (!hasRestarted.getAndSet(true)) { |
| try { |
| assertThat(producer.tryProduceValue(env)).isNull(); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return null; |
| } |
| if (trueForException1) { |
| assertThrows(SomeErrorException1.class, () -> producer.tryProduceValue(env)); |
| } else { |
| assertThrows(SomeErrorException2.class, () -> producer.tryProduceValue(env)); |
| } |
| return DONE_VALUE; |
| }); |
| |
| var result = eval(rootKey, /* keepGoing= */ false); |
| |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A1); |
| assertThat(StringOrException2ProducerWithSingleLookup.isProcessValueOrExceptionCalled).isTrue(); |
| } |
| |
| private static class StringOrException3Producer |
| extends ValueOrException3Producer< |
| StringValue, SomeErrorException1, SomeErrorException2, SomeErrorException3> |
| implements SkyKeyComputeState { |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp( |
| KEY_A1, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (e != null) { |
| setException1(new SomeErrorException1(e.getMessage())); |
| } |
| }); |
| tasks.lookUp( |
| KEY_A2, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (e != null) { |
| setException2(new SomeErrorException2(e.getMessage())); |
| } |
| }); |
| tasks.lookUp( |
| KEY_A3, |
| SomeErrorException.class, |
| (v, e) -> { |
| if (e != null) { |
| setException3(new SomeErrorException3(e.getMessage())); |
| } |
| }); |
| return t -> { |
| if (getException1() == null && getException2() == null && getException3() == null) { |
| setValue(SUCCESS_VALUE); |
| } |
| return DONE; |
| }; |
| } |
| } |
| |
| @Test |
| public void valueOrException3Producer_propagatesValues() throws InterruptedException { |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException3Producer::new); |
| SkyValue value; |
| try { |
| if ((value = producer.tryProduceValue(env)) == null) { |
| return null; |
| } |
| assertThat(value).isEqualTo(SUCCESS_VALUE); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return DONE_VALUE; |
| }); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| } |
| |
| enum ValueOrException3ExceptionCase { |
| ONE { |
| @Override |
| SkyKey errorKey() { |
| return KEY_A1; |
| } |
| }, |
| TWO { |
| @Override |
| SkyKey errorKey() { |
| return KEY_A2; |
| } |
| }, |
| THREE { |
| @Override |
| SkyKey errorKey() { |
| return KEY_A3; |
| } |
| }; |
| |
| abstract SkyKey errorKey(); |
| } |
| |
| @Test |
| public void valueOrException3Producer_propagatesExceptions( |
| @TestParameter ValueOrException3ExceptionCase exceptionCase, @TestParameter boolean keepGoing) |
| throws InterruptedException { |
| var hasRestarted = new AtomicBoolean(false); |
| SkyKey errorKey = exceptionCase.errorKey(); |
| tester.getOrCreate(errorKey).unsetConstantValue().setHasError(true); |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException3Producer::new); |
| if (!hasRestarted.getAndSet(true)) { |
| try { |
| assertThat(producer.tryProduceValue(env)).isNull(); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return null; |
| } |
| switch (exceptionCase) { |
| case ONE: |
| assertThrows(SomeErrorException1.class, () -> producer.tryProduceValue(env)); |
| break; |
| case TWO: |
| assertThrows(SomeErrorException2.class, () -> producer.tryProduceValue(env)); |
| break; |
| case THREE: |
| assertThrows(SomeErrorException3.class, () -> producer.tryProduceValue(env)); |
| break; |
| } |
| return DONE_VALUE; |
| }); |
| var result = eval(rootKey, keepGoing); |
| if (keepGoing) { |
| assertThat(result.get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(result.hasError()).isFalse(); |
| } else { |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(errorKey); |
| } |
| } |
| |
| /** See the comments above {@link StringOrException2ProducerWithSingleLookup} for more details. */ |
| private static class StringOrException3ProducerWithSingleLookup |
| extends ValueOrException3Producer< |
| StringValue, SomeErrorException1, SomeErrorException2, SomeErrorException3> |
| implements SkyKeyComputeState { |
| public static boolean isProcessValueOrExceptionCalled = false; |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp( |
| KEY_A1, |
| SomeErrorException1.class, |
| SomeErrorException2.class, |
| SomeErrorException3.class, |
| (v, e1, e2, e3) -> { |
| if (v != null) { |
| setValue((StringValue) v); |
| } |
| if (e1 != null) { |
| setException1(new SomeErrorException1(e1.getMessage())); |
| } |
| if (e2 != null) { |
| setException2(new SomeErrorException2(e2.getMessage())); |
| } |
| if (e3 != null) { |
| setException3(new SomeErrorException3(e3.getMessage())); |
| } |
| }); |
| return t -> { |
| if (getException1() == null && getException2() == null && getException3() == null) { |
| setValue(SUCCESS_VALUE); |
| } |
| isProcessValueOrExceptionCalled = true; |
| return DONE; |
| }; |
| } |
| } |
| |
| @Test |
| public void valueOrException3Producer_singleLookup_propagatesValues() |
| throws InterruptedException { |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException3ProducerWithSingleLookup::new); |
| SkyValue value; |
| try { |
| if ((value = producer.tryProduceValue(env)) == null) { |
| return null; |
| } |
| assertThat(value).isEqualTo(SUCCESS_VALUE); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return DONE_VALUE; |
| }); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| assertThat(StringOrException3ProducerWithSingleLookup.isProcessValueOrExceptionCalled).isTrue(); |
| } |
| |
| @Test |
| public void valueOrException3Producer_singleLookup_propagatesExceptionsAndExecuteRunAfter( |
| @TestParameter ValueOrException3ExceptionCase exceptionCase) throws InterruptedException { |
| var hasRestarted = new AtomicBoolean(false); |
| tester |
| .getOrCreate(KEY_A1) |
| .unsetConstantValue() |
| .setBuilder( |
| (k, env) -> { |
| Exception exception = null; |
| switch (exceptionCase) { |
| case ONE: |
| exception = new SomeErrorException1("Exception 1"); |
| break; |
| case TWO: |
| exception = new SomeErrorException2("Exception 2"); |
| break; |
| case THREE: |
| exception = new SomeErrorException3("Exception 3"); |
| break; |
| } |
| throw new ExceptionWrapper(exception); |
| }); |
| |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| var producer = env.getState(StringOrException3ProducerWithSingleLookup::new); |
| if (!hasRestarted.getAndSet(true)) { |
| try { |
| assertThat(producer.tryProduceValue(env)).isNull(); |
| } catch (SomeErrorException e) { |
| fail("Unexpecteded exception: " + e); |
| } |
| return null; |
| } |
| switch (exceptionCase) { |
| case ONE: |
| assertThrows(SomeErrorException1.class, () -> producer.tryProduceValue(env)); |
| break; |
| case TWO: |
| assertThrows(SomeErrorException2.class, () -> producer.tryProduceValue(env)); |
| break; |
| case THREE: |
| assertThrows(SomeErrorException3.class, () -> producer.tryProduceValue(env)); |
| break; |
| } |
| return DONE_VALUE; |
| }); |
| |
| var result = eval(rootKey, /* keepGoing= */ false); |
| |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A1); |
| assertThat(StringOrException3ProducerWithSingleLookup.isProcessValueOrExceptionCalled).isTrue(); |
| } |
| |
| @Test |
| public void lookupValue_matrix( |
| @TestParameter LookupType lookupType, |
| @TestParameter boolean useBatch, |
| @TestParameter boolean useTestingEvaluator) |
| throws InterruptedException { |
| var sink = new OmniSink(); |
| Supplier<StateMachine> rootSupplier = |
| () -> { |
| var lookup = lookupType.newLookup(KEY_A1, sink); |
| if (!useBatch) { |
| return lookup; |
| } |
| return new BatchPair(lookup); |
| }; |
| if (useTestingEvaluator) { |
| assertThat(runMachine(rootSupplier.get())).isTrue(); |
| } else { |
| var unused = defineRootMachine(rootSupplier); |
| // There are no errors in this test so the keepGoing value is arbitrary. |
| assertThat(eval(rootKey, /* keepGoing= */ true).get(rootKey)).isEqualTo(DONE_VALUE); |
| } |
| assertThat(sink.value).isEqualTo(VALUE_A1); |
| assertThat(sink.exception).isNull(); |
| } |
| |
| enum EvaluationMode { |
| NO_KEEP_GOING, |
| KEEP_GOING, |
| TEST_EVALUATOR, |
| } |
| |
| @Test |
| public void lookupErrors_matrix( |
| @TestParameter LookupType lookupType, |
| @TestParameter ExceptionCase exceptionCase, |
| @TestParameter boolean useBatch, |
| @TestParameter EvaluationMode evaluationMode) |
| throws InterruptedException { |
| var exception = exceptionCase.getException(); |
| tester |
| .getOrCreate(KEY_A1) |
| .unsetConstantValue() |
| .setBuilder( |
| (k, env) -> { |
| throw new ExceptionWrapper(exception); |
| }); |
| var sink = new OmniSink(); |
| Supplier<StateMachine> rootSupplier = |
| () -> { |
| var lookup = lookupType.newLookup(KEY_A1, sink); |
| if (!useBatch) { |
| return lookup; |
| } |
| return new BatchPair(lookup); |
| }; |
| |
| boolean keepGoing = false; |
| switch (evaluationMode) { |
| case TEST_EVALUATOR: |
| assertThat(runMachine(rootSupplier.get())).isFalse(); |
| if (exceptionCase.exceptionOrdinal() > lookupType.exceptionCount()) { |
| // Undeclared exception is not handled. |
| assertThat(sink.exception).isNull(); |
| } else { |
| // Declared exception is captured. |
| assertThat(sink.exception).isEqualTo(exception); |
| } |
| return; |
| case KEEP_GOING: |
| keepGoing = true; |
| break; |
| case NO_KEEP_GOING: |
| break; |
| } |
| |
| var unused = defineRootMachine(rootSupplier); |
| var result = eval(rootKey, keepGoing); |
| assertThat(sink.value).isNull(); |
| if (exceptionCase.exceptionOrdinal() > lookupType.exceptionCount()) { |
| // The exception was not handled. |
| assertThat(sink.exception).isNull(); |
| assertThat(result.get(rootKey)).isNull(); |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A1); |
| return; |
| } |
| assertThat(sink.exception).isEqualTo(exception); |
| if (keepGoing) { |
| // The error is completely handled. |
| assertThat(result.get(rootKey)).isEqualTo(DONE_VALUE); |
| return; |
| } |
| assertThatEvaluationResult(result).hasSingletonErrorThat(KEY_A1); |
| assertThat(result.get(rootKey)).isNull(); |
| } |
| |
| /** |
| * Sink for {@link SkyValue}s. |
| * |
| * <p>Verifies that the value is set no more than once. |
| */ |
| private static class SkyValueSink implements Consumer<SkyValue> { |
| private SkyValue value; |
| |
| @Override |
| public void accept(SkyValue value) { |
| assertThat(this.value).isNull(); |
| this.value = value; |
| } |
| |
| @Nullable |
| private SkyValue get() { |
| return value; |
| } |
| } |
| |
| // -------------------- Helpers for lookupErrors_matrix -------------------- |
| private static class Exception1 extends Exception {} |
| |
| private static class Exception2 extends Exception {} |
| |
| private static class Exception3 extends Exception {} |
| |
| private static class Exception4 extends Exception {} |
| |
| private static class ExceptionWrapper extends SkyFunctionException { |
| private ExceptionWrapper(Exception e) { |
| super(e, Transience.PERSISTENT); |
| } |
| } |
| |
| /** |
| * Adds a secondary lookup in parallel with a given {@link StateMachine}. |
| * |
| * <p>This causes the {@link Environment#getValuesAndExceptions} codepath in {@link Driver#drive} |
| * to be used instead of the {@link Lookup#doLookup} when there is a single lookup. |
| */ |
| private static class BatchPair implements StateMachine { |
| private final StateMachine other; |
| |
| private BatchPair(StateMachine other) { |
| this.other = other; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.enqueue(other); |
| tasks.lookUp(KEY_B1, v -> assertThat(v).isEqualTo(VALUE_B1)); |
| return DONE; |
| } |
| } |
| |
| private static class Lookup0 implements StateMachine { |
| private final SkyKey key; |
| private final Consumer<SkyValue> sink; |
| |
| private Lookup0(SkyKey key, Consumer<SkyValue> sink) { |
| this.key = key; |
| this.sink = sink; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp(key, sink); |
| return DONE; |
| } |
| } |
| |
| private static class Lookup1 implements StateMachine { |
| private final SkyKey key; |
| private final ValueOrExceptionSink<Exception1> sink; |
| |
| private Lookup1(SkyKey key, ValueOrExceptionSink<Exception1> sink) { |
| this.key = key; |
| this.sink = sink; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp(key, Exception1.class, sink); |
| return DONE; |
| } |
| } |
| |
| private static class Lookup2 implements StateMachine { |
| private final SkyKey key; |
| private final ValueOrException2Sink<Exception1, Exception2> sink; |
| |
| private Lookup2(SkyKey key, ValueOrException2Sink<Exception1, Exception2> sink) { |
| this.key = key; |
| this.sink = sink; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp(key, Exception1.class, Exception2.class, sink); |
| return DONE; |
| } |
| } |
| |
| private static class Lookup3 implements StateMachine { |
| private final SkyKey key; |
| private final ValueOrException3Sink<Exception1, Exception2, Exception3> sink; |
| |
| private Lookup3(SkyKey key, ValueOrException3Sink<Exception1, Exception2, Exception3> sink) { |
| this.key = key; |
| this.sink = sink; |
| } |
| |
| @Override |
| public StateMachine step(Tasks tasks) { |
| tasks.lookUp(key, Exception1.class, Exception2.class, Exception3.class, sink); |
| return DONE; |
| } |
| } |
| |
| private static class OmniSink |
| implements Consumer<SkyValue>, |
| StateMachine.ValueOrExceptionSink<Exception1>, |
| StateMachine.ValueOrException2Sink<Exception1, Exception2>, |
| StateMachine.ValueOrException3Sink<Exception1, Exception2, Exception3> { |
| private SkyValue value; |
| private Exception exception; |
| |
| @Override |
| public void accept(SkyValue value) { |
| checkState(this.value == null && exception == null); |
| this.value = checkNotNull(value); |
| } |
| |
| @Override |
| public void acceptValueOrException(@Nullable SkyValue value, @Nullable Exception1 exception1) { |
| checkState(this.value == null && exception == null); |
| if (value != null) { |
| this.value = value; |
| return; |
| } |
| if (exception1 != null) { |
| checkState(value == null); |
| this.exception = exception1; |
| } |
| } |
| |
| @Override |
| public void acceptValueOrException2( |
| @Nullable SkyValue value, |
| @Nullable Exception1 exception1, |
| @Nullable Exception2 exception2) { |
| checkState(this.value == null && exception == null); |
| if (value != null) { |
| checkState(exception1 == null && exception2 == null); |
| this.value = value; |
| return; |
| } |
| if (exception1 != null) { |
| checkState(value == null && exception2 == null); |
| this.exception = exception1; |
| return; |
| } |
| if (exception2 != null) { |
| checkState(value == null && exception1 == null); |
| this.exception = exception2; |
| } |
| } |
| |
| @Override |
| public void acceptValueOrException3( |
| @Nullable SkyValue value, |
| @Nullable Exception1 exception1, |
| @Nullable Exception2 exception2, |
| @Nullable Exception3 exception3) { |
| checkState(this.value == null && exception == null); |
| if (value != null) { |
| checkState(exception1 == null && exception2 == null && exception3 == null); |
| this.value = value; |
| return; |
| } |
| if (exception1 != null) { |
| checkState(value == null && exception2 == null && exception3 == null); |
| this.exception = exception1; |
| return; |
| } |
| if (exception2 != null) { |
| checkState(value == null && exception1 == null && exception3 == null); |
| this.exception = exception2; |
| return; |
| } |
| if (exception3 != null) { |
| checkState(value == null && exception1 == null && exception2 == null); |
| this.exception = exception3; |
| } |
| } |
| } |
| |
| private enum LookupType { |
| LOOKUP0 { |
| @Override |
| StateMachine newLookup(SkyKey key, OmniSink sink) { |
| return new Lookup0(key, sink); |
| } |
| |
| @Override |
| int exceptionCount() { |
| return 0; |
| } |
| }, |
| LOOKUP1 { |
| @Override |
| StateMachine newLookup(SkyKey key, OmniSink sink) { |
| return new Lookup1(key, sink); |
| } |
| |
| @Override |
| int exceptionCount() { |
| return 1; |
| } |
| }, |
| LOOKUP2 { |
| @Override |
| StateMachine newLookup(SkyKey key, OmniSink sink) { |
| return new Lookup2(key, sink); |
| } |
| |
| @Override |
| int exceptionCount() { |
| return 2; |
| } |
| }, |
| LOOKUP3 { |
| @Override |
| StateMachine newLookup(SkyKey key, OmniSink sink) { |
| return new Lookup3(key, sink); |
| } |
| |
| @Override |
| int exceptionCount() { |
| return 3; |
| } |
| }; |
| |
| abstract StateMachine newLookup(SkyKey key, OmniSink sink); |
| |
| abstract int exceptionCount(); |
| } |
| |
| private enum ExceptionCase { |
| EXCEPTION1 { |
| @Override |
| Exception getException() { |
| return new Exception1(); |
| } |
| |
| @Override |
| int exceptionOrdinal() { |
| return 1; |
| } |
| }, |
| EXCEPTION2 { |
| @Override |
| Exception getException() { |
| return new Exception2(); |
| } |
| |
| @Override |
| int exceptionOrdinal() { |
| return 2; |
| } |
| }, |
| EXCEPTION3 { |
| @Override |
| Exception getException() { |
| return new Exception3(); |
| } |
| |
| @Override |
| int exceptionOrdinal() { |
| return 3; |
| } |
| }, |
| EXCEPTION4 { |
| @Override |
| Exception getException() { |
| return new Exception4(); |
| } |
| |
| @Override |
| int exceptionOrdinal() { |
| return 4; |
| } |
| }; |
| |
| abstract Exception getException(); |
| |
| abstract int exceptionOrdinal(); |
| } |
| |
| private static class StateMachineWithMultipleConcurrentDriverWrapper |
| implements SkyKeyComputeState { |
| private final List<Driver> drivers = new ArrayList<>(); |
| |
| private StateMachineWithMultipleConcurrentDriverWrapper(List<StateMachine> stateMachines) { |
| for (StateMachine stateMachine : stateMachines) { |
| drivers.add(new Driver(stateMachine)); |
| } |
| } |
| |
| private boolean drive(LookupEnvironment env) throws InterruptedException { |
| ExecutorService executor = Executors.newFixedThreadPool(4); |
| AtomicBoolean allCompletes = new AtomicBoolean(true); |
| ConcurrentSkyFunctionEnvironment concurrentEnvironment = |
| new ConcurrentSkyFunctionEnvironment((SkyFunctionEnvironment) env); |
| for (Driver driver : drivers) { |
| var unused = |
| executor.submit( |
| () -> { |
| try { |
| if (!driver.drive(concurrentEnvironment)) { |
| allCompletes.set(false); |
| } |
| } catch (InterruptedException e) { |
| throw new AssertionError("No exception is expected to be thrown", e); |
| } |
| }); |
| } |
| |
| executor.shutdown(); |
| executor.awaitTermination(Long.MAX_VALUE, NANOSECONDS); |
| return allCompletes.get(); |
| } |
| } |
| |
| private AtomicInteger defineRootMachineWithMultipleDriver( |
| Supplier<List<StateMachine>> rootMachineSupplier) { |
| AtomicInteger restartCount = new AtomicInteger(); |
| tester |
| .getOrCreate(rootKey) |
| .setBuilder( |
| (k, env) -> { |
| if (!env.getState( |
| () -> |
| new StateMachineWithMultipleConcurrentDriverWrapper( |
| rootMachineSupplier.get())) |
| .drive(env)) { |
| restartCount.getAndIncrement(); |
| return null; |
| } |
| return DONE_VALUE; |
| }); |
| return restartCount; |
| } |
| |
| private int evalMachineWithMultipleDrivers(Supplier<List<StateMachine>> rootMachineSupplier) |
| throws InterruptedException { |
| AtomicInteger restartCount = defineRootMachineWithMultipleDriver(rootMachineSupplier); |
| assertThat(eval(rootKey, /* keepGoing= */ false).get(rootKey)).isEqualTo(DONE_VALUE); |
| return restartCount.get(); |
| } |
| |
| @Test |
| public void test_multipleStateMachinesInParallelDriver() throws InterruptedException { |
| for (int i = 0; i < 100; ++i) { |
| graph.remove(rootKey); |
| graph.remove(KEY_A1); |
| graph.remove(KEY_A2); |
| var v1Sink = new SkyValueSink(); |
| var v2Sink = new SkyValueSink(); |
| var v3Sink = new SkyValueSink(); |
| var v4Sink = new SkyValueSink(); |
| var v5Sink = new SkyValueSink(); |
| var v6Sink = new SkyValueSink(); |
| Supplier<List<StateMachine>> factory = |
| () -> |
| Arrays.asList( |
| new TwoStepMachine(v1Sink, v2Sink), |
| new TwoStepMachine(v3Sink, v4Sink), |
| new TwoStepMachine(v5Sink, v6Sink)); |
| assertThat(evalMachineWithMultipleDrivers(factory)).isEqualTo(2); |
| } |
| } |
| } |