blob: 6f7562c247f67d4863e9daefc1513ff7bbc2e3a7 [file] [log] [blame]
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.runtime;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.io.OutputStream;
/**
* {@link OutputStream} suitably synchronized for producer-consumer use cases.
* The method {@link #readAndReset()} allows to read the bytes accumulated so far
* and simultaneously truncate precisely the bytes read. Moreover, upon such a reset
* the amount of memory retained is reset to a small constant. This is a difference
* with resecpt to the behaviour of the standard classes {@link ByteArrayOutputStream}
* which only resets the index but keeps the array. This difference matters, as we need
* to support output peeks without retaining this ammount of memory for the rest of the
* build.
*
* <p>This class is expected to be used with the {@link BuildEventStreamer}.
*/
public class SynchronizedOutputStream extends OutputStream {
// The maximal amount of bytes we intend to store in the buffer. However,
// the requirement that a single write be written in one go is more important,
// so the actual size we store in this buffer can be the maximum (not the sum)
// of this value and the amount of bytes written in a single call to the
// {@link write(byte[] buffer, int offset, int count)} method.
private final long maxBufferedLength;
private byte[] buf;
private long count;
private boolean discardAll;
// The event streamer that is supposed to flush stdout/stderr.
private BuildEventStreamer streamer;
public SynchronizedOutputStream(long maxBufferedLength) {
buf = new byte[64];
count = 0;
discardAll = false;
this.maxBufferedLength = maxBufferedLength;
}
public void registerStreamer(BuildEventStreamer streamer) {
this.streamer = streamer;
}
public synchronized void setDiscardAll() {
discardAll = true;
count = 0;
buf = null;
}
/**
* Read the contents of the stream and simultaneously clear them. Also, reset the amount of
* memory retained to a constant amount.
*/
public synchronized String readAndReset() {
String content = new String(buf, 0, (int) count, UTF_8);
buf = new byte[64];
count = 0;
return content;
}
@Override
public void write(int oneByte) throws IOException {
if (discardAll) {
return;
}
// We change the dependency with respect to that of the super class: write(int)
// now calls write(int[], int, int) which is implemented without any dependencies.
write(new byte[] {(byte) oneByte}, 0, 1);
}
@Override
public void write(byte[] buffer, int offset, int count) throws IOException {
// As we base the less common write(int) on this method, we may not depend not call write(int)
// directly or indirectly (e.g., by calling super.write(int[], int, int)).
synchronized (this) {
if (discardAll) {
return;
}
}
boolean shouldFlush = false;
// As we have to do the flushing outside the synchronized block, we have to expect
// other writes to come immediately after flushing, so we have to do the check inside
// a while loop.
boolean didWrite = false;
while (!didWrite) {
synchronized (this) {
if (this.count + (long) count < maxBufferedLength || this.count == 0) {
if (this.count + (long) count >= (long) buf.length) {
// We need to increase the buffer; if within the permissible range range for array
// sizes, we at least double it, otherwise we only increase as far as needed.
long newsize;
if (2 * (long) buf.length + count < (long) Integer.MAX_VALUE) {
newsize = 2 * (long) buf.length + count;
} else {
newsize = this.count + count;
}
byte[] newbuf = new byte[(int) newsize];
System.arraycopy(buf, 0, newbuf, 0, (int) this.count);
this.buf = newbuf;
}
System.arraycopy(buffer, offset, buf, (int) this.count, count);
this.count += (long) count;
didWrite = true;
} else {
shouldFlush = true;
}
if (this.count >= maxBufferedLength) {
shouldFlush = true;
}
}
if (shouldFlush && streamer != null) {
streamer.flush();
shouldFlush = false;
}
}
}
}