remote: concurrent blob downloads. Fixes #5215
This change introduces concurrent downloads of action outputs
for remote caching/execution. So far, for an action we would
download one output after the other which isn't as bad as it
sounds as we would typically run dozens or hundreds of actions
in parallel. However, for actions with a lot of outputs or graphs
that allow limited parallelism we expect this change to positively
impact performance.
Note, that with this change the AbstractRemoteActionCache will
attempt to always download all outputs concurrently. The actual
parallelism is controlled by the underlying network transport.
The gRPC transport currently enforces no limits on the concurrent
calls, which should be fine given that all calls are multiplexed
on a single network connection. The HTTP/1.1 transport also
enforces no parallelism by default, but I have added the
--remote_max_connections=INT flag which allows to specify an upper
bound on the number of network connections to be open concurrently.
I have introduced this flag as a defensive mechanism for users
who's environment might enforce an upper bound on the number of open
connections, as with this change its possible for the number of
concurrently open connections to dramatically increase (from
NumParallelActions to NumParallelActions * SumParallelActionOutputs).
A side effect of this change is that it puts the infrastructure
for retries and circuit breaking for the HttpBlobStore in place.
RELNOTES: None
PiperOrigin-RevId: 199005510
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index db1f0c3..731610e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -65,7 +65,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -85,8 +87,7 @@
private static final String INSTANCE_NAME = "foo";
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
- private final ListeningScheduledExecutorService retryService =
- MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ private static ListeningScheduledExecutorService retryService;
private Server server;
private Channel channel;
@@ -95,6 +96,11 @@
@Mock private Retrier.Backoff mockBackoff;
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public final void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
@@ -118,17 +124,20 @@
withEmptyMetadata.detach(prevContext);
server.shutdownNow();
- retryService.shutdownNow();
server.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test(timeout = 10000)
public void singleBlobUploadShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -198,9 +207,9 @@
public void multipleBlobsUploadShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
int numUploads = 100;
Map<String, byte[]> blobsByHash = new HashMap<>();
@@ -285,15 +294,15 @@
withEmptyMetadata.detach(prevContext);
}
- @Test(timeout = 20000)
+ @Test
public void contextShouldBePreservedUponRetries() throws Exception {
Context prevContext = withEmptyMetadata.attach();
// We upload blobs with different context, and retry 3 times for each upload.
// We verify that the correct metadata is passed to the server with every blob.
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(3, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(5, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
List<Chunker> builders = new ArrayList<>(toUpload.size());
@@ -383,9 +392,8 @@
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -445,9 +453,9 @@
public void errorsShouldBeReported() throws IOException, InterruptedException {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -475,9 +483,9 @@
public void shutdownShouldCancelOngoingUploads() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
CountDownLatch cancellations = new CountDownLatch(2);
@@ -532,10 +540,12 @@
@Test(timeout = 10000)
public void failureInRetryExecutorShouldBeHandled() throws Exception {
Context prevContext = withEmptyMetadata.attach();
+ ListeningScheduledExecutorService retryService =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -567,9 +577,9 @@
public void resourceNameWithoutInstanceName() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
+ new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -610,9 +620,10 @@
new RemoteRetrier(
() -> new FixedBackoff(1, 0),
/* No Status is retriable. */ (e) -> false,
+ retryService,
Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
+ new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
AtomicInteger numCalls = new AtomicInteger();