Display remote action as running only if it is actually executing.

From #11801

> due to bazel displaying full parallelism even if actions are actually queued waiting for gRPC connections

This PR makes Bazel display running actions only if it is actually executing remotely as a first step to fix parallelism limitation.

With `--jobs=500`, the progress bar is displayed as:
```
[1 / 501] 500 actions running
```

However, the number of actual running actions by remote workers is not 500 since some actions are pending because of the limitation. With this change, the progress bar now become:
```
[1 / 501] 500 actions, 200 running
```

Closes #11922.

PiperOrigin-RevId: 326604891
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 8dd55a4..3773b5f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -21,6 +21,8 @@
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -30,9 +32,11 @@
 
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.ExecuteOperationMetadata;
 import build.bazel.remote.execution.v2.ExecuteRequest;
 import build.bazel.remote.execution.v2.ExecuteResponse;
 import build.bazel.remote.execution.v2.ExecutedActionMetadata;
+import build.bazel.remote.execution.v2.ExecutionStage.Value;
 import build.bazel.remote.execution.v2.LogFile;
 import com.google.common.collect.ClassToInstanceMap;
 import com.google.common.collect.ImmutableClassToInstanceMap;
@@ -69,8 +73,10 @@
 import com.google.devtools.build.lib.exec.ExecutionOptions;
 import com.google.devtools.build.lib.exec.RemoteLocalFallbackRegistry;
 import com.google.devtools.build.lib.exec.SpawnRunner;
+import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus;
 import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
 import com.google.devtools.build.lib.exec.util.FakeOwner;
+import com.google.devtools.build.lib.remote.GrpcRemoteExecutor.ExecuteOperationUpdateReceiver;
 import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
@@ -87,6 +93,8 @@
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
 import com.google.devtools.common.options.Options;
+import com.google.longrunning.Operation;
+import com.google.protobuf.Any;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Durations;
 import com.google.protobuf.util.Timestamps;
@@ -105,6 +113,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -180,7 +189,9 @@
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenReturn(succeeded);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(succeeded);
 
     Spawn spawn = simpleSpawnWithExecutionInfo(NO_CACHE);
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -188,7 +199,8 @@
     runner.exec(spawn, policy);
 
     ArgumentCaptor<ExecuteRequest> requestCaptor = ArgumentCaptor.forClass(ExecuteRequest.class);
-    verify(executor).executeRemotely(requestCaptor.capture());
+    verify(executor)
+        .executeRemotely(requestCaptor.capture(), any(ExecuteOperationUpdateReceiver.class));
     assertThat(requestCaptor.getValue().getSkipCacheLookup()).isTrue();
     assertThat(requestCaptor.getValue().getResultsCachePolicy().getPriority()).isEqualTo(1);
     assertThat(requestCaptor.getValue().getExecutionPolicy().getPriority()).isEqualTo(2);
@@ -220,7 +232,9 @@
     RemoteSpawnRunner runner = newSpawnRunner();
 
     // Throw an IOException to trigger the local fallback.
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(IOException.class);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(IOException.class);
 
     Spawn spawn = simpleSpawnWithExecutionInfo(NO_CACHE);
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -244,7 +258,9 @@
     RemoteSpawnRunner runner = spy(newSpawnRunner());
 
     // Throw an IOException to trigger the local fallback.
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(IOException.class);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(IOException.class);
 
     SpawnResult res =
         new SpawnResult.Builder()
@@ -277,7 +293,9 @@
     RemoteSpawnRunner runner = spy(newSpawnRunner());
 
     // Throw an IOException to trigger the local fallback.
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(IOException.class);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(IOException.class);
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -310,7 +328,9 @@
 
     RemoteSpawnRunner runner = spy(newSpawnRunner());
     // Throw an IOException to trigger the local fallback.
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(IOException.class);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(IOException.class);
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -348,14 +368,17 @@
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenReturn(succeeded);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(succeeded);
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
 
     runner.exec(spawn, policy);
 
     ArgumentCaptor<ExecuteRequest> requestCaptor = ArgumentCaptor.forClass(ExecuteRequest.class);
-    verify(executor).executeRemotely(requestCaptor.capture());
+    verify(executor)
+        .executeRemotely(requestCaptor.capture(), any(ExecuteOperationUpdateReceiver.class));
     assertThat(requestCaptor.getValue().getSkipCacheLookup()).isTrue();
   }
 
@@ -372,7 +395,9 @@
 
     RemoteSpawnRunner runner = newSpawnRunner(reporter);
     // Trigger local fallback
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException());
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(new IOException());
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -413,7 +438,9 @@
 
     RemoteSpawnRunner runner = newSpawnRunner();
     // Trigger local fallback
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException());
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(new IOException());
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -439,7 +466,9 @@
 
     RemoteSpawnRunner runner = newSpawnRunner();
     // Trigger local fallback
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException());
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(new IOException());
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -464,7 +493,9 @@
 
     when(cache.downloadActionResult(any(ActionKey.class), /* inlineOutErr= */ eq(false)))
         .thenReturn(null);
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException());
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(new IOException());
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -483,7 +514,8 @@
     RemoteSpawnRunner runner = newSpawnRunner();
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenReturn(
             ExecuteResponse.newBuilder()
                 .putServerLogs(
@@ -501,7 +533,8 @@
     SpawnResult res = runner.exec(spawn, policy);
     assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache).downloadFile(eq(logPath), eq(logDigest));
   }
 
@@ -518,7 +551,8 @@
                 "logname", LogFile.newBuilder().setHumanReadable(true).setDigest(logDigest).build())
             .setStatus(timeoutStatus)
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenThrow(new IOException(new ExecutionStatusException(resp.getStatus(), resp)));
     SettableFuture<Void> completed = SettableFuture.create();
     completed.set(null);
@@ -530,7 +564,8 @@
     SpawnResult res = runner.exec(spawn, policy);
     assertThat(res.status()).isEqualTo(Status.TIMEOUT);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache).downloadFile(eq(logPath), eq(logDigest));
   }
 
@@ -540,7 +575,8 @@
 
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     ActionResult result = ActionResult.newBuilder().setExitCode(31).build();
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenReturn(
             ExecuteResponse.newBuilder()
                 .putServerLogs("logname", LogFile.newBuilder().setDigest(logDigest).build())
@@ -552,7 +588,8 @@
     SpawnResult res = runner.exec(spawn, policy);
     assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class), any());
     verify(cache, never()).downloadFile(any(Path.class), any(Digest.class));
   }
@@ -563,7 +600,8 @@
 
     Digest logDigest = digestUtil.computeAsUtf8("bla");
     ActionResult result = ActionResult.newBuilder().setExitCode(0).build();
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenReturn(
             ExecuteResponse.newBuilder()
                 .putServerLogs(
@@ -578,7 +616,8 @@
     SpawnResult res = runner.exec(spawn, policy);
     assertThat(res.status()).isEqualTo(Status.SUCCESS);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class), any());
     verify(cache, never()).downloadFile(any(Path.class), any(Digest.class));
   }
@@ -599,7 +638,9 @@
         .download(eq(cachedResult), any(Path.class), any(FileOutErr.class), any());
     ActionResult execResult = ActionResult.newBuilder().setExitCode(31).build();
     ExecuteResponse succeeded = ExecuteResponse.newBuilder().setResult(execResult).build();
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenReturn(succeeded);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(succeeded);
     doNothing().when(cache).download(eq(execResult), any(Path.class), any(FileOutErr.class), any());
 
     Spawn spawn = newSimpleSpawn();
@@ -610,7 +651,8 @@
     assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
     assertThat(res.exitCode()).isEqualTo(31);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
   }
 
   @Test
@@ -627,7 +669,8 @@
     ExecuteResponse cachedResponse =
         ExecuteResponse.newBuilder().setResult(cachedResult).setCachedResult(true).build();
     ExecuteResponse executedResponse = ExecuteResponse.newBuilder().setResult(execResult).build();
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenReturn(cachedResponse)
         .thenReturn(executedResponse);
     Exception downloadFailure =
@@ -646,7 +689,8 @@
     assertThat(res.exitCode()).isEqualTo(31);
 
     ArgumentCaptor<ExecuteRequest> requestCaptor = ArgumentCaptor.forClass(ExecuteRequest.class);
-    verify(executor, times(2)).executeRemotely(requestCaptor.capture());
+    verify(executor, times(2))
+        .executeRemotely(requestCaptor.capture(), any(ExecuteOperationUpdateReceiver.class));
     List<ExecuteRequest> requests = requestCaptor.getAllValues();
     // first request should have been executed without skip cache lookup
     assertThat(requests.get(0).getSkipCacheLookup()).isFalse();
@@ -673,7 +717,8 @@
                     .setCode(Code.DEADLINE_EXCEEDED.getNumber())
                     .build())
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenThrow(new IOException(new ExecutionStatusException(resp.getStatus(), resp)));
 
     Spawn spawn = newSimpleSpawn();
@@ -683,7 +728,8 @@
     SpawnResult res = runner.exec(spawn, policy);
     assertThat(res.status()).isEqualTo(Status.TIMEOUT);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache).download(eq(cachedResult), eq(execRoot), any(FileOutErr.class), any());
   }
 
@@ -707,7 +753,8 @@
                     .setCode(Code.DEADLINE_EXCEEDED.getNumber())
                     .build())
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class)))
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
         .thenThrow(new IOException(new ExecutionStatusException(resp.getStatus(), resp)));
 
     Spawn spawn = newSimpleSpawn();
@@ -717,7 +764,8 @@
     SpawnResult res = runner.exec(spawn, policy);
     assertThat(res.status()).isEqualTo(Status.TIMEOUT);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache).download(eq(cachedResult), eq(execRoot), any(FileOutErr.class), any());
     verify(localRunner, never()).exec(eq(spawn), eq(policy));
   }
@@ -735,7 +783,9 @@
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(33).build())
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenReturn(failed);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(failed);
 
     Spawn spawn = newSimpleSpawn();
 
@@ -745,7 +795,8 @@
     assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
     assertThat(res.exitCode()).isEqualTo(33);
 
-    verify(executor).executeRemotely(any(ExecuteRequest.class));
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
     verify(cache, never()).download(eq(cachedResult), eq(execRoot), any(FileOutErr.class), any());
     verify(localRunner, never()).exec(eq(spawn), eq(policy));
   }
@@ -761,7 +812,9 @@
 
     when(cache.downloadActionResult(any(ActionKey.class), /* inlineOutErr= */ eq(false)))
         .thenReturn(null);
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException("reasons"));
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenThrow(new IOException("reasons"));
 
     Spawn spawn = newSimpleSpawn();
     SpawnExecutionContext policy = getSpawnContext(spawn);
@@ -824,7 +877,9 @@
         ExecuteResponse.newBuilder()
             .setResult(ActionResult.newBuilder().setExitCode(0).build())
             .build();
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenReturn(succeeded);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(succeeded);
 
     ImmutableList<String> args = ImmutableList.of("--foo", "--bar");
     ParamFileActionInput input =
@@ -885,7 +940,9 @@
 
     ActionResult succeededAction = ActionResult.newBuilder().setExitCode(0).build();
     ExecuteResponse succeeded = ExecuteResponse.newBuilder().setResult(succeededAction).build();
-    when(executor.executeRemotely(any(ExecuteRequest.class))).thenReturn(succeeded);
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(succeeded);
 
     RemoteSpawnRunner runner = newSpawnRunner();
 
@@ -898,7 +955,7 @@
     assertThat(result.status()).isEqualTo(Status.SUCCESS);
 
     // assert
-    verify(executor).executeRemotely(any());
+    verify(executor).executeRemotely(any(), any(ExecuteOperationUpdateReceiver.class));
     verify(cache)
         .downloadMinimal(
             any(), eq(succeededAction), anyCollection(), any(), any(), any(), any(), any());
@@ -1009,6 +1066,152 @@
     assertThat(spawnMetrics.processOutputsTime()).isEqualTo(Duration.ofSeconds(1));
   }
 
+  @Test
+  public void shouldReportExecutingStatusWithoutMetadata() throws Exception {
+    RemoteSpawnRunner runner = newSpawnRunner();
+    ExecuteResponse succeeded =
+        ExecuteResponse.newBuilder()
+            .setResult(ActionResult.newBuilder().setExitCode(0).build())
+            .build();
+
+    Spawn spawn = newSimpleSpawn();
+    SpawnExecutionContext policy = mock(SpawnExecutionContext.class);
+    when(policy.getTimeout()).thenReturn(Duration.ZERO);
+
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenAnswer(
+            invocationOnMock -> {
+              ExecuteOperationUpdateReceiver receiver = invocationOnMock.getArgument(1);
+              verify(policy, never()).report(eq(ProgressStatus.EXECUTING), any(String.class));
+              receiver.onNextOperation(Operation.getDefaultInstance());
+              return succeeded;
+            });
+
+    SpawnResult res = runner.exec(spawn, policy);
+    assertThat(res.status()).isEqualTo(Status.SUCCESS);
+
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
+    InOrder reportOrder = inOrder(policy);
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.SCHEDULING), any(String.class));
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.EXECUTING), any(String.class));
+  }
+
+  @Test
+  public void shouldReportExecutingStatusAfterGotExecutingStageFromMetadata() throws Exception {
+    RemoteSpawnRunner runner = newSpawnRunner();
+    ExecuteResponse succeeded =
+        ExecuteResponse.newBuilder()
+            .setResult(ActionResult.newBuilder().setExitCode(0).build())
+            .build();
+
+    Spawn spawn = newSimpleSpawn();
+    SpawnExecutionContext policy = mock(SpawnExecutionContext.class);
+    when(policy.getTimeout()).thenReturn(Duration.ZERO);
+
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenAnswer(
+            invocationOnMock -> {
+              ExecuteOperationUpdateReceiver receiver = invocationOnMock.getArgument(1);
+              Operation queued =
+                  Operation.newBuilder()
+                      .setMetadata(
+                          Any.pack(
+                              ExecuteOperationMetadata.newBuilder().setStage(Value.QUEUED).build()))
+                      .build();
+              receiver.onNextOperation(queued);
+              verify(policy, never()).report(eq(ProgressStatus.EXECUTING), any(String.class));
+
+              Operation executing =
+                  Operation.newBuilder()
+                      .setMetadata(
+                          Any.pack(
+                              ExecuteOperationMetadata.newBuilder()
+                                  .setStage(Value.EXECUTING)
+                                  .build()))
+                      .build();
+              receiver.onNextOperation(executing);
+
+              return succeeded;
+            });
+
+    SpawnResult res = runner.exec(spawn, policy);
+    assertThat(res.status()).isEqualTo(Status.SUCCESS);
+
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
+    InOrder reportOrder = inOrder(policy);
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.SCHEDULING), any(String.class));
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.EXECUTING), any(String.class));
+  }
+
+  @Test
+  public void shouldReportExecutingStatusIfNoExecutingStatusFromMetadata() throws Exception {
+    RemoteSpawnRunner runner = newSpawnRunner();
+    ExecuteResponse succeeded =
+        ExecuteResponse.newBuilder()
+            .setResult(ActionResult.newBuilder().setExitCode(0).build())
+            .build();
+
+    Spawn spawn = newSimpleSpawn();
+    SpawnExecutionContext policy = mock(SpawnExecutionContext.class);
+    when(policy.getTimeout()).thenReturn(Duration.ZERO);
+
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenAnswer(
+            invocationOnMock -> {
+              ExecuteOperationUpdateReceiver receiver = invocationOnMock.getArgument(1);
+              Operation completed =
+                  Operation.newBuilder()
+                      .setMetadata(
+                          Any.pack(
+                              ExecuteOperationMetadata.newBuilder()
+                                  .setStage(Value.COMPLETED)
+                                  .build()))
+                      .build();
+              receiver.onNextOperation(completed);
+              return succeeded;
+            });
+
+    SpawnResult res = runner.exec(spawn, policy);
+    assertThat(res.status()).isEqualTo(Status.SUCCESS);
+
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
+    InOrder reportOrder = inOrder(policy);
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.SCHEDULING), any(String.class));
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.EXECUTING), any(String.class));
+  }
+
+  @Test
+  public void shouldReportExecutingStatusEvenNoOperationFromServer() throws Exception {
+    RemoteSpawnRunner runner = newSpawnRunner();
+    ExecuteResponse succeeded =
+        ExecuteResponse.newBuilder()
+            .setResult(ActionResult.newBuilder().setExitCode(0).build())
+            .build();
+
+    Spawn spawn = newSimpleSpawn();
+    SpawnExecutionContext policy = mock(SpawnExecutionContext.class);
+    when(policy.getTimeout()).thenReturn(Duration.ZERO);
+
+    when(executor.executeRemotely(
+            any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class)))
+        .thenReturn(succeeded);
+
+    SpawnResult res = runner.exec(spawn, policy);
+    assertThat(res.status()).isEqualTo(Status.SUCCESS);
+
+    verify(executor)
+        .executeRemotely(any(ExecuteRequest.class), any(ExecuteOperationUpdateReceiver.class));
+    InOrder reportOrder = inOrder(policy);
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.SCHEDULING), any(String.class));
+    reportOrder.verify(policy, times(1)).report(eq(ProgressStatus.EXECUTING), any(String.class));
+  }
+
   private static Spawn newSimpleSpawn(Artifact... outputs) {
     return simpleSpawnWithExecutionInfo(ImmutableMap.of(), outputs);
   }