buchgr | 2dbf36e | 2017-05-30 10:51:43 +0200 | [diff] [blame] | 1 | // Copyright 2017 The Bazel Authors. All rights reserved. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package com.google.devtools.build.lib.runtime; |
| 16 | |
| 17 | import static java.nio.charset.StandardCharsets.UTF_8; |
| 18 | |
| 19 | import java.io.IOException; |
| 20 | import java.io.OutputStream; |
| 21 | |
| 22 | /** |
| 23 | * {@link OutputStream} suitably synchronized for producer-consumer use cases. |
| 24 | * The method {@link #readAndReset()} allows to read the bytes accumulated so far |
| 25 | * and simultaneously truncate precisely the bytes read. Moreover, upon such a reset |
| 26 | * the amount of memory retained is reset to a small constant. This is a difference |
| 27 | * with resecpt to the behaviour of the standard classes {@link ByteArrayOutputStream} |
| 28 | * which only resets the index but keeps the array. This difference matters, as we need |
| 29 | * to support output peeks without retaining this ammount of memory for the rest of the |
| 30 | * build. |
| 31 | * |
| 32 | * <p>This class is expected to be used with the {@link BuildEventStreamer}. |
| 33 | */ |
| 34 | public class SynchronizedOutputStream extends OutputStream { |
| 35 | |
| 36 | // The maximal amount of bytes we intend to store in the buffer. However, |
| 37 | // the requirement that a single write be written in one go is more important, |
| 38 | // so the actual size we store in this buffer can be the maximum (not the sum) |
| 39 | // of this value and the amount of bytes written in a single call to the |
| 40 | // {@link write(byte[] buffer, int offset, int count)} method. |
| 41 | private static final long MAX_BUFFERED_LENGTH = 10 * 1024; |
| 42 | |
| 43 | private byte[] buf; |
| 44 | private long count; |
| 45 | private boolean discardAll; |
| 46 | |
| 47 | // The event streamer that is supposed to flush stdout/stderr. |
| 48 | private BuildEventStreamer streamer; |
| 49 | |
| 50 | public SynchronizedOutputStream() { |
| 51 | buf = new byte[64]; |
| 52 | count = 0; |
| 53 | discardAll = false; |
| 54 | } |
| 55 | |
| 56 | public void registerStreamer(BuildEventStreamer streamer) { |
| 57 | this.streamer = streamer; |
| 58 | } |
| 59 | |
| 60 | public synchronized void setDiscardAll() { |
| 61 | discardAll = true; |
| 62 | count = 0; |
| 63 | buf = null; |
| 64 | } |
| 65 | |
| 66 | /** |
| 67 | * Read the contents of the stream and simultaneously clear them. Also, reset the amount of |
| 68 | * memory retained to a constant amount. |
| 69 | */ |
| 70 | public synchronized String readAndReset() { |
| 71 | String content = new String(buf, 0, (int) count, UTF_8); |
| 72 | buf = new byte[64]; |
| 73 | count = 0; |
| 74 | return content; |
| 75 | } |
| 76 | |
| 77 | @Override |
| 78 | public void write(int oneByte) throws IOException { |
| 79 | if (discardAll) { |
| 80 | return; |
| 81 | } |
| 82 | // We change the dependency with respect to that of the super class: write(int) |
| 83 | // now calls write(int[], int, int) which is implemented without any dependencies. |
| 84 | write(new byte[] {(byte) oneByte}, 0, 1); |
| 85 | } |
| 86 | |
| 87 | @Override |
| 88 | public void write(byte[] buffer, int offset, int count) throws IOException { |
| 89 | // As we base the less common write(int) on this method, we may not depend not call write(int) |
| 90 | // directly or indirectly (e.g., by calling super.write(int[], int, int)). |
| 91 | synchronized (this) { |
| 92 | if (discardAll) { |
| 93 | return; |
| 94 | } |
| 95 | } |
| 96 | boolean shouldFlush = false; |
| 97 | // As we have to do the flushing outside the synchronized block, we have to expect |
| 98 | // other writes to come immediately after flushing, so we have to do the check inside |
| 99 | // a while loop. |
| 100 | boolean didWrite = false; |
| 101 | while (!didWrite) { |
| 102 | synchronized (this) { |
| 103 | if (this.count + (long) count < MAX_BUFFERED_LENGTH || this.count == 0) { |
| 104 | if (this.count + (long) count >= (long) buf.length) { |
| 105 | // We need to increase the buffer; if within the permissible range range for array |
| 106 | // sizes, we at least double it, otherwise we only increase as far as needed. |
| 107 | long newsize; |
| 108 | if (2 * (long) buf.length + count < (long) Integer.MAX_VALUE) { |
| 109 | newsize = 2 * (long) buf.length + count; |
| 110 | } else { |
| 111 | newsize = this.count + count; |
| 112 | } |
| 113 | byte[] newbuf = new byte[(int) newsize]; |
| 114 | System.arraycopy(buf, 0, newbuf, 0, (int) this.count); |
| 115 | this.buf = newbuf; |
| 116 | } |
| 117 | System.arraycopy(buffer, offset, buf, (int) this.count, count); |
| 118 | this.count += (long) count; |
| 119 | didWrite = true; |
| 120 | } else { |
| 121 | shouldFlush = true; |
| 122 | } |
| 123 | if (this.count >= MAX_BUFFERED_LENGTH) { |
| 124 | shouldFlush = true; |
| 125 | } |
| 126 | } |
| 127 | if (shouldFlush && streamer != null) { |
| 128 | streamer.flush(); |
| 129 | shouldFlush = false; |
| 130 | } |
| 131 | } |
| 132 | } |
| 133 | } |