Delete CompletableFuture.

I noticed that CompletableFuture became more prevalant in https://github.com/bazelbuild/bazel/commit/a226eed7a48aa3637c8846b475cf429b0b41caf7. However, I find it's generally simpler to use plain-old ListenableFuture.addListener to propagate cancellations.

Closes #15469.

PiperOrigin-RevId: 449461782
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index b54b6d2..5899321 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -14,9 +14,7 @@
 
 package com.google.devtools.build.lib.remote;
 
-import static com.google.bytestream.ByteStreamGrpc.getReadMethod;
 import static com.google.common.base.Strings.isNullOrEmpty;
-import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
 
 import build.bazel.remote.execution.v2.ActionCacheGrpc;
 import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub;
@@ -43,6 +41,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
@@ -51,7 +50,6 @@
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
-import com.google.devtools.build.lib.remote.util.CompletableFuture;
 import com.google.devtools.build.lib.remote.util.DigestOutputStream;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -60,7 +58,7 @@
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
 import io.grpc.Channel;
-import io.grpc.ClientCall;
+import io.grpc.Context;
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
@@ -366,84 +364,86 @@
       Channel channel) {
     String resourceName =
         getResourceName(options.remoteInstanceName, digest, options.cacheCompression);
-    CompletableFuture<Long> future = CompletableFuture.create();
+    SettableFuture<Long> future = SettableFuture.create();
     OutputStream out;
     try {
       out = options.cacheCompression ? new ZstdDecompressingOutputStream(rawOut) : rawOut;
     } catch (IOException e) {
       return Futures.immediateFailedFuture(e);
     }
-    ByteStreamStub stub = bsAsyncStub(context, channel);
-    ClientCall<ReadRequest, ReadResponse> clientCall =
-        stub.getChannel().newCall(getReadMethod(), stub.getCallOptions());
-    future.setCancelCallback(() -> clientCall.cancel("Cancelled", /* cause= */ null));
-    asyncServerStreamingCall(
-        clientCall,
-        ReadRequest.newBuilder()
-            .setResourceName(resourceName)
-            .setReadOffset(rawOut.getCount())
-            .build(),
-        new StreamObserver<ReadResponse>() {
+    Context.CancellableContext grpcContext = Context.current().withCancellation();
+    future.addListener(() -> grpcContext.cancel(null), MoreExecutors.directExecutor());
+    grpcContext.run(
+        () ->
+            bsAsyncStub(context, channel)
+                .read(
+                    ReadRequest.newBuilder()
+                        .setResourceName(resourceName)
+                        .setReadOffset(rawOut.getCount())
+                        .build(),
+                    new StreamObserver<ReadResponse>() {
+                      @Override
+                      public void onNext(ReadResponse readResponse) {
+                        ByteString data = readResponse.getData();
+                        try {
+                          data.writeTo(out);
+                        } catch (IOException e) {
+                          // Cancel the call.
+                          throw new RuntimeException(e);
+                        }
+                        // reset the stall backoff because we've made progress or been kept alive
+                        progressiveBackoff.reset();
+                      }
 
-          @Override
-          public void onNext(ReadResponse readResponse) {
-            ByteString data = readResponse.getData();
-            try {
-              data.writeTo(out);
-            } catch (IOException e) {
-              // Cancel the call.
-              throw new RuntimeException(e);
-            }
-            // reset the stall backoff because we've made progress or been kept alive
-            progressiveBackoff.reset();
-          }
+                      @Override
+                      public void onError(Throwable t) {
+                        if (rawOut.getCount() == digest.getSizeBytes()) {
+                          // If the file was fully downloaded, it doesn't matter if there was an
+                          // error at
+                          // the end of the stream.
+                          logger.atInfo().withCause(t).log(
+                              "ignoring error because file was fully received");
+                          onCompleted();
+                          return;
+                        }
+                        releaseOut();
+                        Status status = Status.fromThrowable(t);
+                        if (status.getCode() == Status.Code.NOT_FOUND) {
+                          future.setException(new CacheNotFoundException(digest));
+                        } else {
+                          future.setException(t);
+                        }
+                      }
 
-          @Override
-          public void onError(Throwable t) {
-            if (rawOut.getCount() == digest.getSizeBytes()) {
-              // If the file was fully downloaded, it doesn't matter if there was an error at
-              // the end of the stream.
-              logger.atInfo().withCause(t).log("ignoring error because file was fully received");
-              onCompleted();
-              return;
-            }
-            releaseOut();
-            Status status = Status.fromThrowable(t);
-            if (status.getCode() == Status.Code.NOT_FOUND) {
-              future.setException(new CacheNotFoundException(digest));
-            } else {
-              future.setException(t);
-            }
-          }
+                      @Override
+                      public void onCompleted() {
+                        try {
+                          if (digestSupplier != null) {
+                            Utils.verifyBlobContents(digest, digestSupplier.get());
+                          }
+                          out.flush();
+                          future.set(rawOut.getCount());
+                        } catch (IOException e) {
+                          future.setException(e);
+                        } catch (RuntimeException e) {
+                          logger.atWarning().withCause(e).log("Unexpected exception");
+                          future.setException(e);
+                        } finally {
+                          releaseOut();
+                        }
+                      }
 
-          @Override
-          public void onCompleted() {
-            try {
-              if (digestSupplier != null) {
-                Utils.verifyBlobContents(digest, digestSupplier.get());
-              }
-              out.flush();
-              future.set(rawOut.getCount());
-            } catch (IOException e) {
-              future.setException(e);
-            } catch (RuntimeException e) {
-              logger.atWarning().withCause(e).log("Unexpected exception");
-              future.setException(e);
-            } finally {
-              releaseOut();
-            }
-          }
-
-          private void releaseOut() {
-            if (out instanceof ZstdDecompressingOutputStream) {
-              try {
-                ((ZstdDecompressingOutputStream) out).closeShallow();
-              } catch (IOException e) {
-                logger.atWarning().withCause(e).log("failed to cleanly close output stream");
-              }
-            }
-          }
-        });
+                      private void releaseOut() {
+                        if (out instanceof ZstdDecompressingOutputStream) {
+                          try {
+                            ((ZstdDecompressingOutputStream) out).closeShallow();
+                          } catch (IOException e) {
+                            logger.atWarning().withCause(e).log(
+                                "failed to cleanly close output stream");
+                          }
+                        }
+                      }
+                    }));
     return future;
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 504c186..8678b76 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -41,7 +41,6 @@
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
-import com.google.devtools.build.lib.remote.util.CompletableFuture;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.RxFutures;
 import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
@@ -331,45 +330,20 @@
     reporter.started();
     OutputStream out = new ReportingOutputStream(new LazyFileOutputStream(path), reporter);
 
-    CompletableFuture<Void> outerF = CompletableFuture.create();
     ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
-    outerF.setCancelCallback(() -> f.cancel(/* mayInterruptIfRunning= */ true));
-    Futures.addCallback(
-        f,
-        new FutureCallback<Void>() {
-          @Override
-          public void onSuccess(Void result) {
-            try {
-              out.close();
-              outerF.set(null);
-              reporter.finished();
-            } catch (IOException e) {
-              outerF.setException(e);
-            } catch (RuntimeException e) {
-              logger.atWarning().withCause(e).log("Unexpected exception");
-              outerF.setException(e);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            try {
-              out.close();
-              reporter.finished();
-            } catch (IOException e) {
-              if (t != e) {
-                t.addSuppressed(e);
-              }
-            } catch (RuntimeException e) {
-              logger.atWarning().withCause(e).log("Unexpected exception");
-              t.addSuppressed(e);
-            } finally {
-              outerF.setException(t);
-            }
+    f.addListener(
+        () -> {
+          try {
+            out.close();
+            reporter.finished();
+          } catch (IOException e) {
+            logger.atWarning().withCause(e).log(
+                "Unexpected exception closing output stream after downloading %s/%d to %s",
+                digest.getHash(), digest.getSizeBytes(), path);
           }
         },
         directExecutor());
-    return outerF;
+    return f;
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/CompletableFuture.java b/src/main/java/com/google/devtools/build/lib/remote/util/CompletableFuture.java
deleted file mode 100644
index 6a6566d..0000000
--- a/src/main/java/com/google/devtools/build/lib/remote/util/CompletableFuture.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2022 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote.util;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import io.reactivex.rxjava3.disposables.Disposable;
-import io.reactivex.rxjava3.functions.Action;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
-
-/**
- * A {@link com.google.common.util.concurrent.ListenableFuture} whose result can be set by a {@link
- * #set(Object)} or {@link #setException(Throwable)}.
- *
- * <p>It differs from {@link com.google.common.util.concurrent.SettableFuture} that it provides
- * {@link #setCancelCallback(Disposable)} for callers to register a callback which is called when
- * the future is cancelled.
- */
-public final class CompletableFuture<T> extends AbstractFuture<T> {
-
-  public static <T> CompletableFuture<T> create() {
-    return new CompletableFuture<>();
-  }
-
-  private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>();
-
-  public void setCancelCallback(Action action) {
-    setCancelCallback(Disposable.fromAction(action));
-  }
-
-  public void setCancelCallback(Disposable cancelCallback) {
-    this.cancelCallback.set(cancelCallback);
-    // Just in case it was already canceled before we set the callback.
-    doCancelIfCancelled();
-  }
-
-  private void doCancelIfCancelled() {
-    if (isCancelled()) {
-      Disposable callback = cancelCallback.getAndSet(null);
-      if (callback != null) {
-        callback.dispose();
-      }
-    }
-  }
-
-  @Override
-  protected void afterDone() {
-    doCancelIfCancelled();
-  }
-
-  // Allow set to be called by other members.
-  @Override
-  public boolean set(@Nullable T t) {
-    return super.set(t);
-  }
-
-  // Allow setException to be called by other members.
-  @Override
-  public boolean setException(Throwable throwable) {
-    return super.setException(throwable);
-  }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
index efd3c77..f04838b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
@@ -14,10 +14,12 @@
 package com.google.devtools.build.lib.remote.util;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.reactivex.rxjava3.annotations.NonNull;
 import io.reactivex.rxjava3.core.Completable;
 import io.reactivex.rxjava3.core.CompletableEmitter;
@@ -189,12 +191,18 @@
    * the {@link Completable} will automatically be cancelled.
    */
   public static ListenableFuture<Void> toListenableFuture(Completable completable) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
+    SettableFuture<Void> future = SettableFuture.create();
     completable.subscribe(
         new CompletableObserver() {
           @Override
           public void onSubscribe(Disposable d) {
-            future.setCancelCallback(d);
+            future.addListener(
+                () -> {
+                  if (future.isCancelled()) {
+                    d.dispose();
+                  }
+                },
+                directExecutor());
           }
 
           @Override
@@ -222,12 +230,18 @@
    * the {@link Single} will automatically be cancelled.
    */
   public static <T> ListenableFuture<T> toListenableFuture(Single<T> single) {
-    CompletableFuture<T> future = new CompletableFuture<>();
+    SettableFuture<T> future = SettableFuture.create();
     single.subscribe(
         new SingleObserver<T>() {
           @Override
           public void onSubscribe(Disposable d) {
-            future.setCancelCallback(d);
+            future.addListener(
+                () -> {
+                  if (future.isCancelled()) {
+                    d.dispose();
+                  }
+                },
+                directExecutor());
           }
 
           @Override