Allow cancelling test runs when detecting flakes
When using --runs_per_tests_detects_flakes, Bazel runs multiple actions
in parallel and aggregates them such that failed runs are ignored if
there is at least one passing run. This change adds a new flag
--experimental_cancel_concurrent_tests, which causes Bazel to cancel any
concurrent actions when the first passing run is observed. This can
reduce the end-to-end build time in case of large flaky tests.
This is only the first half of the implementation. The code path to
interrupt a running action is currently only used to end the entire
build, and does not post a TestResult for the interrupted action. This
leads to confusing command-line reporting of the results.
PiperOrigin-RevId: 275199895
diff --git a/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionBuilder.java b/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionBuilder.java
index 6b06426..c59c4b7 100644
--- a/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionBuilder.java
+++ b/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionBuilder.java
@@ -366,6 +366,9 @@
           coverageArtifacts.add(coverageArtifact);
         }
 
+        boolean cancelConcurrentTests =
+            testConfiguration.runsPerTestDetectsFlakes()
+                && testConfiguration.cancelConcurrentTests();
         TestRunnerAction testRunnerAction =
             new TestRunnerAction(
                 ruleContext.getActionOwner(),
@@ -388,7 +391,8 @@
                 (!isUsingTestWrapperInsteadOfTestSetupScript
                         || executionSettings.needsShell(isExecutedOnWindows))
                     ? ShToolchain.getPathOrError(ruleContext)
-                    : null);
+                    : null,
+                cancelConcurrentTests);
 
         testOutputs.addAll(testRunnerAction.getSpawnOutputs());
         testOutputs.addAll(testRunnerAction.getOutputs());
diff --git a/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionContext.java b/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionContext.java
index 67a8503..23a6436 100644
--- a/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionContext.java
+++ b/src/main/java/com/google/devtools/build/lib/analysis/test/TestActionContext.java
@@ -17,6 +17,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.ActionContext;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
+import com.google.devtools.build.lib.actions.ActionOwner;
 import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.SpawnResult;
 import com.google.devtools.build.lib.vfs.Path;
@@ -36,13 +37,24 @@
   /** Returns whether test_keep_going is enabled. */
   boolean isTestKeepGoing();
 
-  /**
-   * Creates a cached test result.
-   */
+  /** Creates a cached test result. */
   TestResult newCachedTestResult(Path execRoot, TestRunnerAction action, TestResultData cached)
       throws IOException;
 
   /**
+   * Returns a listenable future that is unique for any given combination of owner and shard number,
+   * i.e., that is cached across different runs within the same shard of the same target. This is to
+   * facilitate cross-action cancellation - if {@code runs_per_test} and {@code
+   * runs_per_test_detects_flake} are both set, then it is sufficient to have a single passing
+   * result per shard, and any concurrent actions can be cancelled.
+   *
+   * <p>Note that the output files of a test are named after the owner, which guarantees that there
+   * are no two tests with the same owner.
+   */
+  @Nullable
+  ListenableFuture<Void> getTestCancelFuture(ActionOwner owner, int shardNum);
+
+  /**
    * An object representing an individual test attempt result. Note that {@link TestRunnerSpawn} is
    * generic in a subtype of this type; this interface only provide a tiny amount of generic
    * top-level functionality necessary to share code between the different {@link TestActionContext}
diff --git a/src/main/java/com/google/devtools/build/lib/analysis/test/TestConfiguration.java b/src/main/java/com/google/devtools/build/lib/analysis/test/TestConfiguration.java
index b9a9995..3f5e72d 100644
--- a/src/main/java/com/google/devtools/build/lib/analysis/test/TestConfiguration.java
+++ b/src/main/java/com/google/devtools/build/lib/analysis/test/TestConfiguration.java
@@ -188,6 +188,16 @@
     public boolean runsPerTestDetectsFlakes;
 
     @Option(
+        name = "experimental_cancel_concurrent_tests",
+        defaultValue = "false",
+        documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+        effectTags = {OptionEffectTag.AFFECTS_OUTPUTS, OptionEffectTag.LOADING_AND_ANALYSIS},
+        help =
+            "If true, then Blaze will cancel concurrently running tests on the first successful "
+                + "run. This is only useful in combination with --runs_per_test_detects_flakes.")
+    public boolean cancelConcurrentTests;
+
+    @Option(
         name = "coverage_support",
         converter = LabelConverter.class,
         defaultValue = "@bazel_tools//tools/test:coverage_support",
@@ -332,6 +342,10 @@
     return options.runsPerTestDetectsFlakes;
   }
 
+  public boolean cancelConcurrentTests() {
+    return options.cancelConcurrentTests;
+  }
+
   /**
    * Option converter that han handle two styles of value for "--runs_per_test":
    *
diff --git a/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java b/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java
index 2b7b7bc..1fe5ff0 100644
--- a/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java
+++ b/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.actions.AbstractAction;
 import com.google.devtools.build.lib.actions.ActionContinuationOrResult;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
@@ -128,6 +129,8 @@
    */
   private final ImmutableIterable<String> requiredClientEnvVariables;
 
+  private final boolean cancelConcurrentTestsOnSuccess;
+
   private static ImmutableList<Artifact> list(Artifact... artifacts) {
     ImmutableList.Builder<Artifact> builder = ImmutableList.builder();
     for (Artifact artifact : artifacts) {
@@ -164,7 +167,8 @@
       int runNumber,
       BuildConfiguration configuration,
       String workspaceName,
-      @Nullable PathFragment shExecutable) {
+      @Nullable PathFragment shExecutable,
+      boolean cancelConcurrentTestsOnSuccess) {
     super(
         owner,
         /*tools=*/ ImmutableList.of(),
@@ -219,6 +223,7 @@
             Iterables.concat(
                 configuration.getActionEnvironment().getInheritedEnv(),
                 configuration.getTestActionEnvironment().getInheritedEnv()));
+    this.cancelConcurrentTestsOnSuccess = cancelConcurrentTestsOnSuccess;
   }
 
   public BuildConfiguration getConfiguration() {
@@ -746,11 +751,20 @@
     try {
       TestRunnerSpawn testRunnerSpawn =
           testActionContext.createTestRunnerSpawn(this, actionExecutionContext);
-      TestAttemptContinuation beginContinuation = testRunnerSpawn.beginExecution();
-      RunAttemptsContinuation continuation =
-          new RunAttemptsContinuation(
-              testRunnerSpawn, beginContinuation, testActionContext.isTestKeepGoing());
-      return continuation;
+      ListenableFuture<Void> cancelFuture = null;
+      if (cancelConcurrentTestsOnSuccess) {
+        cancelFuture = testActionContext.getTestCancelFuture(getOwner(), shardNum);
+      }
+      TestAttemptContinuation testAttemptContinuation =
+          beginIfNotCancelled(testRunnerSpawn, cancelFuture);
+      if (testAttemptContinuation == null) {
+        return ActionContinuationOrResult.of(ActionResult.create(ImmutableList.of()));
+      }
+      return new RunAttemptsContinuation(
+          testRunnerSpawn,
+          testAttemptContinuation,
+          testActionContext.isTestKeepGoing(),
+          cancelFuture);
     } catch (ExecException e) {
       throw e.toActionExecutionException(this);
     } catch (IOException e) {
@@ -758,6 +772,30 @@
     }
   }
 
+  @Nullable
+  private static TestAttemptContinuation beginIfNotCancelled(
+      TestRunnerSpawn testRunnerSpawn, @Nullable ListenableFuture<Void> cancelFuture)
+      throws InterruptedException, IOException {
+    if (cancelFuture != null && cancelFuture.isCancelled()) {
+      // Don't start another attempt if the action was cancelled. Note that there is a race
+      // between checking this and starting the test action. If we loose the race, then we get
+      // to cancel the action below when we register a callback with the cancelFuture. Note that
+      // cancellation only works with spawn runners supporting async execution, so currently does
+      // not work with local execution.
+      return null;
+    }
+    TestAttemptContinuation testAttemptContinuation = testRunnerSpawn.beginExecution();
+    if (!testAttemptContinuation.isDone() && cancelFuture != null) {
+      cancelFuture.addListener(
+          () -> {
+            // This is a noop if the future is already done.
+            testAttemptContinuation.getFuture().cancel(true);
+          },
+          MoreExecutors.directExecutor());
+    }
+    return testAttemptContinuation;
+  }
+
   @Override
   public ActionResult execute(ActionExecutionContext actionExecutionContext)
       throws ActionExecutionException, InterruptedException {
@@ -924,6 +962,7 @@
     private final int maxAttempts;
     private final List<SpawnResult> spawnResults;
     private final List<FailedAttemptResult> failedAttempts;
+    @Nullable private final ListenableFuture<Void> cancelFuture;
 
     private RunAttemptsContinuation(
         TestRunnerSpawn testRunnerSpawn,
@@ -931,20 +970,38 @@
         boolean keepGoing,
         int maxAttempts,
         List<SpawnResult> spawnResults,
-        List<FailedAttemptResult> failedAttempts) {
+        List<FailedAttemptResult> failedAttempts,
+        ListenableFuture<Void> cancelFuture) {
       this.testRunnerSpawn = testRunnerSpawn;
       this.testContinuation = testContinuation;
       this.keepGoing = keepGoing;
       this.maxAttempts = maxAttempts;
       this.spawnResults = spawnResults;
       this.failedAttempts = failedAttempts;
+      this.cancelFuture = cancelFuture;
+      if (cancelFuture != null) {
+        cancelFuture.addListener(
+            () -> {
+              // This is a noop if the future is already done.
+              testContinuation.getFuture().cancel(true);
+            },
+            MoreExecutors.directExecutor());
+      }
     }
 
     RunAttemptsContinuation(
         TestRunnerSpawn testRunnerSpawn,
         TestAttemptContinuation testContinuation,
-        boolean keepGoing) {
-      this(testRunnerSpawn, testContinuation, keepGoing, 0, new ArrayList<>(), new ArrayList<>());
+        boolean keepGoing,
+        @Nullable ListenableFuture<Void> cancelFuture) {
+      this(
+          testRunnerSpawn,
+          testContinuation,
+          keepGoing,
+          0,
+          new ArrayList<>(),
+          new ArrayList<>(),
+          cancelFuture);
     }
 
     @Nullable
@@ -957,7 +1014,20 @@
     public ActionContinuationOrResult execute()
         throws ActionExecutionException, InterruptedException {
       try {
-        TestAttemptContinuation nextContinuation = testContinuation.execute();
+        TestAttemptContinuation nextContinuation;
+        try {
+          nextContinuation = testContinuation.execute();
+        } catch (InterruptedException e) {
+          if (cancelFuture != null && cancelFuture.isCancelled()) {
+            // Clear the interrupt bit.
+            Thread.currentThread().isInterrupted();
+            for (Artifact output : TestRunnerAction.this.getMandatoryOutputs()) {
+              FileSystemUtils.touchFile(output.getPath());
+            }
+            return ActionContinuationOrResult.of(ActionResult.create(spawnResults));
+          }
+          throw e;
+        }
         if (!nextContinuation.isDone()) {
           return new RunAttemptsContinuation(
               testRunnerSpawn,
@@ -965,7 +1035,8 @@
               keepGoing,
               maxAttempts,
               spawnResults,
-              failedAttempts);
+              failedAttempts,
+              cancelFuture);
         }
 
         TestAttemptResult result = nextContinuation.get();
@@ -983,7 +1054,11 @@
     private ActionContinuationOrResult process(TestAttemptResult result, int actualMaxAttempts)
         throws ExecException, IOException, InterruptedException {
       spawnResults.addAll(result.spawnResults());
-      if (!result.hasPassed()) {
+      if (result.hasPassed()) {
+        if (cancelFuture != null) {
+          cancelFuture.cancel(true);
+        }
+      } else {
         boolean runAnotherAttempt = failedAttempts.size() + 1 < actualMaxAttempts;
         TestRunnerSpawn nextRunner;
         if (runAnotherAttempt) {
@@ -1000,14 +1075,18 @@
           failedAttempts.add(
               testRunnerSpawn.finalizeFailedTestAttempt(result, failedAttempts.size() + 1));
 
-          TestAttemptContinuation nextContinuation = nextRunner.beginExecution();
+          TestAttemptContinuation nextContinuation = beginIfNotCancelled(nextRunner, cancelFuture);
+          if (nextContinuation == null) {
+            return ActionContinuationOrResult.of(ActionResult.create(spawnResults));
+          }
           return new RunAttemptsContinuation(
               nextRunner,
               nextContinuation,
               keepGoing,
               actualMaxAttempts,
               spawnResults,
-              failedAttempts);
+              failedAttempts,
+              cancelFuture);
         }
       }
       testRunnerSpawn.finalizeTest(result, failedAttempts);
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
index 59ab2d8..a6aa223 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
@@ -443,7 +443,7 @@
 
     @Override
     public TestAttemptContinuation beginExecution() throws InterruptedException, IOException {
-      prepareFileSystem(testAction, actionExecutionContext.getExecRoot(), tmpDir, workingDirectory);
+      prepareFileSystem(testAction, execRoot, tmpDir, workingDirectory);
       return beginTestAttempt(testAction, spawn, actionExecutionContext, execRoot);
     }
 
diff --git a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
index 797be9d..1ef619c 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
@@ -20,7 +20,10 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
+import com.google.devtools.build.lib.actions.ActionOwner;
 import com.google.devtools.build.lib.actions.Artifact;
 import com.google.devtools.build.lib.actions.CommandLineExpansionException;
 import com.google.devtools.build.lib.actions.ExecException;
@@ -53,10 +56,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 
 /** A strategy for executing a {@link TestRunnerAction}. */
 public abstract class TestStrategy implements TestActionContext {
+  private final ConcurrentHashMap<ShardKey, ListenableFuture<Void>> futures =
+      new ConcurrentHashMap<>();
 
   /**
    * Ensures that all directories used to run test are in the correct state and their content will
@@ -147,6 +154,12 @@
     return executionOptions.testKeepGoing;
   }
 
+  @Override
+  public final ListenableFuture<Void> getTestCancelFuture(ActionOwner owner, int shardNum) {
+    ShardKey key = new ShardKey(owner, shardNum);
+    return futures.computeIfAbsent(key, (k) -> SettableFuture.<Void>create());
+  }
+
   /**
    * Generates a command line to run for the test action, taking into account coverage and {@code
    * --run_under} settings.
@@ -432,4 +445,31 @@
       }
     }
   }
+
+  private static final class ShardKey {
+    private final ActionOwner owner;
+    private final int shard;
+
+    ShardKey(ActionOwner owner, int shard) {
+      this.owner = Preconditions.checkNotNull(owner);
+      this.shard = shard;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(owner, shard);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof ShardKey)) {
+        return false;
+      }
+      ShardKey s = (ShardKey) o;
+      return owner.equals(s.owner) && shard == s.shard;
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/rules/test/ExclusiveTestStrategy.java b/src/main/java/com/google/devtools/build/lib/rules/test/ExclusiveTestStrategy.java
index 99963e3..967dcd0 100644
--- a/src/main/java/com/google/devtools/build/lib/rules/test/ExclusiveTestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/rules/test/ExclusiveTestStrategy.java
@@ -13,7 +13,9 @@
 // limitations under the License.
 package com.google.devtools.build.lib.rules.test;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
+import com.google.devtools.build.lib.actions.ActionOwner;
 import com.google.devtools.build.lib.actions.ExecException;
 import com.google.devtools.build.lib.actions.ExecutionStrategy;
 import com.google.devtools.build.lib.analysis.test.TestActionContext;
@@ -55,4 +57,11 @@
       Path execRoot, TestRunnerAction action, TestResultData cached) throws IOException {
     return parent.newCachedTestResult(execRoot, action, cached);
   }
+
+  @Override
+  public ListenableFuture<Void> getTestCancelFuture(ActionOwner owner, int shard) {
+    // TODO(ulfjack): Exclusive tests run sequentially, and this feature exists to allow faster
+    //  aborts of concurrent actions. It's not clear what, if anything, we should do here.
+    return null;
+  }
 }