Deflake remote tests by setting an `OnCancelHandler`

Found to fix flaky local behavior of remote tests when invoking `bazel shutdown`:
```
Load240418 11:57:38.114:XT 85 [com.google.devtools.build.remote.worker.ExecutionServer.lambda$waitExecution$1] Work failed: 9bcaa872-d921-4889-a762-66c242993c59
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
	at io.grpc.Status.asRuntimeException(Status.java:526)
	at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366)
	at com.google.devtools.build.remote.worker.ExecutionServer.lambda$waitExecution$1(ExecutionServer.java:171)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.afterRanInterruptiblySuccess(TrustedListenableFutureTask.java:136)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:89)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
```

Closes #22049.

PiperOrigin-RevId: 626953580
Change-Id: Id914dfd77496a50427f59638bd81794f021b9425
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
index f1da0d2..aa4f28c 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
@@ -58,6 +58,7 @@
 import com.google.rpc.Status;
 import io.grpc.StatusException;
 import io.grpc.protobuf.StatusProto;
+import io.grpc.stub.ServerCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -149,6 +150,12 @@
                   .build()));
       return;
     }
+    ((ServerCallStreamObserver<Operation>) responseObserver)
+        .setOnCancelHandler(
+            () -> {
+              future.cancel(true);
+              operationsCache.remove(opName);
+            });
     waitExecution(opName, future, responseObserver);
   }
 
@@ -209,6 +216,12 @@
     ListenableFuture<ActionResult> future =
         executorService.submit(() -> execute(context, request, opName));
     operationsCache.put(opName, future);
+    ((ServerCallStreamObserver<Operation>) responseObserver)
+        .setOnCancelHandler(
+            () -> {
+              future.cancel(true);
+              operationsCache.remove(opName);
+            });
     // Send the first operation.
     responseObserver.onNext(Operation.newBuilder().setName(opName).build());
     // When the operation completes, send the result.