blob: 56a3cef2a4b51ea00d4af7af85db4080d2e4e61a [file] [log] [blame]
// Copyright 2015 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.actions;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.hash.HashCode;
import com.google.devtools.build.lib.actions.ExecutionRequirements.WorkerProtocolFormat;
import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority;
import com.google.devtools.build.lib.analysis.platform.PlatformInfo;
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.Order;
import com.google.devtools.build.lib.testutil.TestThread;
import com.google.devtools.build.lib.testutil.TestUtils;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.build.lib.worker.Worker;
import com.google.devtools.build.lib.worker.WorkerFactory;
import com.google.devtools.build.lib.worker.WorkerKey;
import com.google.devtools.build.lib.worker.WorkerPool;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.commons.pool2.PooledObject;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link ResourceManager}. */
@RunWith(JUnit4.class)
public final class ResourceManagerTest {
private final FileSystem fs = new InMemoryFileSystem(DigestHashFunction.SHA256);
private final ActionExecutionMetadata resourceOwner = new ResourceOwnerStub();
private final ResourceManager rm = ResourceManager.instanceForTestingOnly();
private Worker worker;
private AtomicInteger counter;
CyclicBarrier sync;
CyclicBarrier sync2;
@Before
public void configureResourceManager() throws Exception {
rm.setAvailableResources(
ResourceSet.create(
/* memoryMb= */ 1000,
/* cpuUsage= */ 1,
/* extraResourceUsage= */ ImmutableMap.of(
"gpu", 2.0f,
"fancyresource", 1.5f),
/* localTestCount= */ 2));
counter = new AtomicInteger(0);
sync = new CyclicBarrier(2);
sync2 = new CyclicBarrier(2);
rm.resetResourceUsage();
rm.setPrioritizeLocalActions(true);
worker = mock(Worker.class);
rm.setWorkerPool(createWorkerPool());
}
private WorkerPool createWorkerPool() {
return new WorkerPool(
new WorkerPool.WorkerPoolConfig(
new WorkerFactory(fs.getPath("/workerBase")) {
@Override
public Worker create(WorkerKey key) {
return worker;
}
@Override
public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
return true;
}
},
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of()));
}
private ResourceHandle acquire(double ram, double cpu, int tests, ResourcePriority priority)
throws InterruptedException, IOException {
return rm.acquireResources(resourceOwner, ResourceSet.create(ram, cpu, tests), priority);
}
private ResourceHandle acquire(double ram, double cpu, int tests)
throws InterruptedException, IOException {
return acquire(ram, cpu, tests, ResourcePriority.LOCAL);
}
private ResourceHandle acquire(double ram, double cpu, int tests, String mnemonic)
throws InterruptedException, IOException {
return rm.acquireResources(
resourceOwner,
ResourceSet.createWithWorkerKey(ram, cpu, tests, createWorkerKey(mnemonic)),
ResourcePriority.LOCAL);
}
@CanIgnoreReturnValue
private ResourceHandle acquire(
double ram,
double cpu,
ImmutableMap<String, Float> extraResources,
int tests,
ResourcePriority priority)
throws InterruptedException, IOException, NoSuchElementException {
return rm.acquireResources(
resourceOwner, ResourceSet.create(ram, cpu, extraResources, tests), priority);
}
@CanIgnoreReturnValue
private ResourceHandle acquire(
double ram, double cpu, ImmutableMap<String, Float> extraResources, int tests)
throws InterruptedException, IOException, NoSuchElementException {
return acquire(ram, cpu, extraResources, tests, ResourcePriority.LOCAL);
}
private void release(double ram, double cpu, int tests) throws IOException, InterruptedException {
rm.releaseResources(resourceOwner, ResourceSet.create(ram, cpu, tests), /* worker= */ null);
}
private void release(
double ram, double cpu, ImmutableMap<String, Float> extraResources, int tests)
throws InterruptedException, IOException {
rm.releaseResources(
resourceOwner, ResourceSet.create(ram, cpu, extraResources, tests), /* worker= */ null);
}
private void validate(int count) {
assertThat(counter.incrementAndGet()).isEqualTo(count);
}
private WorkerKey createWorkerKey(String mnemonic) {
return new WorkerKey(
/* args= */ ImmutableList.of(),
/* env= */ ImmutableMap.of(),
/* execRoot= */ fs.getPath("/outputbase/execroot/workspace"),
/* mnemonic= */ mnemonic,
/* workerFilesCombinedHash= */ HashCode.fromInt(0),
/* workerFilesWithDigests= */ ImmutableSortedMap.of(),
/* sandboxed= */ false,
/* multiplex= */ false,
/* cancellable= */ false,
WorkerProtocolFormat.PROTO);
}
@Test
public void testOverBudgetRequests() throws Exception {
assertThat(rm.inUse()).isFalse();
// When nothing is consuming RAM,
// Then Resource Manager will successfully acquire an over-budget request for RAM:
double bigRam = 10000.0;
acquire(bigRam, 0, 0);
// When RAM is consumed,
// Then Resource Manager will be "in use":
assertThat(rm.inUse()).isTrue();
release(bigRam, 0, 0);
// When that RAM is released,
// Then Resource Manager will not be "in use":
assertThat(rm.inUse()).isFalse();
// Ditto, for CPU:
double bigCpu = 10.0;
acquire(0, bigCpu, 0);
assertThat(rm.inUse()).isTrue();
release(0, bigCpu, 0);
assertThat(rm.inUse()).isFalse();
// Ditto, for tests:
int bigTests = 10;
acquire(0, 0, bigTests);
assertThat(rm.inUse()).isTrue();
release(0, 0, bigTests);
assertThat(rm.inUse()).isFalse();
// Ditto, for extra resources:
ImmutableMap<String, Float> bigExtraResources =
ImmutableMap.of("gpu", 10.0f, "fancyresource", 10.0f);
acquire(0, 0, bigExtraResources, 0);
assertThat(rm.inUse()).isTrue();
release(0, 0, bigExtraResources, 0);
assertThat(rm.inUse()).isFalse();
}
@Test
public void testThatCpuCanBeOverallocated() throws Exception {
assertThat(rm.inUse()).isFalse();
// Given CPU is partially acquired:
acquire(0, 0.5, 0);
// When a request for CPU is made that would slightly overallocate CPU,
// Then the request succeeds:
TestThread thread1 = new TestThread(() -> assertThat(acquire(0, 0.6, 0)).isNotNull());
thread1.start();
thread1.joinAndAssertState(10000);
}
@Test
public void testThatCpuAllocationIsNoncommutative() throws Exception {
assertThat(rm.inUse()).isFalse();
// Given that CPU has a small initial allocation:
acquire(0, 0.099, 0);
// When a request for a large CPU allocation is made,
// Then the request succeeds:
TestThread thread1 =
new TestThread(
() -> {
acquire(0, 0.99, 0);
// Cleanup
release(0, 0.99, 0);
});
thread1.start();
thread1.joinAndAssertState(10000);
// Cleanup
release(0, 0.099, 0);
assertThat(rm.inUse()).isFalse();
// Given that CPU has a large initial allocation:
acquire(0, 0.99, 0);
// When a request for a small CPU allocation is made,
// Then the request fails:
TestThread thread2 = new TestThread(() -> acquire(0, 0.099, 0));
thread2.start();
AssertionError e = assertThrows(AssertionError.class, () -> thread2.joinAndAssertState(1000));
assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive");
// Note that this behavior is surprising and probably not intended.
}
@Test
public void testThatRamCannotBeOverallocated() throws Exception {
assertThat(rm.inUse()).isFalse();
// Given RAM is partially acquired:
acquire(500, 0, 0);
// When a request for RAM is made that would slightly overallocate RAM,
// Then the request fails (got timeout):
TestThread thread1 = new TestThread(() -> acquire(600, 0, 0));
thread1.start();
AssertionError e = assertThrows(AssertionError.class, () -> thread1.joinAndAssertState(1000));
assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive");
}
@Test
public void testThatTestsCannotBeOverallocated() throws Exception {
assertThat(rm.inUse()).isFalse();
// Given test count is partially acquired:
acquire(0, 0, 1);
// When a request for tests is made that would slightly overallocate tests,
// Then the request fails:
TestThread thread1 = new TestThread(() -> acquire(0, 0, 2));
thread1.start();
AssertionError e = assertThrows(AssertionError.class, () -> thread1.joinAndAssertState(1000));
assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive");
}
@Test
public void testThatExtraResourcesCannotBeOverallocated() throws Exception {
assertThat(rm.inUse()).isFalse();
// Given a partially acquired extra resources:
acquire(0, 0, ImmutableMap.of("gpu", 1.0f), 1);
// When a request for extra resources is made that would overallocate,
// Then the request fails:
TestThread thread1 = new TestThread(() -> acquire(0, 0, ImmutableMap.of("gpu", 1.1f), 0));
thread1.start();
AssertionError e = assertThrows(AssertionError.class, () -> thread1.joinAndAssertState(1000));
assertThat(e).hasCauseThat().hasMessageThat().contains("is still alive");
}
@Test
public void testHasResources() throws Exception {
assertThat(rm.inUse()).isFalse();
assertThat(rm.threadHasResources()).isFalse();
acquire(1, 0.1, ImmutableMap.of("gpu", 1.0f), 1);
assertThat(rm.threadHasResources()).isTrue();
// We have resources in this thread - make sure other threads
// are not affected.
TestThread thread1 =
new TestThread(
() -> {
assertThat(rm.threadHasResources()).isFalse();
acquire(1, 0, 0);
assertThat(rm.threadHasResources()).isTrue();
release(1, 0, 0);
assertThat(rm.threadHasResources()).isFalse();
acquire(0, 0.1, 0);
assertThat(rm.threadHasResources()).isTrue();
release(0, 0.1, 0);
assertThat(rm.threadHasResources()).isFalse();
acquire(0, 0, 1);
assertThat(rm.threadHasResources()).isTrue();
release(0, 0, 1);
assertThat(rm.threadHasResources()).isFalse();
acquire(0, 0, ImmutableMap.of("gpu", 1.0f), 0);
assertThat(rm.threadHasResources()).isTrue();
release(0, 0, ImmutableMap.of("gpu", 1.0f), 0);
assertThat(rm.threadHasResources()).isFalse();
});
thread1.start();
thread1.joinAndAssertState(10000);
release(1, 0.1, ImmutableMap.of("gpu", 1.0f), 1);
assertThat(rm.threadHasResources()).isFalse();
assertThat(rm.inUse()).isFalse();
}
@Test
@SuppressWarnings("ThreadPriorityCheck")
public void testConcurrentLargeRequests() throws Exception {
assertThat(rm.inUse()).isFalse();
TestThread thread1 =
new TestThread(
() -> {
acquire(2000, 2, 0);
sync.await();
validate(1);
sync.await();
// Wait till other thread will be locked.
while (rm.getWaitCount() == 0) {
Thread.yield();
}
release(2000, 2, 0);
assertThat(rm.getWaitCount()).isEqualTo(0);
acquire(2000, 2, 0); // Will be blocked by the thread2.
validate(3);
release(2000, 2, 0);
});
TestThread thread2 =
new TestThread(
() -> {
sync2.await();
assertThat(isAvailable(rm, 2000, 2, 0)).isFalse();
acquire(2000, 2, 0); // Will be blocked by the thread1.
validate(2);
sync2.await();
// Wait till other thread will be locked.
while (rm.getWaitCount() == 0) {
Thread.yield();
}
release(2000, 2, 0);
});
thread1.start();
thread2.start();
sync.await(1, TimeUnit.SECONDS);
assertThat(rm.inUse()).isTrue();
assertThat(rm.getWaitCount()).isEqualTo(0);
sync2.await(1, TimeUnit.SECONDS);
sync.await(1, TimeUnit.SECONDS);
sync2.await(1, TimeUnit.SECONDS);
thread1.joinAndAssertState(1000);
thread2.joinAndAssertState(1000);
assertThat(rm.inUse()).isFalse();
}
@Test
public void testInterruptedAcquisitionClearsResources() throws Exception {
assertThat(rm.inUse()).isFalse();
// Acquire a small amount of resources so that future requests can block (the initial request
// always succeeds even if it's for too much).
TestThread smallThread = new TestThread(() -> acquire(1, 0, 0));
smallThread.start();
smallThread.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
TestThread thread1 =
new TestThread(
() -> {
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, () -> acquire(1999, 0, 0));
});
thread1.start();
thread1.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
// This should process the queue. If the request from above is still present, it will take all
// the available memory. But it shouldn't.
rm.setAvailableResources(
ResourceSet.create(/*memoryMb=*/ 2000, /*cpuUsage=*/ 1, /* localTestCount= */ 2));
TestThread thread2 =
new TestThread(
() -> {
acquire(1999, 0, 0);
release(1999, 0, 0);
});
thread2.start();
thread2.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
}
@Test
@SuppressWarnings("ThreadPriorityCheck")
public void testOutOfOrderAllocation() throws Exception {
final CyclicBarrier sync3 = new CyclicBarrier(2);
final CyclicBarrier sync4 = new CyclicBarrier(2);
assertThat(rm.inUse()).isFalse();
TestThread thread1 =
new TestThread(
() -> {
sync.await();
acquire(900, 0.5, 0); // Will be blocked by the main thread.
validate(5);
release(900, 0.5, 0);
sync.await();
});
TestThread thread2 =
new TestThread(
() -> {
// Wait till other thread will be locked
while (rm.getWaitCount() == 0) {
Thread.yield();
}
acquire(100, 0.1, 0);
validate(2);
release(100, 0.1, 0);
sync2.await();
acquire(200, 0.5, 0);
validate(4);
sync2.await();
release(200, 0.5, 0);
});
TestThread thread3 =
new TestThread(
() -> {
acquire(100, 0.4, 0);
sync3.await();
sync3.await();
release(100, 0.4, 0);
});
TestThread thread4 =
new TestThread(
() -> {
acquire(750, 0.3, 0);
sync4.await();
sync4.await();
release(750, 0.3, 0);
});
// Lock 900 MB, 0.9 CPU in total (spread over three threads so that we can individually release
// parts of it).
acquire(50, 0.2, 0);
thread3.start();
thread4.start();
sync3.await(1, TimeUnit.SECONDS);
sync4.await(1, TimeUnit.SECONDS);
validate(1);
// Start thread1, which will try to acquire 900 MB, 0.5 CPU, but can't, so it has to wait.
thread1.start();
sync.await(1, TimeUnit.SECONDS);
// Start thread2, which will successfully acquire and release 100 MB, 0.1 CPU.
thread2.start();
// Signal thread2 to acquire 200 MB and 0.5 CPU, which will block.
sync2.await(1, TimeUnit.SECONDS);
// Waiting till both threads are locked.
while (rm.getWaitCount() < 2) {
Thread.yield();
}
validate(3); // Thread1 is now first in the queue and Thread2 is second.
// Release 100 MB, 0.4 CPU. This allows Thread2 to continue out of order.
sync3.await(1, TimeUnit.SECONDS);
sync2.await(1, TimeUnit.SECONDS);
// Release 750 MB, 0.3 CPU. At this point thread1 will finally acquire resources.
sync4.await(1, TimeUnit.SECONDS);
sync.await(1, TimeUnit.SECONDS);
// Release all remaining resources.
release(50, 0.2, 0);
thread1.join();
thread2.join();
thread3.join();
thread4.join();
assertThat(rm.inUse()).isFalse();
}
@Test
@SuppressWarnings("ThreadPriorityCheck")
public void testRelease_noPriority() throws Exception {
rm.setPrioritizeLocalActions(false);
assertThat(rm.inUse()).isFalse();
TestThread thread1 =
new TestThread(
() -> {
acquire(700, 0, 0);
sync.await();
sync2.await();
release(700, 0, 0);
});
thread1.start();
// Wait for thread1 to have acquired its RAM
sync.await(1, TimeUnit.SECONDS);
// Set up threads that compete for resources
CyclicBarrier syncDynamicStandalone =
startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE);
while (rm.getWaitCount() < 1) {
Thread.yield();
}
CyclicBarrier syncDynamicWorker = startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER);
while (rm.getWaitCount() < 2) {
Thread.yield();
}
CyclicBarrier syncLocal = startAcquireReleaseThread(ResourcePriority.LOCAL);
while (rm.getWaitCount() < 3) {
Thread.yield();
}
sync2.await();
while (syncLocal.getNumberWaiting()
+ syncDynamicWorker.getNumberWaiting()
+ syncDynamicStandalone.getNumberWaiting()
== 0) {
Thread.yield();
}
assertThat(rm.getWaitCount()).isEqualTo(2);
assertThat(syncDynamicStandalone.getNumberWaiting()).isEqualTo(1);
syncDynamicStandalone.await(1, TimeUnit.SECONDS);
while (syncDynamicWorker.getNumberWaiting() + syncLocal.getNumberWaiting() == 0) {
Thread.yield();
}
assertThat(syncDynamicWorker.getNumberWaiting()).isEqualTo(1);
assertThat(rm.getWaitCount()).isEqualTo(1);
syncDynamicWorker.await(1, TimeUnit.SECONDS);
while (syncLocal.getNumberWaiting() == 0) {
Thread.yield();
}
assertThat(syncLocal.getNumberWaiting()).isEqualTo(1);
assertThat(rm.getWaitCount()).isEqualTo(0);
syncLocal.await(1, TimeUnit.SECONDS);
}
@Test
@SuppressWarnings("ThreadPriorityCheck")
public void testRelease_highPriorityFirst() throws Exception {
assertThat(rm.inUse()).isFalse();
TestThread thread1 =
new TestThread(
() -> {
acquire(700, 0, 0);
sync.await();
sync2.await();
release(700, 0, 0);
});
thread1.start();
// Wait for thread1 to have acquired its RAM
sync.await(1, TimeUnit.SECONDS);
// Set up threads that compete for resources
CyclicBarrier syncDynamicStandalone =
startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE);
while (rm.getWaitCount() < 1) {
Thread.yield();
}
CyclicBarrier syncDynamicWorker = startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER);
while (rm.getWaitCount() < 2) {
Thread.yield();
}
CyclicBarrier syncLocal = startAcquireReleaseThread(ResourcePriority.LOCAL);
while (rm.getWaitCount() < 3) {
Thread.yield();
}
sync2.await();
while (syncLocal.getNumberWaiting()
+ syncDynamicWorker.getNumberWaiting()
+ syncDynamicStandalone.getNumberWaiting()
== 0) {
Thread.yield();
}
assertThat(rm.getWaitCount()).isEqualTo(2);
assertThat(syncLocal.getNumberWaiting()).isEqualTo(1);
syncLocal.await(1, TimeUnit.SECONDS);
while (syncDynamicWorker.getNumberWaiting() + syncDynamicStandalone.getNumberWaiting() == 0) {
Thread.yield();
}
assertThat(syncDynamicWorker.getNumberWaiting()).isEqualTo(1);
assertThat(rm.getWaitCount()).isEqualTo(1);
syncDynamicWorker.await(1, TimeUnit.SECONDS);
while (syncDynamicStandalone.getNumberWaiting() == 0) {
Thread.yield();
}
assertThat(syncDynamicStandalone.getNumberWaiting()).isEqualTo(1);
assertThat(rm.getWaitCount()).isEqualTo(0);
syncDynamicStandalone.await(1, TimeUnit.SECONDS);
}
@Test
@SuppressWarnings("ThreadPriorityCheck")
public void testRelease_dynamicLifo() throws Exception {
assertThat(rm.inUse()).isFalse();
TestThread thread1 =
new TestThread(
() -> {
acquire(700, 0, 0);
sync.await();
sync2.await();
release(700, 0, 0);
});
thread1.start();
// Wait for thread1 to have acquired enough RAM to block the other threads.
sync.await(1, TimeUnit.SECONDS);
// Set up threads that compete for resources
final CyclicBarrier syncDynamicStandalone1 =
startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE);
while (rm.getWaitCount() < 1) {
Thread.yield();
}
final CyclicBarrier syncDynamicWorker1 =
startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER);
while (rm.getWaitCount() < 2) {
Thread.yield();
}
final CyclicBarrier syncDynamicStandalone2 =
startAcquireReleaseThread(ResourcePriority.DYNAMIC_STANDALONE);
while (rm.getWaitCount() < 3) {
Thread.yield();
}
final CyclicBarrier syncDynamicWorker2 =
startAcquireReleaseThread(ResourcePriority.DYNAMIC_WORKER);
while (rm.getWaitCount() < 4) {
Thread.yield();
}
// Wewease the kwaken!
sync2.await();
while (syncDynamicStandalone1.getNumberWaiting()
+ syncDynamicStandalone2.getNumberWaiting()
+ syncDynamicWorker1.getNumberWaiting()
+ syncDynamicWorker2.getNumberWaiting()
== 0) {
Thread.yield();
}
assertThat(rm.getWaitCount()).isEqualTo(3);
assertThat(syncDynamicWorker2.getNumberWaiting()).isEqualTo(1);
syncDynamicWorker2.await(1, TimeUnit.SECONDS);
while (syncDynamicStandalone1.getNumberWaiting()
+ syncDynamicStandalone2.getNumberWaiting()
+ syncDynamicWorker1.getNumberWaiting()
== 0) {
Thread.yield();
}
assertThat(rm.getWaitCount()).isEqualTo(2);
assertThat(syncDynamicWorker1.getNumberWaiting()).isEqualTo(1);
syncDynamicWorker1.await(1, TimeUnit.SECONDS);
while (syncDynamicStandalone1.getNumberWaiting() + syncDynamicStandalone2.getNumberWaiting()
== 0) {
Thread.yield();
}
assertThat(rm.getWaitCount()).isEqualTo(1);
assertThat(syncDynamicStandalone2.getNumberWaiting()).isEqualTo(1);
syncDynamicStandalone2.await(1, TimeUnit.SECONDS);
while (syncDynamicStandalone1.getNumberWaiting() == 0) {
Thread.yield();
}
assertThat(rm.getWaitCount()).isEqualTo(0);
assertThat(syncDynamicStandalone1.getNumberWaiting()).isEqualTo(1);
syncDynamicStandalone1.await(1, TimeUnit.SECONDS);
}
private CyclicBarrier startAcquireReleaseThread(ResourcePriority priority) {
final CyclicBarrier sync = new CyclicBarrier(2);
TestThread thread =
new TestThread(
() -> {
acquire(700, 0, 0, priority);
sync.await();
release(700, 0, 0);
});
thread.start();
return sync;
}
@Test
public void testNonexistingResource() throws Exception {
// If we try to use nonexisting resource we should return an error
TestThread thread1 =
new TestThread(
() -> {
assertThrows(
NoSuchElementException.class,
() -> acquire(0, 0, ImmutableMap.of("nonexisting", 1.0f), 0));
});
thread1.start();
thread1.joinAndAssertState(1000);
}
@Test
public void testAcquireWithWorker_acquireAndRelease() throws Exception {
int memory = 100;
when(worker.getWorkerKey()).thenReturn(createWorkerKey("dummy"));
assertThat(rm.inUse()).isFalse();
ResourceHandle handle = acquire(memory, 1, 0, "dummy");
assertThat(rm.inUse()).isTrue();
assertThat(handle.getWorker().getWorkerKey().getMnemonic()).isEqualTo("dummy");
release(memory, 1, 0);
// When that RAM is released,
// Then Resource Manager will not be "in use":
assertThat(rm.inUse()).isFalse();
}
@Test
public void testReleaseWorker_highPriorityWorker() throws Exception {
String slowMenmonic = "SLOW";
String fastMenmonic = "FAST";
Worker slowWorker1 = mock(Worker.class);
Worker slowWorker2 = mock(Worker.class);
Worker fastWorker = mock(Worker.class);
WorkerKey slowWorkerKey = createWorkerKey(slowMenmonic);
WorkerKey fastWorkerKey = createWorkerKey(fastMenmonic);
when(slowWorker1.getWorkerKey()).thenReturn(slowWorkerKey);
when(slowWorker2.getWorkerKey()).thenReturn(slowWorkerKey);
when(fastWorker.getWorkerKey()).thenReturn(fastWorkerKey);
CountDownLatch slowLatch = new CountDownLatch(2);
CountDownLatch fastLatch = new CountDownLatch(1);
WorkerPool workerPool =
new WorkerPool(
new WorkerPool.WorkerPoolConfig(
new WorkerFactory(fs.getPath("/workerBase")) {
int numOfSlowWorkers = 0;
@Override
public Worker create(WorkerKey key) {
assertThat(key.getMnemonic()).isAnyOf(slowMenmonic, fastMenmonic);
if (key.getMnemonic().equals(fastMenmonic)) {
return fastWorker;
}
assertThat(numOfSlowWorkers).isLessThan(2);
if (numOfSlowWorkers == 0) {
numOfSlowWorkers++;
return slowWorker1;
}
numOfSlowWorkers++;
return slowWorker2;
}
@Override
public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
return true;
}
},
ImmutableList.of(),
ImmutableList.of(),
/* highPriorityWorkers= */ ImmutableList.of(slowMenmonic)));
rm.setWorkerPool(workerPool);
TestThread slowThread1 =
new TestThread(
() -> {
ResourceHandle handle = acquire(100, 0.1, 0, slowMenmonic);
slowLatch.countDown();
fastLatch.await();
// release resources
handle.close();
});
TestThread slowThread2 =
new TestThread(
() -> {
ResourceHandle handle = acquire(100, 0.1, 0, slowMenmonic);
slowLatch.countDown();
fastLatch.await();
// release resources
handle.close();
});
TestThread fastThread =
new TestThread(
() -> {
slowLatch.await();
assertThat(isAvailable(rm, 100, 0.1, 0, createWorkerKey(fastMenmonic))).isFalse();
fastLatch.countDown();
ResourceHandle handle = acquire(100, 0.1, 0, fastMenmonic);
// release resources
handle.close();
});
slowThread1.start();
slowThread2.start();
fastThread.start();
slowThread1.joinAndAssertState(Duration.ofSeconds(10).toMillis());
slowThread2.joinAndAssertState(Duration.ofSeconds(10).toMillis());
fastThread.joinAndAssertState(Duration.ofSeconds(10).toMillis());
}
synchronized boolean isAvailable(ResourceManager rm, double ram, double cpu, int localTestCount) {
return rm.areResourcesAvailable(ResourceSet.create(ram, cpu, localTestCount));
}
synchronized boolean isAvailable(
ResourceManager rm, double ram, double cpu, int localTestCount, WorkerKey workerKey) {
return rm.areResourcesAvailable(
ResourceSet.createWithWorkerKey(ram, cpu, localTestCount, workerKey));
}
private static class ResourceOwnerStub implements ActionExecutionMetadata {
@Override
@Nullable
public String getProgressMessage() {
throw new IllegalStateException();
}
@Override
public ActionOwner getOwner() {
throw new IllegalStateException();
}
@Override
public boolean isShareable() {
throw new IllegalStateException();
}
@Override
public String prettyPrint() {
throw new IllegalStateException();
}
@Override
public String getMnemonic() {
throw new IllegalStateException();
}
@Override
public boolean inputsDiscovered() {
throw new IllegalStateException();
}
@Override
public boolean discoversInputs() {
throw new IllegalStateException();
}
@Override
public NestedSet<Artifact> getTools() {
throw new IllegalStateException();
}
@Override
public NestedSet<Artifact> getInputs() {
throw new IllegalStateException();
}
@Override
public Collection<String> getClientEnvironmentVariables() {
throw new IllegalStateException();
}
@Override
public RunfilesSupplier getRunfilesSupplier() {
throw new IllegalStateException();
}
@Override
public ImmutableSet<Artifact> getOutputs() {
throw new IllegalStateException();
}
@Override
public Artifact getPrimaryInput() {
throw new IllegalStateException();
}
@Override
public Artifact getPrimaryOutput() {
throw new IllegalStateException();
}
@Override
public NestedSet<Artifact> getMandatoryInputs() {
throw new IllegalStateException();
}
@Override
public NestedSet<Artifact> getInputFilesForExtraAction(
ActionExecutionContext actionExecutionContext) {
return NestedSetBuilder.emptySet(Order.STABLE_ORDER);
}
@Override
public String getKey(
ActionKeyContext actionKeyContext, @Nullable Artifact.ArtifactExpander artifactExpander) {
throw new IllegalStateException();
}
@Override
@Nullable
public String describeKey() {
throw new IllegalStateException();
}
@Override
public String describe() {
return "ResourceOwnerStubAction";
}
@Override
public ImmutableSet<Artifact> getMandatoryOutputs() {
return ImmutableSet.of();
}
@Override
public boolean shouldReportPathPrefixConflict(ActionAnalysisMetadata action) {
throw new IllegalStateException();
}
@Override
public MiddlemanType getActionType() {
throw new IllegalStateException();
}
@Override
public ImmutableMap<String, String> getExecProperties() {
throw new IllegalStateException();
}
@Nullable
@Override
public PlatformInfo getExecutionPlatform() {
throw new IllegalStateException();
}
}
}