| // Copyright 2015 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.actions; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| import static org.junit.Assert.assertThrows; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.ImmutableSortedMap; |
| import com.google.common.hash.HashCode; |
| import com.google.devtools.build.lib.actions.ExecutionRequirements.WorkerProtocolFormat; |
| import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle; |
| import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority; |
| import com.google.devtools.build.lib.analysis.platform.PlatformInfo; |
| import com.google.devtools.build.lib.collect.nestedset.NestedSet; |
| import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder; |
| import com.google.devtools.build.lib.collect.nestedset.Order; |
| import com.google.devtools.build.lib.testutil.TestThread; |
| import com.google.devtools.build.lib.testutil.TestUtils; |
| import com.google.devtools.build.lib.vfs.DigestHashFunction; |
| import com.google.devtools.build.lib.vfs.FileSystem; |
| import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; |
| import com.google.devtools.build.lib.worker.Worker; |
| import com.google.devtools.build.lib.worker.WorkerFactory; |
| import com.google.devtools.build.lib.worker.WorkerKey; |
| import com.google.devtools.build.lib.worker.WorkerPool; |
| import com.google.errorprone.annotations.CanIgnoreReturnValue; |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.Collection; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import javax.annotation.Nullable; |
| import org.apache.commons.pool2.PooledObject; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link ResourceManager}. */ |
| @RunWith(JUnit4.class) |
| public final class ResourceManagerTest { |
| |
| private final FileSystem fs = new InMemoryFileSystem(DigestHashFunction.SHA256); |
| private final ActionExecutionMetadata resourceOwner = new ResourceOwnerStub(); |
| private final ResourceManager rm = ResourceManager.instanceForTestingOnly(); |
| private Worker worker; |
| private AtomicInteger counter; |
| CyclicBarrier sync; |
| CyclicBarrier sync2; |
| |
| @Before |
| public void configureResourceManager() throws Exception { |
| rm.setAvailableResources( |
| ResourceSet.create( |
| /* memoryMb= */ 1000, |
| /* cpuUsage= */ 1, |
| /* extraResourceUsage= */ ImmutableMap.of( |
| "gpu", 2.0f, |
| "fancyresource", 1.5f), |
| /* localTestCount= */ 2)); |
| counter = new AtomicInteger(0); |
| sync = new CyclicBarrier(2); |
| sync2 = new CyclicBarrier(2); |
| rm.resetResourceUsage(); |
| rm.setPrioritizeLocalActions(true); |
| worker = mock(Worker.class); |
| rm.setWorkerPool(createWorkerPool()); |
| } |
| |
| private WorkerPool createWorkerPool() { |
| return new WorkerPool( |
| new WorkerPool.WorkerPoolConfig( |
| new WorkerFactory(fs.getPath("/workerBase")) { |
| @Override |
| public Worker create(WorkerKey key) { |
| return worker; |
| } |
| |
| @Override |
| public boolean validateObject(WorkerKey key, PooledObject<Worker> p) { |
| return true; |
| } |
| }, |
| ImmutableList.of(), |
| ImmutableList.of(), |
| ImmutableList.of())); |
| } |
| |
| private ResourceHandle acquire(double ram, double cpu, int tests, ResourcePriority priority) |
| throws InterruptedException, IOException { |
| return rm.acquireResources(resourceOwner, ResourceSet.create(ram, cpu, tests), priority); |
| } |
| |
| private ResourceHandle acquire(double ram, double cpu, int tests) |
| throws InterruptedException, IOException { |
| return acquire(ram, cpu, tests, ResourcePriority.LOCAL); |
| } |
| |
| private ResourceHandle acquire(double ram, double cpu, int tests, String mnemonic) |
| throws InterruptedException, IOException { |
| |
| return rm.acquireResources( |
| resourceOwner, |
| ResourceSet.createWithWorkerKey(ram, cpu, tests, createWorkerKey(mnemonic)), |
| ResourcePriority.LOCAL); |
| } |
| |
| @CanIgnoreReturnValue |
| private ResourceHandle acquire( |
| double ram, |
| double cpu, |
| ImmutableMap<String, Float> extraResources, |
| int tests, |
| ResourcePriority priority) |
| throws InterruptedException, IOException, NoSuchElementException { |
| return rm.acquireResources( |
| resourceOwner, ResourceSet.create(ram, cpu, extraResources, tests), priority); |
| } |
| |
| @CanIgnoreReturnValue |
| private ResourceHandle acquire( |
| double ram, double cpu, ImmutableMap<String, Float> extraResources, int tests) |
| throws InterruptedException, IOException, NoSuchElementException { |
| return acquire(ram, cpu, extraResources, tests, ResourcePriority.LOCAL); |
| } |
| |
| private void release(double ram, double cpu, int tests) throws IOException, InterruptedException { |
| rm.releaseResources(resourceOwner, ResourceSet.create(ram, cpu, tests), /* worker= */ null); |
| } |
| |
| private void release( |
| double ram, double cpu, ImmutableMap<String, Float> extraResources, int tests) |
| throws InterruptedException, IOException { |
| rm.releaseResources( |
| resourceOwner, ResourceSet.create(ram, cpu, extraResources, tests), /* worker= */ null); |
| } |
| |
| private void validate(int count) { |
| assertThat(counter.incrementAndGet()).isEqualTo(count); |
| } |
| |
| private WorkerKey createWorkerKey(String mnemonic) { |
| return new WorkerKey( |
| /* args= */ ImmutableList.of(), |
| /* env= */ ImmutableMap.of(), |
| /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), |
| /* mnemonic= */ mnemonic, |
| /* workerFilesCombinedHash= */ HashCode.fromInt(0), |
| /* workerFilesWithDigests= */ ImmutableSortedMap.of(), |
| /* sandboxed= */ false, |
| /* multiplex= */ false, |
| /* cancellable= */ false, |
| WorkerProtocolFormat.PROTO); |
| } |
| |
| @Test |
| public void testOverBudgetRequests() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| // When nothing is consuming RAM, |
| // Then Resource Manager will successfully acquire an over-budget request for RAM: |
| double bigRam = 10000.0; |
| acquire(bigRam, 0, 0); |
| // When RAM is consumed, |
| // Then Resource Manager will be "in use": |
| assertThat(rm.inUse()).isTrue(); |
| release(bigRam, 0, 0); |
| // When that RAM is released, |
| // Then Resource Manager will not be "in use": |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Ditto, for CPU: |
| double bigCpu = 10.0; |
| acquire(0, bigCpu, 0); |
| assertThat(rm.inUse()).isTrue(); |
| release(0, bigCpu, 0); |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Ditto, for tests: |
| int bigTests = 10; |
| acquire(0, 0, bigTests); |
| assertThat(rm.inUse()).isTrue(); |
| release(0, 0, bigTests); |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Ditto, for extra resources: |
| ImmutableMap<String, Float> bigExtraResources = |
| ImmutableMap.of("gpu", 10.0f, "fancyresource", 10.0f); |
| acquire(0, 0, bigExtraResources, 0); |
| assertThat(rm.inUse()).isTrue(); |
| release(0, 0, bigExtraResources, 0); |
| assertThat(rm.inUse()).isFalse(); |
| } |
| |
| @Test |
| public void testThatCpuCanBeOverallocated() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Given CPU is partially acquired: |
| acquire(0, 0.5, 0); |
| |
| // When a request for CPU is made that would slightly overallocate CPU, |
| // Then the request succeeds: |
| TestThread thread1 = new TestThread(() -> assertThat(acquire(0, 0.6, 0)).isNotNull()); |
| thread1.start(); |
| thread1.joinAndAssertState(10000); |
| } |
| |
| @Test |
| public void testThatCpuAllocationIsNoncommutative() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Given that CPU has a small initial allocation: |
| acquire(0, 0.099, 0); |
| |
| // When a request for a large CPU allocation is made, |
| // Then the request succeeds: |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| acquire(0, 0.99, 0); |
| // Cleanup |
| release(0, 0.99, 0); |
| }); |
| thread1.start(); |
| thread1.joinAndAssertState(10000); |
| |
| // Cleanup |
| release(0, 0.099, 0); |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Given that CPU has a large initial allocation: |
| acquire(0, 0.99, 0); |
| |
| // When a request for a small CPU allocation is made, |
| // Then the request fails: |
| TestThread thread2 = new TestThread(() -> acquire(0, 0.099, 0)); |
| thread2.start(); |
| AssertionError e = assertThrows(AssertionError.class, () -> thread2.joinAndAssertState(1000)); |
| assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive"); |
| // Note that this behavior is surprising and probably not intended. |
| } |
| |
| @Test |
| public void testThatRamCannotBeOverallocated() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Given RAM is partially acquired: |
| acquire(500, 0, 0); |
| |
| // When a request for RAM is made that would slightly overallocate RAM, |
| // Then the request fails (got timeout): |
| TestThread thread1 = new TestThread(() -> acquire(600, 0, 0)); |
| thread1.start(); |
| AssertionError e = assertThrows(AssertionError.class, () -> thread1.joinAndAssertState(1000)); |
| assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive"); |
| } |
| |
| @Test |
| public void testThatTestsCannotBeOverallocated() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Given test count is partially acquired: |
| acquire(0, 0, 1); |
| |
| // When a request for tests is made that would slightly overallocate tests, |
| // Then the request fails: |
| TestThread thread1 = new TestThread(() -> acquire(0, 0, 2)); |
| thread1.start(); |
| AssertionError e = assertThrows(AssertionError.class, () -> thread1.joinAndAssertState(1000)); |
| assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive"); |
| } |
| |
| @Test |
| public void testThatExtraResourcesCannotBeOverallocated() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| // Given a partially acquired extra resources: |
| acquire(0, 0, ImmutableMap.of("gpu", 1.0f), 1); |
| |
| // When a request for extra resources is made that would overallocate, |
| // Then the request fails: |
| TestThread thread1 = new TestThread(() -> acquire(0, 0, ImmutableMap.of("gpu", 1.1f), 0)); |
| thread1.start(); |
| AssertionError e = assertThrows(AssertionError.class, () -> thread1.joinAndAssertState(1000)); |
| assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive"); |
| } |
| |
| @Test |
| public void testHasResources() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| assertThat(rm.threadHasResources()).isFalse(); |
| acquire(1, 0.1, ImmutableMap.of("gpu", 1.0f), 1); |
| assertThat(rm.threadHasResources()).isTrue(); |
| |
| // We have resources in this thread - make sure other threads |
| // are not affected. |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| assertThat(rm.threadHasResources()).isFalse(); |
| acquire(1, 0, 0); |
| assertThat(rm.threadHasResources()).isTrue(); |
| release(1, 0, 0); |
| assertThat(rm.threadHasResources()).isFalse(); |
| acquire(0, 0.1, 0); |
| assertThat(rm.threadHasResources()).isTrue(); |
| release(0, 0.1, 0); |
| assertThat(rm.threadHasResources()).isFalse(); |
| acquire(0, 0, 1); |
| assertThat(rm.threadHasResources()).isTrue(); |
| release(0, 0, 1); |
| assertThat(rm.threadHasResources()).isFalse(); |
| acquire(0, 0, ImmutableMap.of("gpu", 1.0f), 0); |
| assertThat(rm.threadHasResources()).isTrue(); |
| release(0, 0, ImmutableMap.of("gpu", 1.0f), 0); |
| assertThat(rm.threadHasResources()).isFalse(); |
| }); |
| thread1.start(); |
| thread1.joinAndAssertState(10000); |
| |
| release(1, 0.1, ImmutableMap.of("gpu", 1.0f), 1); |
| assertThat(rm.threadHasResources()).isFalse(); |
| assertThat(rm.inUse()).isFalse(); |
| } |
| |
| @Test |
| @SuppressWarnings("ThreadPriorityCheck") |
| public void testConcurrentLargeRequests() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| acquire(2000, 2, 0); |
| sync.await(); |
| validate(1); |
| sync.await(); |
| // Wait till other thread will be locked. |
| while (rm.getWaitCount() == 0) { |
| Thread.yield(); |
| } |
| release(2000, 2, 0); |
| assertThat(rm.getWaitCount()).isEqualTo(0); |
| acquire(2000, 2, 0); // Will be blocked by the thread2. |
| validate(3); |
| release(2000, 2, 0); |
| }); |
| TestThread thread2 = |
| new TestThread( |
| () -> { |
| sync2.await(); |
| assertThat(isAvailable(rm, 2000, 2, 0)).isFalse(); |
| acquire(2000, 2, 0); // Will be blocked by the thread1. |
| validate(2); |
| sync2.await(); |
| // Wait till other thread will be locked. |
| while (rm.getWaitCount() == 0) { |
| Thread.yield(); |
| } |
| release(2000, 2, 0); |
| }); |
| |
| thread1.start(); |
| thread2.start(); |
| sync.await(1, TimeUnit.SECONDS); |
| assertThat(rm.inUse()).isTrue(); |
| assertThat(rm.getWaitCount()).isEqualTo(0); |
| sync2.await(1, TimeUnit.SECONDS); |
| sync.await(1, TimeUnit.SECONDS); |
| sync2.await(1, TimeUnit.SECONDS); |
| thread1.joinAndAssertState(1000); |
| thread2.joinAndAssertState(1000); |
| assertThat(rm.inUse()).isFalse(); |
| } |
| |
| @Test |
| public void testInterruptedAcquisitionClearsResources() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| // Acquire a small amount of resources so that future requests can block (the initial request |
| // always succeeds even if it's for too much). |
| TestThread smallThread = new TestThread(() -> acquire(1, 0, 0)); |
| smallThread.start(); |
| smallThread.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| Thread.currentThread().interrupt(); |
| assertThrows(InterruptedException.class, () -> acquire(1999, 0, 0)); |
| }); |
| thread1.start(); |
| thread1.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| // This should process the queue. If the request from above is still present, it will take all |
| // the available memory. But it shouldn't. |
| rm.setAvailableResources( |
| ResourceSet.create(/*memoryMb=*/ 2000, /*cpuUsage=*/ 1, /* localTestCount= */ 2)); |
| TestThread thread2 = |
| new TestThread( |
| () -> { |
| acquire(1999, 0, 0); |
| release(1999, 0, 0); |
| }); |
| thread2.start(); |
| thread2.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); |
| } |
| |
| @Test |
| @SuppressWarnings("ThreadPriorityCheck") |
| public void testOutOfOrderAllocation() throws Exception { |
| final CyclicBarrier sync3 = new CyclicBarrier(2); |
| final CyclicBarrier sync4 = new CyclicBarrier(2); |
| |
| assertThat(rm.inUse()).isFalse(); |
| |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| sync.await(); |
| acquire(900, 0.5, 0); // Will be blocked by the main thread. |
| validate(5); |
| release(900, 0.5, 0); |
| sync.await(); |
| }); |
| |
| TestThread thread2 = |
| new TestThread( |
| () -> { |
| // Wait till other thread will be locked |
| while (rm.getWaitCount() == 0) { |
| Thread.yield(); |
| } |
| acquire(100, 0.1, 0); |
| validate(2); |
| release(100, 0.1, 0); |
| sync2.await(); |
| acquire(200, 0.5, 0); |
| validate(4); |
| sync2.await(); |
| release(200, 0.5, 0); |
| }); |
| |
| TestThread thread3 = |
| new TestThread( |
| () -> { |
| acquire(100, 0.4, 0); |
| sync3.await(); |
| sync3.await(); |
| release(100, 0.4, 0); |
| }); |
| |
| TestThread thread4 = |
| new TestThread( |
| () -> { |
| acquire(750, 0.3, 0); |
| sync4.await(); |
| sync4.await(); |
| release(750, 0.3, 0); |
| }); |
| |
| // Lock 900 MB, 0.9 CPU in total (spread over three threads so that we can individually release |
| // parts of it). |
| acquire(50, 0.2, 0); |
| thread3.start(); |
| thread4.start(); |
| sync3.await(1, TimeUnit.SECONDS); |
| sync4.await(1, TimeUnit.SECONDS); |
| validate(1); |
| |
| // Start thread1, which will try to acquire 900 MB, 0.5 CPU, but can't, so it has to wait. |
| thread1.start(); |
| sync.await(1, TimeUnit.SECONDS); |
| |
| // Start thread2, which will successfully acquire and release 100 MB, 0.1 CPU. |
| thread2.start(); |
| // Signal thread2 to acquire 200 MB and 0.5 CPU, which will block. |
| sync2.await(1, TimeUnit.SECONDS); |
| |
| // Waiting till both threads are locked. |
| while (rm.getWaitCount() < 2) { |
| Thread.yield(); |
| } |
| |
| validate(3); // Thread1 is now first in the queue and Thread2 is second. |
| |
| // Release 100 MB, 0.4 CPU. This allows Thread2 to continue out of order. |
| sync3.await(1, TimeUnit.SECONDS); |
| sync2.await(1, TimeUnit.SECONDS); |
| |
| // Release 750 MB, 0.3 CPU. At this point thread1 will finally acquire resources. |
| sync4.await(1, TimeUnit.SECONDS); |
| sync.await(1, TimeUnit.SECONDS); |
| |
| // Release all remaining resources. |
| release(50, 0.2, 0); |
| thread1.join(); |
| thread2.join(); |
| thread3.join(); |
| thread4.join(); |
| |
| assertThat(rm.inUse()).isFalse(); |
| } |
| |
| @Test |
| @SuppressWarnings("ThreadPriorityCheck") |
| public void testRelease_noPriority() throws Exception { |
| rm.setPrioritizeLocalActions(false); |
| assertThat(rm.inUse()).isFalse(); |
| |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| acquire(700, 0, 0); |
| sync.await(); |
| sync2.await(); |
| release(700, 0, 0); |
| }); |
| thread1.start(); |
| // Wait for thread1 to have acquired its RAM |
| sync.await(1, TimeUnit.SECONDS); |
| |
| // Set up threads that compete for resources |
| CyclicBarrier syncDynamicStandalone = |
| startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE); |
| while (rm.getWaitCount() < 1) { |
| Thread.yield(); |
| } |
| CyclicBarrier syncDynamicWorker = startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER); |
| while (rm.getWaitCount() < 2) { |
| Thread.yield(); |
| } |
| CyclicBarrier syncLocal = startAcquireReleaseThread(ResourcePriority.LOCAL); |
| while (rm.getWaitCount() < 3) { |
| Thread.yield(); |
| } |
| |
| sync2.await(); |
| |
| while (syncLocal.getNumberWaiting() |
| + syncDynamicWorker.getNumberWaiting() |
| + syncDynamicStandalone.getNumberWaiting() |
| == 0) { |
| Thread.yield(); |
| } |
| assertThat(rm.getWaitCount()).isEqualTo(2); |
| assertThat(syncDynamicStandalone.getNumberWaiting()).isEqualTo(1); |
| syncDynamicStandalone.await(1, TimeUnit.SECONDS); |
| |
| while (syncDynamicWorker.getNumberWaiting() + syncLocal.getNumberWaiting() == 0) { |
| Thread.yield(); |
| } |
| assertThat(syncDynamicWorker.getNumberWaiting()).isEqualTo(1); |
| assertThat(rm.getWaitCount()).isEqualTo(1); |
| |
| syncDynamicWorker.await(1, TimeUnit.SECONDS); |
| while (syncLocal.getNumberWaiting() == 0) { |
| Thread.yield(); |
| } |
| assertThat(syncLocal.getNumberWaiting()).isEqualTo(1); |
| assertThat(rm.getWaitCount()).isEqualTo(0); |
| syncLocal.await(1, TimeUnit.SECONDS); |
| } |
| |
| @Test |
| @SuppressWarnings("ThreadPriorityCheck") |
| public void testRelease_highPriorityFirst() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| acquire(700, 0, 0); |
| sync.await(); |
| sync2.await(); |
| release(700, 0, 0); |
| }); |
| thread1.start(); |
| // Wait for thread1 to have acquired its RAM |
| sync.await(1, TimeUnit.SECONDS); |
| |
| // Set up threads that compete for resources |
| CyclicBarrier syncDynamicStandalone = |
| startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE); |
| while (rm.getWaitCount() < 1) { |
| Thread.yield(); |
| } |
| CyclicBarrier syncDynamicWorker = startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER); |
| while (rm.getWaitCount() < 2) { |
| Thread.yield(); |
| } |
| CyclicBarrier syncLocal = startAcquireReleaseThread(ResourcePriority.LOCAL); |
| while (rm.getWaitCount() < 3) { |
| Thread.yield(); |
| } |
| |
| sync2.await(); |
| |
| while (syncLocal.getNumberWaiting() |
| + syncDynamicWorker.getNumberWaiting() |
| + syncDynamicStandalone.getNumberWaiting() |
| == 0) { |
| Thread.yield(); |
| } |
| assertThat(rm.getWaitCount()).isEqualTo(2); |
| assertThat(syncLocal.getNumberWaiting()).isEqualTo(1); |
| syncLocal.await(1, TimeUnit.SECONDS); |
| |
| while (syncDynamicWorker.getNumberWaiting() + syncDynamicStandalone.getNumberWaiting() == 0) { |
| Thread.yield(); |
| } |
| assertThat(syncDynamicWorker.getNumberWaiting()).isEqualTo(1); |
| assertThat(rm.getWaitCount()).isEqualTo(1); |
| |
| syncDynamicWorker.await(1, TimeUnit.SECONDS); |
| while (syncDynamicStandalone.getNumberWaiting() == 0) { |
| Thread.yield(); |
| } |
| assertThat(syncDynamicStandalone.getNumberWaiting()).isEqualTo(1); |
| assertThat(rm.getWaitCount()).isEqualTo(0); |
| syncDynamicStandalone.await(1, TimeUnit.SECONDS); |
| } |
| |
| @Test |
| @SuppressWarnings("ThreadPriorityCheck") |
| public void testRelease_dynamicLifo() throws Exception { |
| assertThat(rm.inUse()).isFalse(); |
| |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| acquire(700, 0, 0); |
| sync.await(); |
| sync2.await(); |
| release(700, 0, 0); |
| }); |
| thread1.start(); |
| // Wait for thread1 to have acquired enough RAM to block the other threads. |
| sync.await(1, TimeUnit.SECONDS); |
| |
| // Set up threads that compete for resources |
| final CyclicBarrier syncDynamicStandalone1 = |
| startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE); |
| while (rm.getWaitCount() < 1) { |
| Thread.yield(); |
| } |
| final CyclicBarrier syncDynamicWorker1 = |
| startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER); |
| while (rm.getWaitCount() < 2) { |
| Thread.yield(); |
| } |
| final CyclicBarrier syncDynamicStandalone2 = |
| startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE); |
| while (rm.getWaitCount() < 3) { |
| Thread.yield(); |
| } |
| final CyclicBarrier syncDynamicWorker2 = |
| startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER); |
| while (rm.getWaitCount() < 4) { |
| Thread.yield(); |
| } |
| |
| // Wewease the kwaken! |
| sync2.await(); |
| |
| while (syncDynamicStandalone1.getNumberWaiting() |
| + syncDynamicStandalone2.getNumberWaiting() |
| + syncDynamicWorker1.getNumberWaiting() |
| + syncDynamicWorker2.getNumberWaiting() |
| == 0) { |
| Thread.yield(); |
| } |
| assertThat(rm.getWaitCount()).isEqualTo(3); |
| assertThat(syncDynamicWorker2.getNumberWaiting()).isEqualTo(1); |
| syncDynamicWorker2.await(1, TimeUnit.SECONDS); |
| |
| while (syncDynamicStandalone1.getNumberWaiting() |
| + syncDynamicStandalone2.getNumberWaiting() |
| + syncDynamicWorker1.getNumberWaiting() |
| == 0) { |
| Thread.yield(); |
| } |
| assertThat(rm.getWaitCount()).isEqualTo(2); |
| assertThat(syncDynamicWorker1.getNumberWaiting()).isEqualTo(1); |
| syncDynamicWorker1.await(1, TimeUnit.SECONDS); |
| |
| while (syncDynamicStandalone1.getNumberWaiting() + syncDynamicStandalone2.getNumberWaiting() |
| == 0) { |
| Thread.yield(); |
| } |
| assertThat(rm.getWaitCount()).isEqualTo(1); |
| assertThat(syncDynamicStandalone2.getNumberWaiting()).isEqualTo(1); |
| syncDynamicStandalone2.await(1, TimeUnit.SECONDS); |
| |
| while (syncDynamicStandalone1.getNumberWaiting() == 0) { |
| Thread.yield(); |
| } |
| assertThat(rm.getWaitCount()).isEqualTo(0); |
| assertThat(syncDynamicStandalone1.getNumberWaiting()).isEqualTo(1); |
| syncDynamicStandalone1.await(1, TimeUnit.SECONDS); |
| } |
| |
| private CyclicBarrier startAcquireReleaseThread(ResourcePriority priority) { |
| final CyclicBarrier sync = new CyclicBarrier(2); |
| TestThread thread = |
| new TestThread( |
| () -> { |
| acquire(700, 0, 0, priority); |
| sync.await(); |
| release(700, 0, 0); |
| }); |
| thread.start(); |
| return sync; |
| } |
| |
| @Test |
| public void testNonexistingResource() throws Exception { |
| // If we try to use nonexisting resource we should return an error |
| TestThread thread1 = |
| new TestThread( |
| () -> { |
| assertThrows( |
| NoSuchElementException.class, |
| () -> acquire(0, 0, ImmutableMap.of("nonexisting", 1.0f), 0)); |
| }); |
| thread1.start(); |
| thread1.joinAndAssertState(1000); |
| } |
| |
| @Test |
| public void testAcquireWithWorker_acquireAndRelease() throws Exception { |
| int memory = 100; |
| when(worker.getWorkerKey()).thenReturn(createWorkerKey("dummy")); |
| |
| assertThat(rm.inUse()).isFalse(); |
| ResourceHandle handle = acquire(memory, 1, 0, "dummy"); |
| assertThat(rm.inUse()).isTrue(); |
| |
| assertThat(handle.getWorker().getWorkerKey().getMnemonic()).isEqualTo("dummy"); |
| release(memory, 1, 0); |
| // When that RAM is released, |
| // Then Resource Manager will not be "in use": |
| assertThat(rm.inUse()).isFalse(); |
| } |
| |
| @Test |
| public void testReleaseWorker_highPriorityWorker() throws Exception { |
| |
| String slowMenmonic = "SLOW"; |
| String fastMenmonic = "FAST"; |
| |
| Worker slowWorker1 = mock(Worker.class); |
| Worker slowWorker2 = mock(Worker.class); |
| Worker fastWorker = mock(Worker.class); |
| |
| WorkerKey slowWorkerKey = createWorkerKey(slowMenmonic); |
| WorkerKey fastWorkerKey = createWorkerKey(fastMenmonic); |
| |
| when(slowWorker1.getWorkerKey()).thenReturn(slowWorkerKey); |
| when(slowWorker2.getWorkerKey()).thenReturn(slowWorkerKey); |
| when(fastWorker.getWorkerKey()).thenReturn(fastWorkerKey); |
| |
| CountDownLatch slowLatch = new CountDownLatch(2); |
| CountDownLatch fastLatch = new CountDownLatch(1); |
| |
| WorkerPool workerPool = |
| new WorkerPool( |
| new WorkerPool.WorkerPoolConfig( |
| new WorkerFactory(fs.getPath("/workerBase")) { |
| int numOfSlowWorkers = 0; |
| |
| @Override |
| public Worker create(WorkerKey key) { |
| assertThat(key.getMnemonic()).isAnyOf(slowMenmonic, fastMenmonic); |
| |
| if (key.getMnemonic().equals(fastMenmonic)) { |
| return fastWorker; |
| } |
| |
| assertThat(numOfSlowWorkers).isLessThan(2); |
| |
| if (numOfSlowWorkers == 0) { |
| numOfSlowWorkers++; |
| return slowWorker1; |
| } |
| |
| numOfSlowWorkers++; |
| return slowWorker2; |
| } |
| |
| @Override |
| public boolean validateObject(WorkerKey key, PooledObject<Worker> p) { |
| return true; |
| } |
| }, |
| ImmutableList.of(), |
| ImmutableList.of(), |
| /* highPriorityWorkers= */ ImmutableList.of(slowMenmonic))); |
| rm.setWorkerPool(workerPool); |
| |
| TestThread slowThread1 = |
| new TestThread( |
| () -> { |
| ResourceHandle handle = acquire(100, 0.1, 0, slowMenmonic); |
| slowLatch.countDown(); |
| fastLatch.await(); |
| // release resources |
| handle.close(); |
| }); |
| |
| TestThread slowThread2 = |
| new TestThread( |
| () -> { |
| ResourceHandle handle = acquire(100, 0.1, 0, slowMenmonic); |
| slowLatch.countDown(); |
| fastLatch.await(); |
| // release resources |
| handle.close(); |
| }); |
| |
| TestThread fastThread = |
| new TestThread( |
| () -> { |
| slowLatch.await(); |
| assertThat(isAvailable(rm, 100, 0.1, 0, createWorkerKey(fastMenmonic))).isFalse(); |
| fastLatch.countDown(); |
| ResourceHandle handle = acquire(100, 0.1, 0, fastMenmonic); |
| // release resources |
| handle.close(); |
| }); |
| |
| slowThread1.start(); |
| slowThread2.start(); |
| fastThread.start(); |
| |
| slowThread1.joinAndAssertState(Duration.ofSeconds(10).toMillis()); |
| slowThread2.joinAndAssertState(Duration.ofSeconds(10).toMillis()); |
| fastThread.joinAndAssertState(Duration.ofSeconds(10).toMillis()); |
| } |
| |
| synchronized boolean isAvailable(ResourceManager rm, double ram, double cpu, int localTestCount) { |
| return rm.areResourcesAvailable(ResourceSet.create(ram, cpu, localTestCount)); |
| } |
| |
| synchronized boolean isAvailable( |
| ResourceManager rm, double ram, double cpu, int localTestCount, WorkerKey workerKey) { |
| return rm.areResourcesAvailable( |
| ResourceSet.createWithWorkerKey(ram, cpu, localTestCount, workerKey)); |
| } |
| |
| private static class ResourceOwnerStub implements ActionExecutionMetadata { |
| |
| @Override |
| @Nullable |
| public String getProgressMessage() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public ActionOwner getOwner() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public boolean isShareable() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public String prettyPrint() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public String getMnemonic() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public boolean inputsDiscovered() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public boolean discoversInputs() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public NestedSet<Artifact> getTools() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public NestedSet<Artifact> getInputs() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public Collection<String> getClientEnvironmentVariables() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public RunfilesSupplier getRunfilesSupplier() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public ImmutableSet<Artifact> getOutputs() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public Artifact getPrimaryInput() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public Artifact getPrimaryOutput() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public NestedSet<Artifact> getMandatoryInputs() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public NestedSet<Artifact> getInputFilesForExtraAction( |
| ActionExecutionContext actionExecutionContext) { |
| return NestedSetBuilder.emptySet(Order.STABLE_ORDER); |
| } |
| |
| @Override |
| public String getKey( |
| ActionKeyContext actionKeyContext, @Nullable Artifact.ArtifactExpander artifactExpander) { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| @Nullable |
| public String describeKey() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public String describe() { |
| return "ResourceOwnerStubAction"; |
| } |
| |
| @Override |
| public ImmutableSet<Artifact> getMandatoryOutputs() { |
| return ImmutableSet.of(); |
| } |
| |
| @Override |
| public boolean shouldReportPathPrefixConflict(ActionAnalysisMetadata action) { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public MiddlemanType getActionType() { |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public ImmutableMap<String, String> getExecProperties() { |
| throw new IllegalStateException(); |
| } |
| |
| @Nullable |
| @Override |
| public PlatformInfo getExecutionPlatform() { |
| throw new IllegalStateException(); |
| } |
| } |
| } |