Deflake remote tests by setting an `OnCancelHandler`
Found to fix flaky local behavior of remote tests when invoking `bazel shutdown`:
```
Load240418 11:57:38.114:XT 85 [com.google.devtools.build.remote.worker.ExecutionServer.lambda$waitExecution$1] Work failed: 9bcaa872-d921-4889-a762-66c242993c59
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
at io.grpc.Status.asRuntimeException(Status.java:526)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366)
at com.google.devtools.build.remote.worker.ExecutionServer.lambda$waitExecution$1(ExecutionServer.java:171)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.afterRanInterruptiblySuccess(TrustedListenableFutureTask.java:136)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:89)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
```
Closes #22049.
PiperOrigin-RevId: 626953580
Change-Id: Id914dfd77496a50427f59638bd81794f021b9425
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
index f1da0d2..aa4f28c 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java
@@ -58,6 +58,7 @@
import com.google.rpc.Status;
import io.grpc.StatusException;
import io.grpc.protobuf.StatusProto;
+import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -149,6 +150,12 @@
.build()));
return;
}
+ ((ServerCallStreamObserver<Operation>) responseObserver)
+ .setOnCancelHandler(
+ () -> {
+ future.cancel(true);
+ operationsCache.remove(opName);
+ });
waitExecution(opName, future, responseObserver);
}
@@ -209,6 +216,12 @@
ListenableFuture<ActionResult> future =
executorService.submit(() -> execute(context, request, opName));
operationsCache.put(opName, future);
+ ((ServerCallStreamObserver<Operation>) responseObserver)
+ .setOnCancelHandler(
+ () -> {
+ future.cancel(true);
+ operationsCache.remove(opName);
+ });
// Send the first operation.
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
// When the operation completes, send the result.