blob: 3222b69ca9f803dc09e58d564629ad3ade114f70 [file] [log] [blame]
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.skyframe.serialization;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.skyframe.serialization.FutureHelpers.waitForDeserializationFuture;
import static com.google.devtools.build.lib.unsafe.UnsafeProvider.unsafe;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableClassToInstanceMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.skyframe.serialization.DeferredObjectCodec.DeferredValue;
import com.google.devtools.build.lib.skyframe.serialization.FingerprintValueStore.MissingFingerprintValueException;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
/** Implementation that supports sharing of sub-objects between objects. */
final class SharedValueDeserializationContext extends MemoizingDeserializationContext {
private final FingerprintValueService fingerprintValueService;
@VisibleForTesting // private
static SharedValueDeserializationContext createForTesting(
ObjectCodecRegistry codecRegistry,
ImmutableClassToInstanceMap<Object> dependencies,
FingerprintValueService fingerprintValueService) {
return new SharedValueDeserializationContext(
codecRegistry, dependencies, fingerprintValueService);
}
/**
* List of futures that must be resolved before this value is completely deserialized.
*
* <p>The synchronous {@link #deserialize(CodedInputStream)} overload includes waiting for these
* futures to to be resolved. The top-level deserialization call typically uses that overload
* while codec implementations use the ones in {@link AsyncDeserializationContext}.
*/
@Nullable // Initialized lazily.
private ArrayList<ListenableFuture<?>> readStatusFutures;
/**
* Tracks which {@link #readStatusFutures} were added, transitively, while deserializing a value.
*
* <p>When deserializing a value, {@link MemoizingDeserializationContext} calls {@link
* #deserializeAndMaybeHandleDeferredValues}, then {@link #combineValueWithReadFutures}.
*
* <p>{@link #deserializeAndMaybeHandleDeferredValues} performs the following steps.
*
* <ul>
* <li>It notes the size of {@link #readStatusFutures} when it begins.
* <li>It initiates deserialization of the next value with a given {@link ObjectCodec}.
* <li>After that deserialization invocation completes, it sets {@code lastStartingReadCount} to
* the size it noted when it started.
* </ul>
*
* <p>Consequently, the futures in {@link #readStatusFutures} with index greater than {@code
* lastStartingReadCount} are ones added (transitively) by deserialization.
*
* <p>Next, {@link #combineValueWithReadFutures} uses {@code lastStartingReadCount} to determine
* what futures were added and need to be complete before deserialization of the value is
* complete.
*/
private int lastStartingReadCount;
private SharedValueDeserializationContext(
ObjectCodecRegistry codecRegistry,
ImmutableClassToInstanceMap<Object> dependencies,
FingerprintValueService fingerprintValueService) {
super(codecRegistry, dependencies);
this.fingerprintValueService = fingerprintValueService;
}
static Object deserializeWithSharedValues(
ObjectCodecRegistry codecRegistry,
ImmutableClassToInstanceMap<Object> dependencies,
FingerprintValueService fingerprintValueService,
ByteString bytes)
throws SerializationException {
try {
return ObjectCodecs.deserializeStreamFully(
bytes.newCodedInput(),
new SharedValueDeserializationContext(
codecRegistry, dependencies, fingerprintValueService));
} catch (SerializationException e) {
Throwable cause = e.getCause();
if (cause instanceof MissingFingerprintValueException) {
// TODO: b/297857068 - eventually, callers of this should handle this by falling back on
// local recomputation.
throw new IllegalStateException("Not yet supported.", cause);
}
throw e;
}
}
@Override
public void deserialize(CodedInputStream codedIn, Object parent, long offset)
throws IOException, SerializationException {
Object result = processTagAndDeserialize(codedIn);
if (result == null) {
return;
}
if (!(result instanceof ListenableFuture)) {
unsafe().putObject(parent, offset, result);
return;
}
addReadStatusFuture(
Futures.transform(
(ListenableFuture<?>) result,
value -> {
unsafe().putObject(parent, offset, value);
return null;
},
directExecutor()));
}
@Override
public <T> void deserialize(CodedInputStream codedIn, T parent, FieldSetter<? super T> setter)
throws IOException, SerializationException {
Object result = processTagAndDeserialize(codedIn);
if (result == null) {
return;
}
if (!(result instanceof ListenableFuture)) {
setter.set(parent, result);
return;
}
addReadStatusFuture(
Futures.transformAsync(
(ListenableFuture<?>) result,
value -> {
try {
setter.set(parent, value);
} catch (SerializationException e) {
return immediateFailedFuture(e);
}
return immediateVoidFuture();
},
directExecutor()));
}
@Override
public void deserialize(CodedInputStream codedIn, Object parent, long offset, Runnable done)
throws IOException, SerializationException {
Object result = processTagAndDeserialize(codedIn);
if (result == null) {
done.run();
return;
}
if (!(result instanceof ListenableFuture)) {
unsafe().putObject(parent, offset, result);
done.run();
return;
}
addReadStatusFuture(
Futures.transform(
(ListenableFuture<?>) result,
value -> {
unsafe().putObject(parent, offset, value);
done.run();
return null;
},
directExecutor()));
}
@Override
public <T> void getSharedValue(
CodedInputStream codedIn,
@Nullable Object distinguisher,
DeferredObjectCodec<?> codec,
T parent,
FieldSetter<? super T> setter)
throws IOException, SerializationException {
ByteString fingerprint =
ByteString.copyFrom(codedIn.readRawBytes(fingerprintValueService.fingerprintLength()));
SettableFuture<Object> getOperation = SettableFuture.create();
Object previous =
fingerprintValueService.getOrClaimGetOperation(fingerprint, distinguisher, getOperation);
if (previous != null) {
// This object was previously requested. Discards `getOperation`.
if (previous instanceof ListenableFuture<?> previousValue) {
addReadStatusFuture(
Futures.transformAsync(
previousValue,
value -> {
try {
setter.set(parent, value);
} catch (SerializationException e) {
return immediateFailedFuture(e);
}
return immediateVoidFuture();
},
directExecutor()));
return;
}
setter.set(parent, previous);
return;
}
// There is no previous result. Fetches the remote bytes and deserializes them.
addReadStatusFuture(readValueForFingerprint(fingerprint, codec, parent, setter, getOperation));
}
private <T> ListenableFuture<Object> readValueForFingerprint(
ByteString fingerprint,
DeferredObjectCodec<?> codec,
T parent,
FieldSetter<? super T> setter,
SettableFuture<Object> getOperation)
throws IOException {
try {
ListenableFuture<Object> futureResult =
Futures.transformAsync(
fingerprintValueService.get(fingerprint),
bytes -> {
SharedValueDeserializationContext innerContext = getFreshContext();
try {
DeferredValue<?> deferred =
codec.deserializeDeferred(innerContext, CodedInputStream.newInstance(bytes));
List<ListenableFuture<?>> innerReadStatusFutures = innerContext.readStatusFutures;
if (innerReadStatusFutures == null || innerReadStatusFutures.isEmpty()) {
Object result = deferred.call();
setter.set(parent, result);
return immediateFuture(result);
}
return Futures.whenAllSucceed(innerReadStatusFutures)
.call(
() -> {
Object result = deferred.call();
setter.set(parent, result);
return result;
},
directExecutor());
} catch (SerializationException | IOException e) {
return immediateFailedFuture(e);
}
},
// Switches to another executor to avoid performing serialization work on an an RPC
// executor thread.
fingerprintValueService.getExecutor());
getOperation.setFuture(futureResult);
return futureResult;
} catch (IOException e) {
getOperation.setException(e);
throw e;
} catch (RuntimeException | Error e) {
// Avoids causing SettableFuture consumers to hang if when there are unexpected exceptions.
getOperation.setException(e);
throw e;
}
}
@Override
public SharedValueDeserializationContext getFreshContext() {
return new SharedValueDeserializationContext(
getRegistry(), getDependencies(), fingerprintValueService);
}
@Override
Object makeSynchronous(Object obj) throws SerializationException {
if (obj instanceof ListenableFuture<?> future) {
return waitForDeserializationFuture(future);
}
return obj;
}
@Override
Object deserializeAndMaybeHandleDeferredValues(ObjectCodec<?> codec, CodedInputStream codedIn)
throws SerializationException, IOException {
int startingReadCount = readStatusFutures == null ? 0 : readStatusFutures.size();
Object value;
if (codec instanceof DeferredObjectCodec<?> deferredCodec) {
// On other analogous codepaths, `ObjectCodec.safeCast' is applied to the resulting value.
// Not all codecs have this property, notably DynamicCodec, but DeferredObjectCodec's type
// parameters guarantee type of the deserialized value.
value = deferredCodec.deserializeDeferred(this, codedIn);
} else {
value = codec.safeCast(codec.deserialize(this, codedIn));
}
this.lastStartingReadCount = startingReadCount;
return value;
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
Object combineValueWithReadFutures(Object value) {
if (readStatusFutures == null) {
return unwrapIfDeferredValue(value);
}
int length = readStatusFutures.size();
if (length <= lastStartingReadCount) {
return unwrapIfDeferredValue(value);
}
List<ListenableFuture<?>> futures = readStatusFutures.subList(lastStartingReadCount, length);
Futures.FutureCombiner<?> combiner = Futures.whenAllSucceed(futures);
futures.clear(); // clears this sublist from from `readStatusFutures`
ListenableFuture<Object> futureValue;
if (value instanceof DeferredValue) {
@SuppressWarnings("unchecked")
DeferredValue<Object> castValue = (DeferredValue<Object>) value;
futureValue = combiner.call(castValue, directExecutor());
} else {
futureValue = combiner.call(() -> value, directExecutor());
}
readStatusFutures.add(futureValue);
return futureValue;
}
private void addReadStatusFuture(ListenableFuture<?> readStatus) {
if (readStatusFutures == null) {
readStatusFutures = new ArrayList<>();
}
readStatusFutures.add(readStatus);
}
private static Object unwrapIfDeferredValue(Object value) {
if (value instanceof DeferredValue) {
@SuppressWarnings("unchecked")
DeferredValue<Object> castValue = (DeferredValue<Object>) value;
return castValue.call();
}
return value;
}
}