| // Copyright 2024 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.util; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.devtools.build.lib.clock.BlazeClock; |
| import com.google.devtools.build.lib.clock.Clock; |
| import com.google.devtools.build.lib.testutil.ManualClock; |
| import com.google.devtools.build.lib.util.ConcurrencyMeter.Ticket; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Future.State; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link ConcurrencyMeter}. */ |
| @RunWith(JUnit4.class) |
| public final class ConcurrencyMeterTest { |
| |
| private static void assertFutureIsSuccessful(Future<?> future) { |
| assertThat(future.state()).isEqualTo(State.SUCCESS); |
| } |
| |
| @Test |
| public void testGrant() throws Exception { |
| ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance()); |
| AtomicBoolean isQueued = new AtomicBoolean(false); |
| |
| ListenableFuture<Ticket> req1 = scheduler.request(2, 0, () -> isQueued.set(true)); |
| assertFutureIsSuccessful(req1); |
| assertThat(scheduler.queueSize()).isEqualTo(0); |
| assertThat(isQueued.get()).isFalse(); |
| req1.get().done(); |
| |
| ListenableFuture<Ticket> req2 = scheduler.request(2, 0, () -> isQueued.set(true)); |
| assertFutureIsSuccessful(req2); |
| |
| ListenableFuture<Ticket> req3 = scheduler.request(1, 0, () -> isQueued.set(true)); |
| assertFutureIsSuccessful(req3); |
| assertThat(isQueued.get()).isFalse(); |
| assertThat(scheduler.queueSize()).isEqualTo(0); |
| } |
| |
| @Test |
| public void testBlock() throws Exception { |
| ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance()); |
| AtomicBoolean isQueued = new AtomicBoolean(false); |
| |
| ListenableFuture<Ticket> req1 = scheduler.request(2, 0, () -> isQueued.set(true)); |
| assertFutureIsSuccessful(req1); |
| |
| ListenableFuture<Ticket> req2 = scheduler.request(2, 0, () -> isQueued.set(true)); |
| assertThat(req2.isDone()).isFalse(); |
| assertThat(scheduler.queueSize()).isEqualTo(1); |
| assertThat(isQueued.get()).isTrue(); |
| |
| req1.get().done(); |
| assertFutureIsSuccessful(req2); |
| assertThat(scheduler.queueSize()).isEqualTo(0); |
| } |
| |
| @Test |
| public void testGrantZero() { |
| ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance()); |
| ListenableFuture<Ticket> req = scheduler.request(0, 0); |
| assertFutureIsSuccessful(req); |
| } |
| |
| @Test |
| public void testGrantFromZero() { |
| ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance()); |
| |
| ListenableFuture<Ticket> req1 = scheduler.request(10, 0); |
| assertFutureIsSuccessful(req1); |
| |
| ListenableFuture<Ticket> req2 = scheduler.request(0, 0); |
| assertThat(req2.isDone()).isFalse(); |
| } |
| |
| @Test |
| public void testPriority() throws Exception { |
| ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance()); |
| |
| ListenableFuture<Ticket> req1 = scheduler.request(2, 0); |
| assertFutureIsSuccessful(req1); |
| |
| ListenableFuture<Ticket> req2 = scheduler.request(2, 0); |
| assertThat(req2.isDone()).isFalse(); |
| |
| ListenableFuture<Ticket> req3 = scheduler.request(2, 1); |
| assertThat(req3.isDone()).isFalse(); |
| |
| req1.get().done(); |
| assertThat(req2.isDone()).isFalse(); |
| assertFutureIsSuccessful(req3); |
| |
| req3.get().done(); |
| assertFutureIsSuccessful(req2); |
| } |
| |
| @Test |
| public void testThreadSafety() throws Exception { |
| int requestsPerThread = 10; |
| int threads = 10; |
| Random r = new Random(); |
| |
| ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 100, BlazeClock.instance()); |
| ExecutorService exec = Executors.newFixedThreadPool(threads); |
| ExecutorService unboundedPool = Executors.newCachedThreadPool(); |
| List<Future<?>> results = new ArrayList<>(); |
| CountDownLatch allJobsDone = new CountDownLatch(threads * requestsPerThread); |
| |
| // For every thread, we'll ask for requestsPerThread resource bundles. For |
| // each of those, we'll set up a listener to release the resources after |
| // a small, but random amount of time. |
| for (int i = 0; i < threads; i++) { |
| results.add( |
| exec.submit( |
| () -> { |
| for (int j = 0; j < requestsPerThread; j++) { |
| int size = r.nextInt(20) + 3; |
| ListenableFuture<Ticket> req = scheduler.request(size, 0); |
| req.addListener( |
| () -> { |
| long sleepiness = r.nextInt(30); |
| try { |
| Thread.sleep(sleepiness); |
| req.get().done(); |
| allJobsDone.countDown(); |
| } catch (Exception e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| throw new IllegalStateException(e); |
| } |
| }, |
| unboundedPool); |
| } |
| })); |
| } |
| |
| exec.shutdown(); |
| exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); |
| |
| assertThat(results).hasSize(threads); |
| for (Future<?> result : results) { |
| assertFutureIsSuccessful(result); // Make sure nothing went wrong. |
| } |
| allJobsDone.await(); |
| |
| // Make sure nothing is left to be scheduled |
| assertFutureIsSuccessful(scheduler.request(0, 0)); |
| assertThat(scheduler.queueSize()).isEqualTo(0); |
| } |
| |
| @Test |
| public void cancelledRequest_releasedImmediately() throws Exception { |
| ConcurrencyMeter meter = new ConcurrencyMeter("meter", 1, BlazeClock.instance()); |
| Ticket ticket = meter.request(1, 1).get(); |
| ListenableFuture<Ticket> blockedRequest = meter.request(1, 1); |
| |
| blockedRequest.cancel(/* mayInterruptIfRunning= */ false); |
| assertThat(blockedRequest.isCancelled()).isTrue(); |
| |
| ticket.done(); |
| assertFutureIsSuccessful(meter.request(1, 1)); |
| } |
| |
| @Test |
| public void manyBlockedAllCancelled_noStackOverflow() throws Exception { |
| ConcurrencyMeter meter = new ConcurrencyMeter("meter", 1, BlazeClock.instance()); |
| Ticket liveTicket = meter.request(1, 1).get(); |
| |
| List<ListenableFuture<Ticket>> blockedRequests = new ArrayList<>(); |
| for (int i = 0; i < 100_000; i++) { |
| blockedRequests.add(meter.request(1, 1)); |
| } |
| for (ListenableFuture<Ticket> blockedRequest : blockedRequests) { |
| blockedRequest.cancel(/* mayInterruptIfRunning= */ true); |
| assertThat(blockedRequest.isCancelled()).isTrue(); |
| } |
| |
| liveTicket.done(); |
| } |
| |
| @Test |
| public void stats() throws Exception { |
| ManualClock clock = new ManualClock(); |
| ConcurrencyMeter meter = new ConcurrencyMeter("meter", 10, clock); |
| |
| Ticket ticket1 = meter.request(1, 1).get(); |
| Ticket ticket2 = meter.request(1, 1).get(); |
| clock.advance(Duration.ofMillis(1)); |
| |
| Instant timeOfMax = clock.now(); |
| meter.request(1, 1).get(); // Unreleased ticket. |
| ticket1.done(); |
| clock.advance(Duration.ofMillis(1)); |
| |
| ticket2.done(); |
| |
| assertThat(meter.getStats()) |
| .isEqualTo(new ConcurrencyMeter.Stats("meter", 10, 1, 3, timeOfMax.toEpochMilli())); |
| } |
| |
| @Test |
| public void stats_maxObservedMultipleTimes_maxLeasedTimeMsMatchesLastTime() throws Exception { |
| ManualClock clock = new ManualClock(); |
| ConcurrencyMeter meter = new ConcurrencyMeter("meter", 1, clock); |
| |
| Ticket ticket1 = meter.request(1, 1).get(); |
| ticket1.done(); |
| clock.advance(Duration.ofMillis(1)); |
| |
| Ticket ticket2 = meter.request(1, 1).get(); |
| ticket2.done(); |
| clock.advance(Duration.ofMillis(1)); |
| |
| Instant timeOfLastMax = clock.now(); |
| Ticket ticket3 = meter.request(1, 1).get(); |
| ticket3.done(); |
| clock.advance(Duration.ofMillis(1)); |
| |
| var stats = meter.getStats(); |
| assertThat(stats.maxLeasedTimeMs()).isEqualTo(timeOfLastMax.toEpochMilli()); |
| } |
| |
| @Test |
| public void stats_noPermitsLeased_noTimestamp() { |
| Clock throwingClock = |
| new Clock() { |
| @Override |
| public long currentTimeMillis() { |
| throw new UnsupportedOperationException("Should not need to get the current time"); |
| } |
| |
| @Override |
| public long nanoTime() { |
| throw new UnsupportedOperationException("Should not need to get the current time"); |
| } |
| }; |
| ConcurrencyMeter meter = new ConcurrencyMeter("meter", 1, throwingClock); |
| |
| var stats = meter.getStats(); |
| assertThat(stats.maxLeased()).isEqualTo(0); |
| assertThat(stats.maxLeasedTimeMs()).isEqualTo(0); |
| } |
| } |