Remote: Don't blocking-get when acquiring gRPC connections. With recent change to limit the max number of gRPC connections by default, acquiring a connection could suspend a thread if there is no available connection. gRPC calls are scheduled to a dedicated background thread pool. Workers in the thread pool are responsible to acquire the connection before starting the RPC call. There could be a race condition that a worker thread handles some gRPC calls and then switches to a new call which will acquire new connections. If the number of connections reaches the max, the worker thread is suspended and doesn't have a chance to switch to previous calls. The connections held by previous calls are, hence, never released. This PR changes to not use blocking get when acquiring gRPC connections. Fixes #14363. Closes #14416. PiperOrigin-RevId: 416282883
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 1eaa3fd..a5745bf 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -138,9 +138,10 @@ ], deps = [ "//src/main/java/com/google/devtools/build/lib/remote/grpc", + "//src/main/java/com/google/devtools/build/lib/remote/util", "//third_party:guava", - "//third_party:jsr305", "//third_party:netty", + "//third_party:rxjava3", "//third_party/grpc:grpc-jar", ], )
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 2087492..e409fdf 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -24,6 +24,7 @@ import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; @@ -388,7 +389,7 @@ private class AsyncUpload { private final RemoteActionExecutionContext context; - private final Channel channel; + private final ReferenceCountedChannel channel; private final CallCredentialsProvider callCredentialsProvider; private final long callTimeoutSecs; private final Retrier retrier; @@ -399,7 +400,7 @@ AsyncUpload( RemoteActionExecutionContext context, - Channel channel, + ReferenceCountedChannel channel, CallCredentialsProvider callCredentialsProvider, long callTimeoutSecs, Retrier retrier, @@ -480,7 +481,7 @@ MoreExecutors.directExecutor()); } - private ByteStreamFutureStub bsFutureStub() { + private ByteStreamFutureStub bsFutureStub(Channel channel) { return ByteStreamGrpc.newFutureStub(channel) .withInterceptors( TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata())) @@ -491,7 +492,10 @@ private ListenableFuture<Void> callAndQueryOnFailure( AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) { return Futures.catchingAsync( - call(committedOffset), + Futures.transform( + channel.withChannelFuture(channel -> call(committedOffset, channel)), + written -> null, + MoreExecutors.directExecutor()), Exception.class, (e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff), MoreExecutors.directExecutor()); @@ -528,10 +532,14 @@ AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) { ListenableFuture<Long> committedSizeFuture = Futures.transform( - bsFutureStub() - .queryWriteStatus( - QueryWriteStatusRequest.newBuilder().setResourceName(resourceName).build()), - (response) -> response.getCommittedSize(), + channel.withChannelFuture( + channel -> + bsFutureStub(channel) + .queryWriteStatus( + QueryWriteStatusRequest.newBuilder() + .setResourceName(resourceName) + .build())), + QueryWriteStatusResponse::getCommittedSize, MoreExecutors.directExecutor()); ListenableFuture<Long> guardedCommittedSizeFuture = Futures.catchingAsync( @@ -561,14 +569,14 @@ MoreExecutors.directExecutor()); } - private ListenableFuture<Void> call(AtomicLong committedOffset) { + private ListenableFuture<Long> call(AtomicLong committedOffset, Channel channel) { CallOptions callOptions = CallOptions.DEFAULT .withCallCredentials(callCredentialsProvider.getCallCredentials()) .withDeadlineAfter(callTimeoutSecs, SECONDS); call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions); - SettableFuture<Void> uploadResult = SettableFuture.create(); + SettableFuture<Long> uploadResult = SettableFuture.create(); ClientCall.Listener<WriteResponse> callListener = new ClientCall.Listener<WriteResponse>() { @@ -596,7 +604,7 @@ @Override public void onClose(Status status, Metadata trailers) { if (status.isOk()) { - uploadResult.set(null); + uploadResult.set(committedOffset.get()); } else { uploadResult.setException(status.asRuntimeException()); }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java index 41f5306..d50a77c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
@@ -35,12 +35,13 @@ import com.google.longrunning.Operation; import com.google.longrunning.Operation.ResultCase; import com.google.rpc.Status; +import io.grpc.Channel; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import io.reactivex.rxjava3.functions.Function; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import javax.annotation.Nullable; /** @@ -73,7 +74,7 @@ this.retrier = retrier; } - private ExecutionBlockingStub executionBlockingStub(RequestMetadata metadata) { + private ExecutionBlockingStub executionBlockingStub(RequestMetadata metadata, Channel channel) { return ExecutionGrpc.newBlockingStub(channel) .withInterceptors(TracingMetadataUtils.attachMetadataInterceptor(metadata)) .withCallCredentials(callCredentialsProvider.getCallCredentials()) @@ -90,7 +91,8 @@ // Count retry times for WaitExecution() calls and is reset when we receive any response from // the server that is not an error. private final ProgressiveBackoff waitExecutionBackoff; - private final Supplier<ExecutionBlockingStub> executionBlockingStubSupplier; + private final Function<ExecuteRequest, Iterator<Operation>> executeFunction; + private final Function<WaitExecutionRequest, Iterator<Operation>> waitExecutionFunction; // Last response (without error) we received from server. private Operation lastOperation; @@ -100,14 +102,16 @@ OperationObserver observer, RemoteRetrier retrier, CallCredentialsProvider callCredentialsProvider, - Supplier<ExecutionBlockingStub> executionBlockingStubSupplier) { + Function<ExecuteRequest, Iterator<Operation>> executeFunction, + Function<WaitExecutionRequest, Iterator<Operation>> waitExecutionFunction) { this.request = request; this.observer = observer; this.retrier = retrier; this.callCredentialsProvider = callCredentialsProvider; this.executeBackoff = this.retrier.newBackoff(); this.waitExecutionBackoff = new ProgressiveBackoff(this.retrier::newBackoff); - this.executionBlockingStubSupplier = executionBlockingStubSupplier; + this.executeFunction = executeFunction; + this.waitExecutionFunction = waitExecutionFunction; } ExecuteResponse start() throws IOException, InterruptedException { @@ -168,9 +172,9 @@ Preconditions.checkState(lastOperation == null); try { - Iterator<Operation> operationStream = executionBlockingStubSupplier.get().execute(request); + Iterator<Operation> operationStream = executeFunction.apply(request); return handleOperationStream(operationStream); - } catch (StatusRuntimeException e) { + } catch (Throwable e) { // If lastOperation is not null, we know the execution request is accepted by the server. In // this case, we will fallback to WaitExecution() loop when the stream is broken. if (lastOperation != null) { @@ -188,17 +192,20 @@ WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build(); try { - Iterator<Operation> operationStream = - executionBlockingStubSupplier.get().waitExecution(request); + Iterator<Operation> operationStream = waitExecutionFunction.apply(request); return handleOperationStream(operationStream); - } catch (StatusRuntimeException e) { + } catch (Throwable e) { // A NOT_FOUND error means Operation was lost on the server, retry Execute(). // // However, we only retry Execute() if executeBackoff should retry. Also increase the retry // counter at the same time (done by nextDelayMillis()). - if (e.getStatus().getCode() == Code.NOT_FOUND && executeBackoff.nextDelayMillis(e) >= 0) { - lastOperation = null; - return null; + if (e instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException) e; + if (sre.getStatus().getCode() == Code.NOT_FOUND + && executeBackoff.nextDelayMillis(sre) >= 0) { + lastOperation = null; + return null; + } } throw new IOException(e); } @@ -321,7 +328,16 @@ observer, retrier, callCredentialsProvider, - () -> this.executionBlockingStub(context.getRequestMetadata())); + (req) -> + channel.withChannelBlocking( + channel -> + this.executionBlockingStub(context.getRequestMetadata(), channel) + .execute(req)), + (req) -> + channel.withChannelBlocking( + channel -> + this.executionBlockingStub(context.getRequestMetadata(), channel) + .waitExecution(req))); return execution.start(); }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index e35d4c6..717504b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -56,6 +56,7 @@ import com.google.devtools.build.lib.remote.zstd.ZstdDecompressingOutputStream; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import io.grpc.Channel; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -122,7 +123,8 @@ return (options.maxOutboundMessageSize - overhead) / digestSize; } - private ContentAddressableStorageFutureStub casFutureStub(RemoteActionExecutionContext context) { + private ContentAddressableStorageFutureStub casFutureStub( + RemoteActionExecutionContext context, Channel channel) { return ContentAddressableStorageGrpc.newFutureStub(channel) .withInterceptors( TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()), @@ -131,7 +133,7 @@ .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS); } - private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) { + private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context, Channel channel) { return ByteStreamGrpc.newStub(channel) .withInterceptors( TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()), @@ -140,7 +142,8 @@ .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS); } - private ActionCacheFutureStub acFutureStub(RemoteActionExecutionContext context) { + private ActionCacheFutureStub acFutureStub( + RemoteActionExecutionContext context, Channel channel) { return ActionCacheGrpc.newFutureStub(channel) .withInterceptors( TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()), @@ -222,7 +225,11 @@ private ListenableFuture<FindMissingBlobsResponse> getMissingDigests( RemoteActionExecutionContext context, FindMissingBlobsRequest request) { return Utils.refreshIfUnauthenticatedAsync( - () -> retrier.executeAsync(() -> casFutureStub(context).findMissingBlobs(request)), + () -> + retrier.executeAsync( + () -> + channel.withChannelFuture( + channel -> casFutureStub(context, channel).findMissingBlobs(request))), callCredentialsProvider); } @@ -254,7 +261,10 @@ return Utils.refreshIfUnauthenticatedAsync( () -> retrier.executeAsync( - () -> handleStatus(acFutureStub(context).getActionResult(request))), + () -> + handleStatus( + channel.withChannelFuture( + channel -> acFutureStub(context, channel).getActionResult(request)))), callCredentialsProvider); } @@ -267,13 +277,15 @@ retrier.executeAsync( () -> Futures.catchingAsync( - acFutureStub(context) - .updateActionResult( - UpdateActionResultRequest.newBuilder() - .setInstanceName(options.remoteInstanceName) - .setActionDigest(actionKey.getDigest()) - .setActionResult(actionResult) - .build()), + channel.withChannelFuture( + channel -> + acFutureStub(context, channel) + .updateActionResult( + UpdateActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .setActionResult(actionResult) + .build())), StatusRuntimeException.class, (sre) -> Futures.immediateFailedFuture(new IOException(sre)), MoreExecutors.directExecutor())), @@ -317,18 +329,26 @@ @Nullable Supplier<Digest> digestSupplier) { AtomicLong offset = new AtomicLong(0); ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff); - ListenableFuture<Void> downloadFuture = + ListenableFuture<Long> downloadFuture = Utils.refreshIfUnauthenticatedAsync( () -> retrier.executeAsync( () -> - requestRead( - context, offset, progressiveBackoff, digest, out, digestSupplier), + channel.withChannelFuture( + channel -> + requestRead( + context, + offset, + progressiveBackoff, + digest, + out, + digestSupplier, + channel)), progressiveBackoff), callCredentialsProvider); return Futures.catchingAsync( - downloadFuture, + Futures.transform(downloadFuture, bytesWritten -> null, MoreExecutors.directExecutor()), StatusRuntimeException.class, (e) -> Futures.immediateFailedFuture(new IOException(e)), MoreExecutors.directExecutor()); @@ -343,17 +363,18 @@ return resourceName + DigestUtil.toString(digest); } - private ListenableFuture<Void> requestRead( + private ListenableFuture<Long> requestRead( RemoteActionExecutionContext context, AtomicLong offset, ProgressiveBackoff progressiveBackoff, Digest digest, CountingOutputStream out, - @Nullable Supplier<Digest> digestSupplier) { + @Nullable Supplier<Digest> digestSupplier, + Channel channel) { String resourceName = getResourceName(options.remoteInstanceName, digest, options.cacheCompression); - SettableFuture<Void> future = SettableFuture.create(); - bsAsyncStub(context) + SettableFuture<Long> future = SettableFuture.create(); + bsAsyncStub(context, channel) .read( ReadRequest.newBuilder() .setResourceName(resourceName) @@ -400,7 +421,7 @@ Utils.verifyBlobContents(digest, digestSupplier.get()); } out.flush(); - future.set(null); + future.set(offset.get()); } catch (IOException e) { future.setException(e); } catch (RuntimeException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 0b8c3fa..df3872e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -30,6 +30,7 @@ import com.google.devtools.build.lib.remote.util.Utils; import com.google.longrunning.Operation; import com.google.rpc.Status; +import io.grpc.Channel; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; @@ -57,7 +58,7 @@ this.retrier = retrier; } - private ExecutionBlockingStub execBlockingStub(RequestMetadata metadata) { + private ExecutionBlockingStub execBlockingStub(RequestMetadata metadata, Channel channel) { return ExecutionGrpc.newBlockingStub(channel) .withInterceptors(TracingMetadataUtils.attachMetadataInterceptor(metadata)) .withCallCredentials(callCredentialsProvider.getCallCredentials()); @@ -152,9 +153,17 @@ WaitExecutionRequest.newBuilder() .setName(operation.get().getName()) .build(); - replies = execBlockingStub(context.getRequestMetadata()).waitExecution(wr); + replies = + channel.withChannelBlocking( + channel -> + execBlockingStub(context.getRequestMetadata(), channel) + .waitExecution(wr)); } else { - replies = execBlockingStub(context.getRequestMetadata()).execute(request); + replies = + channel.withChannelBlocking( + channel -> + execBlockingStub(context.getRequestMetadata(), channel) + .execute(request)); } try { while (replies.hasNext()) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 36df5e7..ee67160 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java
@@ -13,26 +13,23 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection; import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool; import com.google.devtools.build.lib.remote.grpc.SharedConnectionFactory.SharedConnection; -import io.grpc.CallOptions; +import com.google.devtools.build.lib.remote.util.RxFutures; import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ForwardingClientCall; -import io.grpc.ForwardingClientCallListener; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Status; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; +import io.reactivex.rxjava3.annotations.CheckReturnValue; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleSource; +import io.reactivex.rxjava3.functions.Function; import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; /** * A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count. @@ -41,7 +38,7 @@ * * <p>See {@link ReferenceCounted} for more information about reference counting. */ -public class ReferenceCountedChannel extends Channel implements ReferenceCounted { +public class ReferenceCountedChannel implements ReferenceCounted { private final DynamicConnectionPool dynamicConnectionPool; private final AbstractReferenceCounted referenceCounted = new AbstractReferenceCounted() { @@ -59,7 +56,6 @@ return this; } }; - private final AtomicReference<String> authorityRef = new AtomicReference<>(); public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory) { this(connectionFactory, /*maxConnections=*/ 0); @@ -75,93 +71,42 @@ return dynamicConnectionPool.isClosed(); } - /** A {@link ClientCall} which call {@link SharedConnection#close()} after the RPC is closed. */ - static class ConnectionCleanupCall<ReqT, RespT> - extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> { - private final SharedConnection connection; - - protected ConnectionCleanupCall(ClientCall<ReqT, RespT> delegate, SharedConnection connection) { - super(delegate); - this.connection = connection; - } - - @Override - public void start(Listener<RespT> responseListener, Metadata headers) { - super.start( - new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>( - responseListener) { - @Override - public void onClose(Status status, Metadata trailers) { - try { - connection.close(); - } catch (IOException e) { - throw new AssertionError(e.getMessage(), e); - } finally { - super.onClose(status, trailers); - } - } - }, - headers); - } + @CheckReturnValue + public <T> ListenableFuture<T> withChannelFuture( + Function<Channel, ? extends ListenableFuture<T>> source) { + return RxFutures.toListenableFuture( + withChannel(channel -> RxFutures.toSingle(() -> source.apply(channel), directExecutor()))); } - private static class CloseOnStartClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> { - private final Status status; - - CloseOnStartClientCall(Status status) { - this.status = status; - } - - @Override - public void start(Listener<RespT> responseListener, Metadata headers) { - responseListener.onClose(status, new Metadata()); - } - - @Override - public void request(int numMessages) {} - - @Override - public void cancel(@Nullable String message, @Nullable Throwable cause) {} - - @Override - public void halfClose() {} - - @Override - public void sendMessage(ReqT message) {} - } - - private SharedConnection acquireSharedConnection() throws IOException, InterruptedException { + public <T> T withChannelBlocking(Function<Channel, T> source) + throws IOException, InterruptedException { try { - SharedConnection sharedConnection = dynamicConnectionPool.create().blockingGet(); - ChannelConnection connection = (ChannelConnection) sharedConnection.getUnderlyingConnection(); - authorityRef.compareAndSet(null, connection.getChannel().authority()); - return sharedConnection; + return withChannel(channel -> Single.just(source.apply(channel))).blockingGet(); } catch (RuntimeException e) { - Throwables.throwIfInstanceOf(e.getCause(), IOException.class); - Throwables.throwIfInstanceOf(e.getCause(), InterruptedException.class); + Throwable cause = e.getCause(); + if (cause != null) { + throwIfInstanceOf(cause, IOException.class); + throwIfInstanceOf(cause, InterruptedException.class); + } throw e; } } - @Override - public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( - MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { - try { - SharedConnection sharedConnection = acquireSharedConnection(); - return new ConnectionCleanupCall<>( - sharedConnection.call(methodDescriptor, callOptions), sharedConnection); - } catch (IOException e) { - return new CloseOnStartClientCall<>(Status.UNKNOWN.withCause(e)); - } catch (InterruptedException e) { - return new CloseOnStartClientCall<>(Status.CANCELLED.withCause(e)); - } - } - - @Override - public String authority() { - String authority = authorityRef.get(); - checkNotNull(authority, "create a connection first to get the authority"); - return authority; + @CheckReturnValue + public <T> Single<T> withChannel(Function<Channel, ? extends SingleSource<? extends T>> source) { + return dynamicConnectionPool + .create() + .flatMap( + sharedConnection -> + Single.using( + () -> sharedConnection, + conn -> { + ChannelConnection connection = + (ChannelConnection) sharedConnection.getUnderlyingConnection(); + Channel channel = connection.getChannel(); + return source.apply(channel); + }, + SharedConnection::close)); } @Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 4e2db8d..d21a3ee 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -96,6 +96,7 @@ import com.google.devtools.common.options.OptionsBase; import com.google.devtools.common.options.OptionsParsingResult; import io.grpc.CallCredentials; +import io.grpc.Channel; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -516,7 +517,15 @@ String remoteBytestreamUriPrefix = remoteOptions.remoteBytestreamUriPrefix; if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) { - remoteBytestreamUriPrefix = cacheChannel.authority(); + try { + remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority); + } catch (IOException e) { + handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); + return; + } catch (InterruptedException e) { + handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE); + return; + } if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) { remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName; }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java index 6eb03ce..6d48648 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java
@@ -31,6 +31,7 @@ import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import io.grpc.CallCredentials; +import io.grpc.Channel; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.List; @@ -59,7 +60,8 @@ this.retrier = retrier; } - private CapabilitiesBlockingStub capabilitiesBlockingStub(RemoteActionExecutionContext context) { + private CapabilitiesBlockingStub capabilitiesBlockingStub( + RemoteActionExecutionContext context, Channel channel) { return CapabilitiesGrpc.newBlockingStub(channel) .withInterceptors( TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata())) @@ -77,7 +79,10 @@ instanceName == null ? GetCapabilitiesRequest.getDefaultInstance() : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); - return retrier.execute(() -> capabilitiesBlockingStub(context).getCapabilities(request)); + return retrier.execute( + () -> + channel.withChannelBlocking( + channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request))); } catch (StatusRuntimeException e) { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java index 5dbbb07..b9b3912 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java +++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
@@ -354,8 +354,11 @@ try { return uploadAsync(context, remoteCache, reporter).blockingGet(); } catch (RuntimeException e) { - throwIfInstanceOf(e.getCause(), InterruptedException.class); - throwIfInstanceOf(e.getCause(), IOException.class); + Throwable cause = e.getCause(); + if (cause != null) { + throwIfInstanceOf(cause, InterruptedException.class); + throwIfInstanceOf(cause, IOException.class); + } throw e; } }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java index a0bc56b..c3456eb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java
@@ -38,6 +38,7 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import io.grpc.CallCredentials; +import io.grpc.Channel; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.io.OutputStream; @@ -122,7 +123,12 @@ newFetchBlobRequest(options.remoteInstanceName, urls, authHeaders, checksum, canonicalId); try { FetchBlobResponse response = - retrier.execute(() -> fetchBlockingStub(remoteActionExecutionContext).fetchBlob(request)); + retrier.execute( + () -> + channel.withChannelBlocking( + channel -> + fetchBlockingStub(remoteActionExecutionContext, channel) + .fetchBlob(request))); final Digest blobDigest = response.getBlobDigest(); retrier.execute( @@ -172,7 +178,8 @@ return requestBuilder.build(); } - private FetchBlockingStub fetchBlockingStub(RemoteActionExecutionContext context) { + private FetchBlockingStub fetchBlockingStub( + RemoteActionExecutionContext context, Channel channel) { return FetchGrpc.newBlockingStub(channel) .withInterceptors( TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java index 7eb07d4..d86cfd8 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
@@ -13,7 +13,6 @@ // limitations under the License. package com.google.devtools.build.lib.remote.util; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.util.concurrent.AbstractFuture; @@ -31,7 +30,7 @@ import io.reactivex.rxjava3.core.SingleOnSubscribe; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.Exceptions; -import java.util.concurrent.Callable; +import io.reactivex.rxjava3.functions.Supplier; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,7 +47,7 @@ * completed. * * <p>A {@link ListenableFuture} represents some computation that is already in progress. We use - * {@link Callable} here to defer the execution of the thing that produces ListenableFuture until + * {@link Supplier} here to defer the execution of the thing that produces ListenableFuture until * there is subscriber. * * <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple @@ -57,19 +56,19 @@ * <p>Disposes the Completable to cancel the underlying ListenableFuture. */ public static Completable toCompletable( - Callable<ListenableFuture<Void>> callable, Executor executor) { - return Completable.create(new OnceCompletableOnSubscribe(callable, executor)); + Supplier<ListenableFuture<Void>> supplier, Executor executor) { + return Completable.create(new OnceCompletableOnSubscribe(supplier, executor)); } private static class OnceCompletableOnSubscribe implements CompletableOnSubscribe { private final AtomicBoolean subscribed = new AtomicBoolean(false); - private final Callable<ListenableFuture<Void>> callable; + private final Supplier<ListenableFuture<Void>> supplier; private final Executor executor; private OnceCompletableOnSubscribe( - Callable<ListenableFuture<Void>> callable, Executor executor) { - this.callable = callable; + Supplier<ListenableFuture<Void>> supplier, Executor executor) { + this.supplier = supplier; this.executor = executor; } @@ -77,7 +76,7 @@ public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable { try { checkState(!subscribed.getAndSet(true), "This completable cannot be subscribed to twice"); - ListenableFuture<Void> future = callable.call(); + ListenableFuture<Void> future = supplier.get(); Futures.addCallback( future, new FutureCallback<Void>() { @@ -120,7 +119,7 @@ * completed. * * <p>A {@link ListenableFuture} represents some computation that is already in progress. We use - * {@link Callable} here to defer the execution of the thing that produces ListenableFuture until + * {@link Supplier} here to defer the execution of the thing that produces ListenableFuture until * there is subscriber. * * <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple @@ -128,18 +127,18 @@ * * <p>Disposes the Single to cancel the underlying ListenableFuture. */ - public static <T> Single<T> toSingle(Callable<ListenableFuture<T>> callable, Executor executor) { - return Single.create(new OnceSingleOnSubscribe<>(callable, executor)); + public static <T> Single<T> toSingle(Supplier<ListenableFuture<T>> supplier, Executor executor) { + return Single.create(new OnceSingleOnSubscribe<>(supplier, executor)); } private static class OnceSingleOnSubscribe<T> implements SingleOnSubscribe<T> { private final AtomicBoolean subscribed = new AtomicBoolean(false); - private final Callable<ListenableFuture<T>> callable; + private final Supplier<ListenableFuture<T>> supplier; private final Executor executor; - private OnceSingleOnSubscribe(Callable<ListenableFuture<T>> callable, Executor executor) { - this.callable = callable; + private OnceSingleOnSubscribe(Supplier<ListenableFuture<T>> supplier, Executor executor) { + this.supplier = supplier; this.executor = executor; } @@ -147,13 +146,12 @@ public void subscribe(@NonNull SingleEmitter<T> emitter) throws Throwable { try { checkState(!subscribed.getAndSet(true), "This single cannot be subscribed to twice"); - ListenableFuture<T> future = callable.call(); + ListenableFuture<T> future = supplier.get(); Futures.addCallback( future, new FutureCallback<T>() { @Override public void onSuccess(@Nullable T t) { - checkNotNull(t, "value in future onSuccess callback is null"); emitter.onSuccess(t); }