blob: 08e3c7da2026f9d58f44c423ae65261c7e7dfe3f [file] [log] [blame]
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress;
import static java.util.concurrent.ForkJoinPool.commonPool;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.concurrent.RequestBatcher.RequestResponse;
import com.google.devtools.build.lib.unsafe.UnsafeProvider;
import java.lang.ref.Cleaner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import sun.misc.Unsafe;
@RunWith(JUnit4.class)
@SuppressWarnings("SunApi") // TODO: b/359688989 - clean this up
public final class RequestBatcherTest {
private static final Cleaner cleaner = Cleaner.create();
@Test
public void simpleSubmit_executes() throws Exception {
// This covers Step 1A in the documentation.
var batcher =
RequestBatcher.<Request, Response>create(
commonPool(),
requests -> immediateFuture(respondTo(requests)),
/* targetWorkerCount= */ 1);
ListenableFuture<Response> response = batcher.submit(new Request(1));
assertThat(response.get()).isEqualTo(new Response(1));
}
@Test
public void queueOverflow_sleeps() throws Exception {
// This covers the overflow case of Step 1B in the documentation.
var multiplexer = new SettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
commonPool(), multiplexer, /* targetWorkerCount= */ 1);
ListenableFuture<Response> response0 = batcher.submit(new Request(0));
BatchedRequestResponses requestResponses0 = multiplexer.queue.take();
// The first worker is busy until requestResponse0 is populated.
var responses = new ArrayList<ListenableFuture<Response>>();
// With the single available worker being busy, we can completely fill the queue.
for (int i = 0; i < ConcurrentFifo.MAX_ELEMENTS; i++) {
responses.add(batcher.submit(new Request(i + 1)));
}
// The next request triggers a queue overflow. Since this ends up blocking, we do it in another
// thread.
var overflowStarting = new CountDownLatch(1);
var overflowAdded = new CountDownLatch(1);
// A new thread needs must used here instead of commonPool because there are test environments
// where commonPool has only a single thread.
new Thread(
() -> {
overflowStarting.countDown();
responses.add(batcher.submit(new Request(ConcurrentFifo.MAX_ELEMENTS + 1)));
overflowAdded.countDown();
})
.start();
// The following assertion will occasionally fail if the overflow submit above does not block.
overflowStarting.await();
assertThat(responses).hasSize(ConcurrentFifo.MAX_ELEMENTS);
// Responding to the first batch enables the overflowing element to enter the queue.
requestResponses0.setSimpleResponses();
assertThat(response0.get()).isEqualTo(new Response(0));
overflowAdded.await();
assertThat(responses).hasSize(ConcurrentFifo.MAX_ELEMENTS + 1);
// Responds to all remaining batches.
int batchSize = RequestBatcher.BATCH_SIZE + 1;
int batchCount = responses.size() / batchSize;
assertThat(responses).hasSize(batchCount * batchSize);
for (int i = 0; i < batchCount; i++) {
multiplexer.queue.take().setSimpleResponses();
}
for (int i = 0; i < ConcurrentFifo.MAX_ELEMENTS + 1; i++) {
assertThat(responses.get(i).get()).isEqualTo(new Response(i + 1));
}
}
@Test
public void submitWithWorkersFull_enqueuesThenExecutes() throws Exception {
// This covers Step 1B, Step 2A and Step 3 of the documentation.
var multiplexer = new SettableMultiplexer();
var batcher =
RequestBatcher.<Request, Response>create(
commonPool(), multiplexer, /* targetWorkerCount= */ 1);
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
BatchedRequestResponses requestResponses1 = multiplexer.queue.take();
ListenableFuture<Response> response2 = batcher.submit(new Request(2));
// The first batch is not yet complete. The 2nd request waits in an internal queue. Ideally, we
// could make a stronger assertion here, that the 2nd batch executes only after the first one is
// done.
assertThat(multiplexer.queue).isEmpty();
requestResponses1.setSimpleResponses();
// With the first batch done, the worker picks up the enqueued 2nd request and executes it.
BatchedRequestResponses requestResponses2 = multiplexer.queue.take();
requestResponses2.setSimpleResponses();
assertThat(response1.get()).isEqualTo(new Response(1));
assertThat(response2.get()).isEqualTo(new Response(2));
}
@Test
public void concurrentWorkCompletion_startsNewWorker() throws Exception {
// This covers Step 1B and Step 2B of the documentation.
// This test uses fakes to achieve the narrow set of conditions needed to reach this code path.
long baseAddress = createPaddedBaseAddress(4);
var executor = new FakeExecutor();
var fifo =
new FakeConcurrentFifo(
getAlignedAddress(baseAddress, /* offset= */ 1),
getAlignedAddress(baseAddress, /* offset= */ 2),
getAlignedAddress(baseAddress, /* offset= */ 3));
long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
var batcher =
new RequestBatcher<Request, Response>(
executor,
requests -> immediateFuture(respondTo(requests)),
/* targetWorkerCount= */ 1,
countersAddress,
fifo);
cleaner.register(batcher, new AddressFreer(baseAddress));
// Submits a request. This starts a worker to run the batch, but it gets blocked on the executor
// and can't continue.
ListenableFuture<Response> response1 = batcher.submit(new Request(1));
// Submits a 2nd request. This request observes that there are enough active workers so it tries
// to enqueue an element. It gets blocked at the queue.
@SuppressWarnings("FutureReturnValueIgnored")
var response2 = SettableFuture.<ListenableFuture<Response>>create();
commonPool().execute(() -> response2.set(batcher.submit(new Request(2))));
// Waits until the 2nd request starts enqueuing.
fifo.tryAppendTokens.acquireUninterruptibly();
// Allows the 1st worker to continue. It will go idle. There are two Runnables, one to run the
// continuation logic and one that sets the response callbacks. Runs both.
executor.queue.take().run();
executor.queue.take().run();
assertThat(response1.get()).isEqualTo(new Response(1));
// Verifies that there's absolutely nothing inflight in the batcher.
assertThat(UNSAFE.getIntVolatile(null, countersAddress)).isEqualTo(0);
// Allows the 2nd request to enqueue and complete processing.
fifo.appendPermits.release();
executor.queue.take().run();
executor.queue.take().run();
assertThat(response2.get().get()).isEqualTo(new Response(2));
}
@Test
public void randomRaces_executeCorrectly() throws Exception {
var batcher =
RequestBatcher.<Request, Response>create(
commonPool(),
requests -> immediateFuture(respondTo(requests)),
/* targetWorkerCount= */ 4);
var results = new ConcurrentLinkedQueue<ListenableFuture<Void>>();
final int requestCount = 4_000_000;
var allStarted = new CountDownLatch(requestCount);
for (int i = 0; i < requestCount; i++) {
final int iForCapture = i;
commonPool()
.execute(
() -> {
results.add(
Futures.transformAsync(
batcher.submit(new Request(iForCapture)),
response ->
response.x() == iForCapture
? immediateVoidFuture()
: immediateFailedFuture(
new AssertionError(
String.format(
"expected %d got %s", iForCapture, response))),
directExecutor()));
allStarted.countDown();
});
}
allStarted.await();
// Throws ExecutionException if there are any errors.
var unused = Futures.whenAllSucceed(results).call(() -> null, directExecutor()).get();
}
private static class FakeExecutor implements Executor {
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override
public void execute(Runnable runnable) {
queue.add(runnable);
}
}
private static class FakeConcurrentFifo
extends ConcurrentFifo<RequestResponse<Request, Response>> {
private final ConcurrentLinkedQueue<RequestResponse<Request, Response>> queue =
new ConcurrentLinkedQueue<>();
private final Semaphore tryAppendTokens = new Semaphore(0);
private final Semaphore appendPermits = new Semaphore(0);
private FakeConcurrentFifo(long sizeAddress, long appendIndexAddress, long takeIndexAddress) {
super(RequestResponse.class, sizeAddress, appendIndexAddress, takeIndexAddress);
}
@Override
boolean tryAppend(RequestResponse<Request, Response> task) {
tryAppendTokens.release();
appendPermits.acquireUninterruptibly();
queue.add(task);
return true;
}
@Override
RequestResponse<Request, Response> take() {
return queue.poll();
}
}
private static class SettableMultiplexer
implements RequestBatcher.Multiplexer<Request, Response> {
private final LinkedBlockingQueue<BatchedRequestResponses> queue = new LinkedBlockingQueue<>();
@Override
public ListenableFuture<List<Response>> execute(List<Request> requests) {
var responses = SettableFuture.<List<Response>>create();
queue.add(new BatchedRequestResponses(requests, responses));
return responses;
}
}
private record BatchedRequestResponses(
List<Request> requests, SettableFuture<List<Response>> responses) {
private void setSimpleResponses() {
responses().set(respondTo(requests()));
}
}
private record Request(int x) {}
private record Response(int x) {}
private static List<Response> respondTo(List<Request> requests) {
return Lists.transform(requests, request -> new Response(request.x()));
}
private static final Unsafe UNSAFE = UnsafeProvider.unsafe();
}