Refactor remote package to only have one remote cache impl.
Historically the remote package shipped two implementations
for remote caching: GrpcRemoteCache and SimpleBlobStoreActionCache.
Today there is no good reason to keep this structure and the
duplication of code and tests that comes with it. This is the
final refactoring of a long series:
* 5fc91a7
* 12ebb84
* 8ac6bdc
* 9fb83b4
* 36611c3
* a6b5b05
* d40933d
* 797f292
This change makes it so that there is one RemoteCache that
has a RemoteCacheClient. The RemoteCacheClient has one
implementation per caching protocol:
* DiskCacheClient (formerly OnDiskBlobStore)
* GrpcCacheClient (formerly GrpcRemoteCache)
* HttpCacheClient (formerly HttpBlobStore)
* CombinedDiskHttpCacheClient (formerly CombinedDiskHttpBlobStore)
A single RemoteCache type for all protocols will allow
composition of caching protocols. In particular, grpc and disk
caching. Additionally, this change will allow us to fix a longstanding
bug where for the HTTP
Closes #10200.
PiperOrigin-RevId: 279723411
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 4b6264d..31d074e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -7,7 +7,6 @@
srcs = glob(["**"]) + [
"//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
- "//src/main/java/com/google/devtools/build/lib/remote/blobstore:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/http:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/logging:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/options:srcs",
@@ -40,7 +39,6 @@
"//src/main/java/com/google/devtools/build/lib/buildeventstream",
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/profiler",
- "//src/main/java/com/google/devtools/build/lib/remote/blobstore",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
"//src/main/java/com/google/devtools/build/lib/remote/http",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index b312d57..107023c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -28,6 +28,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.collect.ImmutableIterable;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import io.grpc.Context;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
index e6b6cad..978c10f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
similarity index 76%
rename from src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
rename to src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index c8709b3..4d694f0 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -15,15 +15,14 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.lang.String.format;
import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
+import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub;
import build.bazel.remote.execution.v2.Digest;
-import build.bazel.remote.execution.v2.Directory;
import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
@@ -37,7 +36,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashingOutputStream;
import com.google.common.util.concurrent.FutureCallback;
@@ -48,26 +46,24 @@
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
-import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
-import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
+import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
import io.grpc.CallCredentials;
import io.grpc.Context;
import io.grpc.Status;
+import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -76,9 +72,11 @@
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
-public class GrpcRemoteCache extends AbstractRemoteActionCache {
+public class GrpcCacheClient implements RemoteCacheClient, MissingDigestsFinder {
private final CallCredentials credentials;
private final ReferenceCountedChannel channel;
+ private final RemoteOptions options;
+ private final DigestUtil digestUtil;
private final RemoteRetrier retrier;
private final ByteStreamUploader uploader;
private final int maxMissingBlobsDigestsPerMessage;
@@ -86,16 +84,17 @@
private AtomicBoolean closed = new AtomicBoolean();
@VisibleForTesting
- public GrpcRemoteCache(
+ public GrpcCacheClient(
ReferenceCountedChannel channel,
CallCredentials credentials,
RemoteOptions options,
RemoteRetrier retrier,
DigestUtil digestUtil,
ByteStreamUploader uploader) {
- super(options, digestUtil);
this.credentials = credentials;
this.channel = channel;
+ this.options = options;
+ this.digestUtil = digestUtil;
this.retrier = retrier;
this.uploader = uploader;
maxMissingBlobsDigestsPerMessage = computeMaxMissingBlobsDigestsPerMessage();
@@ -141,6 +140,13 @@
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
+ private ActionCacheFutureStub acFutureStub() {
+ return ActionCacheGrpc.newFutureStub(channel)
+ .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+ .withCallCredentials(credentials)
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+
@Override
public void close() {
if (closed.getAndSet(true)) {
@@ -199,60 +205,28 @@
return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request)));
}
- /**
- * Ensures that the tree structure of the inputs, the input files themselves, and the command are
- * available in the remote cache, such that the tree can be reassembled and executed on another
- * machine given the root digest.
- *
- * <p>The cache may check whether files or parts of the tree structure are already present, and do
- * not need to be uploaded again.
- *
- * <p>Note that this method is only required for remote execution, not for caching itself.
- * However, remote execution uses a cache to store input files, and that may be a separate
- * end-point from the executor itself, so the functionality lives here.
- */
- public void ensureInputsPresent(
- MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot)
- throws IOException, InterruptedException {
- Iterable<Digest> allDigests =
- Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
- ImmutableSet<Digest> missingDigests = Utils.getFromFuture(findMissingDigests(allDigests));
- Map<HashCode, Chunker> inputsToUpload = Maps.newHashMapWithExpectedSize(missingDigests.size());
- for (Digest missingDigest : missingDigests) {
- Directory node = merkleTree.getDirectoryByDigest(missingDigest);
- HashCode hash = HashCode.fromString(missingDigest.getHash());
- if (node != null) {
- Chunker c = Chunker.builder().setInput(node.toByteArray()).build();
- inputsToUpload.put(hash, c);
- continue;
- }
+ private ListenableFuture<ActionResult> handleStatus(ListenableFuture<ActionResult> download) {
+ return Futures.catchingAsync(
+ download,
+ StatusRuntimeException.class,
+ (sre) ->
+ sre.getStatus().getCode() == Code.NOT_FOUND
+ // Return null to indicate that it was a cache miss.
+ ? Futures.immediateFuture(null)
+ : Futures.immediateFailedFuture(new IOException(sre)),
+ MoreExecutors.directExecutor());
+ }
- PathOrBytes file = merkleTree.getFileByDigest(missingDigest);
- if (file != null) {
- final Chunker c;
- if (file.getPath() != null) {
- c = Chunker.builder().setInput(missingDigest.getSizeBytes(), file.getPath()).build();
- } else {
- c = Chunker.builder().setInput(file.getBytes().toByteArray()).build();
- }
- inputsToUpload.put(hash, c);
- continue;
- }
-
- Message message = additionalInputs.get(missingDigest);
- if (message != null) {
- Chunker c = Chunker.builder().setInput(message.toByteArray()).build();
- inputsToUpload.put(hash, c);
- continue;
- }
-
- throw new IOException(
- format(
- "findMissingDigests returned a missing digest that has not been requested: %s",
- missingDigest));
- }
-
- uploader.uploadBlobs(inputsToUpload, /* forceUpload= */ true);
+ @Override
+ public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
+ GetActionResultRequest request =
+ GetActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .build();
+ Context ctx = Context.current();
+ return retrier.executeAsync(
+ () -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
}
private static String digestToString(Digest digest) {
@@ -260,7 +234,25 @@
}
@Override
- protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
+ throws IOException, InterruptedException {
+ try {
+ retrier.execute(
+ () ->
+ acBlockingStub()
+ .updateActionResult(
+ UpdateActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .setActionResult(actionResult)
+ .build()));
+ } catch (StatusRuntimeException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
@@ -363,7 +355,7 @@
public void onCompleted() {
try {
if (hashSupplier != null) {
- verifyContents(
+ Utils.verifyBlobContents(
digest.getHash(), DigestUtil.hashCodeToString(hashSupplier.get()));
}
out.flush();
@@ -377,7 +369,7 @@
}
@Override
- protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
+ public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
return uploader.uploadBlobAsync(
HashCode.fromString(digest.getHash()),
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
@@ -385,51 +377,10 @@
}
@Override
- protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+ public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
return uploader.uploadBlobAsync(
HashCode.fromString(digest.getHash()),
Chunker.builder().setInput(data.toByteArray()).build(),
/* forceUpload= */ true);
}
-
- // Execution Cache API
-
- @Override
- public ActionResult getCachedActionResult(ActionKey actionKey)
- throws IOException, InterruptedException {
- try {
- return retrier.execute(
- () ->
- acBlockingStub()
- .getActionResult(
- GetActionResultRequest.newBuilder()
- .setInstanceName(options.remoteInstanceName)
- .setActionDigest(actionKey.getDigest())
- .build()));
- } catch (StatusRuntimeException e) {
- if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
- // Return null to indicate that it was a cache miss.
- return null;
- }
- throw new IOException(e);
- }
- }
-
- @Override
- protected void setCachedActionResult(ActionKey actionKey, ActionResult result)
- throws IOException, InterruptedException {
- try {
- retrier.execute(
- () ->
- acBlockingStub()
- .updateActionResult(
- UpdateActionResultRequest.newBuilder()
- .setInstanceName(options.remoteInstanceName)
- .setActionDigest(actionKey.getDigest())
- .setActionResult(result)
- .build()));
- } catch (StatusRuntimeException e) {
- throw new IOException(e);
- }
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
index 535bba1..a929254 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -43,7 +43,7 @@
*/
final class RemoteActionContextProvider extends ActionContextProvider {
private final CommandEnvironment env;
- private final AbstractRemoteActionCache cache;
+ private final RemoteCache cache;
@Nullable private final GrpcRemoteExecutor executor;
private final RemoteRetrier retrier;
private final DigestUtil digestUtil;
@@ -53,7 +53,7 @@
private RemoteActionContextProvider(
CommandEnvironment env,
- AbstractRemoteActionCache cache,
+ RemoteCache cache,
@Nullable GrpcRemoteExecutor executor,
RemoteRetrier retrier,
DigestUtil digestUtil,
@@ -67,17 +67,14 @@
}
public static RemoteActionContextProvider createForRemoteCaching(
- CommandEnvironment env,
- AbstractRemoteActionCache cache,
- RemoteRetrier retrier,
- DigestUtil digestUtil) {
+ CommandEnvironment env, RemoteCache cache, RemoteRetrier retrier, DigestUtil digestUtil) {
return new RemoteActionContextProvider(
env, cache, /*executor=*/ null, retrier, digestUtil, /*logDir=*/ null);
}
public static RemoteActionContextProvider createForRemoteExecution(
CommandEnvironment env,
- GrpcRemoteCache cache,
+ RemoteExecutionCache cache,
GrpcRemoteExecutor executor,
RemoteRetrier retrier,
DigestUtil digestUtil,
@@ -116,7 +113,7 @@
env.getReporter(),
buildRequestId,
commandId,
- (GrpcRemoteCache) cache,
+ (RemoteExecutionCache) cache,
executor,
retrier,
digestUtil,
@@ -167,7 +164,7 @@
/** Returns the remote cache object if any. */
@Nullable
- AbstractRemoteActionCache getRemoteCache() {
+ RemoteCache getRemoteCache() {
return cache;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index 2d82bd6..72c6d54 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -65,11 +65,11 @@
@GuardedBy("lock")
final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();
- private final AbstractRemoteActionCache remoteCache;
+ private final RemoteCache remoteCache;
private final Path execRoot;
private final Context ctx;
- RemoteActionInputFetcher(AbstractRemoteActionCache remoteCache, Path execRoot, Context ctx) {
+ RemoteActionInputFetcher(RemoteCache remoteCache, Path execRoot, Context ctx) {
this.remoteCache = Preconditions.checkNotNull(remoteCache);
this.execRoot = Preconditions.checkNotNull(execRoot);
this.ctx = Preconditions.checkNotNull(ctx);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
similarity index 92%
rename from src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
rename to src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 1aacf14..4df9100 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -50,11 +50,11 @@
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.DirectoryMetadata;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.FileMetadata;
-import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.SymlinkMetadata;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.DirectoryMetadata;
+import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.FileMetadata;
+import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.SymlinkMetadata;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
@@ -87,7 +87,7 @@
/** A cache for storing artifacts (input and output) as well as the output of running an action. */
@ThreadSafety.ThreadSafe
-public abstract class AbstractRemoteActionCache implements MissingDigestsFinder, AutoCloseable {
+public class RemoteCache implements AutoCloseable {
/** See {@link SpawnExecutionContext#lockOutputFiles()}. */
@FunctionalInterface
@@ -103,49 +103,21 @@
((SettableFuture<byte[]>) EMPTY_BYTES).set(new byte[0]);
}
+ protected final RemoteCacheClient cacheProtocol;
protected final RemoteOptions options;
protected final DigestUtil digestUtil;
- public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
+ public RemoteCache(
+ RemoteCacheClient cacheProtocol, RemoteOptions options, DigestUtil digestUtil) {
+ this.cacheProtocol = cacheProtocol;
this.options = options;
this.digestUtil = digestUtil;
}
- /**
- * Attempts to look up the given action in the remote cache and return its result, if present.
- * Returns {@code null} if there is no such entry. Note that a successful result from this method
- * does not guarantee the availability of the corresponding output files in the remote cache.
- *
- * @throws IOException if the remote cache is unavailable.
- */
- @Nullable
- abstract ActionResult getCachedActionResult(ActionKey actionKey)
- throws IOException, InterruptedException;
-
- protected abstract void setCachedActionResult(ActionKey actionKey, ActionResult action)
- throws IOException, InterruptedException;
-
- /**
- * Uploads a file
- *
- * <p>Any errors are being propagated via the returned future. If the future completes without
- * errors the upload was successful.
- *
- * @param digest the digest of the file.
- * @param file the file to upload.
- */
- protected abstract ListenableFuture<Void> uploadFile(Digest digest, Path file);
-
- /**
- * Uploads a BLOB.
- *
- * <p>Any errors are being propagated via the returned future. If the future completes without
- * errors the upload was successful
- *
- * @param digest the digest of the blob.
- * @param data the blob to upload.
- */
- protected abstract ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
+ public ActionResult downloadActionResult(ActionKey actionKey)
+ throws IOException, InterruptedException {
+ return Utils.getFromFuture(cacheProtocol.downloadActionResult(actionKey));
+ }
/**
* Upload the result of a locally executed action to the remote cache.
@@ -167,7 +139,7 @@
resultBuilder.setExitCode(exitCode);
ActionResult result = resultBuilder.build();
if (exitCode == 0 && !action.getDoNotCache()) {
- setCachedActionResult(actionKey, result);
+ cacheProtocol.uploadActionResult(actionKey, result);
}
return result;
}
@@ -209,19 +181,20 @@
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());
- ImmutableSet<Digest> digestsToUpload = Utils.getFromFuture(findMissingDigests(digests));
+ ImmutableSet<Digest> digestsToUpload =
+ Utils.getFromFuture(cacheProtocol.findMissingDigests(digests));
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file != null) {
- uploads.add(uploadFile(digest, file));
+ uploads.add(cacheProtocol.uploadFile(digest, file));
} else {
ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
- uploads.add(uploadBlob(digest, blob));
+ uploads.add(cacheProtocol.uploadBlob(digest, blob));
}
}
@@ -255,13 +228,6 @@
}
/**
- * Downloads a blob with a content hash {@code digest} to {@code out}.
- *
- * @return a future that completes after the download completes (succeeds / fails).
- */
- protected abstract ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
-
- /**
* Downloads a blob with content hash {@code digest} and stores its content in memory.
*
* @return a future that completes after the download completes (succeeds / fails). If successful,
@@ -274,7 +240,7 @@
ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
SettableFuture<byte[]> outerF = SettableFuture.create();
Futures.addCallback(
- downloadBlob(digest, bOut),
+ cacheProtocol.downloadBlob(digest, bOut),
new FutureCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
@@ -490,7 +456,7 @@
OutputStream out = new LazyFileOutputStream(path);
SettableFuture<Void> outerF = SettableFuture.create();
- ListenableFuture<Void> f = downloadBlob(digest, out);
+ ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out);
Futures.addCallback(
f,
new FutureCallback<Void>() {
@@ -530,7 +496,7 @@
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
- downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
+ cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
(d) -> null,
directExecutor()));
}
@@ -540,7 +506,7 @@
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
- downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
+ cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
(d) -> null,
directExecutor()));
}
@@ -857,7 +823,7 @@
* Adds an action and command protos to upload. They need to be uploaded as part of the action
* result.
*/
- public void addAction(SimpleBlobStore.ActionKey actionKey, Action action, Command command) {
+ public void addAction(RemoteCacheClient.ActionKey actionKey, Action action, Command command) {
digestToBlobs.put(actionKey.getDigest(), action.toByteString());
digestToBlobs.put(action.getCommandDigest(), command.toByteString());
}
@@ -993,20 +959,11 @@
}
}
- protected void verifyContents(String expectedHash, String actualHash) throws IOException {
- if (!expectedHash.equals(actualHash)) {
- String msg =
- String.format(
- "An output download failed, because the expected hash"
- + "'%s' did not match the received hash '%s'.",
- expectedHash, actualHash);
- throw new IOException(msg);
- }
- }
-
/** Release resources associated with the cache. The cache may not be used after calling this. */
@Override
- public abstract void close();
+ public void close() {
+ cacheProtocol.close();
+ }
/**
* Creates an {@link OutputStream} that isn't actually opened until the first data is written.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
similarity index 66%
rename from src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
rename to src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
index 89a3bbc..aeaabac 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
@@ -18,11 +18,12 @@
import com.google.common.base.Ascii;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpBlobStore;
-import com.google.devtools.build.lib.remote.disk.OnDiskBlobStore;
-import com.google.devtools.build.lib.remote.http.HttpBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpCacheClient;
+import com.google.devtools.build.lib.remote.disk.DiskCacheClient;
+import com.google.devtools.build.lib.remote.http.HttpCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.netty.channel.unix.DomainSocketAddress;
@@ -31,36 +32,29 @@
import javax.annotation.Nullable;
/**
- * A factory class for providing a {@link SimpleBlobStore} to be used with {@link
- * SimpleBlobStoreActionCache}. Currently implemented with HTTP or local.
+ * A factory class for providing a {@link RemoteCacheClient}. Currently implemented for HTTP and
+ * disk caching.
*/
-public final class SimpleBlobStoreFactory {
+public final class RemoteCacheClientFactory {
- private SimpleBlobStoreFactory() {}
+ private RemoteCacheClientFactory() {}
- public static SimpleBlobStore create(RemoteOptions remoteOptions, @Nullable Path casPath) {
- if (isHttpUrlOptions(remoteOptions)) {
- return createHttp(remoteOptions, /* creds= */ null);
- } else if (casPath != null) {
- return new OnDiskBlobStore(casPath);
- } else {
- return null;
- }
- }
-
- public static SimpleBlobStore create(
- RemoteOptions options, @Nullable Credentials creds, Path workingDirectory)
+ public static RemoteCacheClient create(
+ RemoteOptions options,
+ @Nullable Credentials creds,
+ Path workingDirectory,
+ DigestUtil digestUtil)
throws IOException {
-
Preconditions.checkNotNull(workingDirectory, "workingDirectory");
if (isHttpUrlOptions(options) && isDiskCache(options)) {
- return createCombinedCache(workingDirectory, options.diskCache, options, creds);
+ return createCombinedCache(workingDirectory, options.diskCache, options, creds, digestUtil);
}
if (isHttpUrlOptions(options)) {
- return createHttp(options, creds);
+ return createHttp(options, creds, digestUtil);
}
if (isDiskCache(options)) {
- return createDiskCache(workingDirectory, options.diskCache);
+ return createDiskCache(
+ workingDirectory, options.diskCache, options.remoteVerifyDownloads, digestUtil);
}
throw new IllegalArgumentException(
"Unrecognized RemoteOptions configuration: remote Http cache URL and/or local disk cache"
@@ -71,7 +65,8 @@
return isHttpUrlOptions(options) || isDiskCache(options);
}
- private static SimpleBlobStore createHttp(RemoteOptions options, Credentials creds) {
+ private static RemoteCacheClient createHttp(
+ RemoteOptions options, Credentials creds, DigestUtil digestUtil) {
Preconditions.checkNotNull(options.remoteCache, "remoteCache");
try {
@@ -82,22 +77,26 @@
if (options.remoteProxy != null) {
if (options.remoteProxy.startsWith("unix:")) {
- return HttpBlobStore.create(
+ return HttpCacheClient.create(
new DomainSocketAddress(options.remoteProxy.replaceFirst("^unix:", "")),
uri,
options.remoteTimeout,
options.remoteMaxConnections,
+ options.remoteVerifyDownloads,
ImmutableList.copyOf(options.remoteHeaders),
+ digestUtil,
creds);
} else {
throw new Exception("Remote cache proxy unsupported: " + options.remoteProxy);
}
} else {
- return HttpBlobStore.create(
+ return HttpCacheClient.create(
uri,
options.remoteTimeout,
options.remoteMaxConnections,
+ options.remoteVerifyDownloads,
ImmutableList.copyOf(options.remoteHeaders),
+ digestUtil,
creds);
}
} catch (Exception e) {
@@ -105,29 +104,38 @@
}
}
- private static SimpleBlobStore createDiskCache(Path workingDirectory, PathFragment diskCachePath)
+ private static RemoteCacheClient createDiskCache(
+ Path workingDirectory,
+ PathFragment diskCachePath,
+ boolean verifyDownloads,
+ DigestUtil digestUtil)
throws IOException {
Path cacheDir =
workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
if (!cacheDir.exists()) {
cacheDir.createDirectoryAndParents();
}
- return new OnDiskBlobStore(cacheDir);
+ return new DiskCacheClient(cacheDir, verifyDownloads, digestUtil);
}
- private static SimpleBlobStore createCombinedCache(
- Path workingDirectory, PathFragment diskCachePath, RemoteOptions options, Credentials cred)
+ private static RemoteCacheClient createCombinedCache(
+ Path workingDirectory,
+ PathFragment diskCachePath,
+ RemoteOptions options,
+ Credentials cred,
+ DigestUtil digestUtil)
throws IOException {
-
Path cacheDir =
workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
if (!cacheDir.exists()) {
cacheDir.createDirectoryAndParents();
}
- OnDiskBlobStore diskCache = new OnDiskBlobStore(cacheDir);
- SimpleBlobStore httpCache = createHttp(options, cred);
- return new CombinedDiskHttpBlobStore(diskCache, httpCache);
+ DiskCacheClient diskCache =
+ new DiskCacheClient(cacheDir, options.remoteVerifyDownloads, digestUtil);
+ RemoteCacheClient httpCache = createHttp(options, cred, digestUtil);
+
+ return new CombinedDiskHttpCacheClient(diskCache, httpCache);
}
private static boolean isDiskCache(RemoteOptions options) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
new file mode 100644
index 0000000..bef7cc9
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -0,0 +1,128 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static java.lang.String.format;
+
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.Directory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
+import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
+import com.google.devtools.build.lib.remote.options.RemoteOptions;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.Utils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/** A {@link RemoteCache} with additional functionality needed for remote execution. */
+public class RemoteExecutionCache extends RemoteCache {
+
+ public RemoteExecutionCache(
+ GrpcCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
+ super(protocolImpl, options, digestUtil);
+ }
+
+ private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blobs)
+ throws IOException, InterruptedException {
+ List<ListenableFuture<Void>> uploads = new ArrayList<>();
+
+ for (Map.Entry<Digest, Path> entry : files.entrySet()) {
+ uploads.add(cacheProtocol.uploadFile(entry.getKey(), entry.getValue()));
+ }
+
+ for (Map.Entry<Digest, ByteString> entry : blobs.entrySet()) {
+ uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue()));
+ }
+
+ try {
+ for (ListenableFuture<Void> upload : uploads) {
+ upload.get();
+ }
+ } catch (ExecutionException e) {
+ // Cancel remaining uploads.
+ for (ListenableFuture<Void> upload : uploads) {
+ upload.cancel(/* mayInterruptIfRunning= */ true);
+ }
+
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ Throwables.propagateIfPossible(cause, InterruptedException.class);
+ throw new IOException(cause);
+ }
+ }
+
+ /**
+ * Ensures that the tree structure of the inputs, the input files themselves, and the command are
+ * available in the remote cache, such that the tree can be reassembled and executed on another
+ * machine given the root digest.
+ *
+ * <p>The cache may check whether files or parts of the tree structure are already present, and do
+ * not need to be uploaded again.
+ *
+ * <p>Note that this method is only required for remote execution, not for caching itself.
+ * However, remote execution uses a cache to store input files, and that may be a separate
+ * end-point from the executor itself, so the functionality lives here.
+ */
+ public void ensureInputsPresent(
+ MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot)
+ throws IOException, InterruptedException {
+ Iterable<Digest> allDigests =
+ Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
+ ImmutableSet<Digest> missingDigests =
+ Utils.getFromFuture(cacheProtocol.findMissingDigests(allDigests));
+ Map<Digest, Path> filesToUpload = new HashMap<>();
+ Map<Digest, ByteString> blobsToUpload = new HashMap<>();
+ for (Digest missingDigest : missingDigests) {
+ Directory node = merkleTree.getDirectoryByDigest(missingDigest);
+ if (node != null) {
+ blobsToUpload.put(missingDigest, node.toByteString());
+ continue;
+ }
+
+ PathOrBytes file = merkleTree.getFileByDigest(missingDigest);
+ if (file != null) {
+ if (file.getBytes() != null) {
+ blobsToUpload.put(missingDigest, file.getBytes());
+ continue;
+ }
+ filesToUpload.put(missingDigest, file.getPath());
+ continue;
+ }
+
+ Message message = additionalInputs.get(missingDigest);
+ if (message != null) {
+ blobsToUpload.put(missingDigest, message.toByteString());
+ continue;
+ }
+
+ throw new IOException(
+ format(
+ "findMissingDigests returned a missing digest that has not been requested: %s",
+ missingDigest));
+ }
+
+ uploadMissing(filesToUpload, blobsToUpload);
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 1598e25..4eba377 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -43,6 +43,7 @@
import com.google.devtools.build.lib.exec.ExecutorBuilder;
import com.google.devtools.build.lib.packages.TargetUtils;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.logging.LoggingInterceptor;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -160,8 +161,8 @@
DigestHashFunction hashFn = env.getRuntime().getFileSystem().getDigestFunction();
DigestUtil digestUtil = new DigestUtil(hashFn);
- boolean enableBlobStoreCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions);
- boolean enableGrpcCache = GrpcRemoteCache.isRemoteCacheOptions(remoteOptions);
+ boolean enableBlobStoreCache = RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions);
+ boolean enableGrpcCache = GrpcCacheClient.isRemoteCacheOptions(remoteOptions);
boolean enableRemoteExecution = shouldEnableRemoteExecution(remoteOptions);
if (enableBlobStoreCache && enableRemoteExecution) {
throw new AbruptExitException(
@@ -206,7 +207,7 @@
interceptors.toArray(new ClientInterceptor[0])));
}
RemoteRetrier executeRetrier = null;
- AbstractRemoteActionCache cache = null;
+ RemoteCacheClient cacheClient = null;
if (enableGrpcCache || !Strings.isNullOrEmpty(remoteOptions.remoteExecutor)) {
rpcRetrier =
new RemoteRetrier(
@@ -259,8 +260,8 @@
remoteOptions.remoteTimeout,
rpcRetrier);
cacheChannel.release();
- cache =
- new GrpcRemoteCache(
+ cacheClient =
+ new GrpcCacheClient(
cacheChannel.retain(),
credentials,
remoteOptions,
@@ -273,7 +274,7 @@
buildEventArtifactUploaderFactoryDelegate.init(
new ByteStreamBuildEventArtifactUploaderFactory(
uploader,
- cache,
+ cacheClient,
cacheChannel.authority(),
requestContext,
remoteOptions.remoteInstanceName));
@@ -281,13 +282,11 @@
if (enableBlobStoreCache) {
executeRetrier = null;
- cache =
- new SimpleBlobStoreActionCache(
+ cacheClient =
+ RemoteCacheClientFactory.create(
remoteOptions,
- SimpleBlobStoreFactory.create(
- remoteOptions,
- GoogleAuthUtils.newCredentials(authAndTlsOptions),
- Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory")),
+ GoogleAuthUtils.newCredentials(authAndTlsOptions),
+ Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"),
digestUtil);
}
@@ -306,15 +305,18 @@
retrier);
execChannel.release();
Preconditions.checkState(
- cache instanceof GrpcRemoteCache,
+ cacheClient instanceof GrpcCacheClient,
"Only the gRPC cache is support for remote execution");
+ RemoteExecutionCache remoteCache =
+ new RemoteExecutionCache((GrpcCacheClient) cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteExecution(
- env, (GrpcRemoteCache) cache, executor, executeRetrier, digestUtil, logDir);
- } else if (cache != null) {
+ env, remoteCache, executor, executeRetrier, digestUtil, logDir);
+ } else if (cacheClient != null) {
+ RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
- env, cache, executeRetrier, digestUtil);
+ env, remoteCache, executeRetrier, digestUtil);
}
} catch (IOException e) {
env.getReporter().handle(Event.error(e.getMessage()));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
index 3346b27..fc8064f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -20,7 +20,6 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import io.grpc.Status;
-import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
@@ -28,16 +27,28 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
/** Specific retry logic for remote execution/caching. */
public class RemoteRetrier extends Retrier {
+ @Nullable
+ private static Status fromException(Exception e) {
+ for (Throwable cause = e; cause != null; cause = cause.getCause()) {
+ if (cause instanceof StatusRuntimeException) {
+ return ((StatusRuntimeException) cause).getStatus();
+ }
+ }
+ return null;
+ }
+
public static final Predicate<? super Exception> RETRIABLE_GRPC_ERRORS =
e -> {
- if (!(e instanceof StatusException) && !(e instanceof StatusRuntimeException)) {
+ Status s = fromException(e);
+ if (s == null) {
+ // It's not a gRPC error.
return false;
}
- Status s = Status.fromThrowable(e);
switch (s.getCode()) {
case CANCELLED:
return !Thread.currentThread().isInterrupted();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index 52fb136..d32a79f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -46,7 +46,7 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -75,7 +75,7 @@
private final Path execRoot;
private final RemoteOptions options;
- private final AbstractRemoteActionCache remoteCache;
+ private final RemoteCache remoteCache;
private final String buildRequestId;
private final String commandId;
@@ -94,7 +94,7 @@
RemoteSpawnCache(
Path execRoot,
RemoteOptions options,
- AbstractRemoteActionCache remoteCache,
+ RemoteCache remoteCache,
String buildRequestId,
String commandId,
@Nullable Reporter cmdlineReporter,
@@ -153,7 +153,7 @@
try {
ActionResult result;
try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
- result = remoteCache.getCachedActionResult(actionKey);
+ result = remoteCache.downloadActionResult(actionKey);
}
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
// cache miss
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index cef0be7..bd35367 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -57,7 +57,7 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
@@ -99,7 +99,7 @@
private final boolean verboseFailures;
@Nullable private final Reporter cmdlineReporter;
- private final GrpcRemoteCache remoteCache;
+ private final RemoteExecutionCache remoteCache;
@Nullable private final GrpcRemoteExecutor remoteExecutor;
@Nullable private final RemoteRetrier retrier;
private final String buildRequestId;
@@ -125,7 +125,7 @@
@Nullable Reporter cmdlineReporter,
String buildRequestId,
String commandId,
- GrpcRemoteCache remoteCache,
+ RemoteExecutionCache remoteCache,
GrpcRemoteExecutor remoteExecutor,
@Nullable RemoteRetrier retrier,
DigestUtil digestUtil,
@@ -191,7 +191,7 @@
// Try to lookup the action in the action cache.
ActionResult cachedResult;
try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
- cachedResult = acceptCachedResult ? remoteCache.getCachedActionResult(actionKey) : null;
+ cachedResult = acceptCachedResult ? remoteCache.downloadActionResult(actionKey) : null;
}
if (cachedResult != null) {
if (cachedResult.getExitCode() != 0) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
deleted file mode 100644
index 363340c..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import build.bazel.remote.execution.v2.ActionResult;
-import build.bazel.remote.execution.v2.Digest;
-import build.bazel.remote.execution.v2.Directory;
-import build.bazel.remote.execution.v2.DirectoryNode;
-import build.bazel.remote.execution.v2.FileNode;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.hash.HashingOutputStream;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
-import com.google.devtools.build.lib.remote.options.RemoteOptions;
-import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.build.lib.remote.util.Utils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.io.OutputStream;
-import javax.annotation.Nullable;
-
-/**
- * A RemoteActionCache implementation that uses a simple blob store for files and action output.
- *
- * <p>The thread safety is guaranteed by the underlying simple blob store.
- *
- * <p>Note that this class is used from src/tools/remote.
- */
-@ThreadSafe
-public class SimpleBlobStoreActionCache extends AbstractRemoteActionCache {
- protected final SimpleBlobStore blobStore;
-
- public SimpleBlobStoreActionCache(
- RemoteOptions options, SimpleBlobStore blobStore, DigestUtil digestUtil) {
- super(options, digestUtil);
- this.blobStore = blobStore;
- }
-
- public void downloadTree(Digest rootDigest, Path rootLocation)
- throws IOException, InterruptedException {
- rootLocation.createDirectoryAndParents();
- Directory directory = Directory.parseFrom(Utils.getFromFuture(downloadBlob(rootDigest)));
- for (FileNode file : directory.getFilesList()) {
- Path dst = rootLocation.getRelative(file.getName());
- Utils.getFromFuture(downloadFile(dst, file.getDigest()));
- dst.setExecutable(file.getIsExecutable());
- }
- for (DirectoryNode child : directory.getDirectoriesList()) {
- downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
- }
- }
-
- @Override
- public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
- return blobStore.uploadFile(digest, file);
- }
-
- @Override
- public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
- return blobStore.uploadBlob(digest, data);
- }
-
- @Override
- public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
- return Futures.immediateFuture(ImmutableSet.copyOf(digests));
- }
-
- @Override
- public ActionResult getCachedActionResult(ActionKey actionKey)
- throws IOException, InterruptedException {
- return Utils.getFromFuture(blobStore.downloadActionResult(actionKey));
- }
-
- public void setCachedActionResult(ActionKey actionKey, ActionResult result)
- throws IOException, InterruptedException {
- blobStore.uploadActionResult(actionKey, result);
- }
-
- @Override
- public void close() {
- blobStore.close();
- }
-
- @Override
- protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
- SettableFuture<Void> outerF = SettableFuture.create();
- @Nullable
- HashingOutputStream hashOut =
- options.remoteVerifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
- Futures.addCallback(
- blobStore.downloadBlob(digest, hashOut != null ? hashOut : out),
- new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void unused) {
- try {
- if (hashOut != null) {
- verifyContents(digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
- }
- out.flush();
- outerF.set(null);
- } catch (IOException e) {
- outerF.setException(e);
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- outerF.setException(throwable);
- }
- },
- MoreExecutors.directExecutor());
- return outerF;
- }
-
- public DigestUtil getDigestUtil() {
- return digestUtil;
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
deleted file mode 100644
index c840c0d..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
+++ /dev/null
@@ -1,24 +0,0 @@
-load("@rules_java//java:defs.bzl", "java_library")
-
-package(default_visibility = ["//src:__subpackages__"])
-
-filegroup(
- name = "srcs",
- srcs = glob(["**"]),
- visibility = ["//src/main/java/com/google/devtools/build/lib/remote:__pkg__"],
-)
-
-java_library(
- name = "blobstore",
- srcs = glob(["*.java"]),
- tags = ["bazel"],
- deps = [
- "//src/main/java/com/google/devtools/build/lib/remote/common",
- "//src/main/java/com/google/devtools/build/lib/remote/util",
- "//src/main/java/com/google/devtools/build/lib/vfs",
- "//src/main/java/com/google/devtools/common/options",
- "//third_party:guava",
- "//third_party/protobuf:protobuf_java",
- "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
- ],
-)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
deleted file mode 100644
index 2aaa588..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote.blobstore;
-
-
-/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
diff --git a/src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java b/src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
similarity index 92%
rename from src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java
rename to src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
index 1a6bd9e..c9c1d40 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
@@ -11,14 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.google.devtools.build.lib.remote;
+package com.google.devtools.build.lib.remote.common;
import build.bazel.remote.execution.v2.Digest;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
/** Supports querying a remote cache whether it contains a list of blobs. */
-interface MissingDigestsFinder {
+public interface MissingDigestsFinder {
/**
* Returns a set of digests that the remote cache does not know about. The returned set is
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
similarity index 90%
rename from src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index da6e6de..2c1bb83 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -29,13 +29,14 @@
*
* <p>Implementations must be thread-safe.
*/
-public interface SimpleBlobStore {
+public interface RemoteCacheClient extends MissingDigestsFinder {
/**
* A key in the remote action cache. The type wraps around a {@link Digest} of an {@link Action}.
* Action keys are special in that they aren't content-addressable but refer to action results.
*/
final class ActionKey {
+
private final Digest digest;
public Digest getDigest() {
@@ -45,6 +46,21 @@
public ActionKey(Digest digest) {
this.digest = Preconditions.checkNotNull(digest, "digest");
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof ActionKey)) {
+ return false;
+ }
+
+ ActionKey otherKey = (ActionKey) other;
+ return digest.equals(otherKey.digest);
+ }
+
+ @Override
+ public int hashCode() {
+ return digest.hashCode();
+ }
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
similarity index 80%
rename from src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
index 2f38e3a..cc1c068 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
@@ -16,10 +16,11 @@
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.IOException;
@@ -28,16 +29,16 @@
import java.util.concurrent.ExecutionException;
/**
- * A {@link SimpleBlobStore} implementation combining two blob stores. A local disk blob store and a
- * remote blob store. If a blob isn't found in the first store, the second store is used, and the
+ * A {@link RemoteCacheClient} implementation combining two blob stores. A local disk blob store and
+ * a remote blob store. If a blob isn't found in the first store, the second store is used, and the
* blob added to the first. Put puts the blob on both stores.
*/
-public final class CombinedDiskHttpBlobStore implements SimpleBlobStore {
+public final class CombinedDiskHttpCacheClient implements RemoteCacheClient {
- private final SimpleBlobStore remoteCache;
- private final OnDiskBlobStore diskCache;
+ private final RemoteCacheClient remoteCache;
+ private final DiskCacheClient diskCache;
- public CombinedDiskHttpBlobStore(OnDiskBlobStore diskCache, SimpleBlobStore remoteCache) {
+ public CombinedDiskHttpCacheClient(DiskCacheClient diskCache, RemoteCacheClient remoteCache) {
this.diskCache = Preconditions.checkNotNull(diskCache);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
}
@@ -81,6 +82,20 @@
return Futures.immediateFuture(null);
}
+ @Override
+ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+ ListenableFuture<ImmutableSet<Digest>> remoteQuery = remoteCache.findMissingDigests(digests);
+ ListenableFuture<ImmutableSet<Digest>> diskQuery = diskCache.findMissingDigests(digests);
+ return Futures.whenAllSucceed(remoteQuery, diskQuery)
+ .call(
+ () ->
+ ImmutableSet.<Digest>builder()
+ .addAll(remoteQuery.get())
+ .addAll(diskQuery.get())
+ .build(),
+ MoreExecutors.directExecutor());
+ }
+
private Path newTempPath() {
return diskCache.toPath(UUID.randomUUID().toString(), /* actionResult= */ false);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
similarity index 76%
rename from src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
index ddd0cec..68ad316 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -15,11 +15,15 @@
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashingOutputStream;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
@@ -27,14 +31,21 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.UUID;
+import javax.annotation.Nullable;
/** A on-disk store for the remote action cache. */
-public class OnDiskBlobStore implements SimpleBlobStore {
- private final Path root;
+public class DiskCacheClient implements RemoteCacheClient {
+
private static final String ACTION_KEY_PREFIX = "ac_";
- public OnDiskBlobStore(Path root) {
+ private final Path root;
+ private final boolean verifyDownloads;
+ private final DigestUtil digestUtil;
+
+ public DiskCacheClient(Path root, boolean verifyDownloads, DigestUtil digestUtil) {
this.root = root;
+ this.verifyDownloads = verifyDownloads;
+ this.digestUtil = digestUtil;
}
/** Returns {@code true} if the provided {@code key} is stored in the CAS. */
@@ -68,7 +79,23 @@
@Override
public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
- return download(digest, out, /* isActionCache= */ false);
+ @Nullable
+ HashingOutputStream hashOut = verifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
+ return Futures.transformAsync(
+ download(digest, hashOut != null ? hashOut : out, /* isActionCache= */ false),
+ (v) -> {
+ try {
+ if (hashOut != null) {
+ Utils.verifyBlobContents(
+ digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
+ }
+ out.flush();
+ return Futures.immediateFuture(null);
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ },
+ MoreExecutors.directExecutor());
}
@Override
@@ -108,6 +135,11 @@
return Futures.immediateFuture(null);
}
+ @Override
+ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+ return Futures.immediateFuture(ImmutableSet.copyOf(digests));
+ }
+
protected Path toPath(String key, boolean actionResult) {
return root.getChild(getDiskKey(key, actionResult));
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
index 9361235..e82e236 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
@@ -103,7 +103,7 @@
if (!uri.getPath().endsWith("/")) {
builder.append("/");
}
- builder.append(isCas ? HttpBlobStore.CAS_PREFIX : HttpBlobStore.AC_PREFIX);
+ builder.append(isCas ? HttpCacheClient.CAS_PREFIX : HttpCacheClient.AC_PREFIX);
builder.append(hash);
return builder.toString();
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
similarity index 92%
rename from src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
rename to src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 19e9bd9..c0f0051 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -17,11 +17,15 @@
import build.bazel.remote.execution.v2.Digest;
import com.google.auth.Credentials;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashingOutputStream;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
@@ -84,7 +88,7 @@
import javax.net.ssl.SSLEngine;
/**
- * Implementation of {@link SimpleBlobStore} that can talk to a HTTP/1.1 backend.
+ * Implementation of {@link RemoteCacheClient} that can talk to a HTTP/1.1 backend.
*
* <p>Blobs (Binary large objects) are uploaded using the {@code PUT} method. Action cache blobs are
* stored under the path {@code /ac/base16-key}. CAS (Content Addressable Storage) blobs are stored
@@ -107,7 +111,7 @@
*
* <p>The implementation currently does not support transfer encoding chunked.
*/
-public final class HttpBlobStore implements SimpleBlobStore {
+public final class HttpCacheClient implements RemoteCacheClient {
public static final String AC_PREFIX = "ac/";
public static final String CAS_PREFIX = "cas/";
@@ -122,6 +126,8 @@
private final int timeoutSeconds;
private final ImmutableList<Entry<String, String>> extraHttpHeaders;
private final boolean useTls;
+ private final boolean verifyDownloads;
+ private final DigestUtil digestUtil;
private final Object closeLock = new Object();
@@ -136,51 +142,61 @@
@GuardedBy("credentialsLock")
private long lastRefreshTime;
- public static HttpBlobStore create(
+ public static HttpCacheClient create(
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
+ boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
+ DigestUtil digestUtil,
@Nullable final Credentials creds)
throws Exception {
- return new HttpBlobStore(
+ return new HttpCacheClient(
NioEventLoopGroup::new,
NioSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
+ verifyDownloads,
extraHttpHeaders,
+ digestUtil,
creds,
null);
}
- public static HttpBlobStore create(
+ public static HttpCacheClient create(
DomainSocketAddress domainSocketAddress,
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
+ boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
+ DigestUtil digestUtil,
@Nullable final Credentials creds)
throws Exception {
if (KQueue.isAvailable()) {
- return new HttpBlobStore(
+ return new HttpCacheClient(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
+ verifyDownloads,
extraHttpHeaders,
+ digestUtil,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
- return new HttpBlobStore(
+ return new HttpCacheClient(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
+ verifyDownloads,
extraHttpHeaders,
+ digestUtil,
creds,
domainSocketAddress);
} else {
@@ -188,13 +204,15 @@
}
}
- private HttpBlobStore(
+ private HttpCacheClient(
Function<Integer, EventLoopGroup> newEventLoopGroup,
Class<? extends Channel> channelClass,
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
+ boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
+ DigestUtil digestUtil,
@Nullable final Credentials creds,
@Nullable SocketAddress socketAddress)
throws Exception {
@@ -261,6 +279,8 @@
this.creds = creds;
this.timeoutSeconds = timeoutSeconds;
this.extraHttpHeaders = extraHttpHeaders;
+ this.verifyDownloads = verifyDownloads;
+ this.digestUtil = digestUtil;
}
@SuppressWarnings("FutureReturnValueIgnored")
@@ -419,7 +439,23 @@
@Override
public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
- return get(digest, out, /* casDownload= */ true);
+ final HashingOutputStream hashOut =
+ verifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
+ return Futures.transformAsync(
+ get(digest, hashOut != null ? hashOut : out, /* casDownload= */ true),
+ (v) -> {
+ try {
+ if (hashOut != null) {
+ Utils.verifyBlobContents(
+ digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
+ }
+ out.flush();
+ return Futures.immediateFuture(null);
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ },
+ MoreExecutors.directExecutor());
}
@SuppressWarnings("FutureReturnValueIgnored")
@@ -607,6 +643,11 @@
return Futures.immediateFuture(null);
}
+ @Override
+ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) {
+ return Futures.immediateFuture(ImmutableSet.copyOf(digests));
+ }
+
@SuppressWarnings("FutureReturnValueIgnored")
private void putAfterCredentialRefresh(UploadCommand cmd) throws InterruptedException {
Channel ch = null;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
index 6fc1446..ee3f06d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -23,7 +23,7 @@
import com.google.common.io.BaseEncoding;
import com.google.devtools.build.lib.actions.cache.DigestUtils;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.Message;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
index 7191511..e707260 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
@@ -18,7 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Contexts;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index f1c3083..79f7c68 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -26,7 +26,7 @@
import com.google.devtools.build.lib.actions.SpawnResult;
import com.google.devtools.build.lib.actions.SpawnResult.Status;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
+import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.protobuf.ByteString;
@@ -149,6 +149,17 @@
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
}
+ public static void verifyBlobContents(String expectedHash, String actualHash) throws IOException {
+ if (!expectedHash.equals(actualHash)) {
+ String msg =
+ String.format(
+ "An output download failed, because the expected hash"
+ + "'%s' did not match the received hash '%s'.",
+ expectedHash, actualHash);
+ throw new IOException(msg);
+ }
+ }
+
/** An in-memory output file. */
public static final class InMemoryOutput {
private final ActionInput output;