blob: 79e4881be03a48f6a6a9c9ee6cfb83492f3dcc1c [file] [log] [blame]
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.skyframe.serialization;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.skyframe.serialization.NotNestedSet.NestedArrayCodec;
import com.google.devtools.build.lib.skyframe.serialization.NotNestedSet.NotNestedSetCodec;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public final class SharedValueSerializationContextTest {
private static final int CONCURRENCY = 20;
private final ForkJoinPool executor = new ForkJoinPool(CONCURRENCY);
private final Random rng = new Random(0);
private static final class PutRecordingStore implements FingerprintValueStore {
private final ArrayList<SettableFuture<Void>> putResponses = new ArrayList<>();
@Override
public ListenableFuture<Void> put(ByteString fingerprint, byte[] serializedBytes) {
var response = SettableFuture.<Void>create();
synchronized (putResponses) {
putResponses.add(response);
}
return response;
}
@Override
public ListenableFuture<byte[]> get(ByteString fingerprint) {
throw new UnsupportedOperationException();
}
private void completeAllResponses() {
for (SettableFuture<Void> response : putResponses) {
response.set(null);
}
}
}
@Test
public void resultDoesNotBlockOnPut() throws Exception {
// The result is available prior to completion of the put operations and that completion of the
// put operations propagates to the SerializationResult's future.
PutRecordingStore store = new PutRecordingStore();
FingerprintValueService fingerprintValueService =
FingerprintValueService.createForTesting(store);
ObjectCodecs codecs = createObjectCodecs();
// Creates a diamond.
// a
// / \
// b c
// \ /
// d
Object[] d = createRandomLeafArray();
Object[] c = new Object[] {d};
Object[] b = new Object[] {d};
Object[] a = new Object[] {b, c};
NotNestedSet diamond = new NotNestedSet(a);
SerializationResult<ByteString> result =
codecs.serializeMemoizedAndBlocking(fingerprintValueService, diamond);
// 4 remote arrays were written because d is memoized via the cache, despite the fact that d
// occurs twice in the traversal.
ArrayList<SettableFuture<Void>> responses = store.putResponses;
assertThat(responses).hasSize(4);
ListenableFuture<Void> writeStatus = result.getFutureToBlockWritesOn();
assertThat(writeStatus).isNotNull();
assertThat(writeStatus.isDone()).isFalse();
// Sets some, but not all of the responses.
for (int i = 0; i < 2; i++) {
responses.get(i).set(null);
}
assertThat(writeStatus.isDone()).isFalse(); // not yet done
// Sets the remaining responses.
for (int i = 2; i < responses.size(); i++) {
responses.get(i).set(null);
}
assertThat(writeStatus.isDone()).isTrue(); // write status future completes
}
@Test
public void writeStatusPropagatesToSecondCaller() throws Exception {
// When a shared value is serialized by two different callers, the 2nd caller's
// SerializationResult.futureToBlockWritingOn also waits for writes to complete.
PutRecordingStore store = new PutRecordingStore();
FingerprintValueService fingerprintValueService =
FingerprintValueService.createForTesting(store);
ObjectCodecs codecs = createObjectCodecs();
Object[] shared = createRandomLeafArray();
NotNestedSet set1 = new NotNestedSet(shared);
NotNestedSet set2 = new NotNestedSet(shared);
SerializationResult<ByteString> result1 =
codecs.serializeMemoizedAndBlocking(fingerprintValueService, set1);
ListenableFuture<Void> writeStatus1 = result1.getFutureToBlockWritesOn();
assertThat(writeStatus1.isDone()).isFalse();
assertThat(store.putResponses).hasSize(1);
SerializationResult<ByteString> result2 =
codecs.serializeMemoizedAndBlocking(fingerprintValueService, set2);
ListenableFuture<Void> writeStatus2 = result2.getFutureToBlockWritesOn();
assertThat(writeStatus2.isDone()).isFalse();
// The store only observes 1 put because it is shared between set1 and set2.
assertThat(store.putResponses).hasSize(1);
// Completing the response causes both of the write statuses to complete.
store.completeAllResponses();
assertThat(writeStatus1.isDone()).isTrue();
assertThat(writeStatus2.isDone()).isTrue();
}
@Test
public void multipleSharedValues_requestedInParallel() throws Exception {
// Serialization does not block on blocked fingerprint computations in another thread.
NestedArrayCodec arrayCodec = new NestedArrayCodec();
ObjectCodecs codecs =
new ObjectCodecs(
AutoRegistry.get().getBuilder().add(new NotNestedSetCodec(arrayCodec)).build());
FingerprintValueService fingerprintValueService = FingerprintValueService.createForTesting();
Object[] sharedArray = createRandomLeafArray();
CountDownLatch sharedEntered = new CountDownLatch(1);
CountDownLatch sharedBlocker = new CountDownLatch(1);
arrayCodec.injectSerializeDelay(sharedArray, sharedEntered, sharedBlocker);
// Serializes `sharedArray`, which is registered to block on `sharedBlocker`.
ListenableFuture<SerializationResult<ByteString>> first =
serializeWithExecutor(codecs, fingerprintValueService, new NotNestedSet(sharedArray));
sharedEntered.await(); // Waits for the above thread take ownership of `sharedArray`.
Object[] myArray = createRandomLeafArray();
CountDownLatch myArrayEntered = new CountDownLatch(1);
// Does not block serialization of `myArray`, but uses `myArrayEntered` to determine that
// serialization of `myArray` has started.
arrayCodec.injectSerializeDelay(myArray, myArrayEntered, new CountDownLatch(0));
ListenableFuture<SerializationResult<ByteString>> second =
serializeWithExecutor(
codecs, fingerprintValueService, new NotNestedSet(new Object[] {sharedArray, myArray}));
// Completing the line below means that the serialization of `myArray` can start even though
// serialization of `sharedArray` is blocked.
myArrayEntered.await();
// Neither is done due to being blocked by `sharedBlocker`.
assertThat(first.isDone()).isFalse();
assertThat(second.isDone()).isFalse();
sharedBlocker.countDown(); // unblocks serialization of `sharedArray`
// Serialization succeeds now that it is unblocked.
SerializationResult<ByteString> unusedFirstResult = first.get();
SerializationResult<ByteString> unusedSecondResult = second.get();
}
@Test
public void concurrentSharing_waitsForCompleteBytes() throws Exception {
// Under parallel sharing, serialization blocks until all fingerprints are computed.
// Counts down every FingerprintValueStore.put.
CountDownLatch arrived = new CountDownLatch(CONCURRENCY);
// For each FingerprintValueStore.put, a CountDownLatch(1) is added to the end of this queue.
// The put thread blocks, awaiting its associated latch.
ConcurrentLinkedDeque<CountDownLatch> putPermits = new ConcurrentLinkedDeque<>();
// Responses returned by the FingerprintValueStore.
ArrayList<SettableFuture<Void>> putResponses = new ArrayList<>();
var blockingStore =
new FingerprintValueStore() {
@Override
public ListenableFuture<Void> put(ByteString fingerprint, byte[] serializedBytes) {
var response = SettableFuture.<Void>create();
synchronized (putResponses) {
putResponses.add(response);
}
CountDownLatch permit = new CountDownLatch(1);
putPermits.offerLast(permit);
arrived.countDown();
try {
permit.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return response;
}
@Override
public ListenableFuture<byte[]> get(ByteString fingerprint) {
throw new UnsupportedOperationException();
}
};
FingerprintValueService fingerprintValueService =
FingerprintValueService.createForTesting(blockingStore);
ArrayList<Object[]> sharedArrays = new ArrayList<>(CONCURRENCY);
for (int i = 0; i < CONCURRENCY; i++) {
sharedArrays.add(createRandomLeafArray());
}
ObjectCodecs codecs = createObjectCodecs();
ArrayList<ListenableFuture<SerializationResult<ByteString>>> results =
new ArrayList<>(CONCURRENCY);
for (int i = 0; i < CONCURRENCY; i++) {
Object[] arrays = new Object[CONCURRENCY];
for (int j = 0; j < CONCURRENCY; j++) {
arrays[(i + j) % CONCURRENCY] = sharedArrays.get(j);
}
NotNestedSet set = new NotNestedSet(arrays);
// Each thread will acquire ownership of a unique `sharedArrays` element then block when it
// hits the `putPermits`.
results.add(serializeWithExecutor(codecs, fingerprintValueService, set));
}
// When the following await has succeeded, each thread has acquired ownership of one of the
// `sharedArrays`.
arrived.await();
// Verifies that all SerializationResults are blocked (due to incomplete fingerprints).
for (ListenableFuture<SerializationResult<ByteString>> result : results) {
assertThat(result.isDone()).isFalse();
}
// Unblocks all but 1 of the threads.
for (int i = 0; i < CONCURRENCY - 1; i++) {
putPermits.pollFirst().countDown();
}
// Since the permits are ordered, the first element of the queue is the one associated with a
// put of one of the `sharedArrays`. It doesn't happen in the current implementation, but more
// permits could be added by the unblocking above, which is why this distinction matters.
CountDownLatch lastSharedPut = putPermits.pollFirst();
assertThat(lastSharedPut).isNotNull();
// Even with all but 1 of the shared puts complete, all results are still blocked since all
// threads require all fingerprints.
for (ListenableFuture<SerializationResult<ByteString>> result : results) {
assertThat(result.isDone()).isFalse();
}
lastSharedPut.countDown(); // Releases the remaining put.
// Releasing the putPermits above unblocks additional serialization work for the top-level
// nested arrays. Unblocks the additional resulting puts.
for (int i = 0; i < CONCURRENCY; i++) {
waitForLastPermit(putPermits).countDown();
}
// Everything succeeds once all the threads wake up from being blocked and complete.
List<SerializationResult<ByteString>> resultList = Futures.successfulAsList(results).get();
// Even with all the results available, the write status futures are still not done because
// `putResponses` have not been set.
for (SerializationResult<ByteString> result : resultList) {
assertThat(result.getFutureToBlockWritesOn().isDone()).isFalse();
}
// There's 2 for each subject: its top-level array and its owned element of `sharedArrays`.
assertThat(putResponses).hasSize(CONCURRENCY * 2);
// Setting all the responses completes the result write status futures.
for (SettableFuture<Void> response : putResponses) {
response.set(null);
}
for (SerializationResult<ByteString> result : resultList) {
assertThat(result.getFutureToBlockWritesOn().isDone()).isTrue();
}
}
private ListenableFuture<SerializationResult<ByteString>> serializeWithExecutor(
ObjectCodecs codecs, FingerprintValueService fingerprintValueService, Object subject) {
var task =
ListenableFutureTask.create(
() -> codecs.serializeMemoizedAndBlocking(fingerprintValueService, subject));
executor.execute(task);
return task;
}
private Object[] createRandomLeafArray() {
return NotNestedSet.createRandomLeafArray(rng);
}
private static final long POLL_MS = 100;
private static CountDownLatch waitForLastPermit(ConcurrentLinkedDeque<CountDownLatch> deque)
throws InterruptedException {
CountDownLatch latch;
while ((latch = deque.pollLast()) == null) {
Thread.sleep(POLL_MS);
}
return latch;
}
private static ObjectCodecs createObjectCodecs() {
return new ObjectCodecs(
AutoRegistry.get().getBuilder().add(new NotNestedSetCodec(new NestedArrayCodec())).build());
}
}