blob: 27e23108c1854d2b12864a84d3a69fbef8b61fd0 [file] [log] [blame]
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.clock.Clock;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Instant;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A dispenser for up to 'total' simultaneous units of some resource. The resource itself is not
* accessed through this object; this is basically an asynchronous interface to a counting
* semaphore.
*/
@ThreadSafe
public final class ConcurrencyMeter {
private final String name;
private final long total;
@GuardedBy("this")
private final Clock clock;
@GuardedBy("this")
private long leased = 0;
@GuardedBy("this")
private final Queue<PendingJob> queue = new PriorityQueue<>();
@GuardedBy("this")
private long maxLeased = 0;
@GuardedBy("this")
private Instant maxLeasedTimestamp;
/**
* Creates a meter with the given number of units.
*
* @param name an identifier for this meter, for use in {@link #getStats}
* @param total total number of permits that may be dispensed
* @param clock provides the current time for {@link Stats#maxLeasedTimeMs()}
*/
public ConcurrencyMeter(String name, long total, Clock clock) {
this.name = checkNotNull(name);
this.total = total;
this.clock = checkNotNull(clock);
}
@VisibleForTesting
synchronized int queueSize() {
return queue.size();
}
/**
* Enqueues a request for {@code quantity} units of the resource managed by this meter. When the
* request is filled, the result becomes available.
*
* <p>The resource must be released either by cancelling the future or by calling {@link
* Ticket#done} on the ticket after the future completes.
*
* @param quantity number of units of resources to acquire
* @param priority requests with greater priority complete earlier
* @return a future which grants resources only when it completes successfully
*/
public ListenableFuture<Ticket> request(long quantity, long priority) {
checkArgument(quantity >= 0);
PendingJob job = new PendingJob(quantity, priority);
synchronized (this) {
queue.add(job);
}
schedule();
return job.futureTicket;
}
/** Statistics about a ConcurrencyMeter. */
public record Stats(
String name, long total, long leased, long maxLeased, long maxLeasedTimeMs) {}
public synchronized Stats getStats() {
return new Stats(
name, total, leased, maxLeased, maxLeased > 0 ? maxLeasedTimestamp.toEpochMilli() : 0);
}
private synchronized void release(long quantity) {
checkState(leased >= quantity, "quantity (%s) > leased (%s)", quantity, leased);
leased -= quantity;
}
private void releaseAndSchedule(long quantity) {
release(quantity);
schedule();
}
private void schedule() {
while (true) {
PendingJob job;
synchronized (this) {
job = queue.peek();
if (job == null || (leased + job.quantity > total && leased > 0)) {
return;
}
queue.remove();
leased += job.quantity;
if (leased >= maxLeased) {
maxLeased = leased;
maxLeasedTimestamp = clock.now();
}
}
// Set the future outside synchronized block to avoid holding the lock when executing future's
// callbacks which may hold other locks and call into ConcurrencyMeter causing deadlocks.
// See: b/319411390
if (!job.futureTicket.set(new ReleasingTicket(job.quantity))) {
// The future may have been cancelled. Release immediately. If the build was interrupted, we
// may encounter a long chain of cancelled tickets - avoid calling ticket.done() or
// releaseAndSchedule() which would process them recursively.
release(job.quantity);
}
}
}
private final class ReleasingTicket implements Ticket {
private final long quantity;
private final AtomicBoolean released = new AtomicBoolean(false);
ReleasingTicket(long quantity) {
this.quantity = quantity;
}
@Override
public void done() {
boolean alreadyReleased = released.getAndSet(true);
checkState(!alreadyReleased, "Already released %s units", quantity);
releaseAndSchedule(quantity);
}
}
private static final class PendingJob implements Comparable<PendingJob> {
private final SettableFuture<Ticket> futureTicket = SettableFuture.create();
private final long quantity;
private final long priority;
PendingJob(long quantity, long priority) {
this.quantity = quantity;
this.priority = priority;
}
@Override
public int compareTo(PendingJob o) {
return Long.compare(o.priority, priority);
}
}
/** A ticket denoting resource acquisition. */
public interface Ticket {
/** Releases the associated resources. Must be called exactly once. */
void done();
}
}