blob: 5474f8842338322daef1dca5b3811ad4754562b4 [file] [log] [blame]
// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.lang.String.format;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
public RemoteExecutionCache(
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
super(protocolImpl, options, digestUtil);
}
/**
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
* available in the remote cache, such that the tree can be reassembled and executed on another
* machine given the root digest.
*
* <p>The cache may check whether files or parts of the tree structure are already present, and do
* not need to be uploaded again.
*
* <p>Note that this method is only required for remote execution, not for caching itself.
* However, remote execution uses a cache to store input files, and that may be a separate
* end-point from the executor itself, so the functionality lives here.
*/
public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force)
throws IOException, InterruptedException {
ImmutableSet<Digest> allDigests =
ImmutableSet.<Digest>builder()
.addAll(merkleTree.getAllDigests())
.addAll(additionalInputs.keySet())
.build();
if (allDigests.isEmpty()) {
return;
}
MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
Flowable<TransferResult> uploads =
Flowable.fromIterable(allDigests)
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));
try {
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
Throwables.throwIfInstanceOf(cause, IOException.class);
}
throw e;
}
}
private Single<TransferResult> uploadBlobIfMissing(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() ->
// Only reach here if the digest is missing and is not being uploaded.
missingDigestFinder
.registerAndCount(digest)
.flatMapCompletable(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
directExecutor());
} else {
return Completable.complete();
}
})),
/* onIgnored= */ missingDigestFinder::count,
force);
return toTransferResult(upload);
}
private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
return cacheProtocol.uploadBlob(context, digest, node.toByteString());
}
PathOrBytes file = merkleTree.getFileByDigest(digest);
if (file != null) {
if (file.getBytes() != null) {
return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
return cacheProtocol.uploadFile(context, digest, file.getPath());
}
Message message = additionalInputs.get(digest);
if (message != null) {
return cacheProtocol.uploadBlob(context, digest, message.toByteString());
}
return Futures.immediateFailedFuture(
new IOException(
format(
"findMissingDigests returned a missing digest that has not been requested: %s",
digest)));
}
/**
* A missing digest finder that initiates the request when the internal counter reaches an
* expected count.
*/
class MissingDigestFinder {
private final int expectedCount;
private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
private final Single<ImmutableSet<Digest>> resultSingle;
@GuardedBy("this")
private final Set<Digest> digests;
@GuardedBy("this")
private int currentCount = 0;
MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
this.expectedCount = expectedCount;
this.digestsSubject = AsyncSubject.create();
this.digests = new HashSet<>();
AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
this.resultSingle =
Single.fromObservable(
digestsSubject
.flatMapSingle(
digests -> {
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
// Make sure we don't have re-subscription caused by refCount() below.
checkState(!wasCalled, "FindMissingDigests is called more than once");
return toSingle(
() -> findMissingDigests(context, digests), directExecutor());
})
// Use replay here because we could have a race condition that downstream hasn't
// been added to the subscription list (to receive the upstream result) while
// upstream is completed.
.replay(1)
.refCount());
}
/**
* Register the {@code digest} and increase the counter.
*
* <p>Returned Single cannot be subscribed more than once.
*
* @return Single that emits the result of the {@code FindMissingDigest} request.
*/
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
AtomicBoolean subscribed = new AtomicBoolean(false);
// count() will potentially trigger the findMissingDigests call. Adding and counting before
// returning the Single could introduce a race that the result of findMissingDigests is
// available but the consumer doesn't get it because it hasn't subscribed the returned
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
// findMissingDigests (due to refCount()).
//
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
// returned Single to avoid a re-execution of findMissingDigests.
return resultSingle.doOnSubscribe(
d -> {
boolean wasSubscribed = subscribed.getAndSet(true);
checkState(!wasSubscribed, "Single is subscribed more than once");
synchronized (this) {
digests.add(digest);
}
count();
});
}
/** Increase the counter. */
void count() {
ImmutableSet<Digest> digestsResult = null;
synchronized (this) {
if (currentCount < expectedCount) {
currentCount++;
if (currentCount == expectedCount) {
digestsResult = ImmutableSet.copyOf(digests);
}
}
}
if (digestsResult != null) {
digestsSubject.onNext(digestsResult);
digestsSubject.onComplete();
}
}
}
}