blob: 7620ce64ef11209257e650f49b49fee1026b50c5 [file] [log] [blame]
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util.io;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
/**
* An output stream supporting asynchronous writes of length-delimited protocol buffer messages,
* backed by a file.
*/
@ThreadSafety.ThreadSafe
public class AsynchronousMessageOutputStream<T extends Message> implements MessageOutputStream<T> {
private static final byte[] POISON_PILL = new byte[1];
private final Thread writerThread;
// Maybe we should use an ArrayBlockingQueue instead, and accept that write may block if the
// buffer is full?
private final BlockingQueue<byte[]> queue = new LinkedBlockingDeque<>();
// The future returned by closeAsync().
private final SettableFuture<Void> closeFuture = SettableFuture.create();
// To store any exception raised from the writes.
private final AtomicReference<Throwable> exception = new AtomicReference<>();
public AsynchronousMessageOutputStream(Path path) throws IOException {
this(
path.toString(),
new BufferedOutputStream( // Use a buffer of 100 kByte, scientifically chosen at random.
path.getOutputStream(), 100000));
}
public AsynchronousMessageOutputStream(String name, OutputStream out) {
writerThread =
new Thread(
() -> {
try {
byte[] data;
while ((data = queue.take()) != POISON_PILL) {
out.write(data);
}
} catch (InterruptedException e) {
// Exit quietly.
} catch (Exception e) {
exception.set(e);
closeFuture.setException(e);
} finally {
try {
out.close();
closeFuture.set(null);
} catch (Exception e) {
closeFuture.setException(e);
}
}
},
"async-file-writer:" + name);
writerThread.start();
}
/**
* Writes a protocol buffer message in the same format as {@link
* MessageLite#writeDelimitedTo(java.io.OutputStream)}.
*
* <p>The writes are guaranteed to land in the output file in the same order that they were
* called; However, some writes may fail, leaving the file partially corrupted. In case a write
* fails, an exception will be propagated in close, but remaining writes will be allowed to
* continue.
*/
@Override
public void write(T m) {
Preconditions.checkNotNull(m);
if (closeFuture.isDone()) {
if (exception.get() != null) {
// There was a previous write failure. Silently return without doing anything.
return;
} else {
// Attempted to write after closing.
throw new IllegalStateException();
}
}
final int size = m.getSerializedSize();
ByteArrayOutputStream bos =
new ByteArrayOutputStream(CodedOutputStream.computeUInt32SizeNoTag(size) + size);
try {
m.writeDelimitedTo(bos);
} catch (IOException e) {
// This should never happen with an in-memory stream.
exception.compareAndSet(null, new IllegalStateException(e.toString()));
return;
}
Uninterruptibles.putUninterruptibly(queue, bos.toByteArray());
}
/**
* Closes the stream and blocks until all pending writes are completed.
*
* Throws an exception if any of the writes or the close itself have failed.
*/
@Override
public void close() throws IOException {
try {
closeAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
Throwables.throwIfInstanceOf(e.getCause(), IOException.class);
Throwables.throwIfInstanceOf(e.getCause(), RuntimeException.class);
throw new RuntimeException(e.getCause());
}
}
/**
* Returns a future that will close the stream when all pending writes are completed.
*
* Any failed writes will propagate an exception.
*/
public ListenableFuture<Void> closeAsync() {
Uninterruptibles.putUninterruptibly(queue, POISON_PILL);
return closeFuture;
}
}