blob: 327b4c521ca085ee405ccadc13b1525acf7420f4 [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.testutil.TestThread;
import com.google.devtools.build.lib.testutil.TestUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link MultisetSemaphore}. */
@RunWith(JUnit4.class)
public class MultisetSemaphoreTest {
@Test
public void testSimple_Serial() throws Exception {
// When we have a MultisetSemaphore
MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
// with 3 max num unique values,
.maxNumUniqueValues(3)
.build();
// Then it initially has 0 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(0);
// And then when we serially acquire permits for 3 unique values,
multisetSemaphore.acquireAll(ImmutableSet.of("a", "b", "c"));
// Then the MultisetSemaphore thinks it currently has 3 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3);
// And then when we attempt to acquire permits for 2 of those same unique values, we don't block
// forever,
multisetSemaphore.acquireAll(ImmutableSet.of("b", "c"));
// And the MultisetSemaphore still thinks it currently has 3 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3);
// And then when we release one of the permit for one of those unique values,
multisetSemaphore.releaseAll(ImmutableSet.of("c"));
// The MultisetSemaphore still thinks it currently has 3 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3);
// And then we release the final permit for that unique value,
multisetSemaphore.releaseAll(ImmutableSet.of("c"));
// The MultisetSemaphore thinks it currently has 2 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(2);
// And then we attempt to acquire a permit for a 4th unique value, we don't block forever,
multisetSemaphore.acquireAll(ImmutableSet.of("d"));
// And the MultisetSemaphore thinks it currently has 3 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3);
// And then we release one permit each for the remaining 3 that unique values,
multisetSemaphore.releaseAll(ImmutableSet.of("a", "b", "d"));
// The MultisetSemaphore thinks it currently has 1 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(1);
// And then we release the final permit for the remaining unique value,
multisetSemaphore.releaseAll(ImmutableSet.of("b"));
// The MultisetSemaphore thinks it currently has 0 unique values.
assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(0);
}
@Test
public void testSimple_Concurrent() throws Exception {
// When we have N and M, with M > N and M|N.
final int n = 10;
int m = n * 2;
Preconditions.checkState(m > n && m % n == 0, "M=%d N=%d", m, n);
// When we have a MultisetSemaphore
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
// with N max num unique values,
.maxNumUniqueValues(n)
.build();
// And a ExecutorService with M threads,
ExecutorService executorService = Executors.newFixedThreadPool(m);
// And a recorder for thrown exceptions,
ThrowableRecordingRunnableWrapper wrapper =
new ThrowableRecordingRunnableWrapper("testSimple_Concurrent");
final AtomicInteger numThreadsJustAfterAcquireInFirstRound = new AtomicInteger(0);
final AtomicInteger numThreadsJustAfterAcquireInSecondRound = new AtomicInteger(0);
final AtomicInteger secondRoundCompleted = new AtomicInteger(0);
final int napTimeMs = 42;
for (int i = 0; i < m; i++) {
final String val = "val" + i;
// And we submit M Runnables, each of which
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executorService.submit(
wrapper.wrap(
new Runnable() {
@Override
public void run() {
try {
// Has two rounds
// Wherein the first round
// The Runnable acquire a permit for a unique value (among M values),
ImmutableSet<String> valSet = ImmutableSet.of(val);
multisetSemaphore.acquireAll(valSet);
assertThat(numThreadsJustAfterAcquireInFirstRound.getAndIncrement())
.isLessThan(n);
// And then sleeps,
Thread.sleep(napTimeMs);
numThreadsJustAfterAcquireInFirstRound.decrementAndGet();
multisetSemaphore.releaseAll(valSet);
// And wherein the second round
// The Runnable again acquires a permit for its unique value,
multisetSemaphore.acquireAll(valSet);
assertThat(numThreadsJustAfterAcquireInSecondRound.getAndIncrement())
.isLessThan(n);
// And then sleeps,
Thread.sleep(napTimeMs);
numThreadsJustAfterAcquireInSecondRound.decrementAndGet();
// And notes that it has completed the second round,
secondRoundCompleted.incrementAndGet();
multisetSemaphore.releaseAll(valSet);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}));
}
// And we wait for all M Runnables to complete (that is, none of them were deadlocked),
boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
// Then none of our Runnables threw any Exceptions.
assertThat(wrapper.getFirstThrownError()).isNull();
if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
// And the counters we used for sanity checks were correctly reset to 0.
assertThat(numThreadsJustAfterAcquireInFirstRound.get()).isEqualTo(0);
assertThat(numThreadsJustAfterAcquireInSecondRound.get()).isEqualTo(0);
// And all M Runnables completed the second round.
assertThat(secondRoundCompleted.get()).isEqualTo(m);
Set<String> newVals = new HashSet<>();
for (int i = 0; i < n; i++) {
newVals.add("newval" + i);
}
// And the main test thread is able to acquire permits for N new unique values (indirectly
// confirming that the MultisetSemaphore previously had no outstanding permits).
multisetSemaphore.acquireAll(newVals);
}
@Test
public void testConcurrentAtomicity() throws Exception {
int n = 100;
// When we have a MultisetSemaphore
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
// with 2 max num unique values,
.maxNumUniqueValues(2)
.build();
// And a ExecutorService with N threads,
ExecutorService executorService = Executors.newFixedThreadPool(n);
// And a recorder for thrown exceptions,
ThrowableRecordingRunnableWrapper wrapper =
new ThrowableRecordingRunnableWrapper("testConcurrentAtomicity");
final int napTimeMs = 42;
// And a done latch with initial count N,
final CountDownLatch allDoneLatch = new CountDownLatch(n);
final String sameVal = "same-val";
for (int i = 0; i < n; i++) {
final String differentVal = "different-val" + i;
// And we submit N Runnables, each of which
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executorService.submit(
wrapper.wrap(
new Runnable() {
@Override
public void run() {
try {
Set<String> vals = ImmutableSet.of(sameVal, differentVal);
// Tries to acquire permits for a set of two values, one of which is the
// same for all the N Runnables and one of which is unique across all N
// Runnables,
multisetSemaphore.acquireAll(vals);
// And then sleeps,
Thread.sleep(napTimeMs);
// And then releases its permits,
multisetSemaphore.releaseAll(vals);
// And then counts down the done latch,
allDoneLatch.countDown();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}));
}
// Then all of our Runnables completed (without deadlock!), as expected,
boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
// And thus were able to count down the done latch,
allDoneLatch.await();
// And also none of them threw any Exceptions.
assertThat(wrapper.getFirstThrownError()).isNull();
if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
}
@Test
public void testConcurrentRace_AllPermuations() throws Exception {
// When we have N values
int n = 6;
ArrayList<String> vals = new ArrayList<>();
for (int i = 0; i < n; i++) {
vals.add("val-" + i);
}
// And we have all permutations of these N values
Collection<List<String>> permutations = Collections2.orderedPermutations(vals);
int numPermutations = permutations.size();
// And we have a MultisetSemaphore
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
// with N max num unique values,
.maxNumUniqueValues(n)
.build();
// And a ExecutorService with N! threads,
ExecutorService executorService = Executors.newFixedThreadPool(numPermutations);
// And a recorder for thrown exceptions,
ThrowableRecordingRunnableWrapper wrapper =
new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations");
for (List<String> orderedVals : permutations) {
final Set<String> orderedSet = new LinkedHashSet<>(orderedVals);
// And we submit N! Runnables, each of which
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executorService.submit(
wrapper.wrap(
new Runnable() {
@Override
public void run() {
try {
// Tries to acquire permits for the set of N values, with a unique
// iteration order (across all the N! different permutations),
multisetSemaphore.acquireAll(orderedSet);
// And then immediately releases the permits.
multisetSemaphore.releaseAll(orderedSet);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}));
}
// Then all of our Runnables completed (without deadlock!), as expected,
boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
// And also none of them threw any Exceptions.
assertThat(wrapper.getFirstThrownError()).isNull();
if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
}
@Test
public void testConcurrentRace_AllSameSizedCombinations() throws Exception {
// When we have n values
int n = 10;
ImmutableSet.Builder<String> valsBuilder = ImmutableSet.builder();
for (int i = 0; i < n; i++) {
valsBuilder.add("val-" + i);
}
ImmutableSet<String> vals = valsBuilder.build();
int k = 5;
// And we have all combinations of size k of these n values
Set<Set<String>> combinations = Sets.combinations(vals, k);
int numCombinations = combinations.size();
// And we have a MultisetSemaphore
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
// with K max num unique values,
.maxNumUniqueValues(k)
.build();
// And a ExecutorService with nCk threads,
ExecutorService executorService = Executors.newFixedThreadPool(numCombinations);
// And a recorder for thrown exceptions,
ThrowableRecordingRunnableWrapper wrapper =
new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations");
// And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves,
ConcurrentHashMultiset<String> counts = ConcurrentHashMultiset.create();
for (Set<String> combination : combinations) {
// And, for each of the nCk combinations, we submit a Runnable, that
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executorService.submit(
wrapper.wrap(
new Runnable() {
@Override
public void run() {
try {
// Tries to acquire permits for its set of k values,
multisetSemaphore.acquireAll(combination);
// And then verifies that the multiplicities are as expected,
combination.forEach(counts::add);
assertThat(counts.entrySet().size()).isAtMost(k);
combination.forEach(counts::remove);
// And then releases the permits.
multisetSemaphore.releaseAll(combination);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}));
}
// Then all of our Runnables completed (without deadlock!), as expected,
boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
// And also none of them threw any Exceptions.
assertThat(wrapper.getFirstThrownError()).isNull();
if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
}
@Test
public void testSimpleDeadlock() throws Exception {
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
.maxNumUniqueValues(2)
.build();
CountDownLatch thread1AcquiredLatch = new CountDownLatch(1);
CountDownLatch thread2AboutToAcquireLatch = new CountDownLatch(1);
CountDownLatch thread3AboutToAcquireLatch = new CountDownLatch(1);
TestThread thread1 =
new TestThread(
() -> {
multisetSemaphore.acquireAll(ImmutableSet.of("a", "b"));
thread1AcquiredLatch.countDown();
thread2AboutToAcquireLatch.await(
TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
thread3AboutToAcquireLatch.await(
TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
Thread.sleep(1000);
multisetSemaphore.releaseAll(ImmutableSet.of("a", "b"));
});
thread1.setName("Thread1");
TestThread thread2 =
new TestThread(
() -> {
thread1AcquiredLatch.await(
TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
thread2AboutToAcquireLatch.countDown();
multisetSemaphore.acquireAll(ImmutableSet.of("b", "c"));
multisetSemaphore.releaseAll(ImmutableSet.of("b", "c"));
});
thread2.setName("Thread2");
TestThread thread3 =
new TestThread(
() -> {
thread2AboutToAcquireLatch.await(
TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
Thread.sleep(1000);
thread3AboutToAcquireLatch.countDown();
multisetSemaphore.acquireAll(ImmutableSet.of("a", "d"));
multisetSemaphore.releaseAll(ImmutableSet.of("a", "d"));
});
thread3.setName("Thread3");
thread1.start();
thread2.start();
thread3.start();
thread1.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
thread2.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
thread3.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
}
}