blob: c5aa657a61e66b7aa82b24cad23a967640f4a0ec [file] [log] [blame]
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import javax.annotation.Nullable;
/** Utility methods for the Rx. * */
public class RxUtils {
private RxUtils() {}
/** Result of an I/O operation to remote cache. */
public static class TransferResult {
private static final TransferResult OK = new TransferResult(null, false);
private static final TransferResult INTERRUPTED = new TransferResult(null, true);
public static TransferResult ok() {
return OK;
}
public static TransferResult interrupted() {
return INTERRUPTED;
}
public static TransferResult error(IOException error) {
return new TransferResult(error, false);
}
@Nullable private final IOException error;
private final boolean interrupted;
TransferResult(@Nullable IOException error, boolean interrupted) {
this.error = error;
this.interrupted = interrupted;
}
/** Returns {@code true} if the operation succeed. */
public boolean isOk() {
return error == null && !interrupted;
}
/** Returns {@code true} if the operation failed. */
public boolean isError() {
return error != null;
}
public boolean isInterrupted() {
return interrupted;
}
/** Returns the IO error if the operation failed. */
@Nullable
public IOException getError() {
return error;
}
}
/**
* Converts the {@link Completable} to {@link Single} which will emit {@link TransferResult} on
* complete or IO errors. Other errors will be propagated to downstream.
*/
public static Single<TransferResult> toTransferResult(Completable completable) {
return completable
.toSingleDefault(TransferResult.ok())
.onErrorResumeNext(
error -> {
if (error instanceof IOException) {
return Single.just(TransferResult.error((IOException) error));
} else if (error instanceof InterruptedException) {
return Single.just(TransferResult.interrupted());
} else {
return Single.error(error);
}
});
}
private static class BulkTransferExceptionCollector {
private BulkTransferException bulkTransferException;
private boolean interrupted = false;
void onResult(TransferResult result) {
if (result.isOk()) {
return;
}
if (result.isInterrupted()) {
interrupted = true;
return;
}
IOException error = checkNotNull(result.getError());
if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(error);
}
Completable toCompletable() {
if (interrupted) {
return Completable.error(new InterruptedException());
}
if (bulkTransferException != null) {
return Completable.error(bulkTransferException);
}
return Completable.complete();
}
}
/**
* Returns a {@link Completable} which will complete when the {@link Flowable} complete.
*
* <p>Errors of {@link TransferResult#getError()} are wrapped in {@link BulkTransferException}.
* Other errors are propagated to downstream.
*/
public static Completable mergeBulkTransfer(Flowable<TransferResult> transfers) {
return transfers
.collectInto(new BulkTransferExceptionCollector(), BulkTransferExceptionCollector::onResult)
.flatMapCompletable(BulkTransferExceptionCollector::toCompletable);
}
/**
* Returns a {@link Completable} which will complete when all the passed in {@link Completable}s
* complete.
*
* <p>{@link IOException}s emitted by the passed in {@link Completable}s are wrapped in {@link
* BulkTransferException}. Other errors are propagated to downstream.
*/
public static Completable mergeBulkTransfer(Completable... transfers) {
Flowable<TransferResult> flowable =
Flowable.fromArray(transfers).flatMapSingle(RxUtils::toTransferResult);
return mergeBulkTransfer(flowable);
}
}