Remote: gRPC load balancing. (Part 5)
Refactor ReferenceCountedChannel to use DynamicConnectionPool when creating new calls. This change allows existing remote execution/cache client dynamically create new connections on demand.
This change includes rxjava3 to final jar so the size of install_base is increased (~6M for macOS).
PiperOrigin-RevId: 359687769
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index a532787..9cd30be 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -37,6 +37,7 @@
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -44,7 +45,6 @@
import com.google.protobuf.ByteString;
import io.grpc.BindableService;
import io.grpc.CallCredentials;
-import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
@@ -61,6 +61,7 @@
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
+import io.reactivex.rxjava3.core.Single;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -98,8 +99,9 @@
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private ListeningScheduledExecutorService retryService;
+ private final String serverName = "Server for " + this.getClass();
private Server server;
- private ManagedChannel channel;
+ private ChannelConnectionFactory channelConnectionFactory;
private RemoteActionExecutionContext context;
@Mock private Retrier.Backoff mockBackoff;
@@ -108,13 +110,24 @@
public final void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- String serverName = "Server for " + this.getClass();
server =
InProcessServerBuilder.forName(serverName)
.fallbackHandlerRegistry(serviceRegistry)
.build()
.start();
- channel = InProcessChannelBuilder.forName(serverName).build();
+ channelConnectionFactory =
+ new ChannelConnectionFactory() {
+ @Override
+ public Single<? extends ChannelConnection> create() {
+ return Single.just(
+ new ChannelConnection(InProcessChannelBuilder.forName(serverName).build()));
+ }
+
+ @Override
+ public int maxConcurrency() {
+ return 100;
+ }
+ };
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(
"none",
@@ -131,8 +144,6 @@
retryService.awaitTermination(
com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- channel.shutdownNow();
- channel.awaitTermination(5, TimeUnit.SECONDS);
server.shutdownNow();
server.awaitTermination();
}
@@ -144,7 +155,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -221,7 +232,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
retrier);
@@ -338,7 +349,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
1,
retrier);
@@ -397,7 +408,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
retrier);
@@ -468,7 +479,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
retrier);
@@ -506,7 +517,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
retrier);
@@ -548,7 +559,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -583,7 +594,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -699,9 +710,21 @@
new ByteStreamUploader(
INSTANCE_NAME,
new ReferenceCountedChannel(
- InProcessChannelBuilder.forName("Server for " + this.getClass())
- .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata))
- .build()),
+ new ChannelConnectionFactory() {
+ @Override
+ public Single<? extends ChannelConnection> create() {
+ return Single.just(
+ new ChannelConnection(
+ InProcessChannelBuilder.forName(serverName)
+ .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata))
+ .build()));
+ }
+
+ @Override
+ public int maxConcurrency() {
+ return 100;
+ }
+ }),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -762,7 +785,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -828,7 +851,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -861,7 +884,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -925,7 +948,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -963,7 +986,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
/* instanceName= */ null,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -1006,7 +1029,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
/* instanceName= */ null,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -1042,7 +1065,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -1120,7 +1143,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);
@@ -1201,7 +1224,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
callCredentialsProvider,
/* callTimeoutSecs= */ 60,
retrier);
@@ -1258,7 +1281,7 @@
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
- new ReferenceCountedChannel(channel),
+ new ReferenceCountedChannel(channelConnectionFactory),
callCredentialsProvider,
/* callTimeoutSecs= */ 60,
retrier);