Handle completed errors with reexecution

A completed operation must be reexecuted, rather than waited for in the
case of a retriable error, since the server has indicated that its
result will not change at any point forward.
ExperimentalGrpcRemoteExecutor will interpret completions as a signal to
reset execution.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
index 28662b9..2a48df2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
@@ -34,6 +34,7 @@
 import com.google.devtools.build.lib.remote.util.Utils;
 import com.google.longrunning.Operation;
 import com.google.longrunning.Operation.ResultCase;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.rpc.Status;
 import io.grpc.Channel;
 import io.grpc.Status.Code;
@@ -94,8 +95,10 @@
     private final Function<ExecuteRequest, Iterator<Operation>> executeFunction;
     private final Function<WaitExecutionRequest, Iterator<Operation>> waitExecutionFunction;
 
-    // Last response (without error) we received from server.
+    // Last response we received from server.
     private Operation lastOperation;
+    // Last response (without error) we received from server.
+    private Operation lastValidOperation;
 
     Execution(
         ExecuteRequest request,
@@ -133,7 +136,7 @@
       // - Any call can return an Operation object with an error status in the result. Such
       //   Operations are completed and failed; however, some of these errors may be retriable.
       //   These errors should trigger a retry of the Execute call, resulting in a new Operation.
-      Preconditions.checkState(lastOperation == null);
+      Preconditions.checkState(lastValidOperation == null);
 
       ExecuteResponse response = null;
       // Exit the loop as long as we get a response from either Execute() or WaitExecution().
@@ -155,9 +158,12 @@
         // The cases to exit the loop:
         //   1. Received the final response.
         //   2. Received a un-retriable gRPC error.
-        //   3. Received NOT_FOUND error where we will retry Execute() (by returning null).
         //   4. Received consecutive retriable gRPC errors (up to max retry times).
-        if (response == null) {
+        //
+        // The cases to retry Execute() (by resetting lastValidOperation and receive a null response)
+        //   1. Received NOT_FOUND error.
+        //   2. Received an error from a complete operation.
+        if (response == null && lastValidOperation != null) {
           response =
               retrier.execute(
                   () ->
@@ -171,17 +177,22 @@
 
     @Nullable
     ExecuteResponse execute() throws IOException {
-      Preconditions.checkState(lastOperation == null);
+      Preconditions.checkState(lastValidOperation == null);
 
       try {
         Iterator<Operation> operationStream = executeFunction.apply(request);
         return handleOperationStream(operationStream);
       } catch (Throwable e) {
-        // If lastOperation is not null, we know the execution request is accepted by the server. In
+        // If lastValidOperation is not null, we know the execution request is accepted by the server. In
         // this case, we will fallback to WaitExecution() loop when the stream is broken.
-        if (lastOperation != null) {
-          // By returning null, we are going to fallback to WaitExecution() loop.
-          return null;
+        if (lastValidOperation != null) {
+          if (!lastOperation.getDone()) {
+            // By returning null, we are going to fallback to WaitExecution() loop.
+            return null;
+          }
+
+          // all done operations must be reexecuted
+          lastValidOperation = null;
         }
         throw new IOException(e);
       }
@@ -189,10 +200,10 @@
 
     @Nullable
     ExecuteResponse waitExecution() throws IOException {
-      Preconditions.checkState(lastOperation != null);
+      Preconditions.checkState(lastValidOperation != null);
 
       WaitExecutionRequest request =
-          WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
+          WaitExecutionRequest.newBuilder().setName(lastValidOperation.getName()).build();
       try {
         Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
         return handleOperationStream(operationStream);
@@ -205,10 +216,18 @@
           StatusRuntimeException sre = (StatusRuntimeException) e;
           if (sre.getStatus().getCode() == Code.NOT_FOUND
               && executeBackoff.nextDelayMillis(sre) >= 0) {
-            lastOperation = null;
+            lastValidOperation = null;
             return null;
           }
         }
+
+        // all completed operations must be reexecuted
+        if (lastOperation.getDone()
+            && e instanceof Exception
+            && executeBackoff.nextDelayMillis((Exception) e) >= 0) {
+          lastValidOperation = null;
+          return null;
+        }
         throw new IOException(e);
       }
     }
@@ -218,10 +237,14 @@
       try {
         while (operationStream.hasNext()) {
           Operation operation = operationStream.next();
+
+          // We've received any response, it may indicate completion, requiring reexecution
+          lastOperation = operation;
+
           ExecuteResponse response = extractResponseOrThrowIfError(operation);
 
           // At this point, we successfully received a response that is not an error.
-          lastOperation = operation;
+          lastValidOperation = lastOperation;
 
           // We don't want to reset executeBackoff since if there is an error:
           //   1. If happened before we received a first response, we want to ensure the retry
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java
index ff3b6c7..6af02ff 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java
@@ -231,7 +231,7 @@
 
   @Test
   public void executeRemotely_executeAndWait() throws Exception {
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
     executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
 
     ExecuteResponse response =
@@ -244,7 +244,7 @@
 
   @Test
   public void executeRemotely_executeAndRetryWait() throws Exception {
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
     executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
 
     ExecuteResponse response =
@@ -257,10 +257,10 @@
 
   @Test
   public void executeRemotely_executeAndRetryWait_forever() throws Exception {
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
     int errorTimes = MAX_RETRY_ATTEMPTS * 2;
     for (int i = 0; i < errorTimes; ++i) {
-      executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.DEADLINE_EXCEEDED);
+      executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Status.DEADLINE_EXCEEDED.asRuntimeException());
     }
     executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
 
@@ -274,9 +274,9 @@
 
   @Test
   public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() {
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
     for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) {
-      executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
+      executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Status.UNAVAILABLE.asRuntimeException());
     }
 
     assertThrows(
@@ -348,8 +348,8 @@
   @Test
   public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
       throws IOException, InterruptedException {
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
-    executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
+    executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Status.UNAUTHENTICATED.asRuntimeException());
     executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);
 
     ExecuteResponse response =
@@ -362,9 +362,9 @@
 
   @Test
   public void executeRemotely_retryExecuteIfNotFound() throws IOException, InterruptedException {
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
     executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.NOT_FOUND);
-    executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+    executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
     executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
 
     ExecuteResponse response =
@@ -378,7 +378,7 @@
   @Test
   public void executeRemotely_notFoundLoop_reportError() {
     for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) {
-      executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+      executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
       executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.NOT_FOUND);
     }
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java b/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java
index 4c9d939..5eaebc8 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java
@@ -115,12 +115,14 @@
     }
 
     public void thenError(Code code) {
-      Operation operation =
-          Operation.newBuilder()
-              .setName(getResourceName(request))
-              .setDone(true)
-              .setError(Status.newBuilder().setCode(code.getNumber()))
-              .build();
+      // From REAPI Spec:
+      // > Errors discovered during creation of the `Operation` will be reported
+      // > as gRPC Status errors, while errors that occurred while running the
+      // > action will be reported in the `status` field of the `ExecuteResponse`. The
+      // > server MUST NOT set the `error` field of the `Operation` proto.
+      Operation operation = doneOperation(request, ExecuteResponse.newBuilder()
+          .setStatus(Status.newBuilder().setCode(code.getNumber()))
+          .build());
       operations.add(() -> operation);
       finish();
     }
@@ -133,7 +135,7 @@
       finish();
     }
 
-    private void finish() {
+    public void finish() {
       String name = getResourceName(request);
       provider.append(name, ImmutableList.copyOf(operations));
     }