Refactor remote package to only have one remote cache impl. Historically the remote package shipped two implementations for remote caching: GrpcRemoteCache and SimpleBlobStoreActionCache. Today there is no good reason to keep this structure and the duplication of code and tests that comes with it. This is the final refactoring of a long series: * 5fc91a7 * 12ebb84 * 8ac6bdc * 9fb83b4 * 36611c3 * a6b5b05 * d40933d * 797f292 This change makes it so that there is one RemoteCache that has a RemoteCacheClient. The RemoteCacheClient has one implementation per caching protocol: * DiskCacheClient (formerly OnDiskBlobStore) * GrpcCacheClient (formerly GrpcRemoteCache) * HttpCacheClient (formerly HttpBlobStore) * CombinedDiskHttpCacheClient (formerly CombinedDiskHttpBlobStore) A single RemoteCache type for all protocols will allow composition of caching protocols. In particular, grpc and disk caching. Additionally, this change will allow us to fix a longstanding bug where for the HTTP Closes #10200. PiperOrigin-RevId: 279723411
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 4b6264d..31d074e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -7,7 +7,6 @@ srcs = glob(["**"]) + [ "//src/main/java/com/google/devtools/build/lib/remote/common:srcs", "//src/main/java/com/google/devtools/build/lib/remote/disk:srcs", - "//src/main/java/com/google/devtools/build/lib/remote/blobstore:srcs", "//src/main/java/com/google/devtools/build/lib/remote/http:srcs", "//src/main/java/com/google/devtools/build/lib/remote/logging:srcs", "//src/main/java/com/google/devtools/build/lib/remote/options:srcs", @@ -40,7 +39,6 @@ "//src/main/java/com/google/devtools/build/lib/buildeventstream", "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/profiler", - "//src/main/java/com/google/devtools/build/lib/remote/blobstore", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/http",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java index b312d57..107023c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -28,6 +28,7 @@ import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.PathConverter; import com.google.devtools.build.lib.collect.ImmutableIterable; +import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import io.grpc.Context;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java index e6b6cad..978c10f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
@@ -14,6 +14,7 @@ package com.google.devtools.build.lib.remote; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; +import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory; import com.google.devtools.build.lib.runtime.CommandEnvironment;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java similarity index 76% rename from src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java rename to src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index c8709b3..4d694f0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -15,15 +15,14 @@ package com.google.devtools.build.lib.remote; import static com.google.common.base.Strings.isNullOrEmpty; -import static java.lang.String.format; import build.bazel.remote.execution.v2.ActionCacheGrpc; import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub; +import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub; import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub; import build.bazel.remote.execution.v2.Digest; -import build.bazel.remote.execution.v2.Directory; import build.bazel.remote.execution.v2.FindMissingBlobsRequest; import build.bazel.remote.execution.v2.FindMissingBlobsResponse; import build.bazel.remote.execution.v2.GetActionResultRequest; @@ -37,7 +36,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.hash.HashCode; import com.google.common.hash.HashingOutputStream; import com.google.common.util.concurrent.FutureCallback; @@ -48,26 +46,24 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; -import com.google.devtools.build.lib.remote.merkletree.MerkleTree; -import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes; +import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; import io.grpc.CallCredentials; import io.grpc.Context; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -76,9 +72,11 @@ /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */ @ThreadSafe -public class GrpcRemoteCache extends AbstractRemoteActionCache { +public class GrpcCacheClient implements RemoteCacheClient, MissingDigestsFinder { private final CallCredentials credentials; private final ReferenceCountedChannel channel; + private final RemoteOptions options; + private final DigestUtil digestUtil; private final RemoteRetrier retrier; private final ByteStreamUploader uploader; private final int maxMissingBlobsDigestsPerMessage; @@ -86,16 +84,17 @@ private AtomicBoolean closed = new AtomicBoolean(); @VisibleForTesting - public GrpcRemoteCache( + public GrpcCacheClient( ReferenceCountedChannel channel, CallCredentials credentials, RemoteOptions options, RemoteRetrier retrier, DigestUtil digestUtil, ByteStreamUploader uploader) { - super(options, digestUtil); this.credentials = credentials; this.channel = channel; + this.options = options; + this.digestUtil = digestUtil; this.retrier = retrier; this.uploader = uploader; maxMissingBlobsDigestsPerMessage = computeMaxMissingBlobsDigestsPerMessage(); @@ -141,6 +140,13 @@ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); } + private ActionCacheFutureStub acFutureStub() { + return ActionCacheGrpc.newFutureStub(channel) + .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) + .withCallCredentials(credentials) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + @Override public void close() { if (closed.getAndSet(true)) { @@ -199,60 +205,28 @@ return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request))); } - /** - * Ensures that the tree structure of the inputs, the input files themselves, and the command are - * available in the remote cache, such that the tree can be reassembled and executed on another - * machine given the root digest. - * - * <p>The cache may check whether files or parts of the tree structure are already present, and do - * not need to be uploaded again. - * - * <p>Note that this method is only required for remote execution, not for caching itself. - * However, remote execution uses a cache to store input files, and that may be a separate - * end-point from the executor itself, so the functionality lives here. - */ - public void ensureInputsPresent( - MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot) - throws IOException, InterruptedException { - Iterable<Digest> allDigests = - Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet()); - ImmutableSet<Digest> missingDigests = Utils.getFromFuture(findMissingDigests(allDigests)); - Map<HashCode, Chunker> inputsToUpload = Maps.newHashMapWithExpectedSize(missingDigests.size()); - for (Digest missingDigest : missingDigests) { - Directory node = merkleTree.getDirectoryByDigest(missingDigest); - HashCode hash = HashCode.fromString(missingDigest.getHash()); - if (node != null) { - Chunker c = Chunker.builder().setInput(node.toByteArray()).build(); - inputsToUpload.put(hash, c); - continue; - } + private ListenableFuture<ActionResult> handleStatus(ListenableFuture<ActionResult> download) { + return Futures.catchingAsync( + download, + StatusRuntimeException.class, + (sre) -> + sre.getStatus().getCode() == Code.NOT_FOUND + // Return null to indicate that it was a cache miss. + ? Futures.immediateFuture(null) + : Futures.immediateFailedFuture(new IOException(sre)), + MoreExecutors.directExecutor()); + } - PathOrBytes file = merkleTree.getFileByDigest(missingDigest); - if (file != null) { - final Chunker c; - if (file.getPath() != null) { - c = Chunker.builder().setInput(missingDigest.getSizeBytes(), file.getPath()).build(); - } else { - c = Chunker.builder().setInput(file.getBytes().toByteArray()).build(); - } - inputsToUpload.put(hash, c); - continue; - } - - Message message = additionalInputs.get(missingDigest); - if (message != null) { - Chunker c = Chunker.builder().setInput(message.toByteArray()).build(); - inputsToUpload.put(hash, c); - continue; - } - - throw new IOException( - format( - "findMissingDigests returned a missing digest that has not been requested: %s", - missingDigest)); - } - - uploader.uploadBlobs(inputsToUpload, /* forceUpload= */ true); + @Override + public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) { + GetActionResultRequest request = + GetActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .build(); + Context ctx = Context.current(); + return retrier.executeAsync( + () -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request)))); } private static String digestToString(Digest digest) { @@ -260,7 +234,25 @@ } @Override - protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { + public void uploadActionResult(ActionKey actionKey, ActionResult actionResult) + throws IOException, InterruptedException { + try { + retrier.execute( + () -> + acBlockingStub() + .updateActionResult( + UpdateActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .setActionResult(actionResult) + .build())); + } catch (StatusRuntimeException e) { + throw new IOException(e); + } + } + + @Override + public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { if (digest.getSizeBytes() == 0) { return Futures.immediateFuture(null); } @@ -363,7 +355,7 @@ public void onCompleted() { try { if (hashSupplier != null) { - verifyContents( + Utils.verifyBlobContents( digest.getHash(), DigestUtil.hashCodeToString(hashSupplier.get())); } out.flush(); @@ -377,7 +369,7 @@ } @Override - protected ListenableFuture<Void> uploadFile(Digest digest, Path path) { + public ListenableFuture<Void> uploadFile(Digest digest, Path path) { return uploader.uploadBlobAsync( HashCode.fromString(digest.getHash()), Chunker.builder().setInput(digest.getSizeBytes(), path).build(), @@ -385,51 +377,10 @@ } @Override - protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) { + public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) { return uploader.uploadBlobAsync( HashCode.fromString(digest.getHash()), Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true); } - - // Execution Cache API - - @Override - public ActionResult getCachedActionResult(ActionKey actionKey) - throws IOException, InterruptedException { - try { - return retrier.execute( - () -> - acBlockingStub() - .getActionResult( - GetActionResultRequest.newBuilder() - .setInstanceName(options.remoteInstanceName) - .setActionDigest(actionKey.getDigest()) - .build())); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.NOT_FOUND) { - // Return null to indicate that it was a cache miss. - return null; - } - throw new IOException(e); - } - } - - @Override - protected void setCachedActionResult(ActionKey actionKey, ActionResult result) - throws IOException, InterruptedException { - try { - retrier.execute( - () -> - acBlockingStub() - .updateActionResult( - UpdateActionResultRequest.newBuilder() - .setInstanceName(options.remoteInstanceName) - .setActionDigest(actionKey.getDigest()) - .setActionResult(result) - .build())); - } catch (StatusRuntimeException e) { - throw new IOException(e); - } - } }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java index 535bba1..a929254 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -43,7 +43,7 @@ */ final class RemoteActionContextProvider extends ActionContextProvider { private final CommandEnvironment env; - private final AbstractRemoteActionCache cache; + private final RemoteCache cache; @Nullable private final GrpcRemoteExecutor executor; private final RemoteRetrier retrier; private final DigestUtil digestUtil; @@ -53,7 +53,7 @@ private RemoteActionContextProvider( CommandEnvironment env, - AbstractRemoteActionCache cache, + RemoteCache cache, @Nullable GrpcRemoteExecutor executor, RemoteRetrier retrier, DigestUtil digestUtil, @@ -67,17 +67,14 @@ } public static RemoteActionContextProvider createForRemoteCaching( - CommandEnvironment env, - AbstractRemoteActionCache cache, - RemoteRetrier retrier, - DigestUtil digestUtil) { + CommandEnvironment env, RemoteCache cache, RemoteRetrier retrier, DigestUtil digestUtil) { return new RemoteActionContextProvider( env, cache, /*executor=*/ null, retrier, digestUtil, /*logDir=*/ null); } public static RemoteActionContextProvider createForRemoteExecution( CommandEnvironment env, - GrpcRemoteCache cache, + RemoteExecutionCache cache, GrpcRemoteExecutor executor, RemoteRetrier retrier, DigestUtil digestUtil, @@ -116,7 +113,7 @@ env.getReporter(), buildRequestId, commandId, - (GrpcRemoteCache) cache, + (RemoteExecutionCache) cache, executor, retrier, digestUtil, @@ -167,7 +164,7 @@ /** Returns the remote cache object if any. */ @Nullable - AbstractRemoteActionCache getRemoteCache() { + RemoteCache getRemoteCache() { return cache; }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java index 2d82bd6..72c6d54 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -65,11 +65,11 @@ @GuardedBy("lock") final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>(); - private final AbstractRemoteActionCache remoteCache; + private final RemoteCache remoteCache; private final Path execRoot; private final Context ctx; - RemoteActionInputFetcher(AbstractRemoteActionCache remoteCache, Path execRoot, Context ctx) { + RemoteActionInputFetcher(RemoteCache remoteCache, Path execRoot, Context ctx) { this.remoteCache = Preconditions.checkNotNull(remoteCache); this.execRoot = Preconditions.checkNotNull(execRoot); this.ctx = Preconditions.checkNotNull(ctx);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java similarity index 92% rename from src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java rename to src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java index 1aacf14..4df9100 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -50,11 +50,11 @@ import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; -import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.DirectoryMetadata; -import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.FileMetadata; -import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.SymlinkMetadata; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; +import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.DirectoryMetadata; +import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.FileMetadata; +import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.SymlinkMetadata; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.Utils; @@ -87,7 +87,7 @@ /** A cache for storing artifacts (input and output) as well as the output of running an action. */ @ThreadSafety.ThreadSafe -public abstract class AbstractRemoteActionCache implements MissingDigestsFinder, AutoCloseable { +public class RemoteCache implements AutoCloseable { /** See {@link SpawnExecutionContext#lockOutputFiles()}. */ @FunctionalInterface @@ -103,49 +103,21 @@ ((SettableFuture<byte[]>) EMPTY_BYTES).set(new byte[0]); } + protected final RemoteCacheClient cacheProtocol; protected final RemoteOptions options; protected final DigestUtil digestUtil; - public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) { + public RemoteCache( + RemoteCacheClient cacheProtocol, RemoteOptions options, DigestUtil digestUtil) { + this.cacheProtocol = cacheProtocol; this.options = options; this.digestUtil = digestUtil; } - /** - * Attempts to look up the given action in the remote cache and return its result, if present. - * Returns {@code null} if there is no such entry. Note that a successful result from this method - * does not guarantee the availability of the corresponding output files in the remote cache. - * - * @throws IOException if the remote cache is unavailable. - */ - @Nullable - abstract ActionResult getCachedActionResult(ActionKey actionKey) - throws IOException, InterruptedException; - - protected abstract void setCachedActionResult(ActionKey actionKey, ActionResult action) - throws IOException, InterruptedException; - - /** - * Uploads a file - * - * <p>Any errors are being propagated via the returned future. If the future completes without - * errors the upload was successful. - * - * @param digest the digest of the file. - * @param file the file to upload. - */ - protected abstract ListenableFuture<Void> uploadFile(Digest digest, Path file); - - /** - * Uploads a BLOB. - * - * <p>Any errors are being propagated via the returned future. If the future completes without - * errors the upload was successful - * - * @param digest the digest of the blob. - * @param data the blob to upload. - */ - protected abstract ListenableFuture<Void> uploadBlob(Digest digest, ByteString data); + public ActionResult downloadActionResult(ActionKey actionKey) + throws IOException, InterruptedException { + return Utils.getFromFuture(cacheProtocol.downloadActionResult(actionKey)); + } /** * Upload the result of a locally executed action to the remote cache. @@ -167,7 +139,7 @@ resultBuilder.setExitCode(exitCode); ActionResult result = resultBuilder.build(); if (exitCode == 0 && !action.getDoNotCache()) { - setCachedActionResult(actionKey, result); + cacheProtocol.uploadActionResult(actionKey, result); } return result; } @@ -209,19 +181,20 @@ digests.addAll(digestToFile.keySet()); digests.addAll(digestToBlobs.keySet()); - ImmutableSet<Digest> digestsToUpload = Utils.getFromFuture(findMissingDigests(digests)); + ImmutableSet<Digest> digestsToUpload = + Utils.getFromFuture(cacheProtocol.findMissingDigests(digests)); ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder(); for (Digest digest : digestsToUpload) { Path file = digestToFile.get(digest); if (file != null) { - uploads.add(uploadFile(digest, file)); + uploads.add(cacheProtocol.uploadFile(digest, file)); } else { ByteString blob = digestToBlobs.get(digest); if (blob == null) { String message = "FindMissingBlobs call returned an unknown digest: " + digest; throw new IOException(message); } - uploads.add(uploadBlob(digest, blob)); + uploads.add(cacheProtocol.uploadBlob(digest, blob)); } } @@ -255,13 +228,6 @@ } /** - * Downloads a blob with a content hash {@code digest} to {@code out}. - * - * @return a future that completes after the download completes (succeeds / fails). - */ - protected abstract ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out); - - /** * Downloads a blob with content hash {@code digest} and stores its content in memory. * * @return a future that completes after the download completes (succeeds / fails). If successful, @@ -274,7 +240,7 @@ ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes()); SettableFuture<byte[]> outerF = SettableFuture.create(); Futures.addCallback( - downloadBlob(digest, bOut), + cacheProtocol.downloadBlob(digest, bOut), new FutureCallback<Void>() { @Override public void onSuccess(Void aVoid) { @@ -490,7 +456,7 @@ OutputStream out = new LazyFileOutputStream(path); SettableFuture<Void> outerF = SettableFuture.create(); - ListenableFuture<Void> f = downloadBlob(digest, out); + ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out); Futures.addCallback( f, new FutureCallback<Void>() { @@ -530,7 +496,7 @@ } else if (result.hasStdoutDigest()) { downloads.add( Futures.transform( - downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()), + cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()), (d) -> null, directExecutor())); } @@ -540,7 +506,7 @@ } else if (result.hasStderrDigest()) { downloads.add( Futures.transform( - downloadBlob(result.getStderrDigest(), outErr.getErrorStream()), + cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()), (d) -> null, directExecutor())); } @@ -857,7 +823,7 @@ * Adds an action and command protos to upload. They need to be uploaded as part of the action * result. */ - public void addAction(SimpleBlobStore.ActionKey actionKey, Action action, Command command) { + public void addAction(RemoteCacheClient.ActionKey actionKey, Action action, Command command) { digestToBlobs.put(actionKey.getDigest(), action.toByteString()); digestToBlobs.put(action.getCommandDigest(), command.toByteString()); } @@ -993,20 +959,11 @@ } } - protected void verifyContents(String expectedHash, String actualHash) throws IOException { - if (!expectedHash.equals(actualHash)) { - String msg = - String.format( - "An output download failed, because the expected hash" - + "'%s' did not match the received hash '%s'.", - expectedHash, actualHash); - throw new IOException(msg); - } - } - /** Release resources associated with the cache. The cache may not be used after calling this. */ @Override - public abstract void close(); + public void close() { + cacheProtocol.close(); + } /** * Creates an {@link OutputStream} that isn't actually opened until the first data is written.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java similarity index 66% rename from src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java rename to src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index 89a3bbc..aeaabac 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
@@ -18,11 +18,12 @@ import com.google.common.base.Ascii; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore; -import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpBlobStore; -import com.google.devtools.build.lib.remote.disk.OnDiskBlobStore; -import com.google.devtools.build.lib.remote.http.HttpBlobStore; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.disk.CombinedDiskHttpCacheClient; +import com.google.devtools.build.lib.remote.disk.DiskCacheClient; +import com.google.devtools.build.lib.remote.http.HttpCacheClient; import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import io.netty.channel.unix.DomainSocketAddress; @@ -31,36 +32,29 @@ import javax.annotation.Nullable; /** - * A factory class for providing a {@link SimpleBlobStore} to be used with {@link - * SimpleBlobStoreActionCache}. Currently implemented with HTTP or local. + * A factory class for providing a {@link RemoteCacheClient}. Currently implemented for HTTP and + * disk caching. */ -public final class SimpleBlobStoreFactory { +public final class RemoteCacheClientFactory { - private SimpleBlobStoreFactory() {} + private RemoteCacheClientFactory() {} - public static SimpleBlobStore create(RemoteOptions remoteOptions, @Nullable Path casPath) { - if (isHttpUrlOptions(remoteOptions)) { - return createHttp(remoteOptions, /* creds= */ null); - } else if (casPath != null) { - return new OnDiskBlobStore(casPath); - } else { - return null; - } - } - - public static SimpleBlobStore create( - RemoteOptions options, @Nullable Credentials creds, Path workingDirectory) + public static RemoteCacheClient create( + RemoteOptions options, + @Nullable Credentials creds, + Path workingDirectory, + DigestUtil digestUtil) throws IOException { - Preconditions.checkNotNull(workingDirectory, "workingDirectory"); if (isHttpUrlOptions(options) && isDiskCache(options)) { - return createCombinedCache(workingDirectory, options.diskCache, options, creds); + return createCombinedCache(workingDirectory, options.diskCache, options, creds, digestUtil); } if (isHttpUrlOptions(options)) { - return createHttp(options, creds); + return createHttp(options, creds, digestUtil); } if (isDiskCache(options)) { - return createDiskCache(workingDirectory, options.diskCache); + return createDiskCache( + workingDirectory, options.diskCache, options.remoteVerifyDownloads, digestUtil); } throw new IllegalArgumentException( "Unrecognized RemoteOptions configuration: remote Http cache URL and/or local disk cache" @@ -71,7 +65,8 @@ return isHttpUrlOptions(options) || isDiskCache(options); } - private static SimpleBlobStore createHttp(RemoteOptions options, Credentials creds) { + private static RemoteCacheClient createHttp( + RemoteOptions options, Credentials creds, DigestUtil digestUtil) { Preconditions.checkNotNull(options.remoteCache, "remoteCache"); try { @@ -82,22 +77,26 @@ if (options.remoteProxy != null) { if (options.remoteProxy.startsWith("unix:")) { - return HttpBlobStore.create( + return HttpCacheClient.create( new DomainSocketAddress(options.remoteProxy.replaceFirst("^unix:", "")), uri, options.remoteTimeout, options.remoteMaxConnections, + options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), + digestUtil, creds); } else { throw new Exception("Remote cache proxy unsupported: " + options.remoteProxy); } } else { - return HttpBlobStore.create( + return HttpCacheClient.create( uri, options.remoteTimeout, options.remoteMaxConnections, + options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), + digestUtil, creds); } } catch (Exception e) { @@ -105,29 +104,38 @@ } } - private static SimpleBlobStore createDiskCache(Path workingDirectory, PathFragment diskCachePath) + private static RemoteCacheClient createDiskCache( + Path workingDirectory, + PathFragment diskCachePath, + boolean verifyDownloads, + DigestUtil digestUtil) throws IOException { Path cacheDir = workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath")); if (!cacheDir.exists()) { cacheDir.createDirectoryAndParents(); } - return new OnDiskBlobStore(cacheDir); + return new DiskCacheClient(cacheDir, verifyDownloads, digestUtil); } - private static SimpleBlobStore createCombinedCache( - Path workingDirectory, PathFragment diskCachePath, RemoteOptions options, Credentials cred) + private static RemoteCacheClient createCombinedCache( + Path workingDirectory, + PathFragment diskCachePath, + RemoteOptions options, + Credentials cred, + DigestUtil digestUtil) throws IOException { - Path cacheDir = workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath")); if (!cacheDir.exists()) { cacheDir.createDirectoryAndParents(); } - OnDiskBlobStore diskCache = new OnDiskBlobStore(cacheDir); - SimpleBlobStore httpCache = createHttp(options, cred); - return new CombinedDiskHttpBlobStore(diskCache, httpCache); + DiskCacheClient diskCache = + new DiskCacheClient(cacheDir, options.remoteVerifyDownloads, digestUtil); + RemoteCacheClient httpCache = createHttp(options, cred, digestUtil); + + return new CombinedDiskHttpCacheClient(diskCache, httpCache); } private static boolean isDiskCache(RemoteOptions options) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java new file mode 100644 index 0000000..bef7cc9 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -0,0 +1,128 @@ +// Copyright 2019 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static java.lang.String.format; + +import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.Directory; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes; +import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.Utils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** A {@link RemoteCache} with additional functionality needed for remote execution. */ +public class RemoteExecutionCache extends RemoteCache { + + public RemoteExecutionCache( + GrpcCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) { + super(protocolImpl, options, digestUtil); + } + + private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blobs) + throws IOException, InterruptedException { + List<ListenableFuture<Void>> uploads = new ArrayList<>(); + + for (Map.Entry<Digest, Path> entry : files.entrySet()) { + uploads.add(cacheProtocol.uploadFile(entry.getKey(), entry.getValue())); + } + + for (Map.Entry<Digest, ByteString> entry : blobs.entrySet()) { + uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue())); + } + + try { + for (ListenableFuture<Void> upload : uploads) { + upload.get(); + } + } catch (ExecutionException e) { + // Cancel remaining uploads. + for (ListenableFuture<Void> upload : uploads) { + upload.cancel(/* mayInterruptIfRunning= */ true); + } + + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + Throwables.propagateIfPossible(cause, InterruptedException.class); + throw new IOException(cause); + } + } + + /** + * Ensures that the tree structure of the inputs, the input files themselves, and the command are + * available in the remote cache, such that the tree can be reassembled and executed on another + * machine given the root digest. + * + * <p>The cache may check whether files or parts of the tree structure are already present, and do + * not need to be uploaded again. + * + * <p>Note that this method is only required for remote execution, not for caching itself. + * However, remote execution uses a cache to store input files, and that may be a separate + * end-point from the executor itself, so the functionality lives here. + */ + public void ensureInputsPresent( + MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot) + throws IOException, InterruptedException { + Iterable<Digest> allDigests = + Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet()); + ImmutableSet<Digest> missingDigests = + Utils.getFromFuture(cacheProtocol.findMissingDigests(allDigests)); + Map<Digest, Path> filesToUpload = new HashMap<>(); + Map<Digest, ByteString> blobsToUpload = new HashMap<>(); + for (Digest missingDigest : missingDigests) { + Directory node = merkleTree.getDirectoryByDigest(missingDigest); + if (node != null) { + blobsToUpload.put(missingDigest, node.toByteString()); + continue; + } + + PathOrBytes file = merkleTree.getFileByDigest(missingDigest); + if (file != null) { + if (file.getBytes() != null) { + blobsToUpload.put(missingDigest, file.getBytes()); + continue; + } + filesToUpload.put(missingDigest, file.getPath()); + continue; + } + + Message message = additionalInputs.get(missingDigest); + if (message != null) { + blobsToUpload.put(missingDigest, message.toByteString()); + continue; + } + + throw new IOException( + format( + "findMissingDigests returned a missing digest that has not been requested: %s", + missingDigest)); + } + + uploadMissing(filesToUpload, blobsToUpload); + } +}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 1598e25..4eba377 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -43,6 +43,7 @@ import com.google.devtools.build.lib.exec.ExecutorBuilder; import com.google.devtools.build.lib.packages.TargetUtils; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; @@ -160,8 +161,8 @@ DigestHashFunction hashFn = env.getRuntime().getFileSystem().getDigestFunction(); DigestUtil digestUtil = new DigestUtil(hashFn); - boolean enableBlobStoreCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions); - boolean enableGrpcCache = GrpcRemoteCache.isRemoteCacheOptions(remoteOptions); + boolean enableBlobStoreCache = RemoteCacheClientFactory.isRemoteCacheOptions(remoteOptions); + boolean enableGrpcCache = GrpcCacheClient.isRemoteCacheOptions(remoteOptions); boolean enableRemoteExecution = shouldEnableRemoteExecution(remoteOptions); if (enableBlobStoreCache && enableRemoteExecution) { throw new AbruptExitException( @@ -206,7 +207,7 @@ interceptors.toArray(new ClientInterceptor[0]))); } RemoteRetrier executeRetrier = null; - AbstractRemoteActionCache cache = null; + RemoteCacheClient cacheClient = null; if (enableGrpcCache || !Strings.isNullOrEmpty(remoteOptions.remoteExecutor)) { rpcRetrier = new RemoteRetrier( @@ -259,8 +260,8 @@ remoteOptions.remoteTimeout, rpcRetrier); cacheChannel.release(); - cache = - new GrpcRemoteCache( + cacheClient = + new GrpcCacheClient( cacheChannel.retain(), credentials, remoteOptions, @@ -273,7 +274,7 @@ buildEventArtifactUploaderFactoryDelegate.init( new ByteStreamBuildEventArtifactUploaderFactory( uploader, - cache, + cacheClient, cacheChannel.authority(), requestContext, remoteOptions.remoteInstanceName)); @@ -281,13 +282,11 @@ if (enableBlobStoreCache) { executeRetrier = null; - cache = - new SimpleBlobStoreActionCache( + cacheClient = + RemoteCacheClientFactory.create( remoteOptions, - SimpleBlobStoreFactory.create( - remoteOptions, - GoogleAuthUtils.newCredentials(authAndTlsOptions), - Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory")), + GoogleAuthUtils.newCredentials(authAndTlsOptions), + Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), digestUtil); } @@ -306,15 +305,18 @@ retrier); execChannel.release(); Preconditions.checkState( - cache instanceof GrpcRemoteCache, + cacheClient instanceof GrpcCacheClient, "Only the gRPC cache is support for remote execution"); + RemoteExecutionCache remoteCache = + new RemoteExecutionCache((GrpcCacheClient) cacheClient, remoteOptions, digestUtil); actionContextProvider = RemoteActionContextProvider.createForRemoteExecution( - env, (GrpcRemoteCache) cache, executor, executeRetrier, digestUtil, logDir); - } else if (cache != null) { + env, remoteCache, executor, executeRetrier, digestUtil, logDir); + } else if (cacheClient != null) { + RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil); actionContextProvider = RemoteActionContextProvider.createForRemoteCaching( - env, cache, executeRetrier, digestUtil); + env, remoteCache, executeRetrier, digestUtil); } } catch (IOException e) { env.getReporter().handle(Event.error(e.getMessage()));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java index 3346b27..fc8064f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.devtools.build.lib.remote.options.RemoteOptions; import io.grpc.Status; -import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Duration; @@ -28,16 +27,28 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; import java.util.function.Supplier; +import javax.annotation.Nullable; /** Specific retry logic for remote execution/caching. */ public class RemoteRetrier extends Retrier { + @Nullable + private static Status fromException(Exception e) { + for (Throwable cause = e; cause != null; cause = cause.getCause()) { + if (cause instanceof StatusRuntimeException) { + return ((StatusRuntimeException) cause).getStatus(); + } + } + return null; + } + public static final Predicate<? super Exception> RETRIABLE_GRPC_ERRORS = e -> { - if (!(e instanceof StatusException) && !(e instanceof StatusRuntimeException)) { + Status s = fromException(e); + if (s == null) { + // It's not a gRPC error. return false; } - Status s = Status.fromThrowable(e); switch (s.getCode()) { case CANCELLED: return !Thread.currentThread().isInterrupted();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index 52fb136..d32a79f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -46,7 +46,7 @@ import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; @@ -75,7 +75,7 @@ private final Path execRoot; private final RemoteOptions options; - private final AbstractRemoteActionCache remoteCache; + private final RemoteCache remoteCache; private final String buildRequestId; private final String commandId; @@ -94,7 +94,7 @@ RemoteSpawnCache( Path execRoot, RemoteOptions options, - AbstractRemoteActionCache remoteCache, + RemoteCache remoteCache, String buildRequestId, String commandId, @Nullable Reporter cmdlineReporter, @@ -153,7 +153,7 @@ try { ActionResult result; try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { - result = remoteCache.getCachedActionResult(actionKey); + result = remoteCache.downloadActionResult(actionKey); } // In case the remote cache returned a failed action (exit code != 0) we treat it as a // cache miss
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index cef0be7..bd35367 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -57,7 +57,7 @@ import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; @@ -99,7 +99,7 @@ private final boolean verboseFailures; @Nullable private final Reporter cmdlineReporter; - private final GrpcRemoteCache remoteCache; + private final RemoteExecutionCache remoteCache; @Nullable private final GrpcRemoteExecutor remoteExecutor; @Nullable private final RemoteRetrier retrier; private final String buildRequestId; @@ -125,7 +125,7 @@ @Nullable Reporter cmdlineReporter, String buildRequestId, String commandId, - GrpcRemoteCache remoteCache, + RemoteExecutionCache remoteCache, GrpcRemoteExecutor remoteExecutor, @Nullable RemoteRetrier retrier, DigestUtil digestUtil, @@ -191,7 +191,7 @@ // Try to lookup the action in the action cache. ActionResult cachedResult; try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { - cachedResult = acceptCachedResult ? remoteCache.getCachedActionResult(actionKey) : null; + cachedResult = acceptCachedResult ? remoteCache.downloadActionResult(actionKey) : null; } if (cachedResult != null) { if (cachedResult.getExitCode() != 0) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java deleted file mode 100644 index 363340c..0000000 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ /dev/null
@@ -1,137 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import build.bazel.remote.execution.v2.ActionResult; -import build.bazel.remote.execution.v2.Digest; -import build.bazel.remote.execution.v2.Directory; -import build.bazel.remote.execution.v2.DirectoryNode; -import build.bazel.remote.execution.v2.FileNode; -import com.google.common.collect.ImmutableSet; -import com.google.common.hash.HashingOutputStream; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; -import com.google.devtools.build.lib.remote.options.RemoteOptions; -import com.google.devtools.build.lib.remote.util.DigestUtil; -import com.google.devtools.build.lib.remote.util.Utils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.io.OutputStream; -import javax.annotation.Nullable; - -/** - * A RemoteActionCache implementation that uses a simple blob store for files and action output. - * - * <p>The thread safety is guaranteed by the underlying simple blob store. - * - * <p>Note that this class is used from src/tools/remote. - */ -@ThreadSafe -public class SimpleBlobStoreActionCache extends AbstractRemoteActionCache { - protected final SimpleBlobStore blobStore; - - public SimpleBlobStoreActionCache( - RemoteOptions options, SimpleBlobStore blobStore, DigestUtil digestUtil) { - super(options, digestUtil); - this.blobStore = blobStore; - } - - public void downloadTree(Digest rootDigest, Path rootLocation) - throws IOException, InterruptedException { - rootLocation.createDirectoryAndParents(); - Directory directory = Directory.parseFrom(Utils.getFromFuture(downloadBlob(rootDigest))); - for (FileNode file : directory.getFilesList()) { - Path dst = rootLocation.getRelative(file.getName()); - Utils.getFromFuture(downloadFile(dst, file.getDigest())); - dst.setExecutable(file.getIsExecutable()); - } - for (DirectoryNode child : directory.getDirectoriesList()) { - downloadTree(child.getDigest(), rootLocation.getRelative(child.getName())); - } - } - - @Override - public ListenableFuture<Void> uploadFile(Digest digest, Path file) { - return blobStore.uploadFile(digest, file); - } - - @Override - public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) { - return blobStore.uploadBlob(digest, data); - } - - @Override - public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) { - return Futures.immediateFuture(ImmutableSet.copyOf(digests)); - } - - @Override - public ActionResult getCachedActionResult(ActionKey actionKey) - throws IOException, InterruptedException { - return Utils.getFromFuture(blobStore.downloadActionResult(actionKey)); - } - - public void setCachedActionResult(ActionKey actionKey, ActionResult result) - throws IOException, InterruptedException { - blobStore.uploadActionResult(actionKey, result); - } - - @Override - public void close() { - blobStore.close(); - } - - @Override - protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { - SettableFuture<Void> outerF = SettableFuture.create(); - @Nullable - HashingOutputStream hashOut = - options.remoteVerifyDownloads ? digestUtil.newHashingOutputStream(out) : null; - Futures.addCallback( - blobStore.downloadBlob(digest, hashOut != null ? hashOut : out), - new FutureCallback<Void>() { - @Override - public void onSuccess(Void unused) { - try { - if (hashOut != null) { - verifyContents(digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash())); - } - out.flush(); - outerF.set(null); - } catch (IOException e) { - outerF.setException(e); - } - } - - @Override - public void onFailure(Throwable throwable) { - outerF.setException(throwable); - } - }, - MoreExecutors.directExecutor()); - return outerF; - } - - public DigestUtil getDigestUtil() { - return digestUtil; - } -}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD deleted file mode 100644 index c840c0d..0000000 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD +++ /dev/null
@@ -1,24 +0,0 @@ -load("@rules_java//java:defs.bzl", "java_library") - -package(default_visibility = ["//src:__subpackages__"]) - -filegroup( - name = "srcs", - srcs = glob(["**"]), - visibility = ["//src/main/java/com/google/devtools/build/lib/remote:__pkg__"], -) - -java_library( - name = "blobstore", - srcs = glob(["*.java"]), - tags = ["bazel"], - deps = [ - "//src/main/java/com/google/devtools/build/lib/remote/common", - "//src/main/java/com/google/devtools/build/lib/remote/util", - "//src/main/java/com/google/devtools/build/lib/vfs", - "//src/main/java/com/google/devtools/common/options", - "//third_party:guava", - "//third_party/protobuf:protobuf_java", - "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", - ], -)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java deleted file mode 100644 index 2aaa588..0000000 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java +++ /dev/null
@@ -1,17 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote.blobstore; - - -/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
diff --git a/src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java b/src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java similarity index 92% rename from src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java rename to src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java index 1a6bd9e..c9c1d40 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/MissingDigestsFinder.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/MissingDigestsFinder.java
@@ -11,14 +11,14 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package com.google.devtools.build.lib.remote; +package com.google.devtools.build.lib.remote.common; import build.bazel.remote.execution.v2.Digest; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; /** Supports querying a remote cache whether it contains a list of blobs. */ -interface MissingDigestsFinder { +public interface MissingDigestsFinder { /** * Returns a set of digests that the remote cache does not know about. The returned set is
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java similarity index 90% rename from src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java rename to src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index da6e6de..2c1bb83 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -29,13 +29,14 @@ * * <p>Implementations must be thread-safe. */ -public interface SimpleBlobStore { +public interface RemoteCacheClient extends MissingDigestsFinder { /** * A key in the remote action cache. The type wraps around a {@link Digest} of an {@link Action}. * Action keys are special in that they aren't content-addressable but refer to action results. */ final class ActionKey { + private final Digest digest; public Digest getDigest() { @@ -45,6 +46,21 @@ public ActionKey(Digest digest) { this.digest = Preconditions.checkNotNull(digest, "digest"); } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ActionKey)) { + return false; + } + + ActionKey otherKey = (ActionKey) other; + return digest.equals(otherKey.digest); + } + + @Override + public int hashCode() { + return digest.hashCode(); + } } /**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java similarity index 80% rename from src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java rename to src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java index 2f38e3a..cc1c068 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpCacheClient.java
@@ -16,10 +16,11 @@ import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.Digest; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; @@ -28,16 +29,16 @@ import java.util.concurrent.ExecutionException; /** - * A {@link SimpleBlobStore} implementation combining two blob stores. A local disk blob store and a - * remote blob store. If a blob isn't found in the first store, the second store is used, and the + * A {@link RemoteCacheClient} implementation combining two blob stores. A local disk blob store and + * a remote blob store. If a blob isn't found in the first store, the second store is used, and the * blob added to the first. Put puts the blob on both stores. */ -public final class CombinedDiskHttpBlobStore implements SimpleBlobStore { +public final class CombinedDiskHttpCacheClient implements RemoteCacheClient { - private final SimpleBlobStore remoteCache; - private final OnDiskBlobStore diskCache; + private final RemoteCacheClient remoteCache; + private final DiskCacheClient diskCache; - public CombinedDiskHttpBlobStore(OnDiskBlobStore diskCache, SimpleBlobStore remoteCache) { + public CombinedDiskHttpCacheClient(DiskCacheClient diskCache, RemoteCacheClient remoteCache) { this.diskCache = Preconditions.checkNotNull(diskCache); this.remoteCache = Preconditions.checkNotNull(remoteCache); } @@ -81,6 +82,20 @@ return Futures.immediateFuture(null); } + @Override + public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) { + ListenableFuture<ImmutableSet<Digest>> remoteQuery = remoteCache.findMissingDigests(digests); + ListenableFuture<ImmutableSet<Digest>> diskQuery = diskCache.findMissingDigests(digests); + return Futures.whenAllSucceed(remoteQuery, diskQuery) + .call( + () -> + ImmutableSet.<Digest>builder() + .addAll(remoteQuery.get()) + .addAll(diskQuery.get()) + .build(), + MoreExecutors.directExecutor()); + } + private Path newTempPath() { return diskCache.toPath(UUID.randomUUID().toString(), /* actionResult= */ false); }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java similarity index 76% rename from src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java rename to src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java index ddd0cec..68ad316 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -15,11 +15,15 @@ import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.Digest; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashingOutputStream; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; @@ -27,14 +31,21 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.UUID; +import javax.annotation.Nullable; /** A on-disk store for the remote action cache. */ -public class OnDiskBlobStore implements SimpleBlobStore { - private final Path root; +public class DiskCacheClient implements RemoteCacheClient { + private static final String ACTION_KEY_PREFIX = "ac_"; - public OnDiskBlobStore(Path root) { + private final Path root; + private final boolean verifyDownloads; + private final DigestUtil digestUtil; + + public DiskCacheClient(Path root, boolean verifyDownloads, DigestUtil digestUtil) { this.root = root; + this.verifyDownloads = verifyDownloads; + this.digestUtil = digestUtil; } /** Returns {@code true} if the provided {@code key} is stored in the CAS. */ @@ -68,7 +79,23 @@ @Override public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { - return download(digest, out, /* isActionCache= */ false); + @Nullable + HashingOutputStream hashOut = verifyDownloads ? digestUtil.newHashingOutputStream(out) : null; + return Futures.transformAsync( + download(digest, hashOut != null ? hashOut : out, /* isActionCache= */ false), + (v) -> { + try { + if (hashOut != null) { + Utils.verifyBlobContents( + digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash())); + } + out.flush(); + return Futures.immediateFuture(null); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); + } + }, + MoreExecutors.directExecutor()); } @Override @@ -108,6 +135,11 @@ return Futures.immediateFuture(null); } + @Override + public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) { + return Futures.immediateFuture(ImmutableSet.copyOf(digests)); + } + protected Path toPath(String key, boolean actionResult) { return root.getChild(getDiskKey(key, actionResult)); }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java index 9361235..e82e236 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/AbstractHttpHandler.java
@@ -103,7 +103,7 @@ if (!uri.getPath().endsWith("/")) { builder.append("/"); } - builder.append(isCas ? HttpBlobStore.CAS_PREFIX : HttpBlobStore.AC_PREFIX); + builder.append(isCas ? HttpCacheClient.CAS_PREFIX : HttpCacheClient.AC_PREFIX); builder.append(hash); return builder.toString(); }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java similarity index 92% rename from src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java rename to src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 19e9bd9..c0f0051 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -17,11 +17,15 @@ import build.bazel.remote.execution.v2.Digest; import com.google.auth.Credentials; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashingOutputStream; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; @@ -84,7 +88,7 @@ import javax.net.ssl.SSLEngine; /** - * Implementation of {@link SimpleBlobStore} that can talk to a HTTP/1.1 backend. + * Implementation of {@link RemoteCacheClient} that can talk to a HTTP/1.1 backend. * * <p>Blobs (Binary large objects) are uploaded using the {@code PUT} method. Action cache blobs are * stored under the path {@code /ac/base16-key}. CAS (Content Addressable Storage) blobs are stored @@ -107,7 +111,7 @@ * * <p>The implementation currently does not support transfer encoding chunked. */ -public final class HttpBlobStore implements SimpleBlobStore { +public final class HttpCacheClient implements RemoteCacheClient { public static final String AC_PREFIX = "ac/"; public static final String CAS_PREFIX = "cas/"; @@ -122,6 +126,8 @@ private final int timeoutSeconds; private final ImmutableList<Entry<String, String>> extraHttpHeaders; private final boolean useTls; + private final boolean verifyDownloads; + private final DigestUtil digestUtil; private final Object closeLock = new Object(); @@ -136,51 +142,61 @@ @GuardedBy("credentialsLock") private long lastRefreshTime; - public static HttpBlobStore create( + public static HttpCacheClient create( URI uri, int timeoutSeconds, int remoteMaxConnections, + boolean verifyDownloads, ImmutableList<Entry<String, String>> extraHttpHeaders, + DigestUtil digestUtil, @Nullable final Credentials creds) throws Exception { - return new HttpBlobStore( + return new HttpCacheClient( NioEventLoopGroup::new, NioSocketChannel.class, uri, timeoutSeconds, remoteMaxConnections, + verifyDownloads, extraHttpHeaders, + digestUtil, creds, null); } - public static HttpBlobStore create( + public static HttpCacheClient create( DomainSocketAddress domainSocketAddress, URI uri, int timeoutSeconds, int remoteMaxConnections, + boolean verifyDownloads, ImmutableList<Entry<String, String>> extraHttpHeaders, + DigestUtil digestUtil, @Nullable final Credentials creds) throws Exception { if (KQueue.isAvailable()) { - return new HttpBlobStore( + return new HttpCacheClient( KQueueEventLoopGroup::new, KQueueDomainSocketChannel.class, uri, timeoutSeconds, remoteMaxConnections, + verifyDownloads, extraHttpHeaders, + digestUtil, creds, domainSocketAddress); } else if (Epoll.isAvailable()) { - return new HttpBlobStore( + return new HttpCacheClient( EpollEventLoopGroup::new, EpollDomainSocketChannel.class, uri, timeoutSeconds, remoteMaxConnections, + verifyDownloads, extraHttpHeaders, + digestUtil, creds, domainSocketAddress); } else { @@ -188,13 +204,15 @@ } } - private HttpBlobStore( + private HttpCacheClient( Function<Integer, EventLoopGroup> newEventLoopGroup, Class<? extends Channel> channelClass, URI uri, int timeoutSeconds, int remoteMaxConnections, + boolean verifyDownloads, ImmutableList<Entry<String, String>> extraHttpHeaders, + DigestUtil digestUtil, @Nullable final Credentials creds, @Nullable SocketAddress socketAddress) throws Exception { @@ -261,6 +279,8 @@ this.creds = creds; this.timeoutSeconds = timeoutSeconds; this.extraHttpHeaders = extraHttpHeaders; + this.verifyDownloads = verifyDownloads; + this.digestUtil = digestUtil; } @SuppressWarnings("FutureReturnValueIgnored") @@ -419,7 +439,23 @@ @Override public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { - return get(digest, out, /* casDownload= */ true); + final HashingOutputStream hashOut = + verifyDownloads ? digestUtil.newHashingOutputStream(out) : null; + return Futures.transformAsync( + get(digest, hashOut != null ? hashOut : out, /* casDownload= */ true), + (v) -> { + try { + if (hashOut != null) { + Utils.verifyBlobContents( + digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash())); + } + out.flush(); + return Futures.immediateFuture(null); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); + } + }, + MoreExecutors.directExecutor()); } @SuppressWarnings("FutureReturnValueIgnored") @@ -607,6 +643,11 @@ return Futures.immediateFuture(null); } + @Override + public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(Iterable<Digest> digests) { + return Futures.immediateFuture(ImmutableSet.copyOf(digests)); + } + @SuppressWarnings("FutureReturnValueIgnored") private void putAfterCredentialRefresh(UploadCommand cmd) throws InterruptedException { Channel ch = null;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java index 6fc1446..ee3f06d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -23,7 +23,7 @@ import com.google.common.io.BaseEncoding; import com.google.devtools.build.lib.actions.cache.DigestUtils; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.Message;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java index 7191511..e707260 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java
@@ -18,7 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.devtools.build.lib.analysis.BlazeVersionInfo; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import io.grpc.ClientInterceptor; import io.grpc.Context; import io.grpc.Contexts;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index f1c3083..79f7c68 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -26,7 +26,7 @@ import com.google.devtools.build.lib.actions.SpawnResult; import com.google.devtools.build.lib.actions.SpawnResult.Status; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.protobuf.ByteString; @@ -149,6 +149,17 @@ .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor()); } + public static void verifyBlobContents(String expectedHash, String actualHash) throws IOException { + if (!expectedHash.equals(actualHash)) { + String msg = + String.format( + "An output download failed, because the expected hash" + + "'%s' did not match the received hash '%s'.", + expectedHash, actualHash); + throw new IOException(msg); + } + } + /** An in-memory output file. */ public static final class InMemoryOutput { private final ActionInput output;