Ensure no writes occur after StreamedTestOutput#close

There's a race on interrupt where the backing thread could write a little more
content before realizing it was supposed to exit. The new tests expose that race
with a moderate --runs_per_test.

RELNOTES: None
PiperOrigin-RevId: 299906419
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
index 6d68f36..996df02 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
@@ -298,7 +298,7 @@
     Closeable streamed = null;
     if (executionOptions.testOutput.equals(TestOutputFormat.STREAMED)) {
       streamed =
-          new StreamedTestOutput(
+          createStreamedTestOutput(
               Reporter.outErrForReporter(actionExecutionContext.getEventHandler()), out);
     }
     long startTimeMillis = actionExecutionContext.getClock().currentTimeMillis();
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java b/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java
new file mode 100644
index 0000000..363553a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java
@@ -0,0 +1,78 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.exec;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.devtools.build.lib.util.io.FileWatcher;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.Path;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+/** Implements the --test_output=streamed option. */
+class StreamedTestOutput implements Closeable {
+  private static final int JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS = 30;
+
+  private final TestLogHelper.FilterTestHeaderOutputStream headerFilter;
+  private final FileWatcher watcher;
+  private final Path testLogPath;
+  private final OutErr outErr;
+
+  StreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
+    this.testLogPath = testLogPath;
+    this.outErr = outErr;
+    this.headerFilter = TestLogHelper.getHeaderFilteringOutputStream(outErr.getOutputStream());
+    this.watcher = new FileWatcher(testLogPath, OutErr.create(headerFilter, headerFilter), false);
+    watcher.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    watcher.stopPumping();
+    try {
+      // The watcher thread might leak if the following call is interrupted.
+      // This is a relatively minor issue since the worst it could do is
+      // write one additional line from the test.log to the console later on
+      // in the build.
+      watcher.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      watcher.interrupt();
+      Uninterruptibles.joinUninterruptibly(
+          watcher, JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS, TimeUnit.SECONDS);
+      Preconditions.checkState(
+          !watcher.isAlive(),
+          "Watcher thread failed to exit for %s seconds after interrupt",
+          JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS);
+    }
+
+    // It's unclear if writing this after interrupt is desirable, but it's been this way forever.
+    if (!headerFilter.foundHeader()) {
+      try (InputStream input = testLogPath.getInputStream()) {
+        ByteStreams.copy(input, outErr.getOutputStream());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  FileWatcher getFileWatcher() {
+    return watcher;
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
index e800f0e..97ef786 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
@@ -19,7 +19,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
@@ -41,7 +40,6 @@
 import com.google.devtools.build.lib.events.EventKind;
 import com.google.devtools.build.lib.util.Fingerprint;
 import com.google.devtools.build.lib.util.OS;
-import com.google.devtools.build.lib.util.io.FileWatcher;
 import com.google.devtools.build.lib.util.io.OutErr;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
@@ -423,39 +421,8 @@
     }
   }
 
-  /** Implements the --test_output=streamed option. */
-  protected static class StreamedTestOutput implements Closeable {
-    private final TestLogHelper.FilterTestHeaderOutputStream headerFilter;
-    private final FileWatcher watcher;
-    private final Path testLogPath;
-    private final OutErr outErr;
-
-    public StreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
-      this.testLogPath = testLogPath;
-      this.outErr = outErr;
-      this.headerFilter = TestLogHelper.getHeaderFilteringOutputStream(outErr.getOutputStream());
-      this.watcher = new FileWatcher(testLogPath, OutErr.create(headerFilter, headerFilter), false);
-      watcher.start();
-    }
-
-    @Override
-    public void close() throws IOException {
-      watcher.stopPumping();
-      try {
-        // The watcher thread might leak if the following call is interrupted.
-        // This is a relatively minor issue since the worst it could do is
-        // write one additional line from the test.log to the console later on
-        // in the build.
-        watcher.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      if (!headerFilter.foundHeader()) {
-        try (InputStream input = testLogPath.getInputStream()) {
-          ByteStreams.copy(input, outErr.getOutputStream());
-        }
-      }
-    }
+  protected Closeable createStreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
+    return new StreamedTestOutput(outErr, testLogPath);
   }
 
   private static final class ShardKey {