blob: 42104035474443431b429f569345f3ebab009a02 [file]
// Copyright 2026 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertThrows;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.concurrent.RequestBatching.CallbackMultiplexer;
import com.google.devtools.build.lib.concurrent.RequestBatching.FutureMultiplexer;
import com.google.devtools.build.lib.concurrent.RequestBatching.Multiplexer;
import com.google.devtools.build.lib.concurrent.RequestBatching.Operation;
import java.lang.invoke.VarHandle;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public final class EagerRequestBatcherTest {
@Test
public void simpleSubmit_executes() throws Exception {
var batcher =
EagerRequestBatcher.<Request, Response>create(
requests -> immediateFuture(respondTo(requests)),
directExecutor(),
/* maxBatchSize= */ 10,
/* targetConcurrentRequests= */ 1,
directExecutor());
ListenableFuture<Response> response = batcher.submit(new Request(1));
assertThat(response.get()).isEqualTo(new Response(1));
}
@Test
public void verifyEagerSendingBatchingAndCompletionFlows() throws Exception {
var multiplexer = new SettableMultiplexer();
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(
strategy, /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 2, directExecutor());
// Scenario A: Eager sending due to low concurrency.
// State established:
// - targetConcurrentRequests = 2, maxBatchSize = 10.
// - R1 and R2 are submitted and eagerly executed immediately as single-item batches
// because inFlightCount < targetConcurrentRequests.
// - Active batches: [R1], [R2] -> inFlightCount = 2.
// - R3 and R4 are submitted but queued because inFlightCount (2)
// >= targetConcurrentRequests (2) and queue size (2) < maxBatchSize (10).
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(0);
BatchedOperations batch1 = multiplexer.queue.take();
assertThat(batch1.requests()).containsExactly(new Request(1));
ListenableFuture<Response> r2 = batcher.submit(new Request(2));
assertThat(batcher.getInFlightCount()).isEqualTo(2);
assertThat(batcher.getQueueSize()).isEqualTo(0);
BatchedOperations batch2 = multiplexer.queue.take();
assertThat(batch2.requests()).containsExactly(new Request(2));
ListenableFuture<Response> r3 = batcher.submit(new Request(3));
assertThat(batcher.getInFlightCount()).isEqualTo(2);
assertThat(batcher.getQueueSize()).isEqualTo(1);
assertThat(multiplexer.queue).isEmpty();
ListenableFuture<Response> r4 = batcher.submit(new Request(4));
assertThat(batcher.getInFlightCount()).isEqualTo(2);
assertThat(batcher.getQueueSize()).isEqualTo(2);
assertThat(multiplexer.queue).isEmpty();
// Scenario B: Batching due to high concurrency (Max Batch Size trigger).
// State carried over from A:
// - Active batches: [R1], [R2] -> inFlightCount = 2.
// - Queued requests: [R3, R4] -> queueSize = 2.
// Action: Submit 8 more requests (R5 to R12) to reach maxBatchSize (10).
// State established:
// - The queue reaches maxBatchSize (10) and is flushed immediately as a batch [R3-R12].
// - Active batches: [R1], [R2], [R3-R12] -> inFlightCount = 3.
// - R13 is submitted and queued because inFlightCount (3) >= targetConcurrentRequests (2)
// and queue size (1) < maxBatchSize (10).
List<ListenableFuture<Response>> queuedResponses = new ArrayList<>();
queuedResponses.add(r3);
queuedResponses.add(r4);
for (int i = 5; i <= 12; i++) {
queuedResponses.add(batcher.submit(new Request(i)));
}
assertThat(batcher.getInFlightCount()).isEqualTo(3);
assertThat(batcher.getQueueSize()).isEqualTo(0);
BatchedOperations batch3 = multiplexer.queue.take();
assertThat(batch3.requests()).hasSize(10);
assertThat(batch3.requests().stream().map(Request::x).collect(toImmutableList()))
.containsExactly(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
.inOrder();
ListenableFuture<Response> r13 = batcher.submit(new Request(13));
assertThat(batcher.getInFlightCount()).isEqualTo(3);
assertThat(batcher.getQueueSize()).isEqualTo(1);
// Scenario C: Completion triggering queued work.
// State carried over from B:
// - Active batches: [R1], [R2], [R3-R12] -> inFlightCount = 3.
// - Queued requests: [R13] -> queueSize = 1.
// Action: Complete active batches and observe queue draining.
// State transitions:
// 1. Complete [R1] -> inFlightCount decrements to 2. R13 remains queued because
// inFlightCount (2) is not < targetConcurrentRequests (2).
// 2. Complete [R2] -> inFlightCount decrements to 1. Since inFlightCount (1) <
// targetConcurrentRequests (2), the queued R13 is eagerly flushed and executed.
// - Active batches: [R3-R12], [R13] -> inFlightCount = 2.
// - Queued requests: none.
batch1.setSimpleResponses();
assertThat(r1.get()).isEqualTo(new Response(1));
assertThat(batcher.getInFlightCount()).isEqualTo(2);
assertThat(batcher.getQueueSize()).isEqualTo(1);
assertThat(multiplexer.queue).isEmpty();
batch2.setSimpleResponses();
assertThat(r2.get()).isEqualTo(new Response(2));
assertThat(batcher.getInFlightCount()).isEqualTo(2);
assertThat(batcher.getQueueSize()).isEqualTo(0);
BatchedOperations batch4 = multiplexer.queue.take();
assertThat(batch4.requests()).containsExactly(new Request(13));
batch3.setSimpleResponses();
batch4.setSimpleResponses();
assertThat(r13.get()).isEqualTo(new Response(13));
for (int i = 0; i < queuedResponses.size(); i++) {
assertThat(queuedResponses.get(i).get()).isEqualTo(new Response(i + 3));
}
}
@Test
public void synchronousException_decrementsInFlightAndFailsFutures() throws Exception {
var failure = new RuntimeException("Sync Failure");
Multiplexer<Request, Response> faultyMultiplexer =
requests -> {
throw failure;
};
var strategy =
RequestBatching.createBatchExecutionStrategy(faultyMultiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(
strategy, /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, directExecutor());
ListenableFuture<Response> response = batcher.submit(new Request(1));
assertThat(batcher.getInFlightCount()).isEqualTo(0);
var thrown = assertThrows(ExecutionException.class, response::get);
assertThat(thrown).hasCauseThat().isEqualTo(failure);
// Verify we can still submit after failure
var multiplexer = new SettableMultiplexer();
var goodStrategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var goodBatcher =
new EagerRequestBatcher<>(
goodStrategy,
/* maxBatchSize= */ 10,
/* targetConcurrentRequests= */ 1,
directExecutor());
ListenableFuture<Response> goodResponse = goodBatcher.submit(new Request(2));
assertThat(goodBatcher.getInFlightCount()).isEqualTo(1);
multiplexer.queue.take().setSimpleResponses();
assertThat(goodResponse.get()).isEqualTo(new Response(2));
}
@Test
public void executor_runsCallbacksOnInjectedExecutor() throws Exception {
var multiplexer = new SettableMultiplexer();
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var executorThreads = new ConcurrentLinkedQueue<Thread>();
Executor recordingExecutor =
command ->
new Thread(
() -> {
executorThreads.add(Thread.currentThread());
command.run();
})
.start();
var batcher =
new EagerRequestBatcher<>(
strategy, /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, recordingExecutor);
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
BatchedOperations batch1 = multiplexer.queue.take();
// Queue a second request
ListenableFuture<Response> r2 = batcher.submit(new Request(2));
assertThat(batcher.getQueueSize()).isEqualTo(1);
// Complete the first batch. This should trigger onBatchComplete on the recordingExecutor.
batch1.setSimpleResponses();
r1.get(); // Wait for completion
// Wait for the second batch to be executed (it should be triggered by onBatchComplete)
BatchedOperations batch2 = multiplexer.queue.take();
assertThat(batch2.requests()).containsExactly(new Request(2));
// Verify that onBatchComplete ran on a thread from recordingExecutor
assertThat(executorThreads).isNotEmpty();
Thread callbackThread = executorThreads.peek();
assertThat(callbackThread).isNotEqualTo(Thread.currentThread());
batch2.setSimpleResponses();
assertThat(r2.get()).isEqualTo(new Response(2));
}
@Test
public void nestedSubmissions_workCorrectly() throws Exception {
var multiplexer = new SettableMultiplexer();
var batcherRef = new AtomicReference<EagerRequestBatcher<Request, Response>>();
var nestedResponseRef = new AtomicReference<ListenableFuture<Response>>();
var interceptingMultiplexer =
new Multiplexer<Request, Response>() {
private boolean submittedNested = false;
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
if (!submittedNested) {
submittedNested = true;
// Submit a nested request. This will run on the same thread.
nestedResponseRef.set(batchRefRef(batcherRef).submit(new Request(99)));
}
return multiplexer.execute(requests);
}
};
var goodStrategy =
RequestBatching.createBatchExecutionStrategy(interceptingMultiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(
goodStrategy,
/* maxBatchSize= */ 10,
/* targetConcurrentRequests= */ 1,
directExecutor());
batcherRef.set(batcher);
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
// At this point, interceptingMultiplexer should have run, and submitted R99.
// R1 triggered immediate execution, so it called execute().
// Inside execute(), R99 was submitted.
// Since targetConcurrentRequests is 1, and inFlightCount is 1 (for R1), R99 should be queued.
assertThat(batcher.getQueueSize()).isEqualTo(1);
assertThat(batcher.getInFlightCount()).isEqualTo(1);
BatchedOperations batch1 = multiplexer.queue.take();
assertThat(batch1.requests()).containsExactly(new Request(1));
// Complete batch 1. This should trigger execution of R99.
batch1.setSimpleResponses();
r1.get();
BatchedOperations batch2 = multiplexer.queue.take();
assertThat(batch2.requests()).containsExactly(new Request(99));
batch2.setSimpleResponses();
assertThat(nestedResponseRef.get().get()).isEqualTo(new Response(99));
}
private static <T, R> EagerRequestBatcher<T, R> batchRefRef(
AtomicReference<EagerRequestBatcher<T, R>> ref) {
return ref.get();
}
@Test
public void callbackMultiplexer_integration() throws Exception {
var events = new LinkedBlockingQueue<String>();
CallbackMultiplexer<Request, Response> callbackMultiplexer =
(requests, sinks) -> {
events.add("execute");
for (int i = 0; i < requests.size(); i++) {
sinks.get(i).acceptResponse(new Response(requests.get(i).x()));
}
return () -> events.add("cleanup");
};
var batcher =
EagerRequestBatcher.<Request, Response>createWithCallbackMultiplexer(
callbackMultiplexer,
/* maxBatchSize= */ 2,
/* targetConcurrentRequests= */ 1,
directExecutor());
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
assertThat(r1.get()).isEqualTo(new Response(1));
assertThat(events.take()).isEqualTo("execute");
assertThat(events.take()).isEqualTo("cleanup");
}
@Test
public void futureMultiplexer_integration() throws Exception {
FutureMultiplexer<Request, Response> futureMultiplexer =
(requests, sinks) -> {
for (int i = 0; i < requests.size(); i++) {
sinks.get(i).acceptFuture(immediateFuture(new Response(requests.get(i).x())));
}
};
var batcher =
EagerRequestBatcher.<Request, Response>createWithFutureMultiplexer(
futureMultiplexer,
/* maxBatchSize= */ 2,
/* targetConcurrentRequests= */ 1,
directExecutor());
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
assertThat(r1.get()).isEqualTo(new Response(1));
}
@Test
public void parameterValidation() {
var strategy =
RequestBatching.createBatchExecutionStrategy(new SettableMultiplexer(), directExecutor());
// Validate maxBatchSize
assertThrows(
IllegalArgumentException.class,
() -> new EagerRequestBatcher<>(strategy, 0, 1, directExecutor()));
assertThrows(
IllegalArgumentException.class,
() -> new EagerRequestBatcher<>(strategy, -1, 1, directExecutor()));
// Validate targetConcurrentRequests
assertThrows(
IllegalArgumentException.class,
() -> new EagerRequestBatcher<>(strategy, 10, 0, directExecutor()));
assertThrows(
IllegalArgumentException.class,
() -> new EagerRequestBatcher<>(strategy, 10, -1, directExecutor()));
}
private static class SettableMultiplexer implements Multiplexer<Request, Response> {
private final LinkedBlockingQueue<BatchedOperations> queue = new LinkedBlockingQueue<>();
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
var responses = SettableFuture.<List<Response>>create();
queue.add(new BatchedOperations(requests, responses));
return responses;
}
}
private record BatchedOperations(
List<Request> requests, SettableFuture<List<Response>> responses) {
private void setSimpleResponses() {
responses().set(respondTo(requests()));
}
}
@Test
public void targetConcurrencyStrictness_underHighContention() throws Exception {
final int targetConcurrency = 4;
final int maxBatchSize = 10_000;
// With these parameters, there are at most 6_400 requests sent, which is less than the
// maxBatchSize of 10_000 so concurrency should never exceed targetConcurrency.
final int numThreads = 32;
final int submissionsPerThread = 200;
// Under high concurrent load, we need to artificially introduce execution latency
// at the multiplexer level to "stretch" the execution window. This allows concurrent
// executions to pile up to the targetConcurrency (4) so we can verify that the batcher
// strictly bounds this overlap and does not exceed it.
// (Note: The baseline concurrent execution of >= 2 is guaranteed by active coordination
// inside LatencyMultiplexer, but this latency is still required to reach peak concurrency).
final int latencyMs = 20;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
ExecutorService multiplexerExecutor = Executors.newCachedThreadPool();
try {
LatencyMultiplexer multiplexer = new LatencyMultiplexer(latencyMs, multiplexerExecutor);
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor());
CyclicBarrier barrier = new CyclicBarrier(numThreads);
CountDownLatch latch = new CountDownLatch(numThreads);
ConcurrentLinkedQueue<ListenableFuture<Response>> futures = new ConcurrentLinkedQueue<>();
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
executorService.execute(
() -> {
try {
barrier.await();
for (int j = 0; j < submissionsPerThread; j++) {
futures.add(batcher.submit(new Request(threadId * 1000 + j)));
}
} catch (InterruptedException e) {
multiplexer.errors.add(e);
Thread.currentThread().interrupt();
} catch (Exception e) {
multiplexer.errors.add(e);
} finally {
latch.countDown();
}
});
}
latch.await();
for (ListenableFuture<Response> future : futures) {
future.get();
}
assertThat(multiplexer.errors).isEmpty();
assertThat(multiplexer.maxConcurrentExecutions.get()).isAtMost(targetConcurrency);
// Under remote scheduling delays, we might not always reach peak target concurrency
// concurrently, but we must at least verify concurrent execution (>= 2) without exceeding
// target concurrency limit.
assertThat(multiplexer.maxConcurrentExecutions.get()).isAtLeast(2);
} finally {
executorService.shutdown();
multiplexerExecutor.shutdown();
}
}
@Test
public void saturationFlush_bypassesConcurrencyLimit() throws Exception {
int targetConcurrency = 1;
int maxBatchSize = 10;
var multiplexer = new SettableMultiplexer();
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor());
List<ListenableFuture<Response>> futures = new ArrayList<>();
for (int i = 1; i <= 50; i++) {
futures.add(batcher.submit(new Request(i)));
}
assertThat(multiplexer.queue).hasSize(5);
BatchedOperations b1 = multiplexer.queue.take();
assertThat(b1.requests()).containsExactly(new Request(1));
BatchedOperations b2 = multiplexer.queue.take();
assertThat(b2.requests()).hasSize(10);
assertThat(b2.requests().stream().map(Request::x).collect(toImmutableList()))
.containsExactly(2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
BatchedOperations b3 = multiplexer.queue.take();
assertThat(b3.requests()).hasSize(10);
BatchedOperations b4 = multiplexer.queue.take();
assertThat(b4.requests()).hasSize(10);
BatchedOperations b5 = multiplexer.queue.take();
assertThat(b5.requests()).hasSize(10);
assertThat(multiplexer.queue).isEmpty();
assertThat(batcher.getQueueSize()).isEqualTo(9);
assertThat(batcher.getInFlightCount()).isEqualTo(5);
// Complete Batch 1. inFlightCount should go 5 -> 4.
b1.setSimpleResponses();
futures.get(0).get();
assertThat(batcher.getInFlightCount()).isEqualTo(4);
assertThat(batcher.getQueueSize()).isEqualTo(9);
// Complete Batch 2. inFlightCount should go 4 -> 3.
b2.setSimpleResponses();
futures.get(1).get();
assertThat(batcher.getInFlightCount()).isEqualTo(3);
assertThat(batcher.getQueueSize()).isEqualTo(9);
// Complete Batch 3. inFlightCount should go 3 -> 2.
b3.setSimpleResponses();
futures.get(11).get();
assertThat(batcher.getInFlightCount()).isEqualTo(2);
assertThat(batcher.getQueueSize()).isEqualTo(9);
// Complete Batch 4. inFlightCount should go 2 -> 1.
b4.setSimpleResponses();
futures.get(21).get();
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(9);
// Complete Batch 5. This should reopen capacity (1 -> 0) and trigger flush of active buffer (0
// -> 1).
b5.setSimpleResponses();
futures.get(31).get();
BatchedOperations b6 = multiplexer.queue.take();
assertThat(b6.requests()).hasSize(9);
assertThat(b6.requests().stream().map(Request::x).collect(toImmutableList()))
.containsExactly(42, 43, 44, 45, 46, 47, 48, 49, 50);
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(0);
b6.setSimpleResponses();
for (ListenableFuture<Response> future : futures) {
future.get();
}
assertThat(batcher.getInFlightCount()).isEqualTo(0);
}
@Test
public void exceptionHandling_preventsSlotLeaking() throws Exception {
int targetConcurrency = 1;
int maxBatchSize = 10;
var multiplexer = new MockExceptionMultiplexer();
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor());
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
assertThat(batcher.getInFlightCount()).isEqualTo(0);
var thrownSync = assertThrows(ExecutionException.class, r1::get);
assertThat(thrownSync).hasCauseThat().isEqualTo(multiplexer.syncException);
ListenableFuture<Response> r2 = batcher.submit(new Request(2));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(multiplexer.futures).hasSize(1);
SettableFuture<List<Response>> f2 = multiplexer.futures.get(0);
ListenableFuture<Response> r3 = batcher.submit(new Request(3));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(1);
var asyncException = new RuntimeException("Async Failure");
f2.setException(asyncException);
var thrownAsync = assertThrows(ExecutionException.class, r2::get);
assertThat(thrownAsync).hasCauseThat().isEqualTo(asyncException);
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(0);
assertThat(multiplexer.futures).hasSize(2);
SettableFuture<List<Response>> f3 = multiplexer.futures.get(1);
f3.set(respondTo(ImmutableList.of(new Request(3))));
assertThat(r3.get()).isEqualTo(new Response(3));
assertThat(batcher.getInFlightCount()).isEqualTo(0);
}
@Test
public void testStalledWriterProgressGracefulDegradation() throws Exception {
int targetConcurrency = 1;
int maxBatchSize = 3;
var multiplexer = new SettableMultiplexer();
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor());
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
BatchedOperations batch1 = multiplexer.queue.take();
assertThat(batch1.requests()).containsExactly(new Request(1));
// Get the active buffer which is currently empty (size=0, refs=1).
Object bStalled = batcher.getActiveBufferForTesting();
VarHandle sizeAndRefsVarHandle = EagerRequestBatcher.getBufferSizeAndRefsVarHandleForTesting();
long initial = (long) sizeAndRefsVarHandle.get(bStalled);
assertThat(EagerRequestBatcher.getBufferSizeAndRefsVarHandleForTesting()).isNotNull();
// Simulates a "stalled writer" thread (Thread S).
// We set size=1, refs=2.
// This simulates that Thread S has successfully reserved slot 0 (incrementing size to 1 and
// refs to 2), but it has got stalled *before* writing the element to the array, and *before*
// decrementing refs back. So the buffer is locked in a state where it has 1 element (not yet
// written) and 1 active writer (plus 1 barrier).
long stalledValue = 0x0000_0001_0000_0002L;
boolean success = sizeAndRefsVarHandle.compareAndSet(bStalled, initial, stalledValue);
assertThat(success).isTrue();
// Submit a normal request. It will see size=1, reserve slot 1, write elements[1], and decrement
// refs. Buffer state becomes size=2, refs=2 (1 for stalled thread, 1 for barrier).
ListenableFuture<Response> r2 = batcher.submit(new Request(2));
assertThat(batcher.getInFlightCount())
.isEqualTo(1); // Still 1 because r2 is not sent (over capacity)
// Submit another request. It will see size=2 (which is maxBatchSize-1).
// This triggers Rule R2 (Saturation Flush).
// It detaches the buffer (allocating a new active buffer with size=0, refs=1), writes
// Request(3) to elements[2], and decrements refs (size becomes 3, refs becomes 1). Since refs
// is 1 (stalled thread still has a reference), it does NOT execute the buffer. The buffer
// remains detached but unexecuted.
ListenableFuture<Response> r3 = batcher.submit(new Request(3));
assertThat(batcher.getInFlightCount())
.isEqualTo(2); // inFlight count increases because we detached the buffer
assertThat(batcher.getQueueSize()).isEqualTo(0); // Active buffer is now a fresh, empty one
assertThat(multiplexer.queue).isEmpty(); // bStalled is NOT sent yet
// Submit more requests to the NEW active buffer.
// Since targetConcurrency=1 and inFlight=2, eager sending is disabled.
// However, saturation flush (Rule R2) is still active.
// Submitting 3 more requests will fill the new buffer (size 3) and trigger a saturation flush,
// proving GRACEFUL DEGRADATION: the stalled thread in the first buffer does NOT block
// progress of subsequent buffers.
ListenableFuture<Response> r4 = batcher.submit(new Request(4));
ListenableFuture<Response> r5 = batcher.submit(new Request(5));
ListenableFuture<Response> r6 = batcher.submit(new Request(6));
assertThat(batcher.getInFlightCount())
.isEqualTo(3); // inFlight increases to 3 (batch1, bStalled, and batchNext)
BatchedOperations batchNext = multiplexer.queue.take();
assertThat(batchNext.requests())
.containsExactly(new Request(4), new Request(5), new Request(6));
// Complete the executed batches to clean up capacity.
batch1.setSimpleResponses();
batchNext.setSimpleResponses();
r1.get();
r4.get();
r5.get();
r6.get();
assertThat(batcher.getInFlightCount()).isEqualTo(1); // Only bStalled remains in-flight
// Now simulate the stalled thread resuming.
// 1. It finally writes its element to slot 0.
var stalledOp = new Operation<Request, Response>(new Request(99));
Field elementsField = bStalled.getClass().getDeclaredField("elements");
elementsField.setAccessible(true);
Object[] elements = (Object[]) elementsField.get(bStalled);
elements[0] = stalledOp;
// 2. It decrements its reference count (size remains 3, refs becomes 0).
long postDecrement = (long) sizeAndRefsVarHandle.getAndAdd(bStalled, -1L) - 1L;
assertThat((int) postDecrement).isEqualTo(0); // We are the last reference!
// 3. Since we are the last reference (refs=0), we trigger execution.
// In production, the thread doing the decrement would call execute(buffer).
// Here we invoke it manually via reflection to complete the simulation.
Method executeMethod = batcher.getClass().getDeclaredMethod("execute", bStalled.getClass());
executeMethod.setAccessible(true);
executeMethod.invoke(batcher, bStalled);
// Verify that the stalled batch is successfully executed and contains all elements in correct
// order.
BatchedOperations batchStalled = multiplexer.queue.take();
assertThat(batchStalled.requests())
.containsExactly(new Request(99), new Request(2), new Request(3));
batchStalled.setSimpleResponses();
stalledOp.get();
assertThat(r2.get()).isEqualTo(new Response(2));
assertThat(r3.get()).isEqualTo(new Response(3));
assertThat(batcher.getInFlightCount()).isEqualTo(0);
}
@Test
public void highConcurrency_exactlyOnceDispatch() throws Exception {
int targetConcurrency = 8;
int maxBatchSize = 50;
int numThreads = 32;
int submissionsPerThread = 5000;
ExecutorService submitExecutor = Executors.newFixedThreadPool(numThreads);
ExecutorService multiplexerExecutor = Executors.newCachedThreadPool();
try {
AsyncCountingMultiplexer multiplexer = new AsyncCountingMultiplexer(multiplexerExecutor);
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor());
CyclicBarrier barrier = new CyclicBarrier(numThreads);
CountDownLatch latch = new CountDownLatch(numThreads);
ConcurrentLinkedQueue<ListenableFuture<Response>> futures = new ConcurrentLinkedQueue<>();
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
submitExecutor.execute(
() -> {
try {
barrier.await();
for (int j = 0; j < submissionsPerThread; j++) {
futures.add(batcher.submit(new Request(threadId * 100000 + j)));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// Ignore other exceptions
} finally {
latch.countDown();
}
});
}
latch.await();
for (ListenableFuture<Response> future : futures) {
future.get();
}
int totalExpected = numThreads * submissionsPerThread;
assertThat(multiplexer.totalExecutions.get()).isEqualTo(totalExpected);
assertThat(multiplexer.executionCounts).hasSize(totalExpected);
for (AtomicInteger count : multiplexer.executionCounts.values()) {
assertThat(count.get()).isEqualTo(1);
}
assertThat(batcher.getInFlightCount()).isEqualTo(0);
assertThat(batcher.getQueueSize()).isEqualTo(0);
} finally {
submitExecutor.shutdown();
multiplexerExecutor.shutdown();
}
}
@Test
public void capacityRecoveryEagerlySendsAccumulatedRequests() throws Exception {
int targetConcurrency = 1;
int maxBatchSize = 10;
var multiplexer = new SettableMultiplexer();
var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor());
var batcher =
new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor());
ListenableFuture<Response> r1 = batcher.submit(new Request(1));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(0);
BatchedOperations batch1 = multiplexer.queue.take();
assertThat(batch1.requests()).containsExactly(new Request(1));
ListenableFuture<Response> r2 = batcher.submit(new Request(2));
ListenableFuture<Response> r3 = batcher.submit(new Request(3));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(2);
assertThat(multiplexer.queue).isEmpty();
batch1.setSimpleResponses();
r1.get();
BatchedOperations batch2 = multiplexer.queue.take();
assertThat(batch2.requests()).containsExactly(new Request(2), new Request(3));
assertThat(batcher.getInFlightCount()).isEqualTo(1);
assertThat(batcher.getQueueSize()).isEqualTo(0);
batch2.setSimpleResponses();
assertThat(r2.get()).isEqualTo(new Response(2));
assertThat(r3.get()).isEqualTo(new Response(3));
assertThat(batcher.getInFlightCount()).isEqualTo(0);
}
private static class LatencyMultiplexer implements Multiplexer<Request, Response> {
private final int latencyMs;
private final Executor executor;
private final AtomicInteger activeExecutions = new AtomicInteger(0);
private final AtomicInteger maxConcurrentExecutions = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
// Coordinated latch to force overlap
private final CountDownLatch concurrencyLatch = new CountDownLatch(1);
private final AtomicBoolean coordinationFailed = new AtomicBoolean(false);
private LatencyMultiplexer(int latencyMs, Executor executor) {
this.latencyMs = latencyMs;
this.executor = executor;
}
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
SettableFuture<List<Response>> future = SettableFuture.create();
executor.execute(
() -> {
int active = activeExecutions.incrementAndGet();
maxConcurrentExecutions.accumulateAndGet(active, Math::max);
if (active >= 2) {
// Signal that we have reached concurrency >= 2
concurrencyLatch.countDown();
} else {
// We are the first one. If we haven't failed coordination yet,
// wait for a second one to join to guarantee overlap.
if (!coordinationFailed.get()) {
try {
// Wait up to 500ms. If no other task joins, we time out and proceed.
// This avoids deadlock if the batcher is broken and limits concurrency to 1.
if (!concurrencyLatch.await(500, MILLISECONDS)) {
// Mark coordination as failed so subsequent tasks don't keep waiting.
coordinationFailed.set(true);
}
} catch (InterruptedException e) {
errors.add(e);
future.setException(e);
Thread.currentThread().interrupt();
activeExecutions.decrementAndGet();
return;
}
}
}
try {
Thread.sleep(latencyMs);
} catch (InterruptedException e) {
errors.add(e);
future.setException(e);
Thread.currentThread().interrupt();
activeExecutions.decrementAndGet();
return;
}
activeExecutions.decrementAndGet();
future.set(respondTo(requests));
});
return future;
}
}
private static class MockExceptionMultiplexer implements Multiplexer<Request, Response> {
private final List<SettableFuture<List<Response>>> futures = new ArrayList<>();
private int executeCount = 0;
private final RuntimeException syncException = new RuntimeException("Sync Failure");
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
executeCount++;
if (executeCount == 1) {
throw syncException;
}
var future = SettableFuture.<List<Response>>create();
futures.add(future);
return future;
}
}
private static class AsyncCountingMultiplexer implements Multiplexer<Request, Response> {
private final ConcurrentHashMap<Request, AtomicInteger> executionCounts =
new ConcurrentHashMap<>();
private final AtomicInteger totalExecutions = new AtomicInteger(0);
private final Executor executor;
private AsyncCountingMultiplexer(Executor executor) {
this.executor = executor;
}
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
SettableFuture<List<Response>> future = SettableFuture.create();
executor.execute(
() -> {
totalExecutions.addAndGet(requests.size());
for (Request request : requests) {
executionCounts.computeIfAbsent(request, r -> new AtomicInteger(0)).incrementAndGet();
}
future.set(respondTo(requests));
});
return future;
}
}
private record Request(int x) {}
private record Response(int x) {}
private static List<Response> respondTo(List<Request> requests) {
return Lists.transform(requests, request -> new Response(request.x()));
}
}