Refactor SimpleBlobStore#get(ActionResult)(...)
The change update methods signatures to be compatible with those in
AbstractRemoteActionCache:
1) Future<Boolean> get(String, OutputStream) -> Future<Void> downloadBlob(Digest, OutputStream)
2) Future<Boolean> getActionResult(String, OutputStream) -> Future<ActionResult> downloadActionResult(ActionKey)
The refactoring uncovered a bug in CombinedDiskHttpBlobStore. We did not
close the output stream *before* moving a file to the disk cache, thus
potentially moving a corrupted file to the disk cache as any errors
would only be reported afterwards.
Closes #9200.
PiperOrigin-RevId: 278830531
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 9b7a720..3336316 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -48,6 +48,7 @@
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
@@ -250,6 +251,10 @@
uploader.uploadBlobs(inputsToUpload, /* forceUpload= */ true);
}
+ private static String digestToString(Digest digest) {
+ return digest.getHash() + "/" + digest.getSizeBytes();
+ }
+
@Override
protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
@@ -259,7 +264,7 @@
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
- resourceName += "blobs/" + digestUtil.toString(digest);
+ resourceName += "blobs/" + digestToString(digest);
@Nullable Supplier<HashCode> hashSupplier = null;
if (options.remoteVerifyDownloads) {
@@ -344,7 +349,7 @@
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.NOT_FOUND) {
- future.setException(new CacheNotFoundException(digest, digestUtil));
+ future.setException(new CacheNotFoundException(digest));
} else {
future.setException(t);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
index 97feb0f..2d82bd6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java
@@ -29,6 +29,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 24e158a..1598e25 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -42,6 +42,7 @@
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.exec.ExecutorBuilder;
import com.google.devtools.build.lib.packages.TargetUtils;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.logging.LoggingInterceptor;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index 4d738d5..52fb136 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -45,6 +45,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 181b343..cef0be7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -56,6 +56,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 11a2fde..363340c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -34,8 +34,6 @@
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.Nullable;
@@ -89,30 +87,12 @@
@Override
public ActionResult getCachedActionResult(ActionKey actionKey)
throws IOException, InterruptedException {
- try {
- byte[] data = downloadActionResult(actionKey.getDigest());
- return ActionResult.parseFrom(data);
- } catch (InvalidProtocolBufferException | CacheNotFoundException e) {
- return null;
- }
- }
-
- private byte[] downloadActionResult(Digest digest) throws IOException, InterruptedException {
- if (digest.getSizeBytes() == 0) {
- return new byte[0];
- }
- // This unconditionally downloads the whole blob into memory!
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- boolean success = getFromFuture(blobStore.getActionResult(digest.getHash(), out));
- if (!success) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- return out.toByteArray();
+ return Utils.getFromFuture(blobStore.downloadActionResult(actionKey));
}
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws IOException, InterruptedException {
- blobStore.putActionResult(actionKey, result);
+ blobStore.uploadActionResult(actionKey, result);
}
@Override
@@ -127,22 +107,18 @@
HashingOutputStream hashOut =
options.remoteVerifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
Futures.addCallback(
- blobStore.get(digest.getHash(), hashOut != null ? hashOut : out),
- new FutureCallback<Boolean>() {
+ blobStore.downloadBlob(digest, hashOut != null ? hashOut : out),
+ new FutureCallback<Void>() {
@Override
- public void onSuccess(Boolean found) {
- if (found) {
- try {
- if (hashOut != null) {
- verifyContents(digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
- }
- out.flush();
- outerF.set(null);
- } catch (IOException e) {
- outerF.setException(e);
+ public void onSuccess(Void unused) {
+ try {
+ if (hashOut != null) {
+ verifyContents(digest.getHash(), DigestUtil.hashCodeToString(hashOut.hash()));
}
- } else {
- outerF.setException(new CacheNotFoundException(digest, digestUtil));
+ out.flush();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java
similarity index 72%
rename from src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
rename to src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java
index 03a0fee..cad87a1 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java
@@ -12,21 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.google.devtools.build.lib.remote;
+package com.google.devtools.build.lib.remote.common;
import build.bazel.remote.execution.v2.Digest;
-import com.google.devtools.build.lib.remote.util.DigestUtil;
import java.io.IOException;
/**
- * An exception to indicate cache misses.
- * TODO(olaola): have a class of checked RemoteCacheExceptions.
+ * An exception to indicate cache misses. TODO(olaola): have a class of checked
+ * RemoteCacheExceptions.
*/
public final class CacheNotFoundException extends IOException {
private final Digest missingDigest;
- CacheNotFoundException(Digest missingDigest, DigestUtil digestUtil) {
- super("Missing digest: " + digestUtil.toString(missingDigest));
+ public CacheNotFoundException(Digest missingDigest) {
+ super("Missing digest: " + missingDigest.getHash() + "/" + missingDigest.getSizeBytes());
this.missingDigest = missingDigest;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
index 92af8c7..da6e6de 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java
@@ -14,8 +14,10 @@
package com.google.devtools.build.lib.remote.common;
+import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
@@ -23,15 +25,15 @@
import java.io.OutputStream;
/**
- * An interface for storing BLOBs each one indexed by a string (hash in hexadecimal).
+ * An interface for a remote caching protocol.
*
* <p>Implementations must be thread-safe.
*/
public interface SimpleBlobStore {
/**
- * A special type of Digest that is used only as a remote action cache key. This is a separate
- * type in order to prevent accidentally using other Digests as action keys.
+ * A key in the remote action cache. The type wraps around a {@link Digest} of an {@link Action}.
+ * Action keys are special in that they aren't content-addressable but refer to action results.
*/
final class ActionKey {
private final Digest digest;
@@ -41,49 +43,58 @@
}
public ActionKey(Digest digest) {
- this.digest = digest;
+ this.digest = Preconditions.checkNotNull(digest, "digest");
}
}
/**
- * Fetches the BLOB associated with the {@code key} from the CAS and writes it to {@code out}.
+ * Downloads an action result for the {@code actionKey}.
*
- * <p>The caller is responsible to close {@code out}.
- *
- * @return {@code true} if the {@code key} was found. {@code false} otherwise.
+ * @param actionKey The digest of the {@link Action} that generated the action result.
+ * @return A Future representing pending download of an action result. If an action result for
+ * {@code actionKey} cannot be found the result of the Future is {@code null}.
*/
- ListenableFuture<Boolean> get(String key, OutputStream out);
+ ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey);
/**
- * Fetches the BLOB associated with the {@code key} from the Action Cache and writes it to {@code
- * out}.
+ * Uploads an action result for the {@code actionKey}.
*
- * <p>The caller is responsible to close {@code out}.
- *
- * @return {@code true} if the {@code key} was found. {@code false} otherwise.
+ * @param actionKey The digest of the {@link Action} that generated the action result.
+ * @param actionResult The action result to associate with the {@code actionKey}.
+ * @throws IOException If there is an error uploading the action result.
+ * @throws InterruptedException In case the thread
*/
- ListenableFuture<Boolean> getActionResult(String actionKey, OutputStream out);
-
- /** Uploads an {@link ActionResult} keyed by the action hash to the action cache. */
- void putActionResult(ActionKey actionDigest, ActionResult actionResult)
+ void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException;
- /** Close resources associated with the blob store. */
- void close();
+ /**
+ * Downloads a BLOB for the given {@code digest} and writes it to {@code out}.
+ *
+ * <p>It's the callers responsibility to close {@code out}.
+ *
+ * @return A Future representing pending completion of the download. If a BLOB for {@code digest}
+ * does not exist in the cache the Future fails with a {@link CacheNotFoundException}.
+ */
+ ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
/**
- * Uploads a file.
+ * Uploads a {@code file} to the CAS.
*
- * @param digest the digest of the file.
- * @param file the file to upload.
+ * @param digest The digest of the file.
+ * @param file The file to upload.
+ * @return A future representing pending completion of the upload.
*/
ListenableFuture<Void> uploadFile(Digest digest, Path file);
/**
- * Uploads a BLOB.
+ * Uploads a BLOB to the CAS.
*
- * @param digest the digest of the blob.
- * @param data the blob to upload.
+ * @param digest The digest of the blob.
+ * @param data The BLOB to upload.
+ * @return A future representing pending completion of the upload.
*/
ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
+
+ /** Close resources associated with the remote cache. */
+ void close();
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
index c9377e4..2f38e3a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java
@@ -26,8 +26,6 @@
import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
/**
* A {@link SimpleBlobStore} implementation combining two blob stores. A local disk blob store and a
@@ -35,7 +33,6 @@
* blob added to the first. Put puts the blob on both stores.
*/
public final class CombinedDiskHttpBlobStore implements SimpleBlobStore {
- private static final Logger logger = Logger.getLogger(CombinedDiskHttpBlobStore.class.getName());
private final SimpleBlobStore remoteCache;
private final OnDiskBlobStore diskCache;
@@ -46,10 +43,10 @@
}
@Override
- public void putActionResult(ActionKey actionKey, ActionResult actionResult)
+ public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException {
- diskCache.putActionResult(actionKey, actionResult);
- remoteCache.putActionResult(actionKey, actionResult);
+ diskCache.uploadActionResult(actionKey, actionResult);
+ remoteCache.uploadActionResult(actionKey, actionResult);
}
@Override
@@ -84,77 +81,74 @@
return Futures.immediateFuture(null);
}
- @Override
- public ListenableFuture<Boolean> get(String key, OutputStream out) {
- return get(key, out, /* actionResult= */ false);
+ private Path newTempPath() {
+ return diskCache.toPath(UUID.randomUUID().toString(), /* actionResult= */ false);
}
- private ListenableFuture<Boolean> get(String key, OutputStream out, boolean actionResult) {
- boolean foundOnDisk =
- actionResult ? diskCache.containsActionResult(key) : diskCache.contains(key);
+ private static ListenableFuture<Void> closeStreamOnError(
+ ListenableFuture<Void> f, OutputStream out) {
+ return Futures.catchingAsync(
+ f,
+ Exception.class,
+ (rootCause) -> {
+ try {
+ out.close();
+ } catch (IOException e) {
+ rootCause.addSuppressed(e);
+ }
+ return Futures.immediateFailedFuture(rootCause);
+ },
+ MoreExecutors.directExecutor());
+ }
- if (foundOnDisk) {
- return getFromCache(diskCache, key, out, actionResult);
- } else {
- return getFromRemoteAndSaveToDisk(key, out, actionResult);
+ @Override
+ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ if (diskCache.contains(digest)) {
+ return diskCache.downloadBlob(digest, out);
}
- }
- @Override
- public ListenableFuture<Boolean> getActionResult(String key, OutputStream out) {
- return get(key, out, /* actionResult= */ true);
- }
-
- private ListenableFuture<Boolean> getFromRemoteAndSaveToDisk(
- String key, OutputStream out, boolean actionResult) {
- // Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
- Path temp = diskCache.toPath(UUID.randomUUID().toString(), /* actionResult= */ false);
-
- OutputStream tempOut;
+ Path tempPath = newTempPath();
+ final OutputStream tempOut;
try {
- tempOut = temp.getOutputStream();
+ tempOut = tempPath.getOutputStream();
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
- ListenableFuture<Boolean> chained =
+
+ ListenableFuture<Void> download =
+ closeStreamOnError(remoteCache.downloadBlob(digest, tempOut), tempOut);
+ ListenableFuture<Void> saveToDiskAndTarget =
Futures.transformAsync(
- getFromCache(remoteCache, key, tempOut, actionResult),
- (found) -> {
- if (!found) {
- return Futures.immediateFuture(false);
- } else {
- saveToDiskCache(key, temp, actionResult);
- return getFromCache(diskCache, key, out, actionResult);
+ download,
+ (unused) -> {
+ try {
+ tempOut.close();
+ diskCache.captureFile(tempPath, digest, /* isActionCache= */ false);
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
}
+ return diskCache.downloadBlob(digest, out);
},
MoreExecutors.directExecutor());
- chained.addListener(
- () -> {
- try {
- tempOut.close();
- } catch (IOException e) {
- // not sure what to do here, we either are here because of another exception being
- // thrown, or we have successfully used the file we are trying (and failing) to close
- logger.log(Level.WARNING, "Failed to close temporary file on get", e);
+ return saveToDiskAndTarget;
+ }
+
+ @Override
+ public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
+ if (diskCache.containsActionResult(actionKey)) {
+ return diskCache.downloadActionResult(actionKey);
+ }
+
+ return Futures.transformAsync(
+ remoteCache.downloadActionResult(actionKey),
+ (actionResult) -> {
+ if (actionResult == null) {
+ return Futures.immediateFuture(null);
+ } else {
+ diskCache.uploadActionResult(actionKey, actionResult);
+ return Futures.immediateFuture(actionResult);
}
},
MoreExecutors.directExecutor());
- return chained;
- }
-
- private void saveToDiskCache(String key, Path temp, boolean actionResult) throws IOException {
- Path target = diskCache.toPath(key, actionResult);
- // TODO(ulfjack): Fsync temp here before we rename it to avoid data loss in the
- // case of machine crashes (the OS may reorder the writes and the rename).
- temp.renameTo(target);
- }
-
- private ListenableFuture<Boolean> getFromCache(
- SimpleBlobStore blobStore, String key, OutputStream tempOut, boolean actionResult) {
- if (!actionResult) {
- return blobStore.get(key, tempOut);
- } else {
- return blobStore.getActionResult(key, tempOut);
- }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
index 205dd9f..ddd0cec 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/OnDiskBlobStore.java
@@ -18,8 +18,9 @@
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.IOException;
@@ -37,39 +38,48 @@
}
/** Returns {@code true} if the provided {@code key} is stored in the CAS. */
- public boolean contains(String key) {
- return toPath(key, /* actionResult= */ false).exists();
+ public boolean contains(Digest digest) {
+ return toPath(digest.getHash(), /* actionResult= */ false).exists();
}
/** Returns {@code true} if the provided {@code key} is stored in the Action Cache. */
- public boolean containsActionResult(String key) {
- return toPath(key, /* actionResult= */ true).exists();
+ public boolean containsActionResult(ActionKey actionKey) {
+ return toPath(actionKey.getDigest().getHash(), /* actionResult= */ true).exists();
}
- @Override
- public ListenableFuture<Boolean> get(String key, OutputStream out) {
- SettableFuture<Boolean> f = SettableFuture.create();
- Path p = toPath(key, /* actionResult= */ false);
+ public void captureFile(Path src, Digest digest, boolean isActionCache) throws IOException {
+ Path target = toPath(digest.getHash(), isActionCache);
+ src.renameTo(target);
+ }
+
+ private ListenableFuture<Void> download(Digest digest, OutputStream out, boolean isActionCache) {
+ Path p = toPath(digest.getHash(), isActionCache);
if (!p.exists()) {
- f.set(false);
+ return Futures.immediateFailedFuture(new CacheNotFoundException(digest));
} else {
try (InputStream in = p.getInputStream()) {
ByteStreams.copy(in, out);
- f.set(true);
+ return Futures.immediateFuture(null);
} catch (IOException e) {
- f.setException(e);
+ return Futures.immediateFailedFuture(e);
}
}
- return f;
}
@Override
- public ListenableFuture<Boolean> getActionResult(String key, OutputStream out) {
- return get(getDiskKey(key, /* actionResult= */ true), out);
+ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ return download(digest, out, /* isActionCache= */ false);
}
@Override
- public void putActionResult(ActionKey actionKey, ActionResult actionResult) throws IOException {
+ public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
+ return Utils.downloadAsActionResult(
+ actionKey, (digest, out) -> download(digest, out, /* isActionCache= */ true));
+ }
+
+ @Override
+ public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
+ throws IOException {
try (InputStream data = actionResult.toByteString().newInput()) {
saveFile(getDiskKey(actionKey.getDigest().getHash(), /* actionResult= */ true), data);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
index 4ef3b21..643c45d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.http;
+import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Preconditions;
import java.io.OutputStream;
import java.net.URI;
@@ -22,13 +23,13 @@
private final URI uri;
private final boolean casDownload;
- private final String hash;
+ private final Digest digest;
private final OutputStream out;
- protected DownloadCommand(URI uri, boolean casDownload, String hash, OutputStream out) {
+ protected DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) {
this.uri = Preconditions.checkNotNull(uri);
this.casDownload = casDownload;
- this.hash = Preconditions.checkNotNull(hash);
+ this.digest = Preconditions.checkNotNull(digest);
this.out = Preconditions.checkNotNull(out);
}
@@ -40,8 +41,8 @@
return casDownload;
}
- public String hash() {
- return hash;
+ public Digest digest() {
+ return digest;
}
public OutputStream out() {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
index 8065a62..19e9bd9 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpBlobStore.java
@@ -20,7 +20,9 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
+import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.netty.bootstrap.Bootstrap;
@@ -416,12 +418,12 @@
}
@Override
- public ListenableFuture<Boolean> get(String key, OutputStream out) {
- return get(key, out, true);
+ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ return get(digest, out, /* casDownload= */ true);
}
@SuppressWarnings("FutureReturnValueIgnored")
- private ListenableFuture<Boolean> get(String key, final OutputStream out, boolean casDownload) {
+ private ListenableFuture<Void> get(Digest digest, final OutputStream out, boolean casDownload) {
final AtomicBoolean dataWritten = new AtomicBoolean();
OutputStream wrappedOut =
new OutputStream() {
@@ -446,8 +448,8 @@
out.flush();
}
};
- DownloadCommand download = new DownloadCommand(uri, casDownload, key, wrappedOut);
- SettableFuture<Boolean> outerF = SettableFuture.create();
+ DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut);
+ SettableFuture<Void> outerF = SettableFuture.create();
acquireDownloadChannel()
.addListener(
(Future<Channel> chP) -> {
@@ -457,12 +459,12 @@
}
Channel ch = chP.getNow();
- ch.writeAndFlush(download)
+ ch.writeAndFlush(downloadCmd)
.addListener(
(f) -> {
try {
if (f.isSuccess()) {
- outerF.set(true);
+ outerF.set(null);
} else {
Throwable cause = f.cause();
// cause can be of type HttpException, because Netty uses
@@ -475,10 +477,10 @@
// The error is due to an auth token having expired. Let's try
// again.
refreshCredentials();
- getAfterCredentialRefresh(download, outerF);
+ getAfterCredentialRefresh(downloadCmd, outerF);
return;
} else if (cacheMiss(response.status())) {
- outerF.set(false);
+ outerF.setException(new CacheNotFoundException(digest));
return;
}
}
@@ -493,7 +495,7 @@
}
@SuppressWarnings("FutureReturnValueIgnored")
- private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture<Boolean> outerF) {
+ private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture<Void> outerF) {
acquireDownloadChannel()
.addListener(
(Future<Channel> chP) -> {
@@ -508,13 +510,13 @@
(f) -> {
try {
if (f.isSuccess()) {
- outerF.set(true);
+ outerF.set(null);
} else {
Throwable cause = f.cause();
if (cause instanceof HttpException) {
HttpResponse response = ((HttpException) cause).response();
if (cacheMiss(response.status())) {
- outerF.set(false);
+ outerF.setException(new CacheNotFoundException(cmd.digest()));
return;
}
}
@@ -528,8 +530,9 @@
}
@Override
- public ListenableFuture<Boolean> getActionResult(String actionKey, OutputStream out) {
- return get(actionKey, out, false);
+ public ListenableFuture<ActionResult> downloadActionResult(ActionKey actionKey) {
+ return Utils.downloadAsActionResult(
+ actionKey, (digest, out) -> get(digest, out, /* casDownload= */ false));
}
@SuppressWarnings("FutureReturnValueIgnored")
@@ -633,7 +636,7 @@
}
@Override
- public void putActionResult(ActionKey actionKey, ActionResult actionResult)
+ public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException {
ByteString serialized = actionResult.toByteString();
try (InputStream in = serialized.newInput()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
index 3359820..2ba92ec 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java
@@ -136,7 +136,7 @@
}
DownloadCommand cmd = (DownloadCommand) msg;
out = cmd.out();
- path = constructPath(cmd.uri(), cmd.hash(), cmd.casDownload());
+ path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload());
HttpRequest request = buildRequest(path, constructHost(cmd.uri()));
addCredentialHeaders(request, cmd.uri());
addExtraRemoteHeaders(request);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
index b17770c..6fc1446 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -116,7 +116,7 @@
return new HashingOutputStream(hashFn.getHashFunction(), out);
}
- public String toString(Digest digest) {
+ public static String toString(Digest digest) {
return digest.getHash() + "/" + digest.getSizeBytes();
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index 4d824e0..f1c3083 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -13,20 +13,31 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
+import build.bazel.remote.execution.v2.ActionResult;
+import build.bazel.remote.execution.v2.Digest;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecutionRequirements;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnResult;
import com.google.devtools.build.lib.actions.SpawnResult.Status;
+import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
import javax.annotation.Nullable;
/** Utility methods for the remote package. * */
@@ -119,6 +130,25 @@
return e.getMessage();
}
+ @SuppressWarnings("ProtoParseWithRegistry")
+ public static ListenableFuture<ActionResult> downloadAsActionResult(
+ ActionKey actionDigest,
+ BiFunction<Digest, OutputStream, ListenableFuture<Void>> downloadFunction) {
+ ByteArrayOutputStream data = new ByteArrayOutputStream(/* size= */ 1024);
+ ListenableFuture<Void> download = downloadFunction.apply(actionDigest.getDigest(), data);
+ return FluentFuture.from(download)
+ .transformAsync(
+ (v) -> {
+ try {
+ return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
+ } catch (InvalidProtocolBufferException e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ },
+ MoreExecutors.directExecutor())
+ .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
+ }
+
/** An in-memory output file. */
public static final class InMemoryOutput {
private final ActionInput output;