| // Copyright 2021 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.remote.util; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import com.google.common.util.concurrent.AbstractFuture; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import io.reactivex.rxjava3.annotations.NonNull; |
| import io.reactivex.rxjava3.core.Completable; |
| import io.reactivex.rxjava3.core.CompletableEmitter; |
| import io.reactivex.rxjava3.core.CompletableObserver; |
| import io.reactivex.rxjava3.core.CompletableOnSubscribe; |
| import io.reactivex.rxjava3.core.Single; |
| import io.reactivex.rxjava3.core.SingleEmitter; |
| import io.reactivex.rxjava3.core.SingleObserver; |
| import io.reactivex.rxjava3.core.SingleOnSubscribe; |
| import io.reactivex.rxjava3.disposables.Disposable; |
| import io.reactivex.rxjava3.exceptions.Exceptions; |
| import io.reactivex.rxjava3.functions.Supplier; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| |
| /** Methods for interoperating between Rx and ListenableFuture. */ |
| public class RxFutures { |
| |
| private RxFutures() {} |
| |
| /** |
| * Returns a {@link Completable} that is complete once the supplied {@link ListenableFuture} has |
| * completed. |
| * |
| * <p>A {@link ListenableFuture} represents some computation that is already in progress. We use |
| * {@link Supplier} here to defer the execution of the thing that produces ListenableFuture until |
| * there is subscriber. |
| * |
| * <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple |
| * subscriptions are not allowed. |
| * |
| * <p>Disposes the Completable to cancel the underlying ListenableFuture. |
| */ |
| public static Completable toCompletable( |
| Supplier<ListenableFuture<Void>> supplier, Executor executor) { |
| return Completable.create(new OnceCompletableOnSubscribe(supplier, executor)); |
| } |
| |
| private static class OnceCompletableOnSubscribe implements CompletableOnSubscribe { |
| private final AtomicBoolean subscribed = new AtomicBoolean(false); |
| |
| private final Supplier<ListenableFuture<Void>> supplier; |
| private final Executor executor; |
| |
| private OnceCompletableOnSubscribe( |
| Supplier<ListenableFuture<Void>> supplier, Executor executor) { |
| this.supplier = supplier; |
| this.executor = executor; |
| } |
| |
| @Override |
| public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable { |
| try { |
| checkState(!subscribed.getAndSet(true), "This completable cannot be subscribed to twice"); |
| ListenableFuture<Void> future = supplier.get(); |
| Futures.addCallback( |
| future, |
| new FutureCallback<Void>() { |
| @Override |
| public void onSuccess(@Nullable Void t) { |
| emitter.onComplete(); |
| } |
| |
| @Override |
| public void onFailure(Throwable throwable) { |
| /* |
| * CancellationException can be thrown in two cases: |
| * 1. The ListenableFuture itself is cancelled. |
| * 2. Completable is disposed by downstream. |
| * |
| * This check is used to prevent propagating CancellationException to downstream |
| * when it has already disposed the Completable. |
| */ |
| if (throwable instanceof CancellationException && emitter.isDisposed()) { |
| return; |
| } |
| |
| emitter.onError(throwable); |
| } |
| }, |
| executor); |
| emitter.setCancellable(() -> future.cancel(true)); |
| } catch (Throwable t) { |
| // We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer |
| // to pass RuntimeExceptions and Errors down to the subscriber except for certain |
| // "fatal" exceptions. |
| Exceptions.throwIfFatal(t); |
| executor.execute(() -> emitter.onError(t)); |
| } |
| } |
| } |
| |
| /** |
| * Returns a {@link Single} that is complete once the supplied {@link ListenableFuture} has |
| * completed. |
| * |
| * <p>A {@link ListenableFuture} represents some computation that is already in progress. We use |
| * {@link Supplier} here to defer the execution of the thing that produces ListenableFuture until |
| * there is subscriber. |
| * |
| * <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple |
| * subscriptions are not allowed. |
| * |
| * <p>Disposes the Single to cancel the underlying ListenableFuture. |
| */ |
| public static <T> Single<T> toSingle(Supplier<ListenableFuture<T>> supplier, Executor executor) { |
| return Single.create(new OnceSingleOnSubscribe<>(supplier, executor)); |
| } |
| |
| private static class OnceSingleOnSubscribe<T> implements SingleOnSubscribe<T> { |
| private final AtomicBoolean subscribed = new AtomicBoolean(false); |
| |
| private final Supplier<ListenableFuture<T>> supplier; |
| private final Executor executor; |
| |
| private OnceSingleOnSubscribe(Supplier<ListenableFuture<T>> supplier, Executor executor) { |
| this.supplier = supplier; |
| this.executor = executor; |
| } |
| |
| @Override |
| public void subscribe(@NonNull SingleEmitter<T> emitter) throws Throwable { |
| try { |
| checkState(!subscribed.getAndSet(true), "This single cannot be subscribed to twice"); |
| ListenableFuture<T> future = supplier.get(); |
| Futures.addCallback( |
| future, |
| new FutureCallback<T>() { |
| @Override |
| public void onSuccess(@Nullable T t) { |
| emitter.onSuccess(t); |
| } |
| |
| @Override |
| public void onFailure(Throwable throwable) { |
| /* |
| * CancellationException can be thrown in two cases: |
| * 1. The ListenableFuture itself is cancelled. |
| * 2. Single is disposed by downstream. |
| * |
| * This check is used to prevent propagating CancellationException to downstream |
| * when it has already disposed the Single. |
| */ |
| if (throwable instanceof CancellationException && emitter.isDisposed()) { |
| return; |
| } |
| |
| emitter.onError(throwable); |
| } |
| }, |
| executor); |
| emitter.setCancellable(() -> future.cancel(true)); |
| } catch (Throwable t) { |
| // We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer |
| // to pass RuntimeExceptions and Errors down to the subscriber except for certain |
| // "fatal" exceptions. |
| Exceptions.throwIfFatal(t); |
| executor.execute(() -> emitter.onError(t)); |
| } |
| } |
| } |
| |
| /** |
| * Returns a {@link ListenableFuture} that is complete once the {@link Completable} has completed. |
| * |
| * <p>Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to |
| * the {@link Completable} will automatically be cancelled. |
| */ |
| public static ListenableFuture<Void> toListenableFuture(Completable completable) { |
| CompletableFuture<Void> future = new CompletableFuture<>(); |
| completable.subscribe( |
| new CompletableObserver() { |
| @Override |
| public void onSubscribe(Disposable d) { |
| future.setCancelCallback(d); |
| } |
| |
| @Override |
| public void onComplete() { |
| // Making the Completable as complete. |
| future.set(null); |
| } |
| |
| @Override |
| public void onError(Throwable e) { |
| future.setException(e); |
| } |
| }); |
| return future; |
| } |
| |
| /** |
| * Returns a {@link ListenableFuture} that is complete once the {@link Single} has succeeded. |
| * |
| * <p>Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to |
| * the {@link Single} will automatically be cancelled. |
| */ |
| public static <T> ListenableFuture<T> toListenableFuture(Single<T> single) { |
| CompletableFuture<T> future = new CompletableFuture<>(); |
| single.subscribe( |
| new SingleObserver<T>() { |
| @Override |
| public void onSubscribe(Disposable d) { |
| future.setCancelCallback(d); |
| } |
| |
| @Override |
| public void onSuccess(@NonNull T t) { |
| future.set(t); |
| } |
| |
| @Override |
| public void onError(Throwable e) { |
| future.setException(e); |
| } |
| }); |
| return future; |
| } |
| |
| private static final class CompletableFuture<T> extends AbstractFuture<T> { |
| private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>(); |
| |
| private void setCancelCallback(Disposable cancelCallback) { |
| this.cancelCallback.set(cancelCallback); |
| // Just in case it was already canceled before we set the callback. |
| doCancelIfCancelled(); |
| } |
| |
| private void doCancelIfCancelled() { |
| if (isCancelled()) { |
| Disposable callback = cancelCallback.getAndSet(null); |
| if (callback != null) { |
| callback.dispose(); |
| } |
| } |
| } |
| |
| @Override |
| protected void afterDone() { |
| doCancelIfCancelled(); |
| } |
| |
| // Allow set to be called by other members. |
| @Override |
| protected boolean set(@Nullable T t) { |
| return super.set(t); |
| } |
| |
| // Allow setException to be called by other members. |
| @Override |
| protected boolean setException(Throwable throwable) { |
| return super.setException(throwable); |
| } |
| } |
| } |