Make the checking of available resources of a WorkerKeyPool atomic (less worry for race conditions) and re-order logic in WorkerKeyPool#invalidateWorker to make it more understandable.

PiperOrigin-RevId: 627316722
Change-Id: I458d275f692292d04ae6c1ebc14189d3d3ebaf5e
diff --git a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
index 9b544a3..b164bb6 100644
--- a/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
+++ b/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
@@ -626,12 +626,8 @@
     // by the release() method.
 
     WorkerKey workerKey = resources.getWorkerKey();
-    if (workerKey != null) {
-      int availableWorkers = this.workerPool.getMaxTotalPerKey(workerKey);
-      int activeWorkers = this.workerPool.getNumActive(workerKey);
-      if (activeWorkers >= availableWorkers) {
-        return false;
-      }
+    if (workerKey != null && !this.workerPool.hasAvailableQuota(workerKey)) {
+      return false;
     }
 
     // We test for tracking of extra resources whenever acquired and throw an
diff --git a/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java
index cc16ee9..a5cb04d 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java
@@ -121,6 +121,10 @@
     return getMaxTotalPerKey() - shrunkBy.getOrDefault(key, 0);
   }
 
+  public synchronized boolean hasAvailableQuota(WorkerKey key) {
+    return getMaxTotalPerKey(key) - getNumActive(key) > 0;
+  }
+
   private synchronized void updateShrunkBy(WorkerKey workerKey, int workerId) {
     int currentValue = shrunkBy.getOrDefault(workerKey, 0);
     if (getMaxTotalPerKey() - currentValue > 1) {
@@ -135,6 +139,7 @@
     shrunkBy = new HashMap<>();
   }
 
+
   /**
    * Our own configuration class for the {@code SimpleWorkerPool} that correctly implements {@code
    * equals()} and {@code hashCode()}.
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
index ceaffb8..e4ea0a3 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
@@ -43,6 +43,17 @@
   int getNumActive(WorkerKey key);
 
   /**
+   * Returns whether there is quota available to create or use an existing worker.
+   *
+   * <p>It is essentially #getMaxTotalPerKey() - #getNumActive() > 0, but meant to be handled
+   * atomically to prevent internal race conditions.
+   *
+   * @param key the worker key.
+   * @return whether there is quota available to either get an existing or create a new worker.
+   */
+  boolean hasAvailableQuota(WorkerKey key);
+
+  /**
    * Evicts specified workers from the pool, destroying them.
    *
    * <p>It is possible that not all specified workers get evicted if they become active.
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java
index a08ef53..02ce010 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImpl.java
@@ -14,7 +14,6 @@
 
 package com.google.devtools.build.lib.worker;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 
 import com.google.common.collect.ImmutableMap;
@@ -95,7 +94,7 @@
 
   @Override
   public int getMaxTotalPerKey(WorkerKey key) {
-    return getPool(key).getAvailableQuota();
+    return getPool(key).getEffectiveMax();
   }
 
   @Override
@@ -104,6 +103,11 @@
   }
 
   @Override
+  public boolean hasAvailableQuota(WorkerKey key) {
+    return getPool(key).hasAvailableQuota();
+  }
+
+  @Override
   public ImmutableSet<Integer> evictWorkers(ImmutableSet<Integer> workerIdsToEvict)
       throws InterruptedException {
     // TODO: Without having the Worker objects themselves, we can't directly pass the worker to the
@@ -178,30 +182,50 @@
   /**
    * Actual pool implementation that handles the borrowing, returning and invalidation of workers of
    * a single worker key.
+   *
+   * <p>The following describes how the key features of the pool and how they work in tandem with
+   * each other:
+   *
+   * <ul>
+   *   <li>Borrowing a worker: If quota is available, the pool returns an already existing idle
+   *       worker or creates a new worker. If quota is not available, it creates a {@code
+   *       PendingWorkerRequest} in the waiting queue and waits on it.
+   *   <li>Returning worker: If there are pending requests in the waiting queue, directly hand the
+   *       worker over to that request, signalling to the waiting thread to proceed. Otherwise,
+   *       returns the worker back to the pool.
+   *   <li>Invalidating worker: Destroys this worker and removes it from the pool. The pool is
+   *       optionally shrunk, which reduces the maximum number of workers that can be in the pool
+   *       (to a minimum of 1). If the pool is not shrunk, the destruction of this worker represents
+   *       a freeing up of quota, in this case it signals for any pending request to continue and
+   *       effectively taking over this quota.
+   * </ul>
    */
   private class WorkerKeyPool {
 
     private final WorkerKey key;
     private final int max;
-    // We maintain this as a separate counter from the activeSet so that we can create workers
-    // without locking the pool, while maintaining atomicity on
+    // The number of workers in use.
     private final AtomicInteger acquired = new AtomicInteger(0);
+    // The number by which the overall quota is shrunk by.
     private final AtomicInteger shrunk = new AtomicInteger(0);
 
-    private final BlockingDeque<Worker> idleQueue = new LinkedBlockingDeque<>();
+    private final BlockingDeque<Worker> idleWorkers = new LinkedBlockingDeque<>();
 
     /**
-     * The waiting queue is meant to block borrowers when there are no workers available. With
-     * workers as a resource, workers are only borrowed when they get they are available, so this
+     * The waiting queue is meant to provide fairness in borrowing from the pool (first come first
+     * serve), any freeing up of quota (either through returning or invalidating a worker) will
+     * service requests this queue first.
+     *
+     * <p>With workers as a resource, workers are only borrowed when they are available, so this
      * doesn't get used, i.e. there shouldn't be any borrowers waiting here, where the {@code
      * ResourceManager} handles the proper synchronization to ensure that workers are borrowed
      * together with its allocated resources.
      *
-     * <p>Regardless, this implementation is still included to ensure correctness such that that
-     * multiple threads can still borrow concurrently, without needing to check how many workers are
-     * actually available (blocking if unavailable).
+     * <p>Regardless, this implementation is still included to ensure correctness such that multiple
+     * threads can still borrow concurrently, without needing to check how many workers are actually
+     * available (blocking if unavailable).
      */
-    private final BlockingDeque<WorkerLatch> waitingQueue = new LinkedBlockingDeque<>();
+    private final BlockingDeque<PendingWorkerRequest> waitingQueue = new LinkedBlockingDeque<>();
 
     private final Set<Worker> activeSet = new HashSet<>();
 
@@ -212,14 +236,14 @@
 
     private synchronized Set<Integer> evictWorkers(Set<Integer> workerIdsToEvict) {
       Set<Integer> evictedWorkerIds = new HashSet<>();
-      for (Worker worker : idleQueue) {
+      for (Worker worker : idleWorkers) {
         if (workerIdsToEvict.contains(worker.getWorkerId())) {
           evictedWorkerIds.add(worker.getWorkerId());
           worker.getStatus().maybeUpdateStatus(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);
           // Currently when evicting idle workers, we do not shrink the pool. The pool is only
           // shrunk when we have to postpone invalidation of the worker.
           invalidateWorker(worker, /* shouldShrinkPool= */ false);
-          idleQueue.remove(worker);
+          idleWorkers.remove(worker);
           logger.atInfo().log(
               "Evicted %s worker (id %d, key hash %d).",
               worker.getWorkerKey().getMnemonic(),
@@ -237,20 +261,26 @@
       return acquired.get();
     }
 
-    private synchronized int getAvailableQuota() {
+    private synchronized int getEffectiveMax() {
       return max - shrunk.get();
     }
 
+    private synchronized boolean hasAvailableQuota() {
+      return getEffectiveMax() - getNumActive() > 0;
+    }
+
     // Callers should atomically check to confirm that workers are available before calling this
     // method or risk being blocked waiting for a worker to be available.
     private Worker borrowWorker() throws IOException, InterruptedException {
       Worker worker = null;
-      WorkerLatch latch = null;
+      PendingWorkerRequest pendingReq = null;
       // We don't want to hold the lock on the pool while creating or waiting for a worker or quota
       // to be available.
       synchronized (this) {
-        while (!idleQueue.isEmpty()) {
-          worker = idleQueue.takeLast();
+        while (!idleWorkers.isEmpty()) {
+          // LIFO: It's better to re-use a worker as often as possible and keep it hot, in order to
+          // profit from JIT optimizations as much as possible.
+          worker = idleWorkers.takeLast();
           if (factory.validateWorker(worker.getWorkerKey(), worker)) {
             acquired.incrementAndGet();
             break;
@@ -261,21 +291,21 @@
 
         if (worker == null) {
           // If we were unable to get an idle worker, then either create or wait for one.
-          if (getAvailableQuota() - getNumActive() > 0) {
+          if (hasAvailableQuota()) {
             // No idle workers, but we have space to create another.
             acquired.incrementAndGet();
           } else {
-            latch = new WorkerLatch();
-            waitingQueue.add(latch);
+            pendingReq = new PendingWorkerRequest();
+            waitingQueue.add(pendingReq);
           }
         }
       }
 
-      if (latch != null) {
-        // Wait until the resources are available. We cannot do this why synchronized because that
+      if (pendingReq != null) {
+        // Wait until the resources are available. We cannot do this while synchronized because that
         // would deadlock by blocking other threads from returning and thus freeing up quota for
         // this to proceed.
-        worker = latch.await();
+        worker = pendingReq.await();
       }
 
       if (worker == null) {
@@ -283,11 +313,6 @@
       }
 
       activeSet.add(worker);
-
-      checkArgument(
-          getAvailableQuota() - getNumActive() >= 0,
-          "Worker pool (mnemonic %s) does not have space to create another worker.",
-          key.getMnemonic());
       return worker;
     }
 
@@ -299,44 +324,45 @@
 
       activeSet.remove(worker);
 
-      WorkerLatch latch = waitingQueue.poll();
-      if (latch != null) {
+      PendingWorkerRequest pendingReq = waitingQueue.poll();
+      if (pendingReq != null) {
         // Pass the worker directly to the waiting thread.
-        latch.countDown(worker);
+        pendingReq.signal(worker);
       } else {
         acquired.decrementAndGet();
-        idleQueue.addLast(worker);
+        idleWorkers.addLast(worker);
       }
     }
 
     private synchronized void invalidateWorker(Worker worker, boolean shouldShrinkPool) {
       factory.destroyWorker(worker.getWorkerKey(), worker);
 
-      if (activeSet.remove(worker)) {
-        acquired.decrementAndGet();
-      } else {
-        idleQueue.remove(worker);
+      if (idleWorkers.contains(worker)) {
+        idleWorkers.remove(worker);
         return;
       }
 
+      // If it isn't idle, then we're destroying an active worker.
+      activeSet.remove(worker);
+
       // We don't want to shrink the pool to 0.
-      if (shouldShrinkPool && getAvailableQuota() > 1) {
+      if (shouldShrinkPool && getEffectiveMax() > 1) {
+        // When shrinking, there is no effective change in the availability, so there is no need to
+        // signal a waiting thread to proceed.
+        acquired.decrementAndGet();
         shrunk.incrementAndGet();
         return;
       }
 
-      // If invalidating the worker has resulted in the freeing up of effective quota for waiting
-      // threads (also taking into account whether it was shrunk), then we signal for the next
-      // waiting thread proceed.
-      WorkerLatch latch = waitingQueue.poll();
-      if (latch != null) {
-        // We signal to the waiting thread that it can proceed, but it has to create a worker
-        // for itself.
-        latch.countDown(null);
-        // We need to increment while synchronized here, so that we don't race with another
-        // thread that might borrow (thus taking up the quota) before the waiting thread actually
-        // manages to proceed (and increment on its own).
-        acquired.getAndIncrement();
+      PendingWorkerRequest pendingReq = waitingQueue.poll();
+      if (pendingReq == null) {
+        // Since there is no pending request, we free up this quota.
+        acquired.decrementAndGet();
+      } else {
+        // Since there is a pending request, hold onto this quota (and do not decrement acquired) so
+        // that other threads aren't able to borrow before this pending request (thus creating a
+        // race condition).
+        pendingReq.signal(null);
       }
     }
 
@@ -347,7 +373,7 @@
      * previously idle worker.
      */
     public Set<Integer> getIdleWorkers() {
-      return idleQueue.stream().map(Worker::getWorkerId).collect(toImmutableSet());
+      return idleWorkers.stream().map(Worker::getWorkerId).collect(toImmutableSet());
     }
 
     private void reset() {
@@ -359,7 +385,7 @@
 
     // Destroys all workers created in this pool.
     private synchronized void close() {
-      for (Worker worker : idleQueue) {
+      for (Worker worker : idleWorkers) {
         factory.destroyWorker(worker.getWorkerKey(), worker);
       }
       for (Worker worker : activeSet) {
@@ -375,7 +401,7 @@
    * Used to pass workers from threads that are returning the worker to the pool, bypassing the
    * queue.
    */
-  private static class WorkerLatch {
+  private static class PendingWorkerRequest {
     final CountDownLatch latch = new CountDownLatch(1);
     @Nullable volatile Worker worker = null;
 
@@ -386,7 +412,12 @@
       return worker;
     }
 
-    public void countDown(@Nullable Worker worker) {
+    /**
+     * Signals to the thread #await(ing) to proceed. When calling this, the {@code
+     * WorkerKeyPool.acquired} quota associated to this worker should not be released because that
+     * allows for race conditions with other threads attempting to borrow from the pool.
+     */
+    public void signal(@Nullable Worker worker) {
       this.worker = worker;
       latch.countDown();
     }
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java
index 59eef40..9c89653 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPoolImplLegacy.java
@@ -128,6 +128,11 @@
     return getPool(key).getNumActive(key);
   }
 
+  @Override
+  public synchronized boolean hasAvailableQuota(WorkerKey key) {
+    return getPool(key).hasAvailableQuota(key);
+  }
+
   public void evictWithPolicy(EvictionPolicy<Worker> evictionPolicy) throws InterruptedException {
     for (SimpleWorkerPool pool : workerPools.values()) {
       evictWithPolicy(evictionPolicy, pool);
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java
index a31802a..2b7c8d5 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTestUtils.java
@@ -334,6 +334,11 @@
       }
 
       @Override
+      public boolean hasAvailableQuota(WorkerKey key) {
+        return true;
+      }
+
+      @Override
       public ImmutableSet<Integer> evictWorkers(ImmutableSet<Integer> workerIdsToEvict)
           throws InterruptedException {
         return ImmutableSet.of();