Ensure no writes occur after StreamedTestOutput#close
There's a race on interrupt where the backing thread could write a little more
content before realizing it was supposed to exit. The new tests expose that race
with a moderate --runs_per_test.
RELNOTES: None
PiperOrigin-RevId: 299906419
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
index 6d68f36..996df02 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/StandaloneTestStrategy.java
@@ -298,7 +298,7 @@
Closeable streamed = null;
if (executionOptions.testOutput.equals(TestOutputFormat.STREAMED)) {
streamed =
- new StreamedTestOutput(
+ createStreamedTestOutput(
Reporter.outErrForReporter(actionExecutionContext.getEventHandler()), out);
}
long startTimeMillis = actionExecutionContext.getClock().currentTimeMillis();
diff --git a/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java b/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java
new file mode 100644
index 0000000..363553a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/exec/StreamedTestOutput.java
@@ -0,0 +1,78 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.exec;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.devtools.build.lib.util.io.FileWatcher;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.Path;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+/** Implements the --test_output=streamed option. */
+class StreamedTestOutput implements Closeable {
+ private static final int JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS = 30;
+
+ private final TestLogHelper.FilterTestHeaderOutputStream headerFilter;
+ private final FileWatcher watcher;
+ private final Path testLogPath;
+ private final OutErr outErr;
+
+ StreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
+ this.testLogPath = testLogPath;
+ this.outErr = outErr;
+ this.headerFilter = TestLogHelper.getHeaderFilteringOutputStream(outErr.getOutputStream());
+ this.watcher = new FileWatcher(testLogPath, OutErr.create(headerFilter, headerFilter), false);
+ watcher.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ watcher.stopPumping();
+ try {
+ // The watcher thread might leak if the following call is interrupted.
+ // This is a relatively minor issue since the worst it could do is
+ // write one additional line from the test.log to the console later on
+ // in the build.
+ watcher.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ watcher.interrupt();
+ Uninterruptibles.joinUninterruptibly(
+ watcher, JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS, TimeUnit.SECONDS);
+ Preconditions.checkState(
+ !watcher.isAlive(),
+ "Watcher thread failed to exit for %s seconds after interrupt",
+ JOIN_ON_INTERRUPT_GRACE_PERIOD_SECONDS);
+ }
+
+ // It's unclear if writing this after interrupt is desirable, but it's been this way forever.
+ if (!headerFilter.foundHeader()) {
+ try (InputStream input = testLogPath.getInputStream()) {
+ ByteStreams.copy(input, outErr.getOutputStream());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ FileWatcher getFileWatcher() {
+ return watcher;
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
index e800f0e..97ef786 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/TestStrategy.java
@@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
@@ -41,7 +40,6 @@
import com.google.devtools.build.lib.events.EventKind;
import com.google.devtools.build.lib.util.Fingerprint;
import com.google.devtools.build.lib.util.OS;
-import com.google.devtools.build.lib.util.io.FileWatcher;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
@@ -423,39 +421,8 @@
}
}
- /** Implements the --test_output=streamed option. */
- protected static class StreamedTestOutput implements Closeable {
- private final TestLogHelper.FilterTestHeaderOutputStream headerFilter;
- private final FileWatcher watcher;
- private final Path testLogPath;
- private final OutErr outErr;
-
- public StreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
- this.testLogPath = testLogPath;
- this.outErr = outErr;
- this.headerFilter = TestLogHelper.getHeaderFilteringOutputStream(outErr.getOutputStream());
- this.watcher = new FileWatcher(testLogPath, OutErr.create(headerFilter, headerFilter), false);
- watcher.start();
- }
-
- @Override
- public void close() throws IOException {
- watcher.stopPumping();
- try {
- // The watcher thread might leak if the following call is interrupted.
- // This is a relatively minor issue since the worst it could do is
- // write one additional line from the test.log to the console later on
- // in the build.
- watcher.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (!headerFilter.foundHeader()) {
- try (InputStream input = testLogPath.getInputStream()) {
- ByteStreams.copy(input, outErr.getOutputStream());
- }
- }
- }
+ protected Closeable createStreamedTestOutput(OutErr outErr, Path testLogPath) throws IOException {
+ return new StreamedTestOutput(outErr, testLogPath);
}
private static final class ShardKey {
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 1dbca0c..a625313 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1444,6 +1444,7 @@
"//src/main/java/com/google/devtools/build/lib/shell",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/testutils",
"//src/main/java/com/google/devtools/build/lib/util/io",
+ "//src/main/java/com/google/devtools/build/lib/util/io:out-err",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:output_service",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
diff --git a/src/test/java/com/google/devtools/build/lib/exec/StreamedTestOutputTest.java b/src/test/java/com/google/devtools/build/lib/exec/StreamedTestOutputTest.java
new file mode 100644
index 0000000..cc04ead
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/exec/StreamedTestOutputTest.java
@@ -0,0 +1,153 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.exec;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.util.OS;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link StreamedTestOutput}. */
+@RunWith(JUnit4.class)
+public class StreamedTestOutputTest {
+
+ private final InMemoryFileSystem fileSystem = new InMemoryFileSystem();
+
+ @Test
+ public void testEmptyFile() throws IOException {
+ Path watchedPath = fileSystem.getPath("/myfile");
+ FileSystemUtils.writeContent(watchedPath, new byte[0]);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ try (StreamedTestOutput underTest =
+ new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"))) {}
+
+ assertThat(out.toByteArray()).isEmpty();
+ assertThat(err.toByteArray()).isEmpty();
+ }
+
+ @Test
+ public void testNoHeaderOutputsEntireFile() throws IOException {
+ Path watchedPath = fileSystem.getPath("/myfile");
+ FileSystemUtils.writeContent(watchedPath, StandardCharsets.UTF_8, "random\nlines\n");
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ try (StreamedTestOutput underTest =
+ new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"))) {}
+
+ assertThat(out.toString(StandardCharsets.UTF_8.name())).isEqualTo("random\nlines\n");
+ assertThat(err.toString(StandardCharsets.UTF_8.name())).isEmpty();
+ }
+
+ @Test
+ public void testOnlyOutputsContentsAfterHeaderWhenPresent() throws IOException {
+ if (OS.getCurrent() == OS.WINDOWS) {
+ // TODO(b/151095783): Disabled because underlying code doesn't respect system line separator.
+ return;
+ }
+
+ Path watchedPath = fileSystem.getPath("/myfile");
+ FileSystemUtils.writeLinesAs(
+ watchedPath,
+ StandardCharsets.UTF_8,
+ "ignored",
+ "lines",
+ TestLogHelper.HEADER_DELIMITER,
+ "included",
+ "lines");
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ try (StreamedTestOutput underTest =
+ new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"))) {}
+
+ assertThat(out.toString(StandardCharsets.UTF_8.name())).isEqualTo("included\nlines\n");
+ assertThat(err.toString(StandardCharsets.UTF_8.name())).isEmpty();
+ }
+
+ @Test
+ public void testWatcherDoneAfterClose() throws IOException {
+ Path watchedPath = fileSystem.getPath("/myfile");
+ FileSystemUtils.writeLinesAs(
+ watchedPath,
+ StandardCharsets.UTF_8,
+ TestLogHelper.HEADER_DELIMITER,
+ Strings.repeat("x", 10 << 20));
+ StreamedTestOutput underTest =
+ new StreamedTestOutput(
+ OutErr.create(ByteStreams.nullOutputStream(), ByteStreams.nullOutputStream()),
+ fileSystem.getPath("/myfile"));
+ underTest.close();
+ assertThat(underTest.getFileWatcher().isAlive()).isFalse();
+ }
+
+ @Test
+ public void testInterruptWaitsForWatcherToClose() throws IOException {
+ Path watchedPath = fileSystem.getPath("/myfile");
+ FileSystemUtils.writeLinesAs(
+ watchedPath,
+ StandardCharsets.UTF_8,
+ TestLogHelper.HEADER_DELIMITER,
+ Strings.repeat("x", 10 << 20));
+
+ StreamedTestOutput underTest =
+ new StreamedTestOutput(
+ OutErr.create(ByteStreams.nullOutputStream(), ByteStreams.nullOutputStream()),
+ fileSystem.getPath("/myfile"));
+ try {
+ Thread.currentThread().interrupt();
+ underTest.close();
+ assertThat(underTest.getFileWatcher().isAlive()).isFalse();
+ } finally {
+ // Both checks that the interrupt bit was reset and clears it for later tests.
+ assertThat(Thread.interrupted()).isTrue();
+ }
+ }
+
+ @Test
+ public void testOutputsFileWithHeaderRegardlessOfInterrupt() throws IOException {
+ Path watchedPath = fileSystem.getPath("/myfile");
+ FileSystemUtils.writeContent(watchedPath, StandardCharsets.UTF_8, "blahblahblah");
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ StreamedTestOutput underTest =
+ new StreamedTestOutput(OutErr.create(out, err), fileSystem.getPath("/myfile"));
+ try {
+ Thread.currentThread().interrupt();
+ underTest.close();
+ assertThat(underTest.getFileWatcher().isAlive()).isFalse();
+ } finally {
+ // Both checks that the interrupt bit was reset and clears it for later tests.
+ assertThat(Thread.interrupted()).isTrue();
+ }
+
+ assertThat(out.toString(StandardCharsets.UTF_8.name())).isEqualTo("blahblahblah");
+ assertThat(err.toString(StandardCharsets.UTF_8.name())).isEmpty();
+ }
+}