Ensure no writes occur after StreamedTestOutput#close

There's a race on interrupt where the backing thread could write a little more
content before realizing it was supposed to exit. The new tests expose that race
with a moderate --runs_per_test.

RELNOTES: None
PiperOrigin-RevId: 299906419
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
index 6d68f36..996df02 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
@@ -298,7 +298,7 @@
     Closeable streamed = null;
     if (executionOptions.testOutput.equals(TestOutputFormat.STREAMED)) {
       streamed =
-          new StreamedTestOutput(
+          createStreamedTestOutput(
               Reporter.outErrForReporter(actionExecutionContext.getEventHandler()), out);
     }
     long startTimeMillis = actionExecutionContext.getClock().currentTimeMillis();
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java b/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java
new file mode 100644
index 0000000..363553a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java
@@ -0,0 +1,78 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.exec;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.devtools.build.lib.util.io.FileWatcher;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.Path;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+/** Implements the --test_output=streamed option. */
+class StreamedTestOutput implements Closeable {
+  private static final int JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS = 30;
+
+  private final TestLogHelper.FilterTestHeaderOutputStream headerFilter;
+  private final FileWatcher watcher;
+  private final Path testLogPath;
+  private final OutErr outErr;
+
+  StreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
+    this.testLogPath = testLogPath;
+    this.outErr = outErr;
+    this.headerFilter = TestLogHelper.getHeaderFilteringOutputStream(outErr.getOutputStream());
+    this.watcher = new FileWatcher(testLogPath, OutErr.create(headerFilter, headerFilter), false);
+    watcher.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    watcher.stopPumping();
+    try {
+      // The watcher thread might leak if the following call is interrupted.
+      // This is a relatively minor issue since the worst it could do is
+      // write one additional line from the test.log to the console later on
+      // in the build.
+      watcher.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      watcher.interrupt();
+      Uninterruptibles.joinUninterruptibly(
+          watcher, JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS, TimeUnit.SECONDS);
+      Preconditions.checkState(
+          !watcher.isAlive(),
+          "Watcher thread failed to exit for %s seconds after interrupt",
+          JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS);
+    }
+
+    // It's unclear if writing this after interrupt is desirable, but it's been this way forever.
+    if (!headerFilter.foundHeader()) {
+      try (InputStream input = testLogPath.getInputStream()) {
+        ByteStreams.copy(input, outErr.getOutputStream());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  FileWatcher getFileWatcher() {
+    return watcher;
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
index e800f0e..97ef786 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
@@ -19,7 +19,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.actions.ActionExecutionContext;
@@ -41,7 +40,6 @@
 import com.google.devtools.build.lib.events.EventKind;
 import com.google.devtools.build.lib.util.Fingerprint;
 import com.google.devtools.build.lib.util.OS;
-import com.google.devtools.build.lib.util.io.FileWatcher;
 import com.google.devtools.build.lib.util.io.OutErr;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
@@ -423,39 +421,8 @@
     }
   }
 
-  /** Implements the --test_output=streamed option. */
-  protected static class StreamedTestOutput implements Closeable {
-    private final TestLogHelper.FilterTestHeaderOutputStream headerFilter;
-    private final FileWatcher watcher;
-    private final Path testLogPath;
-    private final OutErr outErr;
-
-    public StreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
-      this.testLogPath = testLogPath;
-      this.outErr = outErr;
-      this.headerFilter = TestLogHelper.getHeaderFilteringOutputStream(outErr.getOutputStream());
-      this.watcher = new FileWatcher(testLogPath, OutErr.create(headerFilter, headerFilter), false);
-      watcher.start();
-    }
-
-    @Override
-    public void close() throws IOException {
-      watcher.stopPumping();
-      try {
-        // The watcher thread might leak if the following call is interrupted.
-        // This is a relatively minor issue since the worst it could do is
-        // write one additional line from the test.log to the console later on
-        // in the build.
-        watcher.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      if (!headerFilter.foundHeader()) {
-        try (InputStream input = testLogPath.getInputStream()) {
-          ByteStreams.copy(input, outErr.getOutputStream());
-        }
-      }
-    }
+  protected Closeable createStreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
+    return new StreamedTestOutput(outErr, testLogPath);
   }
 
   private static final class ShardKey {
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 1dbca0c..a625313 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1444,6 +1444,7 @@
         "//src/main/java/com/google/devtools/build/lib/shell",
         "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/testutils",
         "//src/main/java/com/google/devtools/build/lib/util/io",
+        "//src/main/java/com/google/devtools/build/lib/util/io:out-err",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/build/lib/vfs:output_service",
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
diff --git a/src/test/java/com/google/devtools/build/lib/exec/StreamedTestOutputTest.java b/src/test/java/com/google/devtools/build/lib/exec/StreamedTestOutputTest.java
new file mode 100644
index 0000000..cc04ead
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/exec/StreamedTestOutputTest.java
@@ -0,0 +1,153 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.exec;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.util.OS;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link StreamedTestOutput}. */
+@RunWith(JUnit4.class)
+public class StreamedTestOutputTest {
+
+  private final InMemoryFileSystem fileSystem = new InMemoryFileSystem();
+
+  @Test
+  public void testEmptyFile() throws IOException {
+    Path watchedPath = fileSystem.getPath("/myfile");
+    FileSystemUtils.writeContent(watchedPath, new byte[0]);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ByteArrayOutputStream err = new ByteArrayOutputStream();
+    try (StreamedTestOutput underTest =
+        new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"))) {}
+
+    assertThat(out.toByteArray()).isEmpty();
+    assertThat(err.toByteArray()).isEmpty();
+  }
+
+  @Test
+  public void testNoHeaderOutputsEntireFile() throws IOException {
+    Path watchedPath = fileSystem.getPath("/myfile");
+    FileSystemUtils.writeContent(watchedPath, StandardCharsets.UTF_8, "random\nlines\n");
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ByteArrayOutputStream err = new ByteArrayOutputStream();
+    try (StreamedTestOutput underTest =
+        new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"))) {}
+
+    assertThat(out.toString(StandardCharsets.UTF_8.name())).isEqualTo("random\nlines\n");
+    assertThat(err.toString(StandardCharsets.UTF_8.name())).isEmpty();
+  }
+
+  @Test
+  public void testOnlyOutputsContentsAfterHeaderWhenPresent() throws IOException {
+    if (OS.getCurrent() == OS.WINDOWS) {
+      // TODO(b/151095783): Disabled because underlying code doesn't respect system line separator.
+      return;
+    }
+
+    Path watchedPath = fileSystem.getPath("/myfile");
+    FileSystemUtils.writeLinesAs(
+        watchedPath,
+        StandardCharsets.UTF_8,
+        "ignored",
+        "lines",
+        TestLogHelper.HEADER_DELIMITER,
+        "included",
+        "lines");
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ByteArrayOutputStream err = new ByteArrayOutputStream();
+    try (StreamedTestOutput underTest =
+        new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"))) {}
+
+    assertThat(out.toString(StandardCharsets.UTF_8.name())).isEqualTo("included\nlines\n");
+    assertThat(err.toString(StandardCharsets.UTF_8.name())).isEmpty();
+  }
+
+  @Test
+  public void testWatcherDoneAfterClose() throws IOException {
+    Path watchedPath = fileSystem.getPath("/myfile");
+    FileSystemUtils.writeLinesAs(
+        watchedPath,
+        StandardCharsets.UTF_8,
+        TestLogHelper.HEADER_DELIMITER,
+        Strings.repeat("x", 10 << 20));
+    StreamedTestOutput underTest =
+        new StreamedTestOutput(
+            OutErr.create(ByteStreams.nullOutputStream(), ByteStreams.nullOutputStream()),
+            fileSystem.getPath("/myfile"));
+    underTest.close();
+    assertThat(underTest.getFileWatcher().isAlive()).isFalse();
+  }
+
+  @Test
+  public void testInterruptWaitsForWatcherToClose() throws IOException {
+    Path watchedPath = fileSystem.getPath("/myfile");
+    FileSystemUtils.writeLinesAs(
+        watchedPath,
+        StandardCharsets.UTF_8,
+        TestLogHelper.HEADER_DELIMITER,
+        Strings.repeat("x", 10 << 20));
+
+    StreamedTestOutput underTest =
+        new StreamedTestOutput(
+            OutErr.create(ByteStreams.nullOutputStream(), ByteStreams.nullOutputStream()),
+            fileSystem.getPath("/myfile"));
+    try {
+      Thread.currentThread().interrupt();
+      underTest.close();
+      assertThat(underTest.getFileWatcher().isAlive()).isFalse();
+    } finally {
+      // Both checks that the interrupt bit was reset and clears it for later tests.
+      assertThat(Thread.interrupted()).isTrue();
+    }
+  }
+
+  @Test
+  public void testOutputsFileWithHeaderRegardlessOfInterrupt() throws IOException {
+    Path watchedPath = fileSystem.getPath("/myfile");
+    FileSystemUtils.writeContent(watchedPath, StandardCharsets.UTF_8, "blahblahblah");
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ByteArrayOutputStream err = new ByteArrayOutputStream();
+    StreamedTestOutput underTest =
+        new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"));
+    try {
+      Thread.currentThread().interrupt();
+      underTest.close();
+      assertThat(underTest.getFileWatcher().isAlive()).isFalse();
+    } finally {
+      // Both checks that the interrupt bit was reset and clears it for later tests.
+      assertThat(Thread.interrupted()).isTrue();
+    }
+
+    assertThat(out.toString(StandardCharsets.UTF_8.name())).isEqualTo("blahblahblah");
+    assertThat(err.toString(StandardCharsets.UTF_8.name())).isEmpty();
+  }
+}