Make the checking of available resources of a WorkerKeyPool atomic (less worry for race conditions) and re-order logic in WorkerKeyPool#invalidateWorker to make it more understandable.
PiperOrigin-RevId: 627316722
Change-Id: I458d275f692292d04ae6c1ebc14189d3d3ebaf5e
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
index 9b544a3..b164bb6 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
+++ b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
@@ -626,12 +626,8 @@
// by the release() method.
WorkerKey workerKey = resources.getWorkerKey();
- if (workerKey != null) {
- int availableWorkers = this.workerPool.getMaxTotalPerKey(workerKey);
- int activeWorkers = this.workerPool.getNumActive(workerKey);
- if (activeWorkers >= availableWorkers) {
- return false;
- }
+ if (workerKey != null && !this.workerPool.hasAvailableQuota(workerKey)) {
+ return false;
}
// We test for tracking of extra resources whenever acquired and throw an
diff --git a/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java
index cc16ee9..a5cb04d 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java
@@ -121,6 +121,10 @@
return getMaxTotalPerKey() - shrunkBy.getOrDefault(key, 0);
}
+ public synchronized boolean hasAvailableQuota(WorkerKey key) {
+ return getMaxTotalPerKey(key) - getNumActive(key) > 0;
+ }
+
private synchronized void updateShrunkBy(WorkerKey workerKey, int workerId) {
int currentValue = shrunkBy.getOrDefault(workerKey, 0);
if (getMaxTotalPerKey() - currentValue > 1) {
@@ -135,6 +139,7 @@
shrunkBy = new HashMap<>();
}
+
/**
* Our own configuration class for the {@code SimpleWorkerPool} that correctly implements {@code
* equals()} and {@code hashCode()}.
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
index ceaffb8..e4ea0a3 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
@@ -43,6 +43,17 @@
int getNumActive(WorkerKey key);
/**
+ * Returns whether there is quota available to create or use an existing worker.
+ *
+ * <p>It is essentially #getMaxTotalPerKey() - #getNumActive() > 0, but meant to be handled
+ * atomically to prevent internal race conditions.
+ *
+ * @param key the worker key.
+ * @return whether there is quota available to either get an existing or create a new worker.
+ */
+ boolean hasAvailableQuota(WorkerKey key);
+
+ /**
* Evicts specified workers from the pool, destroying them.
*
* <p>It is possible that not all specified workers get evicted if they become active.
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java
index a08ef53..02ce010 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java
@@ -14,7 +14,6 @@
package com.google.devtools.build.lib.worker;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import com.google.common.collect.ImmutableMap;
@@ -95,7 +94,7 @@
@Override
public int getMaxTotalPerKey(WorkerKey key) {
- return getPool(key).getAvailableQuota();
+ return getPool(key).getEffectiveMax();
}
@Override
@@ -104,6 +103,11 @@
}
@Override
+ public boolean hasAvailableQuota(WorkerKey key) {
+ return getPool(key).hasAvailableQuota();
+ }
+
+ @Override
public ImmutableSet<Integer> evictWorkers(ImmutableSet<Integer> workerIdsToEvict)
throws InterruptedException {
// TODO: Without having the Worker objects themselves, we can't directly pass the worker to the
@@ -178,30 +182,50 @@
/**
* Actual pool implementation that handles the borrowing, returning and invalidation of workers of
* a single worker key.
+ *
+ * <p>The following describes how the key features of the pool and how they work in tandem with
+ * each other:
+ *
+ * <ul>
+ * <li>Borrowing a worker: If quota is available, the pool returns an already existing idle
+ * worker or creates a new worker. If quota is not available, it creates a {@code
+ * PendingWorkerRequest} in the waiting queue and waits on it.
+ * <li>Returning worker: If there are pending requests in the waiting queue, directly hand the
+ * worker over to that request, signalling to the waiting thread to proceed. Otherwise,
+ * returns the worker back to the pool.
+ * <li>Invalidating worker: Destroys this worker and removes it from the pool. The pool is
+ * optionally shrunk, which reduces the maximum number of workers that can be in the pool
+ * (to a minimum of 1). If the pool is not shrunk, the destruction of this worker represents
+ * a freeing up of quota, in this case it signals for any pending request to continue and
+ * effectively taking over this quota.
+ * </ul>
*/
private class WorkerKeyPool {
private final WorkerKey key;
private final int max;
- // We maintain this as a separate counter from the activeSet so that we can create workers
- // without locking the pool, while maintaining atomicity on
+ // The number of workers in use.
private final AtomicInteger acquired = new AtomicInteger(0);
+ // The number by which the overall quota is shrunk by.
private final AtomicInteger shrunk = new AtomicInteger(0);
- private final BlockingDeque<Worker> idleQueue = new LinkedBlockingDeque<>();
+ private final BlockingDeque<Worker> idleWorkers = new LinkedBlockingDeque<>();
/**
- * The waiting queue is meant to block borrowers when there are no workers available. With
- * workers as a resource, workers are only borrowed when they get they are available, so this
+ * The waiting queue is meant to provide fairness in borrowing from the pool (first come first
+ * serve), any freeing up of quota (either through returning or invalidating a worker) will
+ * service requests this queue first.
+ *
+ * <p>With workers as a resource, workers are only borrowed when they are available, so this
* doesn't get used, i.e. there shouldn't be any borrowers waiting here, where the {@code
* ResourceManager} handles the proper synchronization to ensure that workers are borrowed
* together with its allocated resources.
*
- * <p>Regardless, this implementation is still included to ensure correctness such that that
- * multiple threads can still borrow concurrently, without needing to check how many workers are
- * actually available (blocking if unavailable).
+ * <p>Regardless, this implementation is still included to ensure correctness such that multiple
+ * threads can still borrow concurrently, without needing to check how many workers are actually
+ * available (blocking if unavailable).
*/
- private final BlockingDeque<WorkerLatch> waitingQueue = new LinkedBlockingDeque<>();
+ private final BlockingDeque<PendingWorkerRequest> waitingQueue = new LinkedBlockingDeque<>();
private final Set<Worker> activeSet = new HashSet<>();
@@ -212,14 +236,14 @@
private synchronized Set<Integer> evictWorkers(Set<Integer> workerIdsToEvict) {
Set<Integer> evictedWorkerIds = new HashSet<>();
- for (Worker worker : idleQueue) {
+ for (Worker worker : idleWorkers) {
if (workerIdsToEvict.contains(worker.getWorkerId())) {
evictedWorkerIds.add(worker.getWorkerId());
worker.getStatus().maybeUpdateStatus(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
// Currently when evicting idle workers, we do not shrink the pool. The pool is only
// shrunk when we have to postpone invalidation of the worker.
invalidateWorker(worker, /* shouldShrinkPool= */ false);
- idleQueue.remove(worker);
+ idleWorkers.remove(worker);
logger.atInfo().log(
"Evicted %s worker (id %d, key hash %d).",
worker.getWorkerKey().getMnemonic(),
@@ -237,20 +261,26 @@
return acquired.get();
}
- private synchronized int getAvailableQuota() {
+ private synchronized int getEffectiveMax() {
return max - shrunk.get();
}
+ private synchronized boolean hasAvailableQuota() {
+ return getEffectiveMax() - getNumActive() > 0;
+ }
+
// Callers should atomically check to confirm that workers are available before calling this
// method or risk being blocked waiting for a worker to be available.
private Worker borrowWorker() throws IOException, InterruptedException {
Worker worker = null;
- WorkerLatch latch = null;
+ PendingWorkerRequest pendingReq = null;
// We don't want to hold the lock on the pool while creating or waiting for a worker or quota
// to be available.
synchronized (this) {
- while (!idleQueue.isEmpty()) {
- worker = idleQueue.takeLast();
+ while (!idleWorkers.isEmpty()) {
+ // LIFO: It's better to re-use a worker as often as possible and keep it hot, in order to
+ // profit from JIT optimizations as much as possible.
+ worker = idleWorkers.takeLast();
if (factory.validateWorker(worker.getWorkerKey(), worker)) {
acquired.incrementAndGet();
break;
@@ -261,21 +291,21 @@
if (worker == null) {
// If we were unable to get an idle worker, then either create or wait for one.
- if (getAvailableQuota() - getNumActive() > 0) {
+ if (hasAvailableQuota()) {
// No idle workers, but we have space to create another.
acquired.incrementAndGet();
} else {
- latch = new WorkerLatch();
- waitingQueue.add(latch);
+ pendingReq = new PendingWorkerRequest();
+ waitingQueue.add(pendingReq);
}
}
}
- if (latch != null) {
- // Wait until the resources are available. We cannot do this why synchronized because that
+ if (pendingReq != null) {
+ // Wait until the resources are available. We cannot do this while synchronized because that
// would deadlock by blocking other threads from returning and thus freeing up quota for
// this to proceed.
- worker = latch.await();
+ worker = pendingReq.await();
}
if (worker == null) {
@@ -283,11 +313,6 @@
}
activeSet.add(worker);
-
- checkArgument(
- getAvailableQuota() - getNumActive() >= 0,
- "Worker pool (mnemonic %s) does not have space to create another worker.",
- key.getMnemonic());
return worker;
}
@@ -299,44 +324,45 @@
activeSet.remove(worker);
- WorkerLatch latch = waitingQueue.poll();
- if (latch != null) {
+ PendingWorkerRequest pendingReq = waitingQueue.poll();
+ if (pendingReq != null) {
// Pass the worker directly to the waiting thread.
- latch.countDown(worker);
+ pendingReq.signal(worker);
} else {
acquired.decrementAndGet();
- idleQueue.addLast(worker);
+ idleWorkers.addLast(worker);
}
}
private synchronized void invalidateWorker(Worker worker, boolean shouldShrinkPool) {
factory.destroyWorker(worker.getWorkerKey(), worker);
- if (activeSet.remove(worker)) {
- acquired.decrementAndGet();
- } else {
- idleQueue.remove(worker);
+ if (idleWorkers.contains(worker)) {
+ idleWorkers.remove(worker);
return;
}
+ // If it isn't idle, then we're destroying an active worker.
+ activeSet.remove(worker);
+
// We don't want to shrink the pool to 0.
- if (shouldShrinkPool && getAvailableQuota() > 1) {
+ if (shouldShrinkPool && getEffectiveMax() > 1) {
+ // When shrinking, there is no effective change in the availability, so there is no need to
+ // signal a waiting thread to proceed.
+ acquired.decrementAndGet();
shrunk.incrementAndGet();
return;
}
- // If invalidating the worker has resulted in the freeing up of effective quota for waiting
- // threads (also taking into account whether it was shrunk), then we signal for the next
- // waiting thread proceed.
- WorkerLatch latch = waitingQueue.poll();
- if (latch != null) {
- // We signal to the waiting thread that it can proceed, but it has to create a worker
- // for itself.
- latch.countDown(null);
- // We need to increment while synchronized here, so that we don't race with another
- // thread that might borrow (thus taking up the quota) before the waiting thread actually
- // manages to proceed (and increment on its own).
- acquired.getAndIncrement();
+ PendingWorkerRequest pendingReq = waitingQueue.poll();
+ if (pendingReq == null) {
+ // Since there is no pending request, we free up this quota.
+ acquired.decrementAndGet();
+ } else {
+ // Since there is a pending request, hold onto this quota (and do not decrement acquired) so
+ // that other threads aren't able to borrow before this pending request (thus creating a
+ // race condition).
+ pendingReq.signal(null);
}
}
@@ -347,7 +373,7 @@
* previously idle worker.
*/
public Set<Integer> getIdleWorkers() {
- return idleQueue.stream().map(Worker::getWorkerId).collect(toImmutableSet());
+ return idleWorkers.stream().map(Worker::getWorkerId).collect(toImmutableSet());
}
private void reset() {
@@ -359,7 +385,7 @@
// Destroys all workers created in this pool.
private synchronized void close() {
- for (Worker worker : idleQueue) {
+ for (Worker worker : idleWorkers) {
factory.destroyWorker(worker.getWorkerKey(), worker);
}
for (Worker worker : activeSet) {
@@ -375,7 +401,7 @@
* Used to pass workers from threads that are returning the worker to the pool, bypassing the
* queue.
*/
- private static class WorkerLatch {
+ private static class PendingWorkerRequest {
final CountDownLatch latch = new CountDownLatch(1);
@Nullable volatile Worker worker = null;
@@ -386,7 +412,12 @@
return worker;
}
- public void countDown(@Nullable Worker worker) {
+ /**
+ * Signals to the thread #await(ing) to proceed. When calling this, the {@code
+ * WorkerKeyPool.acquired} quota associated to this worker should not be released because that
+ * allows for race conditions with other threads attempting to borrow from the pool.
+ */
+ public void signal(@Nullable Worker worker) {
this.worker = worker;
latch.countDown();
}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java
index 59eef40..9c89653 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java
@@ -128,6 +128,11 @@
return getPool(key).getNumActive(key);
}
+ @Override
+ public synchronized boolean hasAvailableQuota(WorkerKey key) {
+ return getPool(key).hasAvailableQuota(key);
+ }
+
public void evictWithPolicy(EvictionPolicy<Worker> evictionPolicy) throws InterruptedException {
for (SimpleWorkerPool pool : workerPools.values()) {
evictWithPolicy(evictionPolicy, pool);
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java
index a31802a..2b7c8d5 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java
@@ -334,6 +334,11 @@
}
@Override
+ public boolean hasAvailableQuota(WorkerKey key) {
+ return true;
+ }
+
+ @Override
public ImmutableSet<Integer> evictWorkers(ImmutableSet<Integer> workerIdsToEvict)
throws InterruptedException {
return ImmutableSet.of();