Remote: gRPC load balancing. (Part 5)

Refactor ReferenceCountedChannel to use DynamicConnectionPool when creating new calls. This change allows existing remote execution/cache client dynamically create new connections on demand.

This change includes rxjava3 to final jar so the size of install_base is increased (~6M for macOS).

PiperOrigin-RevId: 359687769
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index a532787..9cd30be 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -37,6 +37,7 @@
 import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
 import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TestUtils;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -44,7 +45,6 @@
 import com.google.protobuf.ByteString;
 import io.grpc.BindableService;
 import io.grpc.CallCredentials;
-import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.Server;
 import io.grpc.ServerCall;
@@ -61,6 +61,7 @@
 import io.grpc.stub.MetadataUtils;
 import io.grpc.stub.StreamObserver;
 import io.grpc.util.MutableHandlerRegistry;
+import io.reactivex.rxjava3.core.Single;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -98,8 +99,9 @@
   private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
   private ListeningScheduledExecutorService retryService;
 
+  private final String serverName = "Server for " + this.getClass();
   private Server server;
-  private ManagedChannel channel;
+  private ChannelConnectionFactory channelConnectionFactory;
   private RemoteActionExecutionContext context;
 
   @Mock private Retrier.Backoff mockBackoff;
@@ -108,13 +110,24 @@
   public final void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
 
-    String serverName = "Server for " + this.getClass();
     server =
         InProcessServerBuilder.forName(serverName)
             .fallbackHandlerRegistry(serviceRegistry)
             .build()
             .start();
-    channel = InProcessChannelBuilder.forName(serverName).build();
+    channelConnectionFactory =
+        new ChannelConnectionFactory() {
+          @Override
+          public Single<? extends ChannelConnection> create() {
+            return Single.just(
+                new ChannelConnection(InProcessChannelBuilder.forName(serverName).build()));
+          }
+
+          @Override
+          public int maxConcurrency() {
+            return 100;
+          }
+        };
     RequestMetadata metadata =
         TracingMetadataUtils.buildMetadata(
             "none",
@@ -131,8 +144,6 @@
     retryService.awaitTermination(
         com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
-    channel.shutdownNow();
-    channel.awaitTermination(5, TimeUnit.SECONDS);
     server.shutdownNow();
     server.awaitTermination();
   }
@@ -144,7 +155,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -221,7 +232,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
             retrier);
@@ -338,7 +349,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             1,
             retrier);
@@ -397,7 +408,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
             retrier);
@@ -468,7 +479,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
             retrier);
@@ -506,7 +517,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
             retrier);
@@ -548,7 +559,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -583,7 +594,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -699,9 +710,21 @@
         new ByteStreamUploader(
             INSTANCE_NAME,
             new ReferenceCountedChannel(
-                InProcessChannelBuilder.forName("Server for " + this.getClass())
-                    .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata))
-                    .build()),
+                new ChannelConnectionFactory() {
+                  @Override
+                  public Single<? extends ChannelConnection> create() {
+                    return Single.just(
+                        new ChannelConnection(
+                            InProcessChannelBuilder.forName(serverName)
+                                .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata))
+                                .build()));
+                  }
+
+                  @Override
+                  public int maxConcurrency() {
+                    return 100;
+                  }
+                }),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -762,7 +785,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -828,7 +851,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -861,7 +884,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -925,7 +948,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -963,7 +986,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             /* instanceName= */ null,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -1006,7 +1029,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             /* instanceName= */ null,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -1042,7 +1065,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -1120,7 +1143,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -1201,7 +1224,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             callCredentialsProvider,
             /* callTimeoutSecs= */ 60,
             retrier);
@@ -1258,7 +1281,7 @@
     ByteStreamUploader uploader =
         new ByteStreamUploader(
             INSTANCE_NAME,
-            new ReferenceCountedChannel(channel),
+            new ReferenceCountedChannel(channelConnectionFactory),
             callCredentialsProvider,
             /* callTimeoutSecs= */ 60,
             retrier);