remote: concurrent blob downloads. Fixes #5215
This change introduces concurrent downloads of action outputs
for remote caching/execution. So far, for an action we would
download one output after the other which isn't as bad as it
sounds as we would typically run dozens or hundreds of actions
in parallel. However, for actions with a lot of outputs or graphs
that allow limited parallelism we expect this change to positively
impact performance.
Note, that with this change the AbstractRemoteActionCache will
attempt to always download all outputs concurrently. The actual
parallelism is controlled by the underlying network transport.
The gRPC transport currently enforces no limits on the concurrent
calls, which should be fine given that all calls are multiplexed
on a single network connection. The HTTP/1.1 transport also
enforces no parallelism by default, but I have added the
--remote_max_connections=INT flag which allows to specify an upper
bound on the number of network connections to be open concurrently.
I have introduced this flag as a defensive mechanism for users
who's environment might enforce an upper bound on the number of open
connections, as with this change its possible for the number of
concurrently open connections to dramatically increase (from
NumParallelActions to NumParallelActions * SumParallelActionOutputs).
A side effect of this change is that it puts the infrastructure
for retries and circuit breaking for the HttpBlobStore in place.
RELNOTES: None
PiperOrigin-RevId: 199005510
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
index 1cd1ef9..ef90223 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
@@ -13,6 +13,14 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.EnvironmentalExecException;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
@@ -35,10 +43,13 @@
import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.devtools.remoteexecution.v1test.Tree;
import com.google.protobuf.ByteString;
+import io.grpc.Context;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -48,12 +59,23 @@
/** A cache for storing artifacts (input and output) as well as the output of running an action. */
@ThreadSafety.ThreadSafe
public abstract class AbstractRemoteActionCache implements AutoCloseable {
+
+ private static final ListenableFuture<Void> COMPLETED_SUCCESS = SettableFuture.create();
+ private static final ListenableFuture<byte[]> EMPTY_BYTES = SettableFuture.create();
+
+ static {
+ ((SettableFuture<Void>) COMPLETED_SUCCESS).set(null);
+ ((SettableFuture<byte[]>) EMPTY_BYTES).set(new byte[0]);
+ }
+
protected final RemoteOptions options;
protected final DigestUtil digestUtil;
+ private final Retrier retrier;
- public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
+ public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil, Retrier retrier) {
this.options = options;
this.digestUtil = digestUtil;
+ this.retrier = retrier;
}
/**
@@ -101,23 +123,40 @@
throws ExecException, IOException, InterruptedException;
/**
- * Download a remote blob to a local destination.
+ * Downloads a blob with a content hash {@code digest} to {@code out}.
*
- * @param digest The digest of the remote blob.
- * @param dest The path to the local file.
- * @throws IOException if download failed.
+ * @return a future that completes after the download completes (succeeds / fails).
*/
- protected abstract void downloadBlob(Digest digest, Path dest)
- throws IOException, InterruptedException;
+ protected abstract ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
/**
- * Download a remote blob and store it in memory.
+ * Downloads a blob with content hash {@code digest} and stores its content in memory.
*
- * @param digest The digest of the remote blob.
- * @return The remote blob.
- * @throws IOException if download failed.
+ * @return a future that completes after the download completes (succeeds / fails). If successful,
+ * the content is stored in the future's {@code byte[]}.
*/
- protected abstract byte[] downloadBlob(Digest digest) throws IOException, InterruptedException;
+ public ListenableFuture<byte[]> downloadBlob(Digest digest) {
+ if (digest.getSizeBytes() == 0) {
+ return EMPTY_BYTES;
+ }
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
+ SettableFuture<byte[]> outerF = SettableFuture.create();
+ Futures.addCallback(
+ downloadBlob(digest, bOut),
+ new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void aVoid) {
+ outerF.set(bOut.toByteArray());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ outerF.setException(t);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return outerF;
+ }
/**
* Download the output files and directory trees of a remotely executed action to the local
@@ -132,22 +171,66 @@
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
try {
+ Context ctx = Context.current();
+ List<FuturePathBooleanTuple> fileDownloads =
+ Collections.synchronizedList(
+ new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount()));
for (OutputFile file : result.getOutputFilesList()) {
Path path = execRoot.getRelative(file.getPath());
- downloadFile(path, file.getDigest(), file.getIsExecutable(), file.getContent());
+ ListenableFuture<Void> download =
+ retrier.executeAsync(
+ () -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent())));
+ fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable()));
}
+
+ List<ListenableFuture<Void>> dirDownloads =
+ new ArrayList<>(result.getOutputDirectoriesCount());
for (OutputDirectory dir : result.getOutputDirectoriesList()) {
- byte[] b = downloadBlob(dir.getTreeDigest());
- Tree tree = Tree.parseFrom(b);
- Map<Digest, Directory> childrenMap = new HashMap<>();
- for (Directory child : tree.getChildrenList()) {
- childrenMap.put(digestUtil.compute(child), child);
- }
- Path path = execRoot.getRelative(dir.getPath());
- downloadDirectory(path, tree.getRoot(), childrenMap);
+ SettableFuture<Void> dirDownload = SettableFuture.create();
+ ListenableFuture<byte[]> protoDownload =
+ retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest())));
+ Futures.addCallback(
+ protoDownload,
+ new FutureCallback<byte[]>() {
+ @Override
+ public void onSuccess(byte[] b) {
+ try {
+ Tree tree = Tree.parseFrom(b);
+ Map<Digest, Directory> childrenMap = new HashMap<>();
+ for (Directory child : tree.getChildrenList()) {
+ childrenMap.put(digestUtil.compute(child), child);
+ }
+ Path path = execRoot.getRelative(dir.getPath());
+ fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx));
+ dirDownload.set(null);
+ } catch (IOException e) {
+ dirDownload.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ dirDownload.setException(t);
+ }
+ },
+ MoreExecutors.directExecutor());
+ dirDownloads.add(dirDownload);
}
- // TODO(ulfjack): use same code as above also for stdout / stderr if applicable.
- downloadOutErr(result, outErr);
+
+ fileDownloads.addAll(downloadOutErr(result, outErr, ctx));
+
+ for (ListenableFuture<Void> dirDownload : dirDownloads) {
+ // Block on all directory download futures, so that we can be sure that we have discovered
+ // all file downloads and can subsequently safely iterate over the list of file downloads.
+ getFromFuture(dirDownload);
+ }
+
+ for (FuturePathBooleanTuple download : fileDownloads) {
+ getFromFuture(download.getFuture());
+ if (download.getPath() != null) {
+ download.getPath().setExecutable(download.isExecutable());
+ }
+ }
} catch (IOException downloadException) {
try {
// Delete any (partially) downloaded output files, since any subsequent local execution
@@ -178,19 +261,51 @@
}
}
+ /** Tuple of {@code ListenableFuture, Path, boolean}. */
+ private static class FuturePathBooleanTuple {
+ private final ListenableFuture<?> future;
+ private final Path path;
+ private final boolean isExecutable;
+
+ public FuturePathBooleanTuple(ListenableFuture<?> future, Path path, boolean isExecutable) {
+ this.future = future;
+ this.path = path;
+ this.isExecutable = isExecutable;
+ }
+
+ public ListenableFuture<?> getFuture() {
+ return future;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public boolean isExecutable() {
+ return isExecutable;
+ }
+ }
+
/**
* Download a directory recursively. The directory is represented by a {@link Directory} protobuf
* message, and the descendant directories are in {@code childrenMap}, accessible through their
* digest.
*/
- private void downloadDirectory(Path path, Directory dir, Map<Digest, Directory> childrenMap)
- throws IOException, InterruptedException {
+ private List<FuturePathBooleanTuple> downloadDirectory(
+ Path path, Directory dir, Map<Digest, Directory> childrenMap, Context ctx)
+ throws IOException {
// Ensure that the directory is created here even though the directory might be empty
- FileSystemUtils.createDirectoryAndParents(path);
+ path.createDirectoryAndParents();
+ List<FuturePathBooleanTuple> downloads = new ArrayList<>(dir.getFilesCount());
for (FileNode child : dir.getFilesList()) {
Path childPath = path.getRelative(child.getName());
- downloadFile(childPath, child.getDigest(), child.getIsExecutable(), null);
+ downloads.add(
+ new FuturePathBooleanTuple(
+ retrier.executeAsync(
+ () -> ctx.call(() -> downloadFile(childPath, child.getDigest(), null))),
+ childPath,
+ child.getIsExecutable()));
}
for (DirectoryNode child : dir.getDirectoriesList()) {
@@ -207,55 +322,93 @@
+ childDigest
+ "not found");
}
- downloadDirectory(childPath, childDir, childrenMap);
+ downloads.addAll(downloadDirectory(childPath, childDir, childrenMap, ctx));
}
+
+ return downloads;
}
/**
* Download a file (that is not a directory). If the {@code content} is not given, the content is
* fetched from the digest.
*/
- public void downloadFile(
- Path path, Digest digest, boolean isExecutable, @Nullable ByteString content)
- throws IOException, InterruptedException {
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
+ public ListenableFuture<Void> downloadFile(Path path, Digest digest, @Nullable ByteString content)
+ throws IOException {
+ Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
FileSystemUtils.writeContent(path, new byte[0]);
- } else {
- if (content != null && !content.isEmpty()) {
- try (OutputStream stream = path.getOutputStream()) {
- content.writeTo(stream);
- }
- } else {
- downloadBlob(digest, path);
- Digest receivedDigest = digestUtil.compute(path);
- if (!receivedDigest.equals(digest)) {
- throw new IOException("Digest does not match " + receivedDigest + " != " + digest);
- }
- }
+ return COMPLETED_SUCCESS;
}
- path.setExecutable(isExecutable);
+
+ if (content != null && !content.isEmpty()) {
+ try (OutputStream stream = path.getOutputStream()) {
+ content.writeTo(stream);
+ }
+ return COMPLETED_SUCCESS;
+ }
+
+ OutputStream out = new LazyFileOutputStream(path);
+ SettableFuture<Void> outerF = SettableFuture.create();
+ ListenableFuture<Void> f = downloadBlob(digest, out);
+ Futures.addCallback(
+ f,
+ new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ try {
+ out.close();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ outerF.setException(t);
+ try {
+ out.close();
+ } catch (IOException e) {
+ // Intentionally left empty. The download already failed, so we can ignore
+ // the error on close().
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ return outerF;
}
- private void downloadOutErr(ActionResult result, FileOutErr outErr)
- throws IOException, InterruptedException {
+ private List<FuturePathBooleanTuple> downloadOutErr(
+ ActionResult result, FileOutErr outErr, Context ctx) throws IOException {
+ List<FuturePathBooleanTuple> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} else if (result.hasStdoutDigest()) {
- byte[] stdoutBytes = downloadBlob(result.getStdoutDigest());
- outErr.getOutputStream().write(stdoutBytes);
- outErr.getOutputStream().flush();
+ downloads.add(
+ new FuturePathBooleanTuple(
+ retrier.executeAsync(
+ () ->
+ ctx.call(
+ () -> downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()))),
+ null,
+ false));
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} else if (result.hasStderrDigest()) {
- byte[] stderrBytes = downloadBlob(result.getStderrDigest());
- outErr.getErrorStream().write(stderrBytes);
- outErr.getErrorStream().flush();
+ downloads.add(
+ new FuturePathBooleanTuple(
+ retrier.executeAsync(
+ () ->
+ ctx.call(
+ () -> downloadBlob(result.getStderrDigest(), outErr.getErrorStream()))),
+ null,
+ false));
}
+ return downloads;
}
/** UploadManifest adds output metadata to a {@link ActionResult}. */
@@ -291,8 +444,7 @@
* <p>Attempting to a upload symlink results in a {@link
* com.google.build.lib.actions.ExecException}, since cachable actions shouldn't emit symlinks.
*/
- public void addFiles(Collection<Path> files)
- throws ExecException, IOException, InterruptedException {
+ public void addFiles(Collection<Path> files) throws ExecException, IOException {
for (Path file : files) {
// TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
// rely on the local spawn runner to stat the files, instead of statting here.
@@ -398,4 +550,55 @@
/** Release resources associated with the cache. The cache may not be used after calling this. */
@Override
public abstract void close();
+
+ /**
+ * Creates an {@link OutputStream} that isn't actually opened until the first data is written.
+ * This is useful to only have as many open file descriptors as necessary at a time to avoid
+ * running into system limits.
+ */
+ private static class LazyFileOutputStream extends OutputStream {
+
+ private final Path path;
+ private OutputStream out;
+
+ public LazyFileOutputStream(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ ensureOpen();
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ ensureOpen();
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureOpen();
+ out.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ ensureOpen();
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ ensureOpen();
+ out.close();
+ }
+
+ private void ensureOpen() throws IOException {
+ if (out == null) {
+ out = path.getOutputStream();
+ }
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 75d58ad..001ba22 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -18,7 +18,6 @@
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.bytestream.ByteStreamGrpc;
@@ -28,8 +27,6 @@
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
@@ -42,7 +39,6 @@
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
-import io.grpc.StatusException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -51,7 +47,6 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -71,7 +66,6 @@
private final CallCredentials callCredentials;
private final long callTimeoutSecs;
private final RemoteRetrier retrier;
- private final ListeningScheduledExecutorService retryService;
private final Object lock = new Object();
@@ -92,17 +86,13 @@
* @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be
* complete. The timeout resets between retries
* @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings.
- * @param retryService the executor service to schedule retries on. It's the responsibility of the
- * caller to properly shutdown the service after use. Users should avoid shutting down the
- * service before {@link #shutdown()} has been called
*/
public ByteStreamUploader(
@Nullable String instanceName,
Channel channel,
@Nullable CallCredentials callCredentials,
long callTimeoutSecs,
- RemoteRetrier retrier,
- ListeningScheduledExecutorService retryService) {
+ RemoteRetrier retrier) {
checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
this.instanceName = instanceName;
@@ -110,7 +100,6 @@
this.callCredentials = callCredentials;
this.callTimeoutSecs = callTimeoutSecs;
this.retrier = retrier;
- this.retryService = retryService;
}
/**
@@ -192,27 +181,29 @@
}
@VisibleForTesting
- ListenableFuture<Void> uploadBlobAsync(Chunker chunker)
- throws IOException {
+ ListenableFuture<Void> uploadBlobAsync(Chunker chunker) {
Digest digest = checkNotNull(chunker.digest());
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
- ListenableFuture<Void> uploadResult = uploadsInProgress.get(digest);
- if (uploadResult == null) {
- uploadResult = SettableFuture.create();
- uploadResult.addListener(
- () -> {
- synchronized (lock) {
- uploadsInProgress.remove(digest);
- }
- },
- MoreExecutors.directExecutor());
- startAsyncUploadWithRetry(
- chunker, retrier.newBackoff(), (SettableFuture<Void>) uploadResult);
- uploadsInProgress.put(digest, uploadResult);
+ ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
+ if (inProgress != null) {
+ return inProgress;
}
+
+ final SettableFuture<Void> uploadResult = SettableFuture.create();
+ uploadResult.addListener(
+ () -> {
+ synchronized (lock) {
+ uploadsInProgress.remove(digest);
+ }
+ },
+ MoreExecutors.directExecutor());
+ Context ctx = Context.current();
+ retrier.executeAsync(
+ () -> ctx.call(() -> startAsyncUpload(chunker, uploadResult)), uploadResult);
+ uploadsInProgress.put(digest, uploadResult);
return uploadResult;
}
}
@@ -224,77 +215,23 @@
}
}
- private void startAsyncUploadWithRetry(
- Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture<Void> overallUploadResult) {
-
- AsyncUpload.Listener listener =
- new AsyncUpload.Listener() {
- @Override
- public void success() {
- overallUploadResult.set(null);
- }
-
- @Override
- public void failure(Status status) {
- StatusException cause = status.asException();
- long nextDelayMillis = backoffTimes.nextDelayMillis();
- if (nextDelayMillis < 0 || !retrier.isRetriable(cause)) {
- // Out of retries or status not retriable.
- RetryException error =
- new RetryException(
- "Out of retries or status not retriable.",
- backoffTimes.getRetryAttempts(),
- cause);
- overallUploadResult.setException(error);
- } else {
- retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult);
- }
- }
-
- private void retryAsyncUpload(
- long nextDelayMillis,
- Chunker chunker,
- Retrier.Backoff backoffTimes,
- SettableFuture<Void> overallUploadResult) {
- try {
- ListenableScheduledFuture<?> schedulingResult =
- retryService.schedule(
- Context.current()
- .wrap(
- () ->
- startAsyncUploadWithRetry(
- chunker, backoffTimes, overallUploadResult)),
- nextDelayMillis,
- MILLISECONDS);
- // In case the scheduled execution errors, we need to notify the overallUploadResult.
- schedulingResult.addListener(
- () -> {
- try {
- schedulingResult.get();
- } catch (Exception e) {
- overallUploadResult.setException(
- new RetryException(
- "Scheduled execution errored.", backoffTimes.getRetryAttempts(), e));
- }
- },
- MoreExecutors.directExecutor());
- } catch (RejectedExecutionException e) {
- // May be thrown by .schedule(...) if i.e. the executor is shutdown.
- overallUploadResult.setException(
- new RetryException("Rejected by executor.", backoffTimes.getRetryAttempts(), e));
- }
- }
- };
-
+ /**
+ * Starts a file upload an returns a future representing the upload. The {@code
+ * overallUploadResult} future propagates cancellations from the caller to the upload.
+ */
+ private ListenableFuture<Void> startAsyncUpload(
+ Chunker chunker, ListenableFuture<Void> overallUploadResult) {
+ SettableFuture<Void> currUpload = SettableFuture.create();
try {
chunker.reset();
} catch (IOException e) {
- overallUploadResult.setException(e);
- return;
+ currUpload.setException(e);
+ return currUpload;
}
AsyncUpload newUpload =
- new AsyncUpload(channel, callCredentials, callTimeoutSecs, instanceName, chunker, listener);
+ new AsyncUpload(
+ channel, callCredentials, callTimeoutSecs, instanceName, chunker, currUpload);
overallUploadResult.addListener(
() -> {
if (overallUploadResult.isCancelled()) {
@@ -303,22 +240,17 @@
},
MoreExecutors.directExecutor());
newUpload.start();
+ return currUpload;
}
private static class AsyncUpload {
- interface Listener {
- void success();
-
- void failure(Status status);
- }
-
private final Channel channel;
private final CallCredentials callCredentials;
private final long callTimeoutSecs;
private final String instanceName;
private final Chunker chunker;
- private final Listener listener;
+ private final SettableFuture<Void> uploadResult;
private ClientCall<WriteRequest, WriteResponse> call;
@@ -328,13 +260,13 @@
long callTimeoutSecs,
String instanceName,
Chunker chunker,
- Listener listener) {
+ SettableFuture<Void> uploadResult) {
this.channel = channel;
this.callCredentials = callCredentials;
this.callTimeoutSecs = callTimeoutSecs;
this.instanceName = instanceName;
this.chunker = chunker;
- this.listener = listener;
+ this.uploadResult = uploadResult;
}
void start() {
@@ -358,9 +290,9 @@
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
- listener.success();
+ uploadResult.set(null);
} else {
- listener.failure(status);
+ uploadResult.setException(status.asRuntimeException());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 8f65021..48bd4c8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -15,15 +15,16 @@
package com.google.devtools.build.lib.remote;
import com.google.bytestream.ByteStreamGrpc;
-import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.hash.HashingOutputStream;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
@@ -51,16 +52,14 @@
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
-import java.io.ByteArrayOutputStream;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@@ -70,8 +69,6 @@
private final Channel channel;
private final RemoteRetrier retrier;
private final ByteStreamUploader uploader;
- private final ListeningScheduledExecutorService retryScheduler =
- MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@VisibleForTesting
public GrpcRemoteCache(
@@ -80,13 +77,14 @@
RemoteOptions options,
RemoteRetrier retrier,
DigestUtil digestUtil) {
- super(options, digestUtil);
+ super(options, digestUtil, retrier);
this.credentials = credentials;
this.channel = channel;
this.retrier = retrier;
- uploader = new ByteStreamUploader(options.remoteInstanceName, channel, credentials,
- options.remoteTimeout, retrier, retryScheduler);
+ uploader =
+ new ByteStreamUploader(
+ options.remoteInstanceName, channel, credentials, options.remoteTimeout, retrier);
}
private ContentAddressableStorageBlockingStub casBlockingStub() {
@@ -96,8 +94,8 @@
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
- private ByteStreamBlockingStub bsBlockingStub() {
- return ByteStreamGrpc.newBlockingStub(channel)
+ private ByteStreamStub bsAsyncStub() {
+ return ByteStreamGrpc.newStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
@@ -112,7 +110,6 @@
@Override
public void close() {
- retryScheduler.shutdownNow();
uploader.shutdown();
}
@@ -174,62 +171,63 @@
uploader.uploadBlobs(toUpload);
}
- /**
- * This method can throw {@link StatusRuntimeException}, but the RemoteCache interface does not
- * allow throwing such an exception. Any caller must make sure to catch the
- * {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used
- * in the context of {@link RemoteRetrier#execute}, that's perfectly safe.
- */
- private void readBlob(Digest digest, OutputStream stream)
- throws IOException, StatusRuntimeException {
+ @Override
+ protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
resourceName += "blobs/" + digestUtil.toString(digest);
- Iterator<ReadResponse> replies = bsBlockingStub()
- .read(ReadRequest.newBuilder().setResourceName(resourceName).build());
- while (replies.hasNext()) {
- replies.next().getData().writeTo(stream);
- }
- }
- @Override
- protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
- try {
- retrier.execute(
- () -> {
- try (OutputStream stream = dest.getOutputStream()) {
- readBlob(digest, stream);
- }
- return null;
- });
- } catch (RetryException e) {
- if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- throw e;
- }
- }
+ HashingOutputStream hashOut = digestUtil.newHashingOutputStream(out);
+ SettableFuture<Void> outerF = SettableFuture.create();
+ bsAsyncStub()
+ .read(
+ ReadRequest.newBuilder().setResourceName(resourceName).build(),
+ new StreamObserver<ReadResponse>() {
+ @Override
+ public void onNext(ReadResponse readResponse) {
+ try {
+ readResponse.getData().writeTo(hashOut);
+ } catch (IOException e) {
+ outerF.setException(e);
+ // Cancel the call.
+ throw new RuntimeException(e);
+ }
+ }
- @Override
- protected byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
- if (digest.getSizeBytes() == 0) {
- return new byte[0];
- }
- try {
- return retrier.execute(
- () -> {
- ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
- readBlob(digest, stream);
- return stream.toByteArray();
- });
- } catch (RetryException e) {
- if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- throw e;
- }
+ @Override
+ public void onError(Throwable t) {
+ if (t instanceof StatusRuntimeException
+ && ((StatusRuntimeException) t).getStatus().getCode()
+ == Status.NOT_FOUND.getCode()) {
+ outerF.setException(new CacheNotFoundException(digest, digestUtil));
+ } else {
+ outerF.setException(t);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ String expectedHash = digest.getHash();
+ String actualHash = DigestUtil.hashCodeToString(hashOut.hash());
+ if (!expectedHash.equals(actualHash)) {
+ String msg =
+ String.format(
+ "Expected hash '%s' does not match received hash '%s'.",
+ expectedHash, actualHash);
+ outerF.setException(new IOException(msg));
+ } else {
+ try {
+ out.flush();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
+ }
+ }
+ }
+ });
+ return outerF;
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 11a6119..9201c94 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -16,6 +16,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.authandtls.GoogleAuthUtils;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
@@ -40,6 +42,7 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import java.io.IOException;
+import java.util.concurrent.Executors;
import java.util.logging.Logger;
/** RemoteModule provides distributed cache and remote execution for Bazel. */
@@ -87,7 +90,8 @@
}
private final CasPathConverter converter = new CasPathConverter();
-
+ private final ListeningScheduledExecutorService retryScheduler =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
private RemoteActionContextProvider actionContextProvider;
@Override
@@ -149,11 +153,14 @@
logger = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock());
}
- RemoteRetrier retrier =
- new RemoteRetrier(
- remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
final AbstractRemoteActionCache cache;
if (enableBlobStoreCache) {
+ Retrier retrier =
+ new Retrier(
+ () -> Retrier.RETRIES_DISABLED,
+ (e) -> false,
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS);
cache =
new SimpleBlobStoreActionCache(
remoteOptions,
@@ -161,6 +168,7 @@
remoteOptions,
GoogleAuthUtils.newCredentials(authAndTlsOptions),
env.getWorkingDirectory()),
+ retrier,
digestUtil);
} else if (enableGrpcCache || remoteOptions.remoteExecutor != null) {
// If a remote executor but no remote cache is specified, assume both at the same target.
@@ -169,6 +177,12 @@
if (logger != null) {
ch = ClientInterceptors.intercept(ch, logger);
}
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS);
cache =
new GrpcRemoteCache(
ch,
@@ -180,11 +194,15 @@
cache = null;
}
- // TODO(davido): The naming is wrong here. "Remote"-prefix in RemoteActionCache class has no
- // meaning.
final GrpcRemoteExecutor executor;
if (remoteOptions.remoteExecutor != null) {
Channel ch = GoogleAuthUtils.newChannel(remoteOptions.remoteExecutor, authAndTlsOptions);
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS);
if (logger != null) {
ch = ClientInterceptors.intercept(ch, logger);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index ebef95e8..f3fa147 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -36,16 +36,14 @@
public String remoteHttpCache;
@Option(
- name = "remote_rest_cache_pool_size",
- defaultValue = "20",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "Size of the HTTP pool for making requests to the REST cache.",
- deprecationWarning =
- "The value will be ignored and the option will be removed in the next "
- + "release. Bazel selects the ideal pool size automatically."
- )
- public int restCachePoolSize;
+ name = "remote_max_connections",
+ defaultValue = "0",
+ documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+ effectTags = {OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
+ help =
+ "The max. number of concurrent network connections to the remote cache/executor. By "
+ + "default Bazel selects the ideal number of connections automatically.")
+ public int remoteMaxConnections;
@Option(
name = "remote_executor",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
index 27de8b7..11d2481 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
@@ -28,8 +29,7 @@
/**
* Specific retry logic for remote execution/caching.
*
- * <p>A call can disable retries by throwing a {@link PassThroughException}.
- * <code>
+ * <p>A call can disable retries by throwing a {@link PassThroughException}. <code>
* RemoteRetrier r = ...;
* try {
* r.execute(() -> {
@@ -42,7 +42,7 @@
* }
* </code>
*/
-class RemoteRetrier extends Retrier {
+public class RemoteRetrier extends Retrier {
/**
* Wraps around an {@link Exception} to make it pass through a single layer of retries.
@@ -75,24 +75,36 @@
}
};
- public RemoteRetrier(RemoteOptions options, Predicate<? super Exception> shouldRetry,
+ public RemoteRetrier(
+ RemoteOptions options,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
- this(options.experimentalRemoteRetry
- ? () -> new ExponentialBackoff(options)
- : () -> RETRIES_DISABLED,
+ this(
+ options.experimentalRemoteRetry
+ ? () -> new ExponentialBackoff(options)
+ : () -> RETRIES_DISABLED,
shouldRetry,
+ retryScheduler,
circuitBreaker);
}
- public RemoteRetrier(Supplier<Backoff> backoff, Predicate<? super Exception> shouldRetry,
+ public RemoteRetrier(
+ Supplier<Backoff> backoff,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
- super(backoff, supportPassthrough(shouldRetry), circuitBreaker);
+ super(backoff, supportPassthrough(shouldRetry), retryScheduler, circuitBreaker);
}
@VisibleForTesting
- RemoteRetrier(Supplier<Backoff> backoff, Predicate<? super Exception> shouldRetry,
- CircuitBreaker circuitBreaker, Sleeper sleeper) {
- super(backoff, supportPassthrough(shouldRetry), circuitBreaker, sleeper);
+ RemoteRetrier(
+ Supplier<Backoff> backoff,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
+ CircuitBreaker circuitBreaker,
+ Sleeper sleeper) {
+ super(backoff, supportPassthrough(shouldRetry), retryScheduler, circuitBreaker, sleeper);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index fb28fff..f533cf7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -14,6 +14,8 @@
package com.google.devtools.build.lib.remote;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@@ -234,7 +236,7 @@
logPath = parent.getRelative(e.getKey());
logCount++;
try {
- remoteCache.downloadFile(logPath, e.getValue().getDigest(), false, null);
+ getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest(), null));
} catch (IOException ex) {
reportOnce(Event.warn("Failed downloading server logs from the remote cache."));
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
index 770098c..a329c04 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
@@ -15,9 +15,19 @@
package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncCallable;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import java.io.IOException;
import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -30,13 +40,10 @@
* delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports
* circuit breaking to stop execution in case of high failure rates.
*/
-// TODO(buchgr): Move to a different package and use it for BES code.
@ThreadSafe
-class Retrier {
+public class Retrier {
- /**
- * A backoff strategy.
- */
+ /** A backoff strategy. */
public interface Backoff {
/**
@@ -55,12 +62,12 @@
/**
* The circuit breaker allows to reject execution when failure rates are high.
*
- * <p>The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are
- * executed and retried in this state. However, if error rates are high a circuit breaker can
- * choose to transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with
- * a {@link RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS}
- * can periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once
- * and in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}.
+ * <p>The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are executed
+ * and retried in this state. However, if error rates are high a circuit breaker can choose to
+ * transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with a {@link
+ * RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS} can
+ * periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once and
+ * in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}.
*
* <p>A circuit breaker implementation must be thread-safe.
*
@@ -68,6 +75,7 @@
*/
public interface CircuitBreaker {
+ /** The state of the circuit breaker. */
enum State {
/**
* Calls are executed and retried in case of failure.
@@ -91,26 +99,28 @@
REJECT_CALLS
}
- /**
- * Returns the current {@link State} of the circuit breaker.
- */
+ /** Returns the current {@link State} of the circuit breaker. */
State state();
- /**
- * Called after an execution failed.
- */
+ /** Called after an execution failed. */
void recordFailure();
- /**
- * Called after an execution succeeded.
- */
+ /** Called after an execution succeeded. */
void recordSuccess();
}
+ /**
+ * {@link Sleeper#sleep(long)} is called to pause between synchronous retries ({@link
+ * #execute(Callable)}.
+ */
public interface Sleeper {
void sleep(long millis) throws InterruptedException;
}
+ /**
+ * Wraps around the actual cause for the retry. Contains information about the number of retry
+ * attempts.
+ */
public static class RetryException extends IOException {
private final int attempts;
@@ -126,14 +136,15 @@
}
/**
- * Returns the number of times a {@link Callable} has been executed before this exception
- * was thrown.
+ * Returns the number of times a {@link Callable} has been executed before this exception was
+ * thrown.
*/
public int getAttempts() {
return attempts;
}
}
+ /** Thrown if the call was stopped by a circuit breaker. */
public static class CircuitBreakerException extends RetryException {
private CircuitBreakerException(String message, int numRetries, Exception cause) {
@@ -145,48 +156,60 @@
}
}
- public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() {
- @Override
- public State state() {
- return State.ACCEPT_CALLS;
- }
+ /** Disables circuit breaking. */
+ public static final CircuitBreaker ALLOW_ALL_CALLS =
+ new CircuitBreaker() {
+ @Override
+ public State state() {
+ return State.ACCEPT_CALLS;
+ }
- @Override
- public void recordFailure() {
- }
+ @Override
+ public void recordFailure() {}
- @Override
- public void recordSuccess() {
- }
- };
+ @Override
+ public void recordSuccess() {}
+ };
- public static final Backoff RETRIES_DISABLED = new Backoff() {
- @Override
- public long nextDelayMillis() {
- return -1;
- }
+ /** Disables retries. */
+ public static final Backoff RETRIES_DISABLED =
+ new Backoff() {
+ @Override
+ public long nextDelayMillis() {
+ return -1;
+ }
- @Override
- public int getRetryAttempts() {
- return 0;
- }
- };
+ @Override
+ public int getRetryAttempts() {
+ return 0;
+ }
+ };
private final Supplier<Backoff> backoffSupplier;
private final Predicate<? super Exception> shouldRetry;
private final CircuitBreaker circuitBreaker;
+ private final ListeningScheduledExecutorService retryService;
private final Sleeper sleeper;
- public Retrier(Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
+ public Retrier(
+ Supplier<Backoff> backoffSupplier,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
- this(backoffSupplier, shouldRetry, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
+ this(
+ backoffSupplier, shouldRetry, retryScheduler, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
}
@VisibleForTesting
- Retrier(Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
- CircuitBreaker circuitBreaker, Sleeper sleeper) {
+ Retrier(
+ Supplier<Backoff> backoffSupplier,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryService,
+ CircuitBreaker circuitBreaker,
+ Sleeper sleeper) {
this.backoffSupplier = backoffSupplier;
this.shouldRetry = shouldRetry;
+ this.retryService = retryService;
this.circuitBreaker = circuitBreaker;
this.sleeper = sleeper;
}
@@ -197,13 +220,13 @@
*
* <p>{@link InterruptedException} is not retried.
*
- * @param call the {@link Callable} to execute.
+ * @param call the {@link Callable} to execute.
* @throws RetryException if the {@code call} didn't succeed within the framework specified by
- * {@code backoffSupplier} and {@code shouldRetry}.
- * @throws CircuitBreakerException in case a call was rejected because the circuit breaker
- * tripped.
+ * {@code backoffSupplier} and {@code shouldRetry}.
+ * @throws CircuitBreakerException in case a call was rejected because the circuit breaker
+ * tripped.
* @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the
- * current thread's interrupted flag is set.
+ * current thread's interrupted flag is set.
*/
public <T> T execute(Callable<T> call) throws RetryException, InterruptedException {
final Backoff backoff = newBackoff();
@@ -230,8 +253,8 @@
e = (Exception) e.getCause();
}
if (State.TRIAL_CALL.equals(circuitState)) {
- throw new CircuitBreakerException("Call failed in circuit breaker half open state.", 0,
- e);
+ throw new CircuitBreakerException(
+ "Call failed in circuit breaker half open state.", 0, e);
}
int attempts = backoff.getRetryAttempts();
if (!shouldRetry.test(e)) {
@@ -247,8 +270,89 @@
}
}
- //TODO(buchgr): Add executeAsync to be used by ByteStreamUploader
- // <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, ScheduledExecutorService executor)
+ /**
+ * Executes an {@link AsyncCallable}, retrying execution in case of failure and returning a {@link
+ * ListenableFuture} pointing to the result/error.
+ */
+ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call) {
+ SettableFuture<T> f = SettableFuture.create();
+ executeAsync(call, f);
+ return f;
+ }
+
+ /**
+ * Executes an {@link AsyncCallable}, retrying execution in case of failure and uses the provided
+ * {@code promise} to point to the result/error.
+ */
+ public <T> void executeAsync(AsyncCallable<T> call, SettableFuture<T> promise) {
+ Preconditions.checkNotNull(call);
+ Preconditions.checkNotNull(promise);
+ Backoff backoff = newBackoff();
+ executeAsync(call, promise, backoff);
+ }
+
+ private <T> void executeAsync(AsyncCallable<T> call, SettableFuture<T> outerF, Backoff backoff) {
+ Preconditions.checkState(!outerF.isDone(), "outerF completed already.");
+ try {
+ Futures.addCallback(
+ call.call(),
+ new FutureCallback<T>() {
+ @Override
+ public void onSuccess(T t) {
+ outerF.set(t);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ onExecuteAsyncFailure(t, call, outerF, backoff);
+ }
+ },
+ MoreExecutors.directExecutor());
+ } catch (Exception e) {
+ onExecuteAsyncFailure(e, call, outerF, backoff);
+ }
+ }
+
+ private <T> void onExecuteAsyncFailure(
+ Throwable t, AsyncCallable<T> call, SettableFuture<T> outerF, Backoff backoff) {
+ long waitMillis = backoff.nextDelayMillis();
+ if (waitMillis >= 0 && t instanceof Exception && isRetriable((Exception) t)) {
+ try {
+ ListenableScheduledFuture<?> sf =
+ retryService.schedule(
+ () -> executeAsync(call, outerF, backoff), waitMillis, TimeUnit.MILLISECONDS);
+ Futures.addCallback(
+ sf,
+ new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object o) {
+ // Submitted successfully. Intentionally left empty.
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
+ outerF.setException(
+ new RetryException(
+ "Scheduled execution errored.", backoff.getRetryAttempts(), e));
+ }
+ },
+ MoreExecutors.directExecutor());
+ } catch (RejectedExecutionException e) {
+ // May be thrown by .schedule(...) if i.e. the executor is shutdown.
+ outerF.setException(
+ new RetryException("Rejected by executor.", backoff.getRetryAttempts(), e));
+ }
+ } else {
+ Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
+ String message =
+ waitMillis >= 0
+ ? "Status not retriable."
+ : "Exhaused retry attempts (" + backoff.getRetryAttempts() + ")";
+ RetryException error = new RetryException(message, backoff.getRetryAttempts(), e);
+ outerF.setException(error);
+ }
+ }
public Backoff newBackoff() {
return backoffSupplier.get();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 803946c..b1b0d60 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -14,6 +14,13 @@
package com.google.devtools.build.lib.remote;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
@@ -24,7 +31,6 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.util.io.FileOutErr;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command;
@@ -57,8 +63,8 @@
private final SimpleBlobStore blobStore;
public SimpleBlobStoreActionCache(
- RemoteOptions options, SimpleBlobStore blobStore, DigestUtil digestUtil) {
- super(options, digestUtil);
+ RemoteOptions options, SimpleBlobStore blobStore, Retrier retrier, DigestUtil digestUtil) {
+ super(options, digestUtil, retrier);
this.blobStore = blobStore;
}
@@ -79,11 +85,12 @@
public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, InterruptedException {
- FileSystemUtils.createDirectoryAndParents(rootLocation);
- Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
+ rootLocation.createDirectoryAndParents();
+ Directory directory = Directory.parseFrom(getFromFuture(downloadBlob(rootDigest)));
for (FileNode file : directory.getFilesList()) {
- downloadFile(
- rootLocation.getRelative(file.getName()), file.getDigest(), file.getIsExecutable(), null);
+ Path dst = rootLocation.getRelative(file.getName());
+ getFromFuture(downloadFile(dst, file.getDigest(), null));
+ dst.setExecutable(file.getIsExecutable());
}
for (DirectoryNode child : directory.getDirectoriesList()) {
downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
@@ -218,26 +225,31 @@
}
@Override
- protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
- try (OutputStream out = dest.getOutputStream()) {
- boolean success = blobStore.get(digest.getHash(), out);
- if (!success) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- }
- }
+ protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ SettableFuture<Void> outerF = SettableFuture.create();
+ Futures.addCallback(
+ blobStore.get(digest.getHash(), out),
+ new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean found) {
+ if (found) {
+ try {
+ out.flush();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
+ }
+ } else {
+ outerF.setException(new CacheNotFoundException(digest, digestUtil));
+ }
+ }
- @Override
- public byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
- if (digest.getSizeBytes() == 0) {
- return new byte[0];
- }
- try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- boolean success = blobStore.get(digest.getHash(), out);
- if (!success) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- return out.toByteArray();
- }
+ @Override
+ public void onFailure(Throwable throwable) {
+ outerF.setException(throwable);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return outerF;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
index 4736756..7d893fb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
@@ -40,6 +40,7 @@
return new HttpBlobStore(
URI.create(options.remoteHttpCache),
(int) TimeUnit.SECONDS.toMillis(options.remoteTimeout),
+ options.remoteMaxConnections,
creds);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
index 247315b..cd0b329 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
@@ -11,6 +11,7 @@
srcs = glob(["*.java"]),
tags = ["bazel"],
deps = [
+ "//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//third_party:guava",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
index 28088d2..85cbc87 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
@@ -15,10 +15,13 @@
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
public final class ConcurrentMapBlobStore implements SimpleBlobStore {
@@ -34,19 +37,26 @@
}
@Override
- public boolean get(String key, OutputStream out) throws IOException {
+ public ListenableFuture<Boolean> get(String key, OutputStream out) {
byte[] data = map.get(key);
+ SettableFuture<Boolean> f = SettableFuture.create();
if (data == null) {
- return false;
+ f.set(false);
+ } else {
+ try {
+ out.write(data);
+ f.set(true);
+ } catch (IOException e) {
+ f.setException(e);
+ }
}
- out.write(data);
- return true;
+ return f;
}
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
- return get(key, out);
+ return getFromFuture(get(key, out));
}
@Override
@@ -57,10 +67,22 @@
}
@Override
- public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ public void putActionResult(String key, byte[] in) {
map.put(key, in);
}
@Override
public void close() {}
+
+ private static <T> T getFromFuture(ListenableFuture<T> f)
+ throws IOException, InterruptedException {
+ try {
+ return f.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new IOException(e.getCause());
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
index 0114310..6556746 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
@@ -13,7 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.vfs.Path;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -35,21 +39,26 @@
}
@Override
- public boolean get(String key, OutputStream out) throws IOException {
- Path f = toPath(key);
- if (!f.exists()) {
- return false;
+ public ListenableFuture<Boolean> get(String key, OutputStream out) {
+ SettableFuture<Boolean> f = SettableFuture.create();
+ Path p = toPath(key);
+ if (!p.exists()) {
+ f.set(false);
+ } else {
+ try (InputStream in = p.getInputStream()) {
+ ByteStreams.copy(in, out);
+ f.set(true);
+ } catch (IOException e) {
+ f.setException(e);
+ }
}
- try (InputStream in = f.getInputStream()) {
- ByteStreams.copy(in, out);
- }
- return true;
+ return f;
}
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
- return get(key, out);
+ return getFromFuture(get(key, out));
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
index b7e4db2..3bf6746 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.blobstore;
+import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -36,7 +37,7 @@
*
* @return {@code true} if the {@code key} was found. {@code false} otherwise.
*/
- boolean get(String key, OutputStream out) throws IOException, InterruptedException;
+ ListenableFuture<Boolean> get(String key, OutputStream out);
/**
* Fetches the BLOB associated with the {@code key} from the Action Cache and writes it to {@code
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD
index e532922..8ba1f88 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD
@@ -16,6 +16,7 @@
],
deps = [
"//src/main/java/com/google/devtools/build/lib/remote/blobstore",
+ "//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/common/options",
"//third_party:auth",
"//third_party:guava",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
index 857dfe4..a017d2a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
@@ -13,7 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore.http;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
import com.google.auth.Credentials;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -23,6 +27,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
@@ -39,6 +44,8 @@
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
@@ -86,9 +93,9 @@
Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?");
private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */);
- private final ChannelPool downloadChannels;
- private final ChannelPool uploadChannels;
+ private final ChannelPool channelPool;
private final URI uri;
+ private final int timeoutMillis;
private final Object credentialsLock = new Object();
@@ -98,7 +105,9 @@
@GuardedBy("credentialsLock")
private long lastRefreshTime;
- public HttpBlobStore(URI uri, int timeoutMillis, @Nullable final Credentials creds)
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public HttpBlobStore(
+ URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds)
throws Exception {
boolean useTls = uri.getScheme().equals("https");
if (uri.getPort() == -1) {
@@ -129,52 +138,48 @@
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.group(eventLoop)
.remoteAddress(uri.getHost(), uri.getPort());
- downloadChannels =
- new SimpleChannelPool(
- clientBootstrap,
- new ChannelPoolHandler() {
- @Override
- public void channelReleased(Channel ch) {
- ch.pipeline().remove("read-timeout-handler");
+ ChannelPoolHandler channelPoolHandler =
+ new ChannelPoolHandler() {
+ @Override
+ public void channelReleased(Channel ch) {}
+
+ @Override
+ public void channelAcquired(Channel ch) {}
+
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline p = ch.pipeline();
+ if (sslCtx != null) {
+ SSLEngine engine = sslCtx.newEngine(ch.alloc());
+ engine.setUseClientMode(true);
+ p.addFirst("ssl-handler", new SslHandler(engine));
+ }
+ }
+ };
+ if (remoteMaxConnections > 0) {
+ channelPool = new FixedChannelPool(clientBootstrap, channelPoolHandler, remoteMaxConnections);
+ } else {
+ channelPool = new SimpleChannelPool(clientBootstrap, channelPoolHandler);
+ }
+ this.creds = creds;
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private Channel acquireUploadChannel() throws InterruptedException {
+ Promise<Channel> channelReady = eventLoop.next().newPromise();
+ channelPool
+ .acquire()
+ .addListener(
+ (Future<Channel> channelAcquired) -> {
+ if (!channelAcquired.isSuccess()) {
+ channelReady.setFailure(channelAcquired.cause());
+ return;
}
- @Override
- public void channelAcquired(Channel ch) {
- ch.pipeline()
- .addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
- }
-
- @Override
- public void channelCreated(Channel ch) {
+ try {
+ Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
- p.addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
- if (sslCtx != null) {
- SSLEngine engine = sslCtx.newEngine(ch.alloc());
- engine.setUseClientMode(true);
- p.addFirst(new SslHandler(engine));
- }
- p.addLast(new HttpClientCodec());
- p.addLast(new HttpDownloadHandler(creds));
- }
- });
- uploadChannels =
- new SimpleChannelPool(
- clientBootstrap,
- new ChannelPoolHandler() {
- @Override
- public void channelReleased(Channel ch) {}
-
- @Override
- public void channelAcquired(Channel ch) {}
-
- @Override
- public void channelCreated(Channel ch) {
- ChannelPipeline p = ch.pipeline();
- if (sslCtx != null) {
- SSLEngine engine = sslCtx.newEngine(ch.alloc());
- engine.setUseClientMode(true);
- p.addFirst(new SslHandler(engine));
- }
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen at random. We only expect HTTP servers to respond with
// an error message in the body and that should always be less than 10KiB.
@@ -182,26 +187,87 @@
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpUploadHandler(creds));
+
+ channelReady.setSuccess(ch);
+ } catch (Throwable t) {
+ channelReady.setFailure(t);
}
});
- this.creds = creds;
+
+ try {
+ return channelReady.get();
+ } catch (ExecutionException e) {
+ PlatformDependent.throwException(e.getCause());
+ return null;
+ }
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void releaseUploadChannel(Channel ch) {
+ if (ch.isOpen()) {
+ ch.pipeline().remove(HttpResponseDecoder.class);
+ ch.pipeline().remove(HttpObjectAggregator.class);
+ ch.pipeline().remove(HttpRequestEncoder.class);
+ ch.pipeline().remove(ChunkedWriteHandler.class);
+ ch.pipeline().remove(HttpUploadHandler.class);
+ }
+ channelPool.release(ch);
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private Future<Channel> acquireDownloadChannel() {
+ Promise<Channel> channelReady = eventLoop.next().newPromise();
+ channelPool
+ .acquire()
+ .addListener(
+ (Future<Channel> channelAcquired) -> {
+ if (!channelAcquired.isSuccess()) {
+ channelReady.setFailure(channelAcquired.cause());
+ return;
+ }
+
+ try {
+ Channel ch = channelAcquired.getNow();
+ ChannelPipeline p = ch.pipeline();
+ ch.pipeline()
+ .addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
+ p.addLast(new HttpClientCodec());
+ p.addLast(new HttpDownloadHandler(creds));
+
+ channelReady.setSuccess(ch);
+ } catch (Throwable t) {
+ channelReady.setFailure(t);
+ }
+ });
+
+ return channelReady;
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void releaseDownloadChannel(Channel ch) {
+ if (ch.isOpen()) {
+ // The channel might have been closed due to an error, in which case its pipeline
+ // has already been cleared. Closed channels can't be reused.
+ ch.pipeline().remove(ReadTimeoutHandler.class);
+ ch.pipeline().remove(HttpClientCodec.class);
+ ch.pipeline().remove(HttpDownloadHandler.class);
+ }
+ channelPool.release(ch);
}
@Override
- public boolean containsKey(String key) throws IOException, InterruptedException {
+ public boolean containsKey(String key) {
throw new UnsupportedOperationException("HTTP Caching does not use this method.");
}
@Override
- public boolean get(String key, OutputStream out) throws IOException, InterruptedException {
+ public ListenableFuture<Boolean> get(String key, OutputStream out) {
return get(key, out, true);
}
@SuppressWarnings("FutureReturnValueIgnored")
- private boolean get(String key, final OutputStream out, boolean casDownload)
- throws IOException, InterruptedException {
+ private ListenableFuture<Boolean> get(String key, final OutputStream out, boolean casDownload) {
final AtomicBoolean dataWritten = new AtomicBoolean();
-
OutputStream wrappedOut =
new OutputStream() {
// OutputStream.close() does nothing, which is what we want to ensure that the
@@ -226,62 +292,90 @@
}
};
DownloadCommand download = new DownloadCommand(uri, casDownload, key, wrappedOut);
+ SettableFuture<Boolean> outerF = SettableFuture.create();
+ acquireDownloadChannel()
+ .addListener(
+ (Future<Channel> chP) -> {
+ if (!chP.isSuccess()) {
+ outerF.setException(chP.cause());
+ return;
+ }
- Channel ch = null;
- try {
- ch = acquireDownloadChannel();
- ChannelFuture downloadFuture = ch.writeAndFlush(download);
- downloadFuture.sync();
- return true;
- } catch (Exception e) {
- // e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a
- // checked exception that hasn't been declared in the method signature.
- if (e instanceof HttpException) {
- HttpResponse response = ((HttpException) e).response();
- if (!dataWritten.get() && authTokenExpired(response)) {
- // The error is due to an auth token having expired. Let's try again.
- refreshCredentials();
- return getAfterCredentialRefresh(download);
- }
- if (cacheMiss(response.status())) {
- return false;
- }
- }
- throw e;
- } finally {
- if (ch != null) {
- downloadChannels.release(ch);
- }
- }
+ Channel ch = chP.getNow();
+ ch.writeAndFlush(download)
+ .addListener(
+ (f) -> {
+ try {
+ if (f.isSuccess()) {
+ outerF.set(true);
+ } else {
+ Throwable cause = f.cause();
+ // cause can be of type HttpException, because Netty uses
+ // Unsafe.throwException to
+ // re-throw a checked exception that hasn't been declared in the method
+ // signature.
+ if (cause instanceof HttpException) {
+ HttpResponse response = ((HttpException) cause).response();
+ if (!dataWritten.get() && authTokenExpired(response)) {
+ // The error is due to an auth token having expired. Let's try
+ // again.
+ refreshCredentials();
+ getAfterCredentialRefresh(download, outerF);
+ return;
+ } else if (cacheMiss(response.status())) {
+ outerF.set(false);
+ return;
+ }
+ }
+ outerF.setException(cause);
+ }
+ } finally {
+ releaseDownloadChannel(ch);
+ }
+ });
+ });
+ return outerF;
}
@SuppressWarnings("FutureReturnValueIgnored")
- private boolean getAfterCredentialRefresh(DownloadCommand cmd) throws InterruptedException {
- Channel ch = null;
- try {
- ch = acquireDownloadChannel();
- ChannelFuture downloadFuture = ch.writeAndFlush(cmd);
- downloadFuture.sync();
- return true;
- } catch (Exception e) {
- if (e instanceof HttpException) {
- HttpResponse response = ((HttpException) e).response();
- if (cacheMiss(response.status())) {
- return false;
- }
- }
- throw e;
- } finally {
- if (ch != null) {
- downloadChannels.release(ch);
- }
- }
+ private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture<Boolean> outerF) {
+ acquireDownloadChannel()
+ .addListener(
+ (Future<Channel> chP) -> {
+ if (!chP.isSuccess()) {
+ outerF.setException(chP.cause());
+ return;
+ }
+
+ Channel ch = chP.getNow();
+ ch.writeAndFlush(cmd)
+ .addListener(
+ (f) -> {
+ try {
+ if (f.isSuccess()) {
+ outerF.set(true);
+ } else {
+ Throwable cause = f.cause();
+ if (cause instanceof HttpException) {
+ HttpResponse response = ((HttpException) cause).response();
+ if (cacheMiss(response.status())) {
+ outerF.set(false);
+ return;
+ }
+ }
+ outerF.setException(cause);
+ }
+ } finally {
+ releaseDownloadChannel(ch);
+ }
+ });
+ });
}
@Override
public boolean getActionResult(String actionKey, OutputStream out)
throws IOException, InterruptedException {
- return get(actionKey, out, false);
+ return getFromFuture(get(actionKey, out, false));
}
@Override
@@ -329,7 +423,7 @@
} finally {
in.close();
if (ch != null) {
- uploadChannels.release(ch);
+ releaseUploadChannel(ch);
}
}
}
@@ -343,7 +437,7 @@
uploadFuture.sync();
} finally {
if (ch != null) {
- uploadChannels.release(ch);
+ releaseUploadChannel(ch);
}
}
}
@@ -376,8 +470,7 @@
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void close() {
- downloadChannels.close();
- uploadChannels.close();
+ channelPool.close();
eventLoop.shutdownGracefully();
}
@@ -403,24 +496,6 @@
}
}
- private Channel acquireDownloadChannel() throws InterruptedException {
- try {
- return downloadChannels.acquire().get();
- } catch (ExecutionException e) {
- PlatformDependent.throwException(e.getCause());
- return null;
- }
- }
-
- private Channel acquireUploadChannel() throws InterruptedException {
- try {
- return uploadChannels.acquire().get();
- } catch (ExecutionException e) {
- PlatformDependent.throwException(e.getCause());
- return null;
- }
- }
-
private void refreshCredentials() throws IOException {
synchronized (credentialsLock) {
long now = System.currentTimeMillis();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
index e4dc92a..ae8b109 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -17,6 +17,8 @@
import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
+import com.google.common.hash.HashingOutputStream;
+import com.google.common.io.BaseEncoding;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.cache.DigestUtils;
@@ -29,6 +31,7 @@
import com.google.protobuf.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
/** Utility methods to work with {@link Digest}. */
public class DigestUtil {
@@ -106,6 +109,14 @@
return Digest.newBuilder().setHash(hexHash).setSizeBytes(size).build();
}
+ public static String hashCodeToString(HashCode hash) {
+ return BaseEncoding.base16().lowerCase().encode(hash.asBytes());
+ }
+
+ public HashingOutputStream newHashingOutputStream(OutputStream out) {
+ return new HashingOutputStream(hashFn.getHash(), out);
+ }
+
public String toString(Digest digest) {
return digest.getHash() + "/" + digest.getSizeBytes();
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
new file mode 100644
index 0000000..95a02ab
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -0,0 +1,43 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote.util;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+/** Utility methods for the remote package. * */
+public class Utils {
+
+ private Utils() {}
+
+ /**
+ * Returns the result of a {@link ListenableFuture} if successful, or throws any checked {@link
+ * Exception} directly if it's an {@link IOException} or else wraps it in an {@link IOException}.
+ */
+ public static <T> T getFromFuture(ListenableFuture<T> f)
+ throws IOException, InterruptedException {
+ try {
+ return f.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ }
+ throw new IOException(e.getCause());
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index db1f0c3..731610e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -65,7 +65,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -85,8 +87,7 @@
private static final String INSTANCE_NAME = "foo";
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
- private final ListeningScheduledExecutorService retryService =
- MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ private static ListeningScheduledExecutorService retryService;
private Server server;
private Channel channel;
@@ -95,6 +96,11 @@
@Mock private Retrier.Backoff mockBackoff;
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public final void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
@@ -118,17 +124,20 @@
withEmptyMetadata.detach(prevContext);
server.shutdownNow();
- retryService.shutdownNow();
server.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test(timeout = 10000)
public void singleBlobUploadShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -198,9 +207,9 @@
public void multipleBlobsUploadShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
int numUploads = 100;
Map<String, byte[]> blobsByHash = new HashMap<>();
@@ -285,15 +294,15 @@
withEmptyMetadata.detach(prevContext);
}
- @Test(timeout = 20000)
+ @Test
public void contextShouldBePreservedUponRetries() throws Exception {
Context prevContext = withEmptyMetadata.attach();
// We upload blobs with different context, and retry 3 times for each upload.
// We verify that the correct metadata is passed to the server with every blob.
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(3, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(5, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
List<Chunker> builders = new ArrayList<>(toUpload.size());
@@ -383,9 +392,8 @@
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -445,9 +453,9 @@
public void errorsShouldBeReported() throws IOException, InterruptedException {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -475,9 +483,9 @@
public void shutdownShouldCancelOngoingUploads() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
CountDownLatch cancellations = new CountDownLatch(2);
@@ -532,10 +540,12 @@
@Test(timeout = 10000)
public void failureInRetryExecutorShouldBeHandled() throws Exception {
Context prevContext = withEmptyMetadata.attach();
+ ListeningScheduledExecutorService retryService =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -567,9 +577,9 @@
public void resourceNameWithoutInstanceName() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
+ new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -610,9 +620,10 @@
new RemoteRetrier(
() -> new FixedBackoff(1, 0),
/* No Status is retriable. */ (e) -> false,
+ retryService,
Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
+ new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
AtomicInteger numCalls = new AtomicInteger();
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index 522241c..25228a5 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
@@ -27,6 +28,8 @@
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.authandtls.GoogleAuthUtils;
@@ -71,8 +74,11 @@
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
+import java.util.concurrent.Executors;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -95,6 +101,12 @@
private Server fakeServer;
private Context withEmptyMetadata;
private Context prevContext;
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
@Before
public final void setUp() throws Exception {
@@ -129,6 +141,11 @@
fakeServer.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
private static class CallCredentialsInterceptor implements ClientInterceptor {
private final CallCredentials credentials;
@@ -166,7 +183,10 @@
RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
RemoteRetrier retrier =
new RemoteRetrier(
- remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryService,
+ Retrier.ALLOW_ALL_CALLS);
return new GrpcRemoteCache(
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(),
@@ -182,7 +202,7 @@
GrpcRemoteCache client = newClient();
Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
// Will not call the mock Bytestream interface at all.
- assertThat(client.downloadBlob(emptyDigest)).isEmpty();
+ assertThat(getFromFuture(client.downloadBlob(emptyDigest))).isEmpty();
}
@Test
@@ -199,7 +219,7 @@
responseObserver.onCompleted();
}
});
- assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
+ assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
}
@Test
@@ -220,7 +240,7 @@
responseObserver.onCompleted();
}
});
- assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
+ assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index c6758c1..e8b1313 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -26,6 +26,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -94,8 +96,11 @@
import java.util.Collection;
import java.util.Set;
import java.util.SortedMap;
+import java.util.concurrent.Executors;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -127,6 +132,7 @@
private RemoteSpawnRunner client;
private FileOutErr outErr;
private Server fakeServer;
+ private static ListeningScheduledExecutorService retryService;
private final SpawnExecutionContext simplePolicy =
new SpawnExecutionContext() {
@@ -182,6 +188,11 @@
}
};
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public final void setUp() throws Exception {
String fakeServerName = "fake server for " + getClass();
@@ -238,7 +249,8 @@
outErr = new FileOutErr(stdout, stderr);
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
RemoteRetrier retrier =
- new RemoteRetrier(options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
+ new RemoteRetrier(
+ options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, Retrier.ALLOW_ALL_CALLS);
Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build();
GrpcRemoteExecutor executor =
new GrpcRemoteExecutor(channel, null, options.remoteTimeout, retrier);
@@ -268,6 +280,11 @@
fakeServer.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test
public void cacheHit() throws Exception {
serviceRegistry.addService(
@@ -909,10 +926,10 @@
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
- // First read is a cache miss, next read succeeds.
+ // First read is a retriable error, next read succeeds.
if (first) {
first = false;
- responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
} else {
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("stdout")).build());
@@ -968,7 +985,7 @@
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
- assertThat(result.isCacheHit()).isFalse();
+ assertThat(result.isCacheHit()).isTrue();
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java
index 68ce454..d9b08d2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java
@@ -19,6 +19,8 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.Range;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
@@ -27,9 +29,12 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -46,12 +51,23 @@
}
private RemoteRetrierTest.Foo fooMock;
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
@Before
public void setUp() {
fooMock = Mockito.mock(RemoteRetrierTest.Foo.class);
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test
public void testExponentialBackoff() throws Exception {
Retrier.Backoff backoff =
@@ -93,7 +109,7 @@
options.experimentalRemoteRetry = false;
RemoteRetrier retrier =
- Mockito.spy(new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS));
+ Mockito.spy(new RemoteRetrier(options, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS));
when(fooMock.foo())
.thenReturn("bla")
.thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException());
@@ -106,8 +122,14 @@
public void testNonRetriableError() throws Exception {
Supplier<Backoff> s =
() -> new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2.0, 0.0, 2);
- RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(s, (e) -> false,
- Retrier.ALLOW_ALL_CALLS, Mockito.mock(Sleeper.class)));
+ RemoteRetrier retrier =
+ Mockito.spy(
+ new RemoteRetrier(
+ s,
+ (e) -> false,
+ retryService,
+ Retrier.ALLOW_ALL_CALLS,
+ Mockito.mock(Sleeper.class)));
when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException());
assertThrows(retrier, 1);
Mockito.verify(fooMock, Mockito.times(1)).foo();
@@ -118,8 +140,9 @@
Supplier<Backoff> s =
() -> new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2.0, 0.0, 2);
Sleeper sleeper = Mockito.mock(Sleeper.class);
- RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(s, (e) -> true,
- Retrier.ALLOW_ALL_CALLS, sleeper));
+ RemoteRetrier retrier =
+ Mockito.spy(
+ new RemoteRetrier(s, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS, sleeper));
when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException());
assertThrows(retrier, 3);
@@ -135,7 +158,8 @@
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
options.experimentalRemoteRetry = false;
- RemoteRetrier retrier = new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ RemoteRetrier retrier =
+ new RemoteRetrier(options, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
try {
retrier.execute(() -> {
throw thrown;
@@ -151,7 +175,8 @@
StatusRuntimeException thrown = Status.Code.UNKNOWN.toStatus().asRuntimeException();
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteRetrier retrier = new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ RemoteRetrier retrier =
+ new RemoteRetrier(options, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
AtomicInteger numCalls = new AtomicInteger();
try {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 3c8293a..b325eeb 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander;
@@ -514,6 +515,7 @@
logDir);
Digest logDigest = digestUtil.computeAsUtf8("bla");
+ Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
when(executor.executeRemotely(any(ExecuteRequest.class)))
.thenReturn(
ExecuteResponse.newBuilder()
@@ -522,6 +524,9 @@
LogFile.newBuilder().setHumanReadable(true).setDigest(logDigest).build())
.setResult(ActionResult.newBuilder().setExitCode(31).build())
.build());
+ SettableFuture<Void> completed = SettableFuture.create();
+ completed.set(null);
+ when(cache.downloadFile(eq(logPath), eq(logDigest), eq(null))).thenReturn(completed);
Spawn spawn = newSimpleSpawn();
SpawnExecutionContext policy = new FakeSpawnExecutionContext(spawn);
@@ -530,8 +535,7 @@
assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
verify(executor).executeRemotely(any(ExecuteRequest.class));
- Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
- verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(false), eq(null));
+ verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(null));
}
@Test
@@ -551,6 +555,7 @@
logDir);
Digest logDigest = digestUtil.computeAsUtf8("bla");
+ Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
com.google.rpc.Status timeoutStatus =
com.google.rpc.Status.newBuilder().setCode(Code.DEADLINE_EXCEEDED.getNumber()).build();
ExecuteResponse resp =
@@ -562,6 +567,9 @@
when(executor.executeRemotely(any(ExecuteRequest.class)))
.thenThrow(new Retrier.RetryException(
"", 1, new ExecutionStatusException(resp.getStatus(), resp)));
+ SettableFuture<Void> completed = SettableFuture.create();
+ completed.set(null);
+ when(cache.downloadFile(eq(logPath), eq(logDigest), eq(null))).thenReturn(completed);
Spawn spawn = newSimpleSpawn();
SpawnExecutionContext policy = new FakeSpawnExecutionContext(spawn);
@@ -570,8 +578,7 @@
assertThat(res.status()).isEqualTo(Status.TIMEOUT);
verify(executor).executeRemotely(any(ExecuteRequest.class));
- Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
- verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(false), eq(null));
+ verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(null));
}
@Test
@@ -609,9 +616,7 @@
verify(executor).executeRemotely(any(ExecuteRequest.class));
verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class));
- verify(cache, never())
- .downloadFile(
- any(Path.class), any(Digest.class), any(Boolean.class), any(ByteString.class));
+ verify(cache, never()).downloadFile(any(Path.class), any(Digest.class), any(ByteString.class));
}
@Test
@@ -649,9 +654,7 @@
verify(executor).executeRemotely(any(ExecuteRequest.class));
verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class));
- verify(cache, never())
- .downloadFile(
- any(Path.class), any(Digest.class), any(Boolean.class), any(ByteString.class));
+ verify(cache, never()).downloadFile(any(Path.class), any(Digest.class), any(ByteString.class));
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java
index 945c27d..624d074 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java
@@ -21,16 +21,22 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreakerException;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -49,19 +55,31 @@
private static final Predicate<Exception> RETRY_ALL = (e) -> true;
private static final Predicate<Exception> RETRY_NONE = (e) -> false;
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(alwaysOpen.state()).thenReturn(State.ACCEPT_CALLS);
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test
public void retryShouldWork_failure() throws Exception {
// Test that a call is retried according to the backoff.
// All calls fail.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
try {
r.execute(() -> {
throw new Exception("call failed");
@@ -81,7 +99,7 @@
// All calls fail.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/2);
- Retrier r = new Retrier(s, RETRY_NONE, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_NONE, retryService, alwaysOpen);
try {
r.execute(() -> {
throw new Exception("call failed");
@@ -101,7 +119,7 @@
// The last call succeeds.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
AtomicInteger numCalls = new AtomicInteger();
int val = r.execute(() -> {
numCalls.incrementAndGet();
@@ -121,7 +139,7 @@
// Test that nested calls using retries compose as expected.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/1);
- Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
AtomicInteger attemptsLvl0 = new AtomicInteger();
AtomicInteger attemptsLvl1 = new AtomicInteger();
@@ -152,7 +170,7 @@
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
try {
r.execute(() -> {
@@ -174,7 +192,7 @@
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
cb.trialCall();
@@ -192,7 +210,7 @@
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
cb.trialCall();
@@ -214,7 +232,7 @@
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
try {
Thread.currentThread().interrupt();
@@ -230,7 +248,7 @@
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
try {
Thread.currentThread().interrupt();
@@ -242,6 +260,26 @@
}
}
+ @Test
+ public void asyncRetryShouldWork() throws Exception {
+ // Test that a call is retried according to the backoff.
+ // All calls fail.
+
+ Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/ 2);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
+ try {
+ r.executeAsync(
+ () -> {
+ throw new Exception("call failed");
+ })
+ .get();
+ fail("exception expected.");
+ } catch (ExecutionException e) {
+ assertThat(e.getCause()).isInstanceOf(RetryException.class);
+ assertThat(((RetryException) e.getCause()).getAttempts()).isEqualTo(3);
+ }
+ }
+
/**
* Simple circuit breaker that trips after N consecutive failures.
*/
diff --git a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
index 594d56b..83d5bc3 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
@@ -14,12 +14,16 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.clock.JavaClock;
+import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -38,8 +42,11 @@
import io.grpc.Context;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -54,6 +61,14 @@
private FakeActionInputFileCache fakeFileCache;
private Context withEmptyMetadata;
private Context prevContext;
+ private Retrier retrier;
+
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
@Before
public final void setUp() throws Exception {
@@ -62,7 +77,23 @@
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
+ retrier =
+ new Retrier(
+ () ->
+ new Backoff() {
+ @Override
+ public long nextDelayMillis() {
+ return -1;
+ }
+ @Override
+ public int getRetryAttempts() {
+ return 0;
+ }
+ },
+ (e) -> false,
+ retryService,
+ RemoteRetrier.ALLOW_ALL_CALLS);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
@@ -78,13 +109,21 @@
withEmptyMetadata.detach(prevContext);
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
private SimpleBlobStoreActionCache newClient() {
return newClient(new ConcurrentHashMap<>());
}
private SimpleBlobStoreActionCache newClient(ConcurrentMap<String, byte[]> map) {
return new SimpleBlobStoreActionCache(
- Options.getDefaults(RemoteOptions.class), new ConcurrentMapBlobStore(map), DIGEST_UTIL);
+ Options.getDefaults(RemoteOptions.class),
+ new ConcurrentMapBlobStore(map),
+ retrier,
+ DIGEST_UTIL);
}
@Test
@@ -92,7 +131,7 @@
SimpleBlobStoreActionCache client = newClient();
Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
// Will not call the mock Bytestream interface at all.
- assertThat(client.downloadBlob(emptyDigest)).isEmpty();
+ assertThat(getFromFuture(client.downloadBlob(emptyDigest))).isEmpty();
}
@Test
@@ -101,7 +140,7 @@
Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
map.put(digest.getHash(), "abcdefg".getBytes(Charsets.UTF_8));
final SimpleBlobStoreActionCache client = newClient(map);
- assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
+ assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
index 6850f9e..c677874 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.blobstore.http;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.util.Collections.singletonList;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -93,8 +94,8 @@
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, credentials);
- blobStore.get("key", new ByteArrayOutputStream());
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials);
+ getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected");
}
@@ -115,8 +116,8 @@
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, credentials);
- blobStore.get("key", new ByteArrayOutputStream());
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials);
+ getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected");
} finally {
closeServerChannel(server);
@@ -137,9 +138,9 @@
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
- blobStore.get("key", out);
+ getFromFuture(blobStore.get("key", out));
assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents");
verify(credentials, times(1)).refresh();
verify(credentials, times(2)).getRequestMetadata(any(URI.class));
@@ -166,7 +167,7 @@
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
ByteArrayInputStream in = new ByteArrayInputStream(data);
blobStore.put("key", data.length, in);
@@ -194,8 +195,8 @@
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
- blobStore.get("key", new ByteArrayOutputStream());
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
+ getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected.");
} catch (Exception e) {
assertThat(e).isInstanceOf(HttpException.class);
@@ -221,7 +222,7 @@
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
blobStore.put("key", 1, new ByteArrayInputStream(new byte[] {0}));
fail("Exception expected.");
} catch (Exception e) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
index 3fbbd14..4010243 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.remote.worker;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
@@ -80,7 +81,7 @@
try {
// This still relies on the blob size to be small enough to fit in memory.
// TODO(olaola): refactor to fix this if the need arises.
- Chunker c = new Chunker(cache.downloadBlob(digest), digestUtil);
+ Chunker c = new Chunker(getFromFuture(cache.downloadBlob(digest)), digestUtil);
while (c.hasNext()) {
responseObserver.onNext(
ReadResponse.newBuilder().setData(c.next().getData()).build());
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
index 0912b1f..f950698 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.remote.worker;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.SEVERE;
@@ -178,7 +179,7 @@
try {
command =
com.google.devtools.remoteexecution.v1test.Command.parseFrom(
- cache.downloadBlob(action.getCommandDigest()));
+ getFromFuture(cache.downloadBlob(action.getCommandDigest())));
cache.downloadTree(action.getInputRootDigest(), execRoot);
} catch (CacheNotFoundException e) {
throw StatusUtils.notFoundError(e.getMissingDigest());
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index a55d2d9..6f6b2c1 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -24,7 +24,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.RemoteOptions;
+import com.google.devtools.build.lib.remote.RemoteRetrier;
+import com.google.devtools.build.lib.remote.Retrier;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory;
import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore;
@@ -66,6 +70,7 @@
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -267,18 +272,28 @@
blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
}
+ ListeningScheduledExecutorService retryService =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryService,
+ Retrier.ALLOW_ALL_CALLS);
DigestUtil digestUtil = new DigestUtil(fs.getDigestFunction());
RemoteWorker worker =
new RemoteWorker(
fs,
remoteWorkerOptions,
- new SimpleBlobStoreActionCache(remoteOptions, blobStore, digestUtil),
+ new SimpleBlobStoreActionCache(remoteOptions, blobStore, retrier, digestUtil),
sandboxPath,
digestUtil);
final Server server = worker.startServer();
worker.createPidFile();
server.awaitTermination();
+ retryService.shutdownNow();
}
private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {