blob: f49b68a08d122d2a051815b0ad329271dccc3709 [file] [log] [blame]
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress;
import static java.lang.Math.min;
import static java.util.concurrent.Executors.newFixedThreadPool;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.unsafe.UnsafeProvider;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.lang.ref.Cleaner;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import sun.misc.Unsafe;
/**
* Provides a unary request-response interface but implements batching.
*
* <p>Clients should provide a {@link Multiplexer} implementation that performs the actual batched
* operations.
*
* <p>This class is thread-safe.
*
* <p>Non-final for mockability.
*/
@SuppressWarnings("SunApi") // TODO: b/359688989 - clean this up
public class RequestBatcher<RequestT, ResponseT> {
/* This class employs concurrent workers that perform the following cycle:
*
* 1. Collect all available request-response pairs from the queue up to `maxBatchSize`.
* 2. Execute the collected pairs as a batch.
*
* We guarantee that every submitted request is handled. The following traces all possible paths a
* request-response pair can take through the batcher to demonstrate this guarantee.
*
* Possible Paths:
*
* 1. The pair is present in some `submit` call.
* 2. The pair is enqueued, but not yet reflected in the request-responses count.
* 3. The pair is enqueued, and request-responses count has been incremented.
*
* Step 1: Initial part of `submit`
*
* A. We check the active-workers count. If it's less than `maxConcurrentRequests`, a new worker
* is started and the pair is directly assigned to it.
*
* B. Otherwise, we enqueue the pair. When the queue is full, we sleep and try again until
* enqueuing succeeds. After enqueuing, we proceed to Step 2.
*
* Case A bypasses Step 2, and the pair is immediately assigned a worker.
*
* Step 2: Request-response Enqueued
*
* Step 2 is not atomic with Step 1, so the counters might have changed. We re-check
* active-workers count.
*
* A. If it's already at `maxConcurrentRequests`, we attempt to increment request-responses count
* atomically, ensuring active-workers count remains unchanged during the increment. Success
* leads to Step 3.
*
* B. If active-workers count is below the target (due to concurrent activity), we start a new
* worker like in Step 1, and dequeue an arbitrary element to assign to it. This maintains
* consistency between queue size and request-responses count. The new worker guarantees
* processing of all enqueued request-responses (including the one we just added), even if that
* specific request ends up handled by a different worker.
*
* Step 3: Request-response Enqueued and request-responses count Incremented
*
* The atomic request-responses count increment only happens in Step 2 if active-workers count is
* already at the target. Workers only stop if request-responses count is 0. Since
* `maxConcurrentRequests` > 0, there's always at least one active worker to handle the
* request-response.
*/
/**
* A common cleaner shared by all instances.
*
* <p>Used to free memory allocated by {@link PaddedAddresses}.
*/
private static final Cleaner cleaner = Cleaner.create();
private static final long QUEUE_FULL_SLEEP_MS = 100;
/**
* Executor dedicated to draining the queue, specifically the {@link
* #continueToNextBatchOrBecomeIdle} method.
*
* <p><b>Purpose of Isolation:</b> This executor is isolated to prevent potential deadlocks. The
* {@link #submit} method can block if the task queue is full. If all threads in the client's
* executor become blocked waiting to submit tasks, only {@link #continueToNextBatchOrBecomeIdle}
* can free up space in the queue. Scheduling this continuation logic on the same, potentially
* blocked, client executor would lead to a deadlock.
*
* <p><b>Deadlock Avoidance:</b> As long as {@link #continueToNextBatchOrBecomeIdle} does not
* contain blocking operations (which is true in the current implementation), using a separate
* executor is sufficient to prevent this specific deadlock scenario.
*/
private final Executor queueDrainingExecutor;
private final BatchExecutionStrategy<RequestT, ResponseT> batchExecutionStrategy;
/**
* Reads this many at a time when constructing a batch.
*
* <p>Note that since {@link #populateBatch} always begins with 1 pair, the resulting batch size
* is one more than this.
*/
private final int maxBatchSize;
/** Number of active workers to target. */
private final int maxConcurrentRequests;
/**
* Address of an integer containing two counters.
*
* <p>Having two counters in the same integer enables simultaneous, atomic updates of both values.
*
* <ul>
* <li><b>request-responses count</b>: the lower 20-bits (occupying the bits of {@link
* #REQUEST_COUNT_MASK}) contain a lower bound of request-responses in {@link #queue}. This
* is incremented after successful enqueuing and decremented before dequeuing. This counter
* value is never more than the size of the queue so it can be used to guarantee that the
* number of calls to {@link ConcurrentFifo#take} do not exceed the number of successful
* {@link ConcurrentFifo#tryAppend} calls.
* <li><b>active-workers count</b>: the upper 12-bits (starting from {@link
* #ACTIVE_WORKERS_COUNT_BIT_OFFSET}) contain the number of active workers.
* </ul>
*/
private final long countersAddress;
private final ConcurrentFifo<Operation<RequestT, ResponseT>> queue;
/** Injectable batching logic. */
/** Batching strategy where a single batch request returns a single batch future response. */
public interface Multiplexer<RequestT, ResponseT> {
/**
* Evaluates {@code requests} as a batch.
*
* @return a future containing a list of responses, positionally aligned with {@code requests}
*/
ListenableFuture<List<ResponseT>> execute(List<RequestT> requests);
}
/**
* A callback for a single request within a batch, which must be completed exactly once.
*
* <p>Used with {@link CallbackMultiplexer}.
*/
public interface ResponseSink<ResponseT> {
/**
* Fulfills the corresponding request with a successful response.
*
* @param response the result of the operation. A {@code null} value is permitted and will be
* forwarded to the original caller as a successful result.
*/
void acceptResponse(@Nullable ResponseT response);
/**
* Fails the corresponding request with the given {@link Throwable}.
*
* <p>A sink should only be completed once. Subsequent calls to this method after the sink has
* already been completed will be ignored.
*/
void acceptFailure(Throwable t);
}
/**
* A batching strategy where the implementation provides concrete response values asynchronously
* via callbacks.
*/
public interface CallbackMultiplexer<RequestT, ResponseT> {
/**
* Executes the batch of {@code requests}, pushing results directly to the corresponding {@link
* ResponseSink} instances in the {@code sinks} list.
*
* <p>The supplied {@code sinks} list is co-indexed with the {@code requests} list. The
* implementation of this method <strong>must</strong> ensure that for each request, the
* corresponding sink is completed exactly once by calling either {@link
* ResponseSink#acceptResponse} on success or {@link ResponseSink#acceptFailure} on failure.
*
* <p>The {@link RequestBatcher} internally monitors the completion of all sink operations for
* the batch.
*
* @return A non-null {@link Runnable} that the {@code RequestBatcher} will execute on behalf of
* the client. The {@code RequestBatcher} guarantees it will run this callback after all
* sinks for this specific batch have been completed, but <strong>before</strong> this
* batch's concurrency slot is released. This provides a reliable mechanism for performing
* batch-specific resource cleanup. For instance, if recycling identifiers used in the
* requests, this guarantee ensures the identifiers are made available before a subsequent
* batch could possibly use them. The callback should be lightweight.
*/
Runnable execute(
List<RequestT> requests, ImmutableList<? extends ResponseSink<ResponseT>> sinks);
}
/**
* Accepts a future response value.
*
* <p>Used with {@link PerResponseMultiplexer}.
*/
public interface FutureResponseSink<ResponseT> {
void acceptFutureResponse(ListenableFuture<ResponseT> futureResponse);
}
/** Batching strategy when a single batch request returns a response per future request. */
public interface PerResponseMultiplexer<RequestT, ResponseT> {
/** Executes {@code requests} in a batch and populates corresponding {@code responses}. */
void execute(
List<RequestT> requests, ImmutableList<? extends FutureResponseSink<ResponseT>> responses);
}
public static <RequestT, ResponseT>
BatchExecutionStrategy<RequestT, ResponseT> createBatchExecutionStrategy(
Multiplexer<RequestT, ResponseT> multiplexer, Executor responseDistributionExecutor) {
return new MultiplexerAdapter<>(multiplexer, responseDistributionExecutor);
}
public static <RequestT, ResponseT>
BatchExecutionStrategy<RequestT, ResponseT> createCallbackBatchExecutionStrategy(
CallbackMultiplexer<RequestT, ResponseT> multiplexer) {
return new CallbackMultiplexerAdapter<>(multiplexer);
}
public static <RequestT, ResponseT>
BatchExecutionStrategy<RequestT, ResponseT> createPerResponseBatchExecutionStrategy(
PerResponseMultiplexer<RequestT, ResponseT> multiplexer) {
return new PerResponseMultiplexerAdapter<>(multiplexer);
}
private interface BatchExecutionStrategy<RequestT, ResponseT> {
ListenableFuture<?> executeBatch(
List<RequestT> requests, ImmutableList<Operation<RequestT, ResponseT>> operations);
}
public static <RequestT, ResponseT> RequestBatcher<RequestT, ResponseT> create(
BatchExecutionStrategy<RequestT, ResponseT> batchExecutionStrategy,
int maxBatchSize,
int maxConcurrentRequests) {
long baseAddress = createPaddedBaseAddress(4);
long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
var queue =
new ConcurrentFifo<Operation<RequestT, ResponseT>>(
Operation.class,
/* sizeAddress= */ getAlignedAddress(baseAddress, /* offset= */ 1),
/* appendIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 2),
/* takeIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 3));
var batcher =
new RequestBatcher<RequestT, ResponseT>(
// `maxConcurrentRequests` is the maximum level of invocation concurrency possible for
// the `queueDrainingExecutor`. It is possible for this to overrun, but the work is
// relatively lightweight and the batch round trip latency is expected to dominate.
/* queueDrainingExecutor= */ newFixedThreadPool(maxConcurrentRequests),
batchExecutionStrategy,
maxBatchSize,
maxConcurrentRequests,
countersAddress,
queue);
cleaner.register(batcher, new AddressFreer(baseAddress));
return batcher;
}
/**
* Low-level constructor.
*
* <p>Caller owns memory addresses used by {@code queue} and cleanup of memory at {@code
* countersAddress}.
*/
// TODO: b/386384684 - remove Unsafe usage
@VisibleForTesting
RequestBatcher(
Executor queueDrainingExecutor,
BatchExecutionStrategy<RequestT, ResponseT> batchExecutionStrategy,
int maxBatchSize,
int maxConcurrentRequests,
long countersAddress,
ConcurrentFifo<Operation<RequestT, ResponseT>> queue) {
checkArgument(maxConcurrentRequests > 0, "maxConcurrentRequests=%s < 1", maxConcurrentRequests);
checkArgument(
maxConcurrentRequests <= ACTIVE_WORKERS_COUNT_MAX,
"maxConcurrentRequests=%s > %s",
maxConcurrentRequests,
ACTIVE_WORKERS_COUNT_MAX);
checkArgument(maxBatchSize > 0);
this.queueDrainingExecutor = queueDrainingExecutor;
this.batchExecutionStrategy = batchExecutionStrategy;
this.maxBatchSize = maxBatchSize;
this.maxConcurrentRequests = maxConcurrentRequests;
this.countersAddress = countersAddress;
this.queue = queue;
// Initializes memory at countersAddress.
UNSAFE.putInt(null, countersAddress, 0);
}
/**
* Submits a request, subject to batching.
*
* <p>This method <em>blocks</em> when the queue is full.
*
* <p>Callers should consider processing the response on a different executor if processing is
* expensive to avoid delaying work pending other responses in the batch.
*/
// TODO: b/386384684 - remove Unsafe usage
public ListenableFuture<ResponseT> submit(RequestT request) {
var requestResponse = new Operation<RequestT, ResponseT>(request);
// Tries to start a worker as long as the active worker count is less than
// `maxConcurrentRequests`.
while (true) {
int snapshot = UNSAFE.getIntVolatile(null, countersAddress);
int activeWorkers = snapshot >>> ACTIVE_WORKERS_COUNT_BIT_OFFSET;
if (activeWorkers >= maxConcurrentRequests) {
break;
}
if (UNSAFE.compareAndSwapInt(null, countersAddress, snapshot, snapshot + ONE_ACTIVE_WORKER)) {
// An active worker was reserved. Starts the worker by executing a batch.
executeBatch(requestResponse);
return requestResponse;
}
}
while (!queue.tryAppend(requestResponse)) {
// As of 09/11/2024, this class is only used for remote cache interactions (see
// b/358347099#comment18). Here, the queue filling up is primarily caused by insufficient
// network bandwidth. Experiments show that sleeping here improves overall system throughput,
// even more than increasing the buffer size.
try {
Thread.sleep(QUEUE_FULL_SLEEP_MS);
} catch (InterruptedException e) {
return immediateFailedFuture(e);
}
}
// Enqueuing succeeded.
while (true) {
int snapshot = UNSAFE.getIntVolatile(null, countersAddress); // pessimistic read
int activeWorkers = snapshot >>> ACTIVE_WORKERS_COUNT_BIT_OFFSET;
if (activeWorkers >= maxConcurrentRequests) {
// Increments the request-responses count.
if (UNSAFE.compareAndSwapInt(null, countersAddress, snapshot, snapshot + ONE_REQUEST)) {
// This must not be reached if `activeWorkers` is 0. Guaranteed by the enclosing check.
return requestResponse;
}
} else {
// This is a less common case where the task was enqueued, but the number of active workers
// immediately dipped below `targetWorkersCount`. Starts a worker.
if (UNSAFE.compareAndSwapInt(
null, countersAddress, snapshot, snapshot + ONE_ACTIVE_WORKER)) {
// Usually, decrementing the request-responses count must precede taking from the queue.
// Here, a request-response was just enqueued and the count has not yet been incremented.
executeBatch(queue.take());
return requestResponse;
}
}
}
}
// TODO: b/386384684 - remove Unsafe usage
@Override
public String toString() {
int snapshot = UNSAFE.getIntVolatile(null, countersAddress);
return String.format(
"activeWorkers=%d, requestCount=%d\nqueue=%s\n",
snapshot >>> ACTIVE_WORKERS_COUNT_BIT_OFFSET, snapshot & REQUEST_COUNT_MASK, queue);
}
/**
* Constructs a batch by polling elements from the queue until it is empty, then executes it.
*
* <p>After the batch is executed, arranges follow-up work by calling {@code
* #continueToNextBatchOrBecomeIdle}.
*
* @param requestResponse a single element to be included in the batch. This ensures the batch is
* non-empty.
*/
private void executeBatch(Operation<RequestT, ResponseT> requestResponse) {
ImmutableList<Operation<RequestT, ResponseT>> batch = populateBatch(requestResponse);
batchExecutionStrategy
.executeBatch(Lists.transform(batch, Operation::request), batch)
.addListener(this::continueToNextBatchOrBecomeIdle, queueDrainingExecutor);
}
/**
* Polls at most {@link #maxBatchSize} elements from the {@link #queue} and creates a batch.
*
* @param requestResponse an element to add to the batch.
*/
// TODO: b/386384684 - remove Unsafe usage
private ImmutableList<Operation<RequestT, ResponseT>> populateBatch(
Operation<RequestT, ResponseT> requestResponse) {
var accumulator = ImmutableList.<Operation<RequestT, ResponseT>>builder().add(requestResponse);
while (true) {
int snapshot = UNSAFE.getIntVolatile(null, countersAddress);
int requestCount = snapshot & REQUEST_COUNT_MASK;
if (requestCount == 0) {
break;
}
int toRead = min(maxBatchSize, requestCount);
if (!UNSAFE.compareAndSwapInt(null, countersAddress, snapshot, snapshot - toRead)) {
continue;
}
for (int i = 0; i < toRead; i++) {
accumulator.add(queue.take());
}
break;
}
return accumulator.build();
}
/**
* Either processes the next batch or releases the held token.
*
* <p>Tries to process the next batch if enqueued requests are available. Otherwise, stops working
* and decrements the active worker count.
*/
// TODO: b/386384684 - remove Unsafe usage
private void continueToNextBatchOrBecomeIdle() {
while (true) {
int snapshot = UNSAFE.getIntVolatile(null, countersAddress);
if ((snapshot & REQUEST_COUNT_MASK) == 0) {
// There are no enqueued requests. Tries to become idle.
if (UNSAFE.compareAndSwapInt(
null, countersAddress, snapshot, snapshot - ONE_ACTIVE_WORKER)) {
return;
}
} else {
// Tries to reserve an enqueued request-response to begin another batch.
if (UNSAFE.compareAndSwapInt(null, countersAddress, snapshot, snapshot - ONE_REQUEST)) {
executeBatch(queue.take());
return;
}
}
}
}
@VisibleForTesting
static final class Operation<RequestT, ResponseT> extends AbstractFuture<ResponseT>
implements ResponseSink<ResponseT>, FutureResponseSink<ResponseT> {
private final RequestT request;
private boolean isFutureSet = false;
private Operation(RequestT request) {
this.request = request;
}
private RequestT request() {
return request;
}
private void setResponse(@Nullable ResponseT response) {
// It's possible for the future to be cancelled by an external event (e.g., an interrupt).
// `set` will return false if the future has already been completed or cancelled.
// If `set` fails, we verify that the future was cancelled. This distinguishes
// graceful cancellation from a bug where we try to set the response more than once.
if (!set(response)) {
checkState(
isCancelled(),
"response already set for request=%s, %s while trying to set future response %s",
request,
this,
response);
}
}
@Override
public void acceptResponse(@Nullable ResponseT response) {
setResponse(response);
}
@Override
public void acceptFailure(Throwable t) {
setException(t);
}
@Override
public void acceptFutureResponse(ListenableFuture<ResponseT> futureResponse) {
setFuture(futureResponse);
isFutureSet = true;
}
private void errorIfFutureUnset() {
if (!isFutureSet) {
setException(
new IllegalStateException(
String.format(
"Future for %s is unexpectedly not set. It should have been set by the"
+ " PerResponseMultiplexer.execute implementation",
request)));
}
}
@Override
@CanIgnoreReturnValue
protected boolean setException(Throwable t) {
return super.setException(t);
}
}
private static final class MultiplexerAdapter<RequestT, ResponseT>
implements BatchExecutionStrategy<RequestT, ResponseT> {
private final Multiplexer<RequestT, ResponseT> multiplexer;
/**
* Executor provided by the client to invoke callbacks for individual responses within a batched
* response.
*
* <p><b>Important:</b> For each batch, response callbacks are executed sequentially on a single
* thread. If a callback involves significant processing, the client should offload the work to
* separate threads to prevent delays in processing subsequent responses.
*/
private final Executor responseDistributionExecutor;
private MultiplexerAdapter(
Multiplexer<RequestT, ResponseT> multiplexer, Executor responseDistributionExecutor) {
this.multiplexer = multiplexer;
this.responseDistributionExecutor = responseDistributionExecutor;
}
@Override
public ListenableFuture<?> executeBatch(
List<RequestT> requests, ImmutableList<Operation<RequestT, ResponseT>> operations) {
ListenableFuture<List<ResponseT>> futureResponses =
multiplexer.execute(Lists.transform(operations, Operation::request));
addCallback(
futureResponses,
new FutureCallback<List<ResponseT>>() {
@Override
public void onFailure(Throwable t) {
for (Operation<RequestT, ResponseT> operation : operations) {
operation.setException(t);
}
}
@Override
public void onSuccess(List<ResponseT> responses) {
if (responses.size() != operations.size()) {
onFailure(
new AssertionError(
"RequestBatcher expected operations.size()="
+ operations.size()
+ " responses, but responses.size()="
+ responses.size()));
return;
}
for (int i = 0; i < responses.size(); ++i) {
operations.get(i).setResponse(responses.get(i));
}
}
},
responseDistributionExecutor);
return futureResponses;
}
}
private static final class CallbackMultiplexerAdapter<RequestT, ResponseT>
implements BatchExecutionStrategy<RequestT, ResponseT> {
private final CallbackMultiplexer<RequestT, ResponseT> multiplexer;
private CallbackMultiplexerAdapter(CallbackMultiplexer<RequestT, ResponseT> multiplexer) {
this.multiplexer = multiplexer;
}
@Override
public ListenableFuture<?> executeBatch(
List<RequestT> requests, ImmutableList<Operation<RequestT, ResponseT>> operations) {
Runnable batchCompleteCallback =
multiplexer.execute(Lists.transform(operations, Operation::request), operations);
return Futures.whenAllComplete(operations).run(batchCompleteCallback, directExecutor());
}
}
private static final class PerResponseMultiplexerAdapter<RequestT, ResponseT>
implements BatchExecutionStrategy<RequestT, ResponseT> {
private final PerResponseMultiplexer<RequestT, ResponseT> multiplexer;
private PerResponseMultiplexerAdapter(PerResponseMultiplexer<RequestT, ResponseT> multiplexer) {
this.multiplexer = multiplexer;
}
@Override
public ListenableFuture<?> executeBatch(
List<RequestT> requests, ImmutableList<Operation<RequestT, ResponseT>> operations) {
multiplexer.execute(Lists.transform(operations, Operation::request), operations);
for (Operation<RequestT, ResponseT> operation : operations) {
operation.errorIfFutureUnset();
}
return Futures.whenAllComplete(operations).run(() -> {}, directExecutor());
}
}
private static final int REQUEST_COUNT_MASK = 0x000F_FFFF;
private static final int ONE_REQUEST = 1;
private static final int ACTIVE_WORKERS_COUNT_BIT_OFFSET = 20;
private static final int ONE_ACTIVE_WORKER = 1 << ACTIVE_WORKERS_COUNT_BIT_OFFSET;
private static final int ACTIVE_WORKERS_COUNT_MAX = 0x0000_0FFF;
static {
checkState(
REQUEST_COUNT_MASK == ConcurrentFifo.CAPACITY_MASK,
"Request Count Constants inconsistent with ConcurrentFifo");
checkState(
ONE_REQUEST == (REQUEST_COUNT_MASK & -REQUEST_COUNT_MASK),
"Inconsistent Request Count Constants");
}
private static final Unsafe UNSAFE = UnsafeProvider.unsafe();
}