Delete CompletableFuture.
I noticed that CompletableFuture became more prevalant in https://github.com/bazelbuild/bazel/commit/a226eed7a48aa3637c8846b475cf429b0b41caf7. However, I find it's generally simpler to use plain-old ListenableFuture.addListener to propagate cancellations.
Closes #15469.
PiperOrigin-RevId: 449461782
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index b54b6d2..5899321 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -14,9 +14,7 @@
package com.google.devtools.build.lib.remote;
-import static com.google.bytestream.ByteStreamGrpc.getReadMethod;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub;
@@ -43,6 +41,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
@@ -51,7 +50,6 @@
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
-import com.google.devtools.build.lib.remote.util.CompletableFuture;
import com.google.devtools.build.lib.remote.util.DigestOutputStream;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -60,7 +58,7 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
-import io.grpc.ClientCall;
+import io.grpc.Context;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
@@ -366,84 +364,86 @@
Channel channel) {
String resourceName =
getResourceName(options.remoteInstanceName, digest, options.cacheCompression);
- CompletableFuture<Long> future = CompletableFuture.create();
+ SettableFuture<Long> future = SettableFuture.create();
OutputStream out;
try {
out = options.cacheCompression ? new ZstdDecompressingOutputStream(rawOut) : rawOut;
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
- ByteStreamStub stub = bsAsyncStub(context, channel);
- ClientCall<ReadRequest, ReadResponse> clientCall =
- stub.getChannel().newCall(getReadMethod(), stub.getCallOptions());
- future.setCancelCallback(() -> clientCall.cancel("Cancelled", /* cause= */ null));
- asyncServerStreamingCall(
- clientCall,
- ReadRequest.newBuilder()
- .setResourceName(resourceName)
- .setReadOffset(rawOut.getCount())
- .build(),
- new StreamObserver<ReadResponse>() {
+ Context.CancellableContext grpcContext = Context.current().withCancellation();
+ future.addListener(() -> grpcContext.cancel(null), MoreExecutors.directExecutor());
+ grpcContext.run(
+ () ->
+ bsAsyncStub(context, channel)
+ .read(
+ ReadRequest.newBuilder()
+ .setResourceName(resourceName)
+ .setReadOffset(rawOut.getCount())
+ .build(),
+ new StreamObserver<ReadResponse>() {
+ @Override
+ public void onNext(ReadResponse readResponse) {
+ ByteString data = readResponse.getData();
+ try {
+ data.writeTo(out);
+ } catch (IOException e) {
+ // Cancel the call.
+ throw new RuntimeException(e);
+ }
+ // reset the stall backoff because we've made progress or been kept alive
+ progressiveBackoff.reset();
+ }
- @Override
- public void onNext(ReadResponse readResponse) {
- ByteString data = readResponse.getData();
- try {
- data.writeTo(out);
- } catch (IOException e) {
- // Cancel the call.
- throw new RuntimeException(e);
- }
- // reset the stall backoff because we've made progress or been kept alive
- progressiveBackoff.reset();
- }
+ @Override
+ public void onError(Throwable t) {
+ if (rawOut.getCount() == digest.getSizeBytes()) {
+ // If the file was fully downloaded, it doesn't matter if there was an
+ // error at
+ // the end of the stream.
+ logger.atInfo().withCause(t).log(
+ "ignoring error because file was fully received");
+ onCompleted();
+ return;
+ }
+ releaseOut();
+ Status status = Status.fromThrowable(t);
+ if (status.getCode() == Status.Code.NOT_FOUND) {
+ future.setException(new CacheNotFoundException(digest));
+ } else {
+ future.setException(t);
+ }
+ }
- @Override
- public void onError(Throwable t) {
- if (rawOut.getCount() == digest.getSizeBytes()) {
- // If the file was fully downloaded, it doesn't matter if there was an error at
- // the end of the stream.
- logger.atInfo().withCause(t).log("ignoring error because file was fully received");
- onCompleted();
- return;
- }
- releaseOut();
- Status status = Status.fromThrowable(t);
- if (status.getCode() == Status.Code.NOT_FOUND) {
- future.setException(new CacheNotFoundException(digest));
- } else {
- future.setException(t);
- }
- }
+ @Override
+ public void onCompleted() {
+ try {
+ if (digestSupplier != null) {
+ Utils.verifyBlobContents(digest, digestSupplier.get());
+ }
+ out.flush();
+ future.set(rawOut.getCount());
+ } catch (IOException e) {
+ future.setException(e);
+ } catch (RuntimeException e) {
+ logger.atWarning().withCause(e).log("Unexpected exception");
+ future.setException(e);
+ } finally {
+ releaseOut();
+ }
+ }
- @Override
- public void onCompleted() {
- try {
- if (digestSupplier != null) {
- Utils.verifyBlobContents(digest, digestSupplier.get());
- }
- out.flush();
- future.set(rawOut.getCount());
- } catch (IOException e) {
- future.setException(e);
- } catch (RuntimeException e) {
- logger.atWarning().withCause(e).log("Unexpected exception");
- future.setException(e);
- } finally {
- releaseOut();
- }
- }
-
- private void releaseOut() {
- if (out instanceof ZstdDecompressingOutputStream) {
- try {
- ((ZstdDecompressingOutputStream) out).closeShallow();
- } catch (IOException e) {
- logger.atWarning().withCause(e).log("failed to cleanly close output stream");
- }
- }
- }
- });
+ private void releaseOut() {
+ if (out instanceof ZstdDecompressingOutputStream) {
+ try {
+ ((ZstdDecompressingOutputStream) out).closeShallow();
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log(
+ "failed to cleanly close output stream");
+ }
+ }
+ }
+ }));
return future;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 504c186..8678b76 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -41,7 +41,6 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
-import com.google.devtools.build.lib.remote.util.CompletableFuture;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
@@ -331,45 +330,20 @@
reporter.started();
OutputStream out = new ReportingOutputStream(new LazyFileOutputStream(path), reporter);
- CompletableFuture<Void> outerF = CompletableFuture.create();
ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
- outerF.setCancelCallback(() -> f.cancel(/* mayInterruptIfRunning= */ true));
- Futures.addCallback(
- f,
- new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- try {
- out.close();
- outerF.set(null);
- reporter.finished();
- } catch (IOException e) {
- outerF.setException(e);
- } catch (RuntimeException e) {
- logger.atWarning().withCause(e).log("Unexpected exception");
- outerF.setException(e);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- try {
- out.close();
- reporter.finished();
- } catch (IOException e) {
- if (t != e) {
- t.addSuppressed(e);
- }
- } catch (RuntimeException e) {
- logger.atWarning().withCause(e).log("Unexpected exception");
- t.addSuppressed(e);
- } finally {
- outerF.setException(t);
- }
+ f.addListener(
+ () -> {
+ try {
+ out.close();
+ reporter.finished();
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log(
+ "Unexpected exception closing output stream after downloading %s/%d to %s",
+ digest.getHash(), digest.getSizeBytes(), path);
}
},
directExecutor());
- return outerF;
+ return f;
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/CompletableFuture.java b/src/main/java/com/google/devtools/build/lib/remote/util/CompletableFuture.java
deleted file mode 100644
index 6a6566d..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/util/CompletableFuture.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2022 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote.util;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import io.reactivex.rxjava3.disposables.Disposable;
-import io.reactivex.rxjava3.functions.Action;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
-
-/**
- * A {@link com.google.common.util.concurrent.ListenableFuture} whose result can be set by a {@link
- * #set(Object)} or {@link #setException(Throwable)}.
- *
- * <p>It differs from {@link com.google.common.util.concurrent.SettableFuture} that it provides
- * {@link #setCancelCallback(Disposable)} for callers to register a callback which is called when
- * the future is cancelled.
- */
-public final class CompletableFuture<T> extends AbstractFuture<T> {
-
- public static <T> CompletableFuture<T> create() {
- return new CompletableFuture<>();
- }
-
- private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>();
-
- public void setCancelCallback(Action action) {
- setCancelCallback(Disposable.fromAction(action));
- }
-
- public void setCancelCallback(Disposable cancelCallback) {
- this.cancelCallback.set(cancelCallback);
- // Just in case it was already canceled before we set the callback.
- doCancelIfCancelled();
- }
-
- private void doCancelIfCancelled() {
- if (isCancelled()) {
- Disposable callback = cancelCallback.getAndSet(null);
- if (callback != null) {
- callback.dispose();
- }
- }
- }
-
- @Override
- protected void afterDone() {
- doCancelIfCancelled();
- }
-
- // Allow set to be called by other members.
- @Override
- public boolean set(@Nullable T t) {
- return super.set(t);
- }
-
- // Allow setException to be called by other members.
- @Override
- public boolean setException(Throwable throwable) {
- return super.setException(throwable);
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
index efd3c77..f04838b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
@@ -14,10 +14,12 @@
package com.google.devtools.build.lib.remote.util;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
@@ -189,12 +191,18 @@
* the {@link Completable} will automatically be cancelled.
*/
public static ListenableFuture<Void> toListenableFuture(Completable completable) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ SettableFuture<Void> future = SettableFuture.create();
completable.subscribe(
new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
- future.setCancelCallback(d);
+ future.addListener(
+ () -> {
+ if (future.isCancelled()) {
+ d.dispose();
+ }
+ },
+ directExecutor());
}
@Override
@@ -222,12 +230,18 @@
* the {@link Single} will automatically be cancelled.
*/
public static <T> ListenableFuture<T> toListenableFuture(Single<T> single) {
- CompletableFuture<T> future = new CompletableFuture<>();
+ SettableFuture<T> future = SettableFuture.create();
single.subscribe(
new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
- future.setCancelCallback(d);
+ future.addListener(
+ () -> {
+ if (future.isCancelled()) {
+ d.dispose();
+ }
+ },
+ directExecutor());
}
@Override