De-flake //src/test/java/com/google/devtools/build/lib/buildeventstream/transports:BuildEventTransportTest

BinaryFormatFileTransportTest was making a series of assertions about the internal state of the FileTransport (more specifically the SequentialWriter) which was causing some flakiness as it asserted on a temporary state of the transport. Most of those assertions aren't really useful since they're not testing the public API of the class but instead it's testing implementation details.

With this change we remove the offending assertions and add the visibility to the members/methods of FileTransport. Because of this JsonFormatFileTransportTest needs to be updated too (which arguably should be the way it should have been done in the first place).

$ bazel test //src/test/java/com/google/devtools/build/lib/buildeventstream/transports:BuildEventTransportTest --runs_per_test=1000
//src/test/java/com/google/devtools/build/lib/buildeventstream/transports:BuildEventTransportTest PASSED in 15.1s

PiperOrigin-RevId: 245268064
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index fd1679f..0551bad 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -59,7 +59,7 @@
 
   private final BuildEventProtocolOptions options;
   private final BuildEventArtifactUploader uploader;
-  @VisibleForTesting final SequentialWriter writer;
+  private final SequentialWriter writer;
   private final ArtifactGroupNamer namer;
 
   FileTransport(
@@ -77,23 +77,23 @@
   @VisibleForTesting
   static final class SequentialWriter implements Runnable {
     private static final Logger logger = Logger.getLogger(SequentialWriter.class.getName());
-    private static final ListenableFuture<BuildEventStreamProtos.BuildEvent> CLOSE =
-        Futures.immediateCancelledFuture();
+    private static final ListenableFuture<BuildEventStreamProtos.BuildEvent> CLOSE_EVENT_FUTURE =
+        Futures.immediateFailedFuture(
+            new IllegalStateException(
+                "A FileTransport is trying to write CLOSE_EVENT_FUTURE, this is a bug."));
+    private static final Duration FLUSH_INTERVAL = Duration.ofMillis(250);
 
     private final Thread writerThread;
-    @VisibleForTesting OutputStream out;
-    @VisibleForTesting static final Duration FLUSH_INTERVAL = Duration.ofMillis(250);
+    private final OutputStream out;
     private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
-
     private final BuildEventArtifactUploader uploader;
     private final AtomicBoolean isClosed = new AtomicBoolean();
+    private final SettableFuture<Void> closeFuture = SettableFuture.create();
 
     @VisibleForTesting
     final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
         new LinkedBlockingDeque<>();
 
-    private final SettableFuture<Void> closeFuture = SettableFuture.create();
-
     SequentialWriter(
         BufferedOutputStream outputStream,
         Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc,
@@ -115,7 +115,7 @@
       try {
         Instant prevFlush = Instant.now();
         while ((buildEventF = pendingWrites.poll(FLUSH_INTERVAL.toMillis(), TimeUnit.MILLISECONDS))
-            != CLOSE) {
+            != CLOSE_EVENT_FUTURE) {
           if (buildEventF != null) {
             BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
             byte[] serialized = serializeFunc.apply(buildEvent);
@@ -163,13 +163,13 @@
       }
       try {
         pendingWrites.clear();
-        pendingWrites.put(CLOSE);
+        pendingWrites.put(CLOSE_EVENT_FUTURE);
       } catch (InterruptedException e) {
         logger.log(Level.SEVERE, "Failed to immediately close the sequential writer.", e);
       }
     }
 
-    public ListenableFuture<Void> close() {
+    ListenableFuture<Void> close() {
       if (isClosed.getAndSet(true)) {
         return closeFuture;
       } else if (closeFuture.isDone()) {
@@ -186,7 +186,7 @@
           MoreExecutors.directExecutor());
 
       try {
-        pendingWrites.put(CLOSE);
+        pendingWrites.put(CLOSE_EVENT_FUTURE);
       } catch (InterruptedException e) {
         closeNow();
         logger.log(Level.SEVERE, "Failed to close the sequential writer.", e);
@@ -194,11 +194,15 @@
       }
       return closeFuture;
     }
+
+    private Duration getFlushInterval() {
+      return FLUSH_INTERVAL;
+    }
   }
 
   @Override
   public void sendBuildEvent(BuildEvent event) {
-    if (writer.closeFuture.isDone()) {
+    if (writer.isClosed.get()) {
       return;
     }
     if (!writer.pendingWrites.add(asStreamProto(event, namer))) {
@@ -251,4 +255,10 @@
   public BuildEventArtifactUploader getUploader() {
     return uploader;
   }
+
+  /** Determines how often the {@link FileTransport} flushes events. */
+  Duration getFlushInterval() {
+    return writer.getFlushInterval();
+  }
 }
+
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
index 49dbf16..bbb7fd8 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java
@@ -159,7 +159,6 @@
         .hasMessageThat()
         .contains("Unable to write all BEP events to file due to 'Task was cancelled.'");
 
-    assertThat(transport.writer.pendingWrites).isEmpty();
     try (InputStream in = new FileInputStream(output)) {
       assertThat(BuildEventStreamProtos.BuildEvent.parseDelimitedFrom(in)).isNull();
       assertThat(in.available()).isEqualTo(0);
@@ -185,9 +184,7 @@
             new LocalFilesArtifactUploader(),
             artifactGroupNamer);
 
-    // Close the stream.
-    transport.writer.out.close();
-    assertThat(transport.writer.pendingWrites.isEmpty()).isTrue();
+    transport.close().get();
 
     // This should not throw an exception.
     transport.sendBuildEvent(buildEvent);
@@ -224,8 +221,6 @@
     // This should not throw an exception, but also not perform any write.
     transport.sendBuildEvent(buildEvent);
 
-    assertThat(transport.writer.pendingWrites.isEmpty()).isTrue();
-
     // There should have only been one write.
     try (InputStream in = new FileInputStream(output)) {
       assertThat(BuildEventStreamProtos.BuildEvent.parseDelimitedFrom(in)).isEqualTo(started);
@@ -268,8 +263,6 @@
     transport.sendBuildEvent(event2);
     transport.close().get();
 
-    assertThat(transport.writer.pendingWrites.isEmpty()).isTrue();
-
     try (InputStream in = new FileInputStream(output)) {
       assertThat(BuildEventStreamProtos.BuildEvent.parseDelimitedFrom(in))
           .isEqualTo(event1.asStreamProto(null));
@@ -309,7 +302,6 @@
     transport.sendBuildEvent(event1);
     transport.close().get();
 
-    assertThat(transport.writer.pendingWrites).isEmpty();
     try (InputStream in = new FileInputStream(output)) {
       assertThat(BuildEventStreamProtos.BuildEvent.parseDelimitedFrom(in))
           .isEqualTo(event1.asStreamProto(null));
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
index f469f9a..1191d02 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
@@ -102,44 +102,39 @@
 
   /**
    * A thin wrapper around an OutputStream that counts number of bytes written and verifies flushes.
+   *
+   * <p>The methods below need to be syncrhonized because they override methods from {@link
+   * BufferedOutputStream} *not* because there's a concurrent access to the stream.
    */
-  private static final class WrappedOutputStream extends OutputStream {
-    private final OutputStream out;
+  private static final class WrappedOutputStream extends BufferedOutputStream {
     private long byteCount;
     private int flushCount;
 
-    public WrappedOutputStream(OutputStream out) {
+    WrappedOutputStream(OutputStream out) {
+      super(out);
       this.out = out;
     }
 
-    public long getByteCount() {
-      return byteCount;
-    }
-
-    public int getFlushCount() {
-      return flushCount;
-    }
-
     @Override
-    public void write(int b) throws IOException {
+    public synchronized void write(int b) throws IOException {
       out.write(b);
       byteCount++;
     }
 
     @Override
-    public void write(byte[] b) throws IOException {
+    public synchronized void write(byte[] b) throws IOException {
       out.write(b);
       byteCount += b.length;
     }
 
     @Override
-    public void write(byte[] b, int off, int len) throws IOException {
+    public synchronized void write(byte[] b, int off, int len) throws IOException {
       out.write(b, off, len);
       byteCount += len;
     }
 
     @Override
-    public void flush() throws IOException {
+    public synchronized void flush() throws IOException {
       out.flush();
       flushCount++;
     }
@@ -150,31 +145,29 @@
     File output = tmp.newFile();
     BufferedOutputStream outputStream =
         new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath())));
+    WrappedOutputStream wrappedOutputStream = new WrappedOutputStream(outputStream);
 
     BuildEventStreamProtos.BuildEvent started =
         BuildEventStreamProtos.BuildEvent.newBuilder()
             .setStarted(BuildStarted.newBuilder().setCommand("build"))
             .build();
     when(buildEvent.asStreamProto(Matchers.<BuildEventContext>any())).thenReturn(started);
+
     JsonFormatFileTransport transport =
         new JsonFormatFileTransport(
-            outputStream,
-            defaultOpts,
-            new LocalFilesArtifactUploader(),
-            artifactGroupNamer);
-    WrappedOutputStream out = new WrappedOutputStream(transport.writer.out);
-    transport.writer.out = out;
+            wrappedOutputStream, defaultOpts, new LocalFilesArtifactUploader(), artifactGroupNamer);
+
     transport.sendBuildEvent(buildEvent);
-    Thread.sleep(FileTransport.SequentialWriter.FLUSH_INTERVAL.toMillis() * 3);
+    Thread.sleep(transport.getFlushInterval().toMillis() * 3);
 
     // Some users, e.g. Tulsi, use JSON build event output for interactive use and expect the stream
     // to be flushed at regular short intervals.
-    assertThat(out.getFlushCount()).isGreaterThan(0);
+    assertThat(wrappedOutputStream.flushCount).isGreaterThan(0);
 
     // We know that large writes get flushed; test is valuable only if we check small writes,
     // meaning smaller than 8192, the default buffer size used by BufferedOutputStream.
-    assertThat(out.getByteCount()).isLessThan(8192L);
-    assertThat(out.getByteCount()).isGreaterThan(0L);
+    assertThat(wrappedOutputStream.byteCount).isLessThan(8192L);
+    assertThat(wrappedOutputStream.byteCount).isGreaterThan(0L);
 
     transport.close().get();
   }