blob: e427303303f1d79fbd53802c06e45286e2b784ee [file] [log] [blame]
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress;
import static com.google.devtools.build.lib.concurrent.RequestBatcher.createBatchExecutionStrategy;
import static com.google.devtools.build.lib.concurrent.RequestBatcher.createCallbackBatchExecutionStrategy;
import static com.google.devtools.build.lib.concurrent.RequestBatcher.createPerResponseBatchExecutionStrategy;
import static java.util.concurrent.ForkJoinPool.commonPool;
import static org.junit.Assert.assertThrows;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.concurrent.RequestBatcher.Operation;
import com.google.devtools.build.lib.concurrent.RequestBatcher.ResponseSink;
import com.google.devtools.build.lib.unsafe.UnsafeProvider;
import java.lang.ref.Cleaner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import sun.misc.Unsafe;
@RunWith(JUnit4.class)
@SuppressWarnings("SunApi") // TODO: b/359688989 - clean this up
public final class RequestBatcherTest {
private static final Cleaner cleaner = Cleaner.create();
@Test
public void simpleSubmit_executes() throws Exception {
// This covers Step 1A in the documentation.
var batcher =
RequestBatcher.<Request, Response>create(
createBatchExecutionStrategy(
requests -> immediateFuture(respondTo(requests)), commonPool()),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 1);
ListenableFuture<Response> response = batcher.submit(new Request(1));
assertThat(response.get()).isEqualTo(new Response(1));
}
@Test
public void queueOverflow_sleeps() throws Exception {
// This covers the overflow case of Step 1B in the documentation.
int batchSize = 256;
var multiplexer = new SettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
createBatchExecutionStrategy(multiplexer, commonPool()),
/* maxBatchSize= */ batchSize - 1,
/* maxConcurrentRequests= */ 1);
ListenableFuture<Response> response0 = batcher.submit(new Request(0));
BatchedOperations requestResponses0 = multiplexer.queue.take();
// The first worker is busy until requestResponse0 is populated.
var responses = new ArrayList<ListenableFuture<Response>>();
// With the single available worker being busy, we can completely fill the queue.
for (int i = 0; i < ConcurrentFifo.MAX_ELEMENTS; i++) {
responses.add(batcher.submit(new Request(i + 1)));
}
// The next request triggers a queue overflow. Since this ends up blocking, we do it in another
// thread.
var overflowStarting = new CountDownLatch(1);
var overflowAdded = new CountDownLatch(1);
// A new thread needs must used here instead of commonPool because there are test environments
// where commonPool has only a single thread.
new Thread(
() -> {
overflowStarting.countDown();
responses.add(batcher.submit(new Request(ConcurrentFifo.MAX_ELEMENTS + 1)));
overflowAdded.countDown();
})
.start();
// The following assertion will occasionally fail if the overflow submit above does not block.
overflowStarting.await();
assertThat(responses).hasSize(ConcurrentFifo.MAX_ELEMENTS);
// Responding to the first batch enables the overflowing element to enter the queue.
requestResponses0.setSimpleResponses();
assertThat(response0.get()).isEqualTo(new Response(0));
overflowAdded.await();
assertThat(responses).hasSize(ConcurrentFifo.MAX_ELEMENTS + 1);
// Responds to all remaining batches.
int batchCount = responses.size() / batchSize;
assertThat(responses).hasSize(batchCount * batchSize);
for (int i = 0; i < batchCount; i++) {
multiplexer.queue.take().setSimpleResponses();
}
for (int i = 0; i < ConcurrentFifo.MAX_ELEMENTS + 1; i++) {
assertThat(responses.get(i).get()).isEqualTo(new Response(i + 1));
}
}
@Test
public void submitWithWorkersFull_enqueuesThenExecutes() throws Exception {
// This covers Step 1B, Step 2A and Step 3 of the documentation.
var multiplexer = new SettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
createBatchExecutionStrategy(multiplexer, commonPool()),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 1);
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
BatchedOperations requestResponses1 = multiplexer.queue.take();
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
// The first batch is not yet complete. The 2nd request waits in an internal queue. Ideally, we
// could make a stronger assertion here, that the 2nd batch executes only after the first one is
// done.
assertThat(multiplexer.queue).isEmpty();
requestResponses1.setSimpleResponses();
// With the first batch done, the worker picks up the enqueued 2nd request and executes it.
BatchedOperations requestResponses2 = multiplexer.queue.take();
requestResponses2.setSimpleResponses();
assertThat(response1.get()).isEqualTo(new Response(1));
assertThat(response2.get()).isEqualTo(new Response(2));
}
// TODO: b/386384684 - remove Unsafe usage
@Test
public void concurrentWorkCompletion_startsNewWorker() throws Exception {
// This covers Step 1B and Step 2B of the documentation.
// This test uses fakes to achieve the narrow set of conditions needed to reach this code path.
long baseAddress = createPaddedBaseAddress(4);
var queueDrainingExecutor = new FakeExecutor();
var fifo =
new FakeConcurrentFifo(
getAlignedAddress(baseAddress, /* offset= */ 1),
getAlignedAddress(baseAddress, /* offset= */ 2),
getAlignedAddress(baseAddress, /* offset= */ 3));
long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
var batcher =
new RequestBatcher<Request, Response>(
/* queueDrainingExecutor= */ queueDrainingExecutor,
createBatchExecutionStrategy(
requests -> immediateFuture(respondTo(requests)), commonPool()),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 1,
countersAddress,
fifo);
cleaner.register(batcher, new AddressFreer(baseAddress));
// Submits a request. This starts a worker to run the batch, but it gets blocked on
// `queueDrainingExecutor` and can't continue.
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
// Submits a 2nd request. This request observes that there are enough active workers so it tries
// to enqueue an element. It gets blocked at the queue.
@SuppressWarnings("FutureReturnValueIgnored")
var response2 = SettableFuture.<ListenableFuture<Response>>create();
commonPool().execute(() -> response2.set(batcher.submit(new Request(2))));
// Waits until the 2nd request starts enqueuing.
fifo.tryAppendTokens.acquireUninterruptibly();
// Allows the 1st worker to continue. This calls an enqueued `continueToNextBatchOrBecomeIdle`
// invocation that will cause the 1st worker to go idle.
queueDrainingExecutor.queue.take().run();
assertThat(response1.get()).isEqualTo(new Response(1));
// Verifies that there's absolutely nothing inflight in the batcher.
assertThat(UNSAFE.getIntVolatile(null, countersAddress)).isEqualTo(0);
// Allows the 2nd request to enqueue and complete processing.
fifo.appendPermits.release();
queueDrainingExecutor.queue.take().run();
assertThat(response2.get().get()).isEqualTo(new Response(2));
}
@Test
public void randomRaces_executeCorrectly() throws Exception {
var batcher =
RequestBatcher.<Request, Response>create(
createBatchExecutionStrategy(
requests -> immediateFuture(respondTo(requests)), commonPool()),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 4);
var results = new ConcurrentLinkedQueue<ListenableFuture<Void>>();
final int requestCount = 4_000_000;
var allStarted = new CountDownLatch(requestCount);
for (int i = 0; i < requestCount; i++) {
final int iForCapture = i;
commonPool()
.execute(
() -> {
results.add(
Futures.transformAsync(
batcher.submit(new Request(iForCapture)),
response ->
response.x() == iForCapture
? immediateVoidFuture()
: immediateFailedFuture(
new AssertionError(
String.format(
"expected %d got %s", iForCapture, response))),
directExecutor()));
allStarted.countDown();
});
}
allStarted.await();
// Throws ExecutionException if there are any errors.
var unused = Futures.whenAllSucceed(results).call(() -> null, directExecutor()).get();
}
private static class FakeExecutor implements Executor {
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override
public void execute(Runnable runnable) {
queue.add(runnable);
}
}
@Test
public void perResponseMultiplexer_simpleSubmit_executes() throws Exception {
var batcher =
RequestBatcher.<Request, Response>create(
createPerResponseBatchExecutionStrategy(
(requests, sinks) -> {
assertThat(requests).hasSize(1);
assertThat(sinks).hasSize(1);
sinks.get(0).acceptFutureResponse(immediateFuture(new Response(1)));
}),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 1);
ListenableFuture<Response> response = batcher.submit(new Request(1));
assertThat(response.get()).isEqualTo(new Response(1));
}
@Test
public void perResponseMultiplexer_batching_succeeds() throws Exception {
var multiplexer = new PerResponseSettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
createPerResponseBatchExecutionStrategy(multiplexer),
/* maxBatchSize= */ 1, // actual batch size is 2
/* maxConcurrentRequests= */ 1);
// Block the first worker
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
BatchedPerResponseRequests batch1 = multiplexer.queue.take();
assertThat(batch1.requests()).hasSize(1);
// These will get enqueued because the worker is busy
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
ListenableFuture<Response> response3 = batcher.submit(new Request(3));
ListenableFuture<Response> response4 = batcher.submit(new Request(4));
// Unblock the first worker, allowing the next batch to be processed.
batch1.setSimpleSuccessResponses();
assertThat(response1.get()).isEqualTo(new Response(1));
// The next batch should contain requests 2 and 3.
BatchedPerResponseRequests batch2 = multiplexer.queue.take();
assertThat(batch2.requests()).hasSize(2);
assertThat(batch2.requests().stream().map(Request::x)).containsExactly(2, 3).inOrder();
batch2.setSimpleSuccessResponses();
// The final batch should contain request 4.
BatchedPerResponseRequests batch3 = multiplexer.queue.take();
assertThat(batch3.requests()).hasSize(1);
assertThat(batch3.requests().get(0).x()).isEqualTo(4);
batch3.setSimpleSuccessResponses();
// Verify all responses
assertThat(response2.get()).isEqualTo(new Response(2));
assertThat(response3.get()).isEqualTo(new Response(3));
assertThat(response4.get()).isEqualTo(new Response(4));
}
@Test
public void perResponseMultiplexer_individualResponseFailure_isIsolated() throws Exception {
var multiplexer = new PerResponseSettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
createPerResponseBatchExecutionStrategy(multiplexer),
/* maxBatchSize= */ 1, // actual batch size is 2
/* maxConcurrentRequests= */ 1);
// Blocks the first worker to allow a batch to form.
ListenableFuture<Response> response0 = batcher.submit(new Request(0));
BatchedPerResponseRequests batch0 = multiplexer.queue.take();
assertThat(batch0.requests()).hasSize(1);
// These two will be enqueued and batched together because the worker is busy.
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
// Unblocks the first worker, allowing the next batch to be processed.
batch0.setSimpleSuccessResponses();
assertThat(response0.get()).isEqualTo(new Response(0));
// Wait for the batch to be sent to the multiplexer.
BatchedPerResponseRequests batch = multiplexer.queue.take();
assertThat(batch.requests()).hasSize(2);
// Fulfill the first future with success and the second with failure.
var failure = new IllegalStateException("Individual failure");
batch.settableFutures().get(0).set(new Response(1));
batch.settableFutures().get(1).setException(failure);
// Assert the first future succeeded and the second failed correctly.
assertThat(response1.get()).isEqualTo(new Response(1));
var thrown = assertThrows(ExecutionException.class, response2::get);
assertThat(thrown).hasCauseThat().isEqualTo(failure);
}
@Test
public void perResponseMultiplexer_missingFuture_throwsIllegalState() throws Exception {
var futureResponses = new LinkedBlockingQueue<SettableFuture<Response>>();
var multiplexer =
new RequestBatcher.PerResponseMultiplexer<Request, Response>() {
@Override
public void execute(
List<Request> requests,
ImmutableList<? extends RequestBatcher.FutureResponseSink<Response>> sinks) {
// Faulty implementation: only sets the first future in the batch, and "forgets" to set
// the rest.
var futureResponse = SettableFuture.<Response>create();
futureResponses.add(futureResponse);
sinks.get(0).acceptFutureResponse(futureResponse);
}
};
var batcher =
RequestBatcher.<Request, Response>create(
createPerResponseBatchExecutionStrategy(multiplexer),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 1);
// Blocks the first worker to allow a batch to form.
ListenableFuture<Response> response0 = batcher.submit(new Request(0));
SettableFuture<Response> responseSetter0 = futureResponses.take();
// These two will be batched together.
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
// Unblocks the first worker, allowing the next batch to be processed.
responseSetter0.set(new Response(0));
assertThat(response0.get()).isEqualTo(new Response(0));
// The multiplexer will set the future for request 1, but not for 2.
futureResponses.take().set(new Response(1));
assertThat(response1.get()).isEqualTo(new Response(1));
assertThat(futureResponses).isEmpty();
var thrown = assertThrows(ExecutionException.class, response2::get);
assertThat(thrown).hasCauseThat().isInstanceOf(IllegalStateException.class);
assertThat(thrown)
.hasCauseThat()
.hasMessageThat()
.contains("Future for Request[x=2] is unexpectedly not set");
}
@Test
public void cancelledRequest_doesNotCrash() throws Exception {
var uncaughtException = new AtomicReference<Throwable>();
var crashDetectingExecutor =
new Executor() {
@Override
public void execute(Runnable command) {
try {
command.run();
} catch (Throwable t) {
uncaughtException.set(t);
}
}
};
var multiplexer = new SettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
createBatchExecutionStrategy(multiplexer, crashDetectingExecutor),
/* maxBatchSize= */ 255,
/* maxConcurrentRequests= */ 1);
ListenableFuture<Response> response = batcher.submit(new Request(1));
response.cancel(true);
BatchedOperations requestResponses = multiplexer.queue.take();
requestResponses.setSimpleResponses();
assertThat(response.isCancelled()).isTrue();
assertThat(uncaughtException.get()).isNull();
}
@Test
public void callbackMultiplexer_allSucceed() throws Exception {
var events = new LinkedBlockingQueue<BatcherEvent>();
RequestBatcher<Request, Response> batcher = createCallbackMultiplexerBatcher(events);
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
// A batch begins executing immediately with response1.
var firstBatch = (BatchOperation) events.poll();
// The next 2 requests are enqueued.
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
ListenableFuture<Response> response3 = batcher.submit(new Request(3));
// No other batches have started and no callbacks have been called.
assertThat(events.poll()).isNull();
firstBatch.defaultReplyAll();
assertThat(response1.get()).isEqualTo(new Response(1));
// The done callback is always called first.
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
var secondBatch = (BatchOperation) events.take();
secondBatch.defaultReplyAll();
assertThat(response2.get()).isEqualTo(new Response(2));
assertThat(response3.get()).isEqualTo(new Response(3));
// The done callback is called after the 2nd batch completes.
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
}
private sealed interface BatcherEvent {}
private enum DoneCallbackCalled implements BatcherEvent {
INSTANCE
}
private record BatchOperation(
List<Request> requests, ImmutableList<? extends ResponseSink<Response>> sinks)
implements BatcherEvent {
private void defaultReplyAll() {
for (int i = 0; i < requests.size(); i++) {
sinks.get(i).acceptResponse(new Response(requests.get(i).x()));
}
}
private void failAll(Throwable t) {
for (ResponseSink<Response> sink : sinks) {
sink.acceptFailure(t);
}
}
}
private static RequestBatcher<Request, Response> createCallbackMultiplexerBatcher(
LinkedBlockingQueue<BatcherEvent> events) {
var multiplexer =
new RequestBatcher.CallbackMultiplexer<Request, Response>() {
@Override
public Runnable execute(
List<Request> requests, ImmutableList<? extends ResponseSink<Response>> sinks) {
assertThat(requests).hasSize(sinks.size());
events.offer(new BatchOperation(requests, sinks));
return () -> events.offer(DoneCallbackCalled.INSTANCE);
}
};
return RequestBatcher.<Request, Response>create(
createCallbackBatchExecutionStrategy(multiplexer),
/* maxBatchSize= */ 1,
/* maxConcurrentRequests= */ 1);
}
@Test
public void callbackMultiplexer_allFail() throws Exception {
var events = new LinkedBlockingQueue<BatcherEvent>();
RequestBatcher<Request, Response> batcher = createCallbackMultiplexerBatcher(events);
var failure = new RuntimeException("Test Failure");
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
// A batch begins executing immediately with response1.
var firstBatch = (BatchOperation) events.poll();
// The next 2 requests are enqueued.
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
ListenableFuture<Response> response3 = batcher.submit(new Request(3));
assertThat(events.poll()).isNull(); // No new events have occurred.
firstBatch.failAll(failure);
var e1 = assertThrows(ExecutionException.class, response1::get);
assertThat(e1).hasCauseThat().isEqualTo(failure);
// The done callback is always called before the next batch starts.
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
var secondBatch = (BatchOperation) events.take();
secondBatch.failAll(failure);
var e2 = assertThrows(ExecutionException.class, response2::get);
assertThat(e2).hasCauseThat().isEqualTo(failure);
var e3 = assertThrows(ExecutionException.class, response3::get);
assertThat(e3).hasCauseThat().isEqualTo(failure);
// The done callback is called after the 2nd batch completes.
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
}
@Test
public void callbackMultiplexer_mixedSuccessFailure() throws Exception {
var events = new LinkedBlockingQueue<BatcherEvent>();
RequestBatcher<Request, Response> batcher = createCallbackMultiplexerBatcher(events);
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
// A batch begins executing immediately with response1.
var firstBatch = (BatchOperation) events.poll();
// The next 2 requests are enqueued.
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
ListenableFuture<Response> response3 = batcher.submit(new Request(3));
assertThat(events.poll()).isNull(); // No new events have occurred.
firstBatch.defaultReplyAll();
assertThat(response1.get()).isEqualTo(new Response(1));
// The done callback is always called before the next batch starts.
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
var secondBatch = (BatchOperation) events.take();
var failure = new IllegalArgumentException("Bad Request");
assertThat(secondBatch.sinks).hasSize(2);
secondBatch.sinks.get(0).acceptResponse(new Response(secondBatch.requests.get(0).x()));
secondBatch.sinks.get(1).acceptFailure(failure);
assertThat(response2.get()).isEqualTo(new Response(2));
var e3 = assertThrows(ExecutionException.class, response3::get);
assertThat(e3).hasCauseThat().isEqualTo(failure);
// The done callback is called after the 2nd batch completes.
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
}
@Test
public void callbackMultiplexer_nullResponse() throws Exception {
var events = new LinkedBlockingQueue<BatcherEvent>();
RequestBatcher<Request, Response> batcher = createCallbackMultiplexerBatcher(events);
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
var batch = (BatchOperation) events.poll();
batch.sinks.get(0).acceptResponse(null);
assertThat(response1.get()).isNull();
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
}
@Test
public void callbackMultiplexer_batching() throws Exception {
var events = new LinkedBlockingQueue<BatcherEvent>();
RequestBatcher<Request, Response> batcher = createCallbackMultiplexerBatcher(events);
// These should form three batches: (1), (2, 3), (4, 5)
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
ListenableFuture<Response> response3 = batcher.submit(new Request(3));
ListenableFuture<Response> response4 = batcher.submit(new Request(4));
ListenableFuture<Response> response5 = batcher.submit(new Request(5));
var batch1 = (BatchOperation) events.take();
assertThat(batch1.requests()).containsExactly(new Request(1));
batch1.defaultReplyAll();
assertThat(response1.get()).isEqualTo(new Response(1));
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
var batch2 = (BatchOperation) events.take();
assertThat(batch2.requests()).containsExactly(new Request(2), new Request(3)).inOrder();
batch2.defaultReplyAll();
assertThat(response2.get()).isEqualTo(new Response(2));
assertThat(response3.get()).isEqualTo(new Response(3));
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
var batch3 = (BatchOperation) events.take();
assertThat(batch3.requests()).containsExactly(new Request(4), new Request(5)).inOrder();
batch3.defaultReplyAll();
assertThat(response4.get()).isEqualTo(new Response(4));
assertThat(response5.get()).isEqualTo(new Response(5));
assertThat(events.take()).isEqualTo(DoneCallbackCalled.INSTANCE);
}
private static class FakeConcurrentFifo extends ConcurrentFifo<Operation<Request, Response>> {
private final ConcurrentLinkedQueue<Operation<Request, Response>> queue =
new ConcurrentLinkedQueue<>();
private final Semaphore tryAppendTokens = new Semaphore(0);
private final Semaphore appendPermits = new Semaphore(0);
private FakeConcurrentFifo(long sizeAddress, long appendIndexAddress, long takeIndexAddress) {
super(Operation.class, sizeAddress, appendIndexAddress, takeIndexAddress);
}
@Override
boolean tryAppend(Operation<Request, Response> task) {
tryAppendTokens.release();
appendPermits.acquireUninterruptibly();
queue.add(task);
return true;
}
@Override
Operation<Request, Response> take() {
return queue.poll();
}
}
private static class SettableMultiplexer
implements RequestBatcher.Multiplexer<Request, Response> {
private final LinkedBlockingQueue<BatchedOperations> queue = new LinkedBlockingQueue<>();
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
var responses = SettableFuture.<List<Response>>create();
queue.add(new BatchedOperations(requests, responses));
return responses;
}
}
private record BatchedOperations(
List<Request> requests, SettableFuture<List<Response>> responses) {
private void setSimpleResponses() {
responses().set(respondTo(requests()));
}
}
private record Request(int x) {}
private record Response(int x) {}
private static List<Response> respondTo(List<Request> requests) {
return Lists.transform(requests, request -> new Response(request.x()));
}
private static class PerResponseSettableMultiplexer
implements RequestBatcher.PerResponseMultiplexer<Request, Response> {
private final LinkedBlockingQueue<BatchedPerResponseRequests> queue =
new LinkedBlockingQueue<>();
@Override
public void execute(
List<Request> requests,
ImmutableList<? extends RequestBatcher.FutureResponseSink<Response>> sinks) {
assertThat(requests).hasSize(sinks.size());
List<SettableFuture<Response>> settableFutures = new ArrayList<>();
for (int i = 0; i < sinks.size(); i++) {
SettableFuture<Response> settableFuture = SettableFuture.create();
settableFutures.add(settableFuture);
// This links the batcher's internal future to our controllable future.
sinks.get(i).acceptFutureResponse(settableFuture);
}
queue.add(new BatchedPerResponseRequests(requests, settableFutures));
}
}
private record BatchedPerResponseRequests(
List<Request> requests, List<SettableFuture<Response>> settableFutures) {
private void setSimpleSuccessResponses() {
assertThat(requests).hasSize(settableFutures.size());
for (int i = 0; i < requests.size(); i++) {
settableFutures.get(i).set(new Response(requests.get(i).x()));
}
}
}
private static final Unsafe UNSAFE = UnsafeProvider.unsafe();
}