Adding async proto or text logging utility class.

WANT_LGTM=buchgr
TESTED=unit tests, 500 runs per test
RELNOTES: None
PiperOrigin-RevId: 188093043
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index 814716d..5dcca6b 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -118,6 +118,7 @@
         "//src/main/java/com/google/devtools/build/lib/profiler",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//third_party:guava",
+        "//third_party/protobuf:protobuf_java",
     ],
 )
 
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java
new file mode 100644
index 0000000..70284d0
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java
@@ -0,0 +1,236 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.concurrent.ThreadSafety;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An output stream supporting anynchronous writes, backed by a file.
+ *
+ * <p>We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It gets
+ * tricky when it comes to {@link #closeAsync()}, as we may only complete the returned future when
+ * all writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites}
+ * to keep track of the number of writes that have not completed yet. It's incremented before a new
+ * write and decremented after a write has completed. When it's {@code 0} it's safe to complete the
+ * close future.
+ */
+@ThreadSafety.ThreadSafe
+public class AsynchronousFileOutputStream extends OutputStream {
+  private final AsynchronousFileChannel ch;
+  private final WriteCompletionHandler completionHandler = new WriteCompletionHandler();
+  // The offset in the file to begin the next write at.
+  private long writeOffset;
+  // Number of writes that haven't completed yet.
+  private long outstandingWrites;
+  // The future returned by closeAsync().
+  private SettableFuture<Void> closeFuture;
+  // To store any exception raised from the writes.
+  private final AtomicReference<Throwable> exception = new AtomicReference<>();
+
+  public AsynchronousFileOutputStream(String filename) throws IOException {
+    this(
+        AsynchronousFileChannel.open(
+            Paths.get(filename),
+            StandardOpenOption.WRITE,
+            StandardOpenOption.CREATE,
+            StandardOpenOption.TRUNCATE_EXISTING));
+  }
+
+  @VisibleForTesting
+  public AsynchronousFileOutputStream(AsynchronousFileChannel ch) throws IOException {
+    this.ch = ch;
+  }
+
+  public void write(String message) {
+    write(message.getBytes(UTF_8));
+  }
+
+  /**
+   * Writes a delimited protocol buffer message in the same format as {@link
+   * MessageLite#writeDelimitedTo(java.io.OutputStream)}.
+   *
+   * <p>Unfortunately, {@link MessageLite#writeDelimitedTo(java.io.OutputStream)} may result in
+   * multiple calls to write on the underlying stream, so we have to provide this method here
+   * instead of the caller using it directly.
+   */
+  public void write(Message m) {
+    Preconditions.checkNotNull(m);
+    final int size = m.getSerializedSize();
+    ByteArrayOutputStream bos =
+        new ByteArrayOutputStream(CodedOutputStream.computeRawVarint32Size(size) + size);
+    try {
+      m.writeDelimitedTo(bos);
+    } catch (IOException e) {
+      // This should never happen with an in-memory stream.
+      exception.compareAndSet(null, new IllegalStateException(e.toString()));
+      return;
+    }
+    write(bos.toByteArray());
+  }
+
+  @Override
+  public void write(int b) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Writes the byte buffer into the file asynchronously.
+   *
+   * <p>The writes are guaranteed to land in the output file in the same order that they were
+   * called; However, some writes may fail, leaving the file partially corrupted. In case a write
+   * fails, an exception will be propagated in close, but remaining writes will be allowed to
+   * continue.
+   */
+  @Override
+  public synchronized void write(byte[] data) {
+    Preconditions.checkNotNull(data);
+    Preconditions.checkState(ch.isOpen());
+
+    if (closeFuture != null) {
+      throw new IllegalStateException("Attempting to write to stream after close");
+    }
+
+    outstandingWrites++;
+    ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler);
+    writeOffset += data.length;
+  }
+
+  /* Returns whether the stream is open for writing. */
+  public boolean isOpen() {
+    return ch.isOpen();
+  }
+
+  /**
+   * Closes the stream without waiting until pending writes are committed, and supressing errors.
+   *
+   * <p>Pending writes will still continue asynchronously, but any errors will be ignored.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void closeNow() {
+    closeAsync();
+  }
+
+  /**
+   * Closes the stream and blocks until all pending writes are completed.
+   *
+   * Throws an exception if any of the writes or the close itself have failed.
+   */
+  @Override
+  public void close() throws IOException {
+    try {
+      closeAsync().get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Write interrupted");
+    } catch (ExecutionException e) {
+      Throwable c = e.getCause();
+      Throwables.throwIfUnchecked(c);
+      if (c instanceof IOException) {
+        throw (IOException) c;
+      }
+      throw new IOException("Exception within stream close: " + c);
+    }
+  }
+
+  /**
+   * Flushes the currently ongoing writes into the channel.
+   *
+   * Throws an exception if any of the writes or the close itself have failed.
+   */
+  @Override
+  public void flush() throws IOException {
+    ch.force(true);
+  }
+
+  /**
+   * Closes the channel, if close was invoked and there are no outstanding writes. Should only be
+   * called in a synchronized context.
+   */
+  private void closeIfNeeded() {
+    if (closeFuture == null || outstandingWrites > 0) {
+      return;
+    }
+    try {
+      flush();
+      ch.close();
+    } catch (Exception e) {
+      exception.compareAndSet(null, e);
+    } finally {
+      Throwable e = exception.get();
+      if (e == null) {
+        closeFuture.set(null);
+      } else {
+        closeFuture.setException(e);
+      }
+    }
+  }
+
+  /**
+   * Returns a future that will close the stream when all pending writes are completed.
+   *
+   * Any failed writes will propagate an exception.
+   */
+  public synchronized ListenableFuture<Void> closeAsync() {
+    if (closeFuture != null) {
+      return closeFuture;
+    }
+    closeFuture = SettableFuture.create();
+    closeIfNeeded();
+    return closeFuture;
+  }
+
+  /**
+   * Handler that's notified when a write completes.
+   */
+  private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> {
+
+    @Override
+    public void completed(Integer result, Void attachment) {
+      countWritesAndTryClose();
+    }
+
+    @Override
+    public void failed(Throwable e, Void attachment) {
+      exception.compareAndSet(null, e);
+      countWritesAndTryClose();
+    }
+
+    private void countWritesAndTryClose() {
+      synchronized (AsynchronousFileOutputStream.this) {
+        Preconditions.checkState(outstandingWrites > 0);
+        outstandingWrites--;
+        closeIfNeeded();
+      }
+    }
+  }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index c7d819a..99d4335 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -314,6 +314,8 @@
         ":testutil",
         "//src/main/java/com/google/devtools/build/lib:io",
         "//src/main/java/com/google/devtools/build/lib:util",
+        "//src/main/protobuf:bazel_flags_java_proto",
+        "//third_party:mockito",
     ],
 )
 
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
new file mode 100644
index 0000000..39c4d6b
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
@@ -0,0 +1,367 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.runtime.commands.proto.BazelFlagsProto.FlagInfo;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests {@link AsynchronousFileOutputStream}. */
+@RunWith(JUnit4.class)
+public class AsynchronousFileOutputStreamTest {
+  @Rule public TemporaryFolder tmp = new TemporaryFolder();
+  @Mock AsynchronousFileChannel mockChannel;
+  Random random = ThreadLocalRandom.current();
+  static final char[] RAND_CHARS = "abcdefghijklmnopqrstuvwxzy0123456789-".toCharArray();
+  static final int RAND_STRING_LENGTH = 10;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @After
+  public void validateMocks() {
+    Mockito.validateMockitoUsage();
+  }
+
+  private FlagInfo generateRandomMessage() {
+    FlagInfo.Builder b = FlagInfo.newBuilder();
+    b.setName(generateRandomString() + "a");  // Name is required, cannot be empty.
+    b.setHasNegativeFlag(random.nextBoolean());
+    b.setDocumentation(generateRandomString());
+    int commandsSize = random.nextInt(5);
+    for (int i = 0; i < commandsSize; ++i) {
+      b.addCommands(generateRandomString());
+    }
+    return b.build();
+  }
+
+  private String generateRandomString() {
+    int len = random.nextInt(RAND_STRING_LENGTH + 1);
+    char[] data = new char[len];
+    for (int i = 0; i < len; ++i) {
+      data[i] = RAND_CHARS[random.nextInt(RAND_CHARS.length)];
+    }
+    return new String(data);
+  }
+
+  @Test
+  public void testConcurrentWrites() throws Exception {
+    Path logPath = tmp.newFile().toPath();
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(logPath.toString());
+    Thread[] writers = new Thread[10];
+    final CountDownLatch start = new CountDownLatch(writers.length);
+    for (int i = 0; i < writers.length; ++i) {
+      String name = "Thread # " + i;
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            start.countDown();
+            start.await();
+          } catch (InterruptedException e) {
+            return;
+          }
+          for (int j = 0; j < 10; ++j) {
+            out.write(name + " time # " + j + "\n");
+          }
+        }
+      };
+      writers[i] = thread;
+      thread.start();
+    }
+    for (int i = 0; i < writers.length; ++i) {
+      writers[i].join();
+    }
+    out.close();
+    String contents =
+        new String(ByteStreams.toByteArray(Files.newInputStream(logPath)), StandardCharsets.UTF_8);
+    for (int i = 0; i < writers.length; ++i) {
+      for (int j = 0; j < 10; ++j) {
+        assertThat(contents).contains("Thread # " + i + " time # " + j + "\n");
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentProtoWrites() throws Exception {
+    Path logPath = tmp.newFile().toPath();
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(logPath.toString());
+    ArrayList<FlagInfo> messages = new ArrayList<>();
+    for (int i = 0; i < 100; ++i) {
+      messages.add(generateRandomMessage());
+    }
+    Thread[] writers = new Thread[messages.size() / 10];
+    final CountDownLatch start = new CountDownLatch(writers.length);
+    for (int i = 0; i < writers.length; ++i) {
+      int startIndex = i * 10;
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            start.countDown();
+            start.await();
+          } catch (InterruptedException e) {
+            return;
+          }
+          for (int j = startIndex; j < startIndex + 10; ++j) {
+            out.write(messages.get(j));
+          }
+        }
+      };
+      writers[i] = thread;
+      thread.start();
+    }
+    for (int i = 0; i < writers.length; ++i) {
+      writers[i].join();
+    }
+    out.close();
+    ArrayList<FlagInfo> readMessages = new ArrayList<>();
+    try (InputStream in = Files.newInputStream(logPath)) {
+      for (int i = 0; i < messages.size(); ++i) {
+        readMessages.add(FlagInfo.parseDelimitedFrom(in));
+      }
+    }
+    assertThat(readMessages).containsExactlyElementsIn(messages);
+  }
+
+  @Test
+  public void testFailedClosePropagatesIOException() throws Exception {
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+    when(mockChannel.isOpen()).thenReturn(true);
+    IOException ex = new IOException("foo");
+    Mockito.doThrow(ex).when(mockChannel).close();
+    Mockito.doAnswer(
+            invocationOnMock -> {
+              @SuppressWarnings("unchecked")
+              CompletionHandler<Integer, Void> handler =
+                  (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+              handler.completed(0, null); // We ignore the arguments.
+              return null;
+            })
+        .when(mockChannel)
+        .write(
+            any(ByteBuffer.class),
+            any(Integer.class),
+            eq(null),
+            Mockito.<CompletionHandler<Integer, Void>>anyObject());
+    out.write("bla");
+
+    try {
+      out.close();
+      fail("Expected an IOException");
+    } catch (IOException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("foo");
+    }
+  }
+
+  @Test
+  public void testFailedClosePropagatesUncheckedException() throws Exception {
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+    when(mockChannel.isOpen()).thenReturn(true);
+    RuntimeException ex = new RuntimeException("foo");
+    Mockito.doThrow(ex).when(mockChannel).close();
+    Mockito.doAnswer(
+            invocationOnMock -> {
+              @SuppressWarnings("unchecked")
+              CompletionHandler<Integer, Void> handler =
+                  (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+              handler.completed(0, null); // We ignore the arguments.
+              return null;
+            })
+        .when(mockChannel)
+        .write(
+            any(ByteBuffer.class),
+            any(Integer.class),
+            eq(null),
+            Mockito.<CompletionHandler<Integer, Void>>anyObject());
+    out.write("bla");
+
+    try {
+      out.close();
+      fail("Expected a RuntimeException");
+    } catch (RuntimeException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("foo");
+    }
+  }
+
+  @Test
+  public void testFailedForcePropagatesIOException() throws Exception {
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+    when(mockChannel.isOpen()).thenReturn(true);
+    IOException ex = new IOException("foo");
+    Mockito.doThrow(ex).when(mockChannel).force(eq(true));
+    Mockito.doAnswer(
+            invocationOnMock -> {
+              @SuppressWarnings("unchecked")
+              CompletionHandler<Integer, Void> handler =
+                  (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+              handler.completed(0, null); // We ignore the arguments.
+              return null;
+            })
+        .when(mockChannel)
+        .write(
+            any(ByteBuffer.class),
+            any(Integer.class),
+            eq(null),
+            Mockito.<CompletionHandler<Integer, Void>>anyObject());
+    out.write("bla");
+
+    try {
+      out.close();
+      fail("Expected an IOException");
+    } catch (IOException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("foo");
+    }
+  }
+
+  @Test
+  public void testFailedForcePropagatesUncheckedException() throws Exception {
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+    when(mockChannel.isOpen()).thenReturn(true);
+    RuntimeException ex = new RuntimeException("foo");
+    Mockito.doThrow(ex).when(mockChannel).force(eq(true));
+    Mockito.doAnswer(
+            invocationOnMock -> {
+              @SuppressWarnings("unchecked")
+              CompletionHandler<Integer, Void> handler =
+                  (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+              handler.completed(0, null); // We ignore the arguments.
+              return null;
+            })
+        .when(mockChannel)
+        .write(
+            any(ByteBuffer.class),
+            any(Integer.class),
+            eq(null),
+            Mockito.<CompletionHandler<Integer, Void>>anyObject());
+    out.write("bla");
+
+    try {
+      out.close();
+      fail("Expected a RuntimeException");
+    } catch (RuntimeException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("foo");
+    }
+  }
+
+  @Test
+  public void testFailedWritePropagatesIOException() throws Exception {
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+    when(mockChannel.isOpen()).thenReturn(true);
+    IOException ex = new IOException("foo");
+    Mockito.doAnswer(
+            invocationOnMock -> {
+              @SuppressWarnings("unchecked")
+              CompletionHandler<Integer, Void> handler =
+                  (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+              handler.failed(ex, null);
+              return null;
+            })
+        .when(mockChannel)
+        .write(
+            any(ByteBuffer.class),
+            any(Integer.class),
+            eq(null),
+            Mockito.<CompletionHandler<Integer, Void>>anyObject());
+    out.write("bla");
+    out.write("blo");
+
+    try {
+      out.close();
+      fail("Expected an IOException");
+    } catch (IOException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("foo");
+    }
+  }
+
+  @Test
+  public void testFailedWritePropagatesUncheckedException() throws Exception {
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+    when(mockChannel.isOpen()).thenReturn(true);
+    RuntimeException ex = new RuntimeException("foo");
+    Mockito.doAnswer(
+            invocationOnMock -> {
+              @SuppressWarnings("unchecked")
+              CompletionHandler<Integer, Void> handler =
+                  (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+              handler.failed(ex, null);
+              return null;
+            })
+        .when(mockChannel)
+        .write(
+            any(ByteBuffer.class),
+            any(Integer.class),
+            eq(null),
+            Mockito.<CompletionHandler<Integer, Void>>anyObject());
+    out.write("bla");
+    out.write("blo");
+
+    try {
+      out.close();
+      fail("Expected a RuntimeException");
+    } catch (RuntimeException expected) {
+      assertThat(expected).hasMessageThat().isEqualTo("foo");
+    }
+  }
+
+  @Test
+  public void testWriteAfterCloseThrowsException() throws Exception {
+    Path logPath = tmp.newFile().toPath();
+    AsynchronousFileChannel ch = AsynchronousFileChannel.open(
+            logPath,
+            StandardOpenOption.WRITE,
+            StandardOpenOption.CREATE,
+            StandardOpenOption.TRUNCATE_EXISTING);
+    AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(ch);
+    out.write("bla");
+    ch.close();
+
+    try {
+      out.write("blo");
+      fail("Expected an IllegalStateException");
+    } catch (IllegalStateException expected) {
+      // Expected.
+    }
+  }
+}