blob: 1433916d6af5879a8fb143f412e540759375a4bb [file]
// Copyright 2026 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.concurrent.RequestBatching.BatchExecutionStrategy;
import com.google.devtools.build.lib.concurrent.RequestBatching.CallbackMultiplexer;
import com.google.devtools.build.lib.concurrent.RequestBatching.FutureMultiplexer;
import com.google.devtools.build.lib.concurrent.RequestBatching.Multiplexer;
import com.google.devtools.build.lib.concurrent.RequestBatching.Operation;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
/**
* A lock-based, asynchronous request batcher designed for eager execution.
*
* <p>This class batches unary requests and executes them together. It eagerly dispatches batches as
* soon as they reach {@code maxBatchSize} or when the number of active concurrent requests is below
* {@code targetConcurrentRequests}.
*
* <p>Submissions do not block waiting for batch execution or queue capacity. While the class uses
* lightweight synchronization (synchronized) to ensure thread-safe queue access, the lock is held
* only for brief in-memory updates, ensuring that calling threads never block on I/O or
* backpressure.
*
* <p><b>Locking Efficiency:</b> While the class uses a lock to ensure thread-safe queue operations,
* the lock hold time is extremely short. Under high load, the relatively more expensive {@link
* ThreadLocal} pool lookup is amortized over the batch size (occurring only once per {@code
* maxBatchSize} submissions), keeping the average lock hold time close to a few nanoseconds.
*
* <p><b>Virtual Thread Performance:</b> While this class will function correctly when called from
* virtual threads, performance will likely be very poor. It relies on {@link ThreadLocal} via
* {@link QueuePool} for optimization, which does not behave predictably or efficiently with
* short-lived virtual threads. Additionally, it uses monitor-based locks (synchronized) which can
* cause pinning of carrier threads.
*/
public final class EagerRequestBatcher<RequestT, ResponseT> {
private final Object lock = new Object();
@GuardedBy("lock")
private List<Operation<RequestT, ResponseT>> queue;
@GuardedBy("lock")
private int inFlightCount = 0;
private final int maxBatchSize;
private final int targetConcurrentRequests;
/** Executor used for batch completion work, which may include sending batches. */
private final Executor executor;
private final BatchExecutionStrategy<RequestT, ResponseT> batchExecutionStrategy;
private final QueuePool<RequestT, ResponseT> pool;
/** Creates a batcher with standard Multiplexer. */
public static <RequestT, ResponseT> EagerRequestBatcher<RequestT, ResponseT> create(
Multiplexer<RequestT, ResponseT> multiplexer,
Executor responseDistributionExecutor,
QueuePool<RequestT, ResponseT> pool,
int targetConcurrentRequests,
Executor executor) {
return new EagerRequestBatcher<>(
RequestBatching.createBatchExecutionStrategy(multiplexer, responseDistributionExecutor),
pool,
targetConcurrentRequests,
executor);
}
/** Creates a batcher with CallbackMultiplexer. */
public static <RequestT, ResponseT>
EagerRequestBatcher<RequestT, ResponseT> createWithCallbackMultiplexer(
CallbackMultiplexer<RequestT, ResponseT> multiplexer,
QueuePool<RequestT, ResponseT> pool,
int targetConcurrentRequests,
Executor executor) {
return new EagerRequestBatcher<>(
RequestBatching.createCallbackBatchExecutionStrategy(multiplexer),
pool,
targetConcurrentRequests,
executor);
}
/** Creates a batcher with FutureMultiplexer. */
public static <RequestT, ResponseT>
EagerRequestBatcher<RequestT, ResponseT> createWithFutureMultiplexer(
FutureMultiplexer<RequestT, ResponseT> multiplexer,
QueuePool<RequestT, ResponseT> pool,
int targetConcurrentRequests,
Executor executor) {
return new EagerRequestBatcher<>(
RequestBatching.createFutureBatchExecutionStrategy(multiplexer),
pool,
targetConcurrentRequests,
executor);
}
// Package-private constructor for testing and internal use
@VisibleForTesting
EagerRequestBatcher(
BatchExecutionStrategy<RequestT, ResponseT> batchExecutionStrategy,
QueuePool<RequestT, ResponseT> pool,
int targetConcurrentRequests,
Executor executor) {
this.batchExecutionStrategy = batchExecutionStrategy;
this.pool = pool;
this.maxBatchSize = pool.getMaxBatchSize();
checkArgument(targetConcurrentRequests >= 1, "targetConcurrentRequests must be >= 1");
this.targetConcurrentRequests = targetConcurrentRequests;
this.executor = executor;
this.queue = new ArrayList<>(maxBatchSize);
}
public ListenableFuture<ResponseT> submit(RequestT request) {
Operation<RequestT, ResponseT> operation = new Operation<>(request);
List<Operation<RequestT, ResponseT>> batch = null;
synchronized (lock) {
queue.add(operation);
// Rule 1 (Eager): Execute immediately if queue reaches maxBatchSize.
// Rule 2 (Target Concurrency): Execute immediately if in-flight count is below target.
if (queue.size() >= maxBatchSize || inFlightCount < targetConcurrentRequests) {
batch = swapQueue();
inFlightCount++;
}
}
if (batch != null) {
execute(copyAndRecycle(batch));
}
return operation;
}
private void onBatchComplete() {
List<Operation<RequestT, ResponseT>> batch = null;
synchronized (lock) {
if (!queue.isEmpty() && inFlightCount <= targetConcurrentRequests) {
batch = swapQueue();
// A batch has just completed, but the queue contents will be sent immediately so
// inFlightCount does not change.
} else {
inFlightCount--;
}
}
if (batch != null) {
execute(copyAndRecycle(batch));
}
}
/**
* Swaps the queue with a clean one from the pool.
*
* <p>IMPORTANT: after this swap, a batch must be recycled into {@link #pool} before any other
* calls to {@link QueuePool#getQueue()} from this thread.
*
* @return the queue at the moment this method was called
*/
@GuardedBy("lock")
private List<Operation<RequestT, ResponseT>> swapQueue() {
List<Operation<RequestT, ResponseT>> batch = queue;
queue = pool.getQueue();
return batch;
}
private ImmutableList<Operation<RequestT, ResponseT>> copyAndRecycle(
List<Operation<RequestT, ResponseT>> batch) {
ImmutableList<Operation<RequestT, ResponseT>> copy = ImmutableList.copyOf(batch);
pool.recycleQueue(batch);
return copy;
}
private void execute(ImmutableList<Operation<RequestT, ResponseT>> batch) {
ListenableFuture<?> batchFuture;
try {
batchFuture =
batchExecutionStrategy.executeBatch(Lists.transform(batch, Operation::request), batch);
} catch (Throwable t) {
handleSynchronousException(batch, t);
return;
}
batchFuture.addListener(this::onBatchComplete, executor);
}
private void handleSynchronousException(
ImmutableList<Operation<RequestT, ResponseT>> operations, Throwable t) {
synchronized (lock) {
inFlightCount--;
}
for (Operation<RequestT, ResponseT> operation : operations) {
operation.acceptFailure(t);
}
}
// Package-private for testing
@VisibleForTesting
int getInFlightCount() {
synchronized (lock) {
return inFlightCount;
}
}
@VisibleForTesting
int getQueueSize() {
synchronized (lock) {
return queue.size();
}
}
}