blob: a606e3d964f64390ad68250d2136128fb3b7d70f [file] [log] [blame]
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.grpc;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.MethodDescriptor;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A {@link ConnectionPool} that creates one connection using provided {@link ConnectionFactory} and
* shares the connection upto {@code maxConcurrency}.
*
* <p>This is useful if underlying connection maintains a connection pool internally. (such as
* {@code Channel} in gRPC)
*
* <p>Connections must be closed with {@link Connection#close()} in order to be reused later.
*/
@ThreadSafe
public class SharedConnectionFactory implements ConnectionPool {
private final TokenBucket<Integer> tokenBucket;
private final ConnectionFactory factory;
@Nullable
@GuardedBy("this")
private AsyncSubject<Connection> connectionAsyncSubject = null;
private final AtomicReference<Disposable> connectionCreationDisposable =
new AtomicReference<>(null);
public SharedConnectionFactory(ConnectionFactory factory, int maxConcurrency) {
this.factory = factory;
List<Integer> initialTokens = new ArrayList<>(maxConcurrency);
for (int i = 0; i < maxConcurrency; ++i) {
initialTokens.add(i);
}
this.tokenBucket = new TokenBucket<>(initialTokens);
}
@Override
public void close() throws IOException {
tokenBucket.close();
Disposable d = connectionCreationDisposable.getAndSet(null);
if (d != null && !d.isDisposed()) {
d.dispose();
}
synchronized (this) {
if (connectionAsyncSubject != null) {
Connection connection = connectionAsyncSubject.getValue();
if (connection != null) {
connection.close();
}
if (!connectionAsyncSubject.hasComplete()) {
connectionAsyncSubject.onError(new IllegalStateException("closed"));
}
}
}
}
private AsyncSubject<Connection> createUnderlyingConnectionIfNot() {
synchronized (this) {
if (connectionAsyncSubject == null || connectionAsyncSubject.hasThrowable()) {
connectionAsyncSubject =
factory
.create()
.doOnSubscribe(connectionCreationDisposable::set)
.toObservable()
.subscribeWith(AsyncSubject.create());
}
return connectionAsyncSubject;
}
}
private Single<? extends Connection> acquireConnection() {
return Single.fromObservable(createUnderlyingConnectionIfNot());
}
/**
* Reuses the underlying {@link Connection} and wait for it to be released if is exceeding {@code
* maxConcurrency}.
*/
@Override
public Single<SharedConnection> create() {
return tokenBucket
.acquireToken()
.flatMap(
token ->
acquireConnection()
.doOnError(ignored -> tokenBucket.addToken(token))
.doOnDispose(() -> tokenBucket.addToken(token))
.map(
conn ->
new SharedConnection(
conn, /* onClose= */ () -> tokenBucket.addToken(token))));
}
/** Returns current number of available connections. */
public int numAvailableConnections() {
return tokenBucket.size();
}
/** A {@link Connection} which wraps an underlying connection and is shared between consumers. */
public static class SharedConnection implements Connection {
private final Connection connection;
private final Action onClose;
public SharedConnection(Connection connection, Action onClose) {
this.connection = connection;
this.onClose = onClose;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> call(
MethodDescriptor<ReqT, RespT> method, CallOptions options) {
return connection.call(method, options);
}
@Override
public void close() throws IOException {
try {
onClose.run();
} catch (Throwable t) {
throw new IOException(t);
}
}
/** Returns the underlying connection this shared connection built on */
public Connection getUnderlyingConnection() {
return connection;
}
}
}