| // Copyright 2016 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.concurrent; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Collections2; |
| import com.google.common.collect.ConcurrentHashMultiset; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Sets; |
| import com.google.devtools.build.lib.testutil.TestThread; |
| import com.google.devtools.build.lib.testutil.TestUtils; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link MultisetSemaphore}. */ |
| @RunWith(JUnit4.class) |
| public class MultisetSemaphoreTest { |
| |
| @Test |
| public void testSimple_Serial() throws Exception { |
| // When we have a MultisetSemaphore |
| MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() |
| // with 3 max num unique values, |
| .maxNumUniqueValues(3) |
| .build(); |
| |
| // Then it initially has 0 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(0); |
| |
| // And then when we serially acquire permits for 3 unique values, |
| multisetSemaphore.acquireAll(ImmutableSet.of("a", "b", "c")); |
| // Then the MultisetSemaphore thinks it currently has 3 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); |
| |
| // And then when we attempt to acquire permits for 2 of those same unique values, we don't block |
| // forever, |
| multisetSemaphore.acquireAll(ImmutableSet.of("b", "c")); |
| // And the MultisetSemaphore still thinks it currently has 3 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); |
| |
| // And then when we release one of the permit for one of those unique values, |
| multisetSemaphore.releaseAll(ImmutableSet.of("c")); |
| // The MultisetSemaphore still thinks it currently has 3 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); |
| |
| // And then we release the final permit for that unique value, |
| multisetSemaphore.releaseAll(ImmutableSet.of("c")); |
| // The MultisetSemaphore thinks it currently has 2 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(2); |
| |
| // And then we attempt to acquire a permit for a 4th unique value, we don't block forever, |
| multisetSemaphore.acquireAll(ImmutableSet.of("d")); |
| // And the MultisetSemaphore thinks it currently has 3 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); |
| |
| // And then we release one permit each for the remaining 3 that unique values, |
| multisetSemaphore.releaseAll(ImmutableSet.of("a", "b", "d")); |
| // The MultisetSemaphore thinks it currently has 1 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(1); |
| |
| // And then we release the final permit for the remaining unique value, |
| multisetSemaphore.releaseAll(ImmutableSet.of("b")); |
| // The MultisetSemaphore thinks it currently has 0 unique values. |
| assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(0); |
| } |
| |
| @Test |
| public void testSimple_Concurrent() throws Exception { |
| // When we have N and M, with M > N and M|N. |
| final int n = 10; |
| int m = n * 2; |
| Preconditions.checkState(m > n && m % n == 0, "M=%d N=%d", m, n); |
| // When we have a MultisetSemaphore |
| final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() |
| // with N max num unique values, |
| .maxNumUniqueValues(n) |
| .build(); |
| |
| // And a ExecutorService with M threads, |
| ExecutorService executorService = Executors.newFixedThreadPool(m); |
| // And a recorder for thrown exceptions, |
| ThrowableRecordingRunnableWrapper wrapper = |
| new ThrowableRecordingRunnableWrapper("testSimple_Concurrent"); |
| final AtomicInteger numThreadsJustAfterAcquireInFirstRound = new AtomicInteger(0); |
| final AtomicInteger numThreadsJustAfterAcquireInSecondRound = new AtomicInteger(0); |
| final AtomicInteger secondRoundCompleted = new AtomicInteger(0); |
| final int napTimeMs = 42; |
| for (int i = 0; i < m; i++) { |
| final String val = "val" + i; |
| // And we submit M Runnables, each of which |
| @SuppressWarnings("unused") |
| Future<?> possiblyIgnoredError = |
| executorService.submit( |
| wrapper.wrap( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // Has two rounds |
| |
| // Wherein the first round |
| // The Runnable acquire a permit for a unique value (among M values), |
| ImmutableSet<String> valSet = ImmutableSet.of(val); |
| multisetSemaphore.acquireAll(valSet); |
| assertThat(numThreadsJustAfterAcquireInFirstRound.getAndIncrement()) |
| .isLessThan(n); |
| // And then sleeps, |
| Thread.sleep(napTimeMs); |
| numThreadsJustAfterAcquireInFirstRound.decrementAndGet(); |
| multisetSemaphore.releaseAll(valSet); |
| |
| // And wherein the second round |
| // The Runnable again acquires a permit for its unique value, |
| multisetSemaphore.acquireAll(valSet); |
| assertThat(numThreadsJustAfterAcquireInSecondRound.getAndIncrement()) |
| .isLessThan(n); |
| // And then sleeps, |
| Thread.sleep(napTimeMs); |
| numThreadsJustAfterAcquireInSecondRound.decrementAndGet(); |
| // And notes that it has completed the second round, |
| secondRoundCompleted.incrementAndGet(); |
| multisetSemaphore.releaseAll(valSet); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| })); |
| } |
| // And we wait for all M Runnables to complete (that is, none of them were deadlocked), |
| boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); |
| // Then none of our Runnables threw any Exceptions. |
| assertThat(wrapper.getFirstThrownError()).isNull(); |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| throw new InterruptedException(); |
| } |
| // And the counters we used for sanity checks were correctly reset to 0. |
| assertThat(numThreadsJustAfterAcquireInFirstRound.get()).isEqualTo(0); |
| assertThat(numThreadsJustAfterAcquireInSecondRound.get()).isEqualTo(0); |
| // And all M Runnables completed the second round. |
| assertThat(secondRoundCompleted.get()).isEqualTo(m); |
| Set<String> newVals = new HashSet<>(); |
| for (int i = 0; i < n; i++) { |
| newVals.add("newval" + i); |
| } |
| // And the main test thread is able to acquire permits for N new unique values (indirectly |
| // confirming that the MultisetSemaphore previously had no outstanding permits). |
| multisetSemaphore.acquireAll(newVals); |
| } |
| |
| @Test |
| public void testConcurrentAtomicity() throws Exception { |
| int n = 100; |
| // When we have a MultisetSemaphore |
| final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() |
| // with 2 max num unique values, |
| .maxNumUniqueValues(2) |
| .build(); |
| // And a ExecutorService with N threads, |
| ExecutorService executorService = Executors.newFixedThreadPool(n); |
| // And a recorder for thrown exceptions, |
| ThrowableRecordingRunnableWrapper wrapper = |
| new ThrowableRecordingRunnableWrapper("testConcurrentAtomicity"); |
| final int napTimeMs = 42; |
| // And a done latch with initial count N, |
| final CountDownLatch allDoneLatch = new CountDownLatch(n); |
| final String sameVal = "same-val"; |
| for (int i = 0; i < n; i++) { |
| final String differentVal = "different-val" + i; |
| // And we submit N Runnables, each of which |
| @SuppressWarnings("unused") |
| Future<?> possiblyIgnoredError = |
| executorService.submit( |
| wrapper.wrap( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Set<String> vals = ImmutableSet.of(sameVal, differentVal); |
| // Tries to acquire permits for a set of two values, one of which is the |
| // same for all the N Runnables and one of which is unique across all N |
| // Runnables, |
| multisetSemaphore.acquireAll(vals); |
| // And then sleeps, |
| Thread.sleep(napTimeMs); |
| // And then releases its permits, |
| multisetSemaphore.releaseAll(vals); |
| // And then counts down the done latch, |
| allDoneLatch.countDown(); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| })); |
| } |
| // Then all of our Runnables completed (without deadlock!), as expected, |
| boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); |
| // And thus were able to count down the done latch, |
| allDoneLatch.await(); |
| // And also none of them threw any Exceptions. |
| assertThat(wrapper.getFirstThrownError()).isNull(); |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| throw new InterruptedException(); |
| } |
| } |
| |
| @Test |
| public void testConcurrentRace_AllPermuations() throws Exception { |
| // When we have N values |
| int n = 6; |
| ArrayList<String> vals = new ArrayList<>(); |
| for (int i = 0; i < n; i++) { |
| vals.add("val-" + i); |
| } |
| // And we have all permutations of these N values |
| Collection<List<String>> permutations = Collections2.orderedPermutations(vals); |
| int numPermutations = permutations.size(); |
| // And we have a MultisetSemaphore |
| final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() |
| // with N max num unique values, |
| .maxNumUniqueValues(n) |
| .build(); |
| // And a ExecutorService with N! threads, |
| ExecutorService executorService = Executors.newFixedThreadPool(numPermutations); |
| // And a recorder for thrown exceptions, |
| ThrowableRecordingRunnableWrapper wrapper = |
| new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations"); |
| for (List<String> orderedVals : permutations) { |
| final Set<String> orderedSet = new LinkedHashSet<>(orderedVals); |
| // And we submit N! Runnables, each of which |
| @SuppressWarnings("unused") |
| Future<?> possiblyIgnoredError = |
| executorService.submit( |
| wrapper.wrap( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // Tries to acquire permits for the set of N values, with a unique |
| // iteration order (across all the N! different permutations), |
| multisetSemaphore.acquireAll(orderedSet); |
| // And then immediately releases the permits. |
| multisetSemaphore.releaseAll(orderedSet); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| })); |
| } |
| // Then all of our Runnables completed (without deadlock!), as expected, |
| boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); |
| // And also none of them threw any Exceptions. |
| assertThat(wrapper.getFirstThrownError()).isNull(); |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| throw new InterruptedException(); |
| } |
| } |
| |
| @Test |
| public void testConcurrentRace_AllSameSizedCombinations() throws Exception { |
| // When we have n values |
| int n = 10; |
| ImmutableSet.Builder<String> valsBuilder = ImmutableSet.builder(); |
| for (int i = 0; i < n; i++) { |
| valsBuilder.add("val-" + i); |
| } |
| ImmutableSet<String> vals = valsBuilder.build(); |
| int k = 5; |
| // And we have all combinations of size k of these n values |
| Set<Set<String>> combinations = Sets.combinations(vals, k); |
| int numCombinations = combinations.size(); |
| // And we have a MultisetSemaphore |
| final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() |
| // with K max num unique values, |
| .maxNumUniqueValues(k) |
| .build(); |
| // And a ExecutorService with nCk threads, |
| ExecutorService executorService = Executors.newFixedThreadPool(numCombinations); |
| // And a recorder for thrown exceptions, |
| ThrowableRecordingRunnableWrapper wrapper = |
| new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations"); |
| // And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves, |
| ConcurrentHashMultiset<String> counts = ConcurrentHashMultiset.create(); |
| for (Set<String> combination : combinations) { |
| // And, for each of the nCk combinations, we submit a Runnable, that |
| @SuppressWarnings("unused") |
| Future<?> possiblyIgnoredError = |
| executorService.submit( |
| wrapper.wrap( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // Tries to acquire permits for its set of k values, |
| multisetSemaphore.acquireAll(combination); |
| // And then verifies that the multiplicities are as expected, |
| combination.forEach(counts::add); |
| assertThat(counts.entrySet().size()).isAtMost(k); |
| combination.forEach(counts::remove); |
| // And then releases the permits. |
| multisetSemaphore.releaseAll(combination); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| })); |
| } |
| // Then all of our Runnables completed (without deadlock!), as expected, |
| boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); |
| // And also none of them threw any Exceptions. |
| assertThat(wrapper.getFirstThrownError()).isNull(); |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| throw new InterruptedException(); |
| } |
| } |
| |
| @Test |
| public void testSimpleDeadlock() throws Exception { |
| final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() |
| .maxNumUniqueValues(2) |
| .build(); |
| |
| CountDownLatch thread1AcquiredLatch = new CountDownLatch(1); |
| CountDownLatch thread2AboutToAcquireLatch = new CountDownLatch(1); |
| CountDownLatch thread3AboutToAcquireLatch = new CountDownLatch(1); |
| |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| multisetSemaphore.acquireAll(ImmutableSet.of("a", "b")); |
| thread1AcquiredLatch.countDown(); |
| thread2AboutToAcquireLatch.await( |
| TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); |
| thread3AboutToAcquireLatch.await( |
| TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); |
| Thread.sleep(1000); |
| multisetSemaphore.releaseAll(ImmutableSet.of("a", "b")); |
| }); |
| thread1.setName("Thread1"); |
| |
| TestThread thread2 = |
| new TestThread( |
| () -> { |
| thread1AcquiredLatch.await( |
| TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); |
| thread2AboutToAcquireLatch.countDown(); |
| multisetSemaphore.acquireAll(ImmutableSet.of("b", "c")); |
| multisetSemaphore.releaseAll(ImmutableSet.of("b", "c")); |
| }); |
| thread2.setName("Thread2"); |
| |
| TestThread thread3 = |
| new TestThread( |
| () -> { |
| thread2AboutToAcquireLatch.await( |
| TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); |
| Thread.sleep(1000); |
| thread3AboutToAcquireLatch.countDown(); |
| multisetSemaphore.acquireAll(ImmutableSet.of("a", "d")); |
| multisetSemaphore.releaseAll(ImmutableSet.of("a", "d")); |
| }); |
| thread3.setName("Thread3"); |
| |
| thread1.start(); |
| thread2.start(); |
| thread3.start(); |
| |
| thread1.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| thread2.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| thread3.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| } |
| } |
| |