Handle completed errors with reexecution
A completed operation must be reexecuted, rather than waited for in the
case of a retriable error, since the server has indicated that its
result will not change at any point forward.
ExperimentalGrpcRemoteExecutor will interpret completions as a signal to
reset execution.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
index 28662b9..2a48df2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java
@@ -34,6 +34,7 @@
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.longrunning.Operation;
import com.google.longrunning.Operation.ResultCase;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;
import io.grpc.Channel;
import io.grpc.Status.Code;
@@ -94,8 +95,10 @@
private final Function<ExecuteRequest, Iterator<Operation>> executeFunction;
private final Function<WaitExecutionRequest, Iterator<Operation>> waitExecutionFunction;
- // Last response (without error) we received from server.
+ // Last response we received from server.
private Operation lastOperation;
+ // Last response (without error) we received from server.
+ private Operation lastValidOperation;
Execution(
ExecuteRequest request,
@@ -133,7 +136,7 @@
// - Any call can return an Operation object with an error status in the result. Such
// Operations are completed and failed; however, some of these errors may be retriable.
// These errors should trigger a retry of the Execute call, resulting in a new Operation.
- Preconditions.checkState(lastOperation == null);
+ Preconditions.checkState(lastValidOperation == null);
ExecuteResponse response = null;
// Exit the loop as long as we get a response from either Execute() or WaitExecution().
@@ -155,9 +158,12 @@
// The cases to exit the loop:
// 1. Received the final response.
// 2. Received a un-retriable gRPC error.
- // 3. Received NOT_FOUND error where we will retry Execute() (by returning null).
// 4. Received consecutive retriable gRPC errors (up to max retry times).
- if (response == null) {
+ //
+ // The cases to retry Execute() (by resetting lastValidOperation and receive a null response)
+ // 1. Received NOT_FOUND error.
+ // 2. Received an error from a complete operation.
+ if (response == null && lastValidOperation != null) {
response =
retrier.execute(
() ->
@@ -171,17 +177,22 @@
@Nullable
ExecuteResponse execute() throws IOException {
- Preconditions.checkState(lastOperation == null);
+ Preconditions.checkState(lastValidOperation == null);
try {
Iterator<Operation> operationStream = executeFunction.apply(request);
return handleOperationStream(operationStream);
} catch (Throwable e) {
- // If lastOperation is not null, we know the execution request is accepted by the server. In
+ // If lastValidOperation is not null, we know the execution request is accepted by the server. In
// this case, we will fallback to WaitExecution() loop when the stream is broken.
- if (lastOperation != null) {
- // By returning null, we are going to fallback to WaitExecution() loop.
- return null;
+ if (lastValidOperation != null) {
+ if (!lastOperation.getDone()) {
+ // By returning null, we are going to fallback to WaitExecution() loop.
+ return null;
+ }
+
+ // all done operations must be reexecuted
+ lastValidOperation = null;
}
throw new IOException(e);
}
@@ -189,10 +200,10 @@
@Nullable
ExecuteResponse waitExecution() throws IOException {
- Preconditions.checkState(lastOperation != null);
+ Preconditions.checkState(lastValidOperation != null);
WaitExecutionRequest request =
- WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
+ WaitExecutionRequest.newBuilder().setName(lastValidOperation.getName()).build();
try {
Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
return handleOperationStream(operationStream);
@@ -205,10 +216,18 @@
StatusRuntimeException sre = (StatusRuntimeException) e;
if (sre.getStatus().getCode() == Code.NOT_FOUND
&& executeBackoff.nextDelayMillis(sre) >= 0) {
- lastOperation = null;
+ lastValidOperation = null;
return null;
}
}
+
+ // all completed operations must be reexecuted
+ if (lastOperation.getDone()
+ && e instanceof Exception
+ && executeBackoff.nextDelayMillis((Exception) e) >= 0) {
+ lastValidOperation = null;
+ return null;
+ }
throw new IOException(e);
}
}
@@ -218,10 +237,14 @@
try {
while (operationStream.hasNext()) {
Operation operation = operationStream.next();
+
+ // We've received any response, it may indicate completion, requiring reexecution
+ lastOperation = operation;
+
ExecuteResponse response = extractResponseOrThrowIfError(operation);
// At this point, we successfully received a response that is not an error.
- lastOperation = operation;
+ lastValidOperation = lastOperation;
// We don't want to reset executeBackoff since if there is an error:
// 1. If happened before we received a first response, we want to ensure the retry
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java
index ff3b6c7..6af02ff 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java
@@ -231,7 +231,7 @@
@Test
public void executeRemotely_executeAndWait() throws Exception {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
ExecuteResponse response =
@@ -244,7 +244,7 @@
@Test
public void executeRemotely_executeAndRetryWait() throws Exception {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
ExecuteResponse response =
@@ -257,10 +257,10 @@
@Test
public void executeRemotely_executeAndRetryWait_forever() throws Exception {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
int errorTimes = MAX_RETRY_ATTEMPTS * 2;
for (int i = 0; i < errorTimes; ++i) {
- executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.DEADLINE_EXCEEDED);
+ executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Status.DEADLINE_EXCEEDED.asRuntimeException());
}
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
@@ -274,9 +274,9 @@
@Test
public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) {
- executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
+ executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Status.UNAVAILABLE.asRuntimeException());
}
assertThrows(
@@ -348,8 +348,8 @@
@Test
public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
throws IOException, InterruptedException {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
- executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
+ executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Status.UNAUTHENTICATED.asRuntimeException());
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);
ExecuteResponse response =
@@ -362,9 +362,9 @@
@Test
public void executeRemotely_retryExecuteIfNotFound() throws IOException, InterruptedException {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.NOT_FOUND);
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE);
ExecuteResponse response =
@@ -378,7 +378,7 @@
@Test
public void executeRemotely_notFoundLoop_reportError() {
for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) {
- executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
+ executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.NOT_FOUND);
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java b/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java
index 4c9d939..5eaebc8 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java
@@ -115,12 +115,14 @@
}
public void thenError(Code code) {
- Operation operation =
- Operation.newBuilder()
- .setName(getResourceName(request))
- .setDone(true)
- .setError(Status.newBuilder().setCode(code.getNumber()))
- .build();
+ // From REAPI Spec:
+ // > Errors discovered during creation of the `Operation` will be reported
+ // > as gRPC Status errors, while errors that occurred while running the
+ // > action will be reported in the `status` field of the `ExecuteResponse`. The
+ // > server MUST NOT set the `error` field of the `Operation` proto.
+ Operation operation = doneOperation(request, ExecuteResponse.newBuilder()
+ .setStatus(Status.newBuilder().setCode(code.getNumber()))
+ .build());
operations.add(() -> operation);
finish();
}
@@ -133,7 +135,7 @@
finish();
}
- private void finish() {
+ public void finish() {
String name = getResourceName(request);
provider.append(name, ImmutableList.copyOf(operations));
}