blob: 3b11c2fd0db4957058f67b6a3f571eabebe57894 [file] [log] [blame]
Damien Martin-Guillerezf88f4d82015-09-25 13:56:55 +00001// Copyright 2014 The Bazel Authors. All rights reserved.
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +01002//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.shell;
15
ulfjackf2d45952017-08-09 15:27:49 +020016import com.google.common.base.Preconditions;
Janak Ramakrishnanb0848312016-08-22 14:09:18 +000017import com.google.common.util.concurrent.Uninterruptibles;
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010018import java.io.ByteArrayOutputStream;
19import java.io.Closeable;
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.OutputStream;
23import java.util.concurrent.ExecutionException;
24import java.util.concurrent.ExecutorService;
25import java.util.concurrent.Executors;
26import java.util.concurrent.Future;
27import java.util.concurrent.ThreadFactory;
Nathan Harmata8e17c852015-08-25 18:15:15 +000028import java.util.concurrent.atomic.AtomicInteger;
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010029import java.util.logging.Level;
30import java.util.logging.Logger;
31
32/**
33 * This class provides convenience methods for consuming (actively reading)
34 * output and error streams with different consumption policies:
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010035 * accumulating ({@link #createAccumulatingConsumers()},
36 * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}).
37 */
ulfjackf2d45952017-08-09 15:27:49 +020038final class Consumers {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010039
lberki97abb522017-09-04 18:51:57 +020040 private static final Logger logger =
41 Logger.getLogger("com.google.devtools.build.lib.shell.Command");
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010042
43 private Consumers() {}
44
45 private static final ExecutorService pool =
46 Executors.newCachedThreadPool(new AccumulatorThreadFactory());
47
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010048 static OutErrConsumers createAccumulatingConsumers() {
ulfjackf2d45952017-08-09 15:27:49 +020049 return new OutErrConsumers(new AccumulatingConsumer(), new AccumulatingConsumer());
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010050 }
51
ulfjackf2d45952017-08-09 15:27:49 +020052 static OutErrConsumers createStreamingConsumers(OutputStream out, OutputStream err) {
53 Preconditions.checkNotNull(out);
54 Preconditions.checkNotNull(err);
55 return new OutErrConsumers(new StreamingConsumer(out), new StreamingConsumer(err));
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010056 }
57
58 static class OutErrConsumers {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010059 private final OutputConsumer out;
60 private final OutputConsumer err;
61
ulfjackf2d45952017-08-09 15:27:49 +020062 private OutErrConsumers(final OutputConsumer out, final OutputConsumer err) {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010063 this.out = out;
64 this.err = err;
65 }
66
ulfjackf2d45952017-08-09 15:27:49 +020067 void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams) {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +010068 out.registerInput(outInput, closeStreams);
69 err.registerInput(errInput, closeStreams);
70 }
71
72 void cancel() {
73 out.cancel();
74 err.cancel();
75 }
76
77 void waitForCompletion() throws IOException {
78 out.waitForCompletion();
79 err.waitForCompletion();
80 }
81
82 ByteArrayOutputStream getAccumulatedOut(){
83 return out.getAccumulatedOut();
84 }
85
86 ByteArrayOutputStream getAccumulatedErr() {
87 return err.getAccumulatedOut();
88 }
89
90 void logConsumptionStrategy() {
91 // The creation methods guarantee that the consumption strategy is
92 // the same for out and err - doesn't matter whether we call out or err,
93 // let's pick out.
94 out.logConsumptionStrategy();
95 }
96
97 }
98
99 /**
100 * This interface describes just one consumer, which consumes the
101 * InputStream provided by {@link #registerInput(InputStream, boolean)}.
102 * Implementations implement different consumption strategies.
103 */
104 private static interface OutputConsumer {
105 /**
106 * Returns whatever the consumer accumulated internally, or
107 * {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate
108 * any output.
109 *
110 * @see AccumulatingConsumer
111 */
112 ByteArrayOutputStream getAccumulatedOut();
113
114 void logConsumptionStrategy();
115
116 void registerInput(InputStream in, boolean closeConsumer);
117
118 void cancel();
119
120 void waitForCompletion() throws IOException;
121 }
122
123 /**
124 * This consumer sends the input to a stream while consuming it.
125 */
Lukacs Berki2b4e2e42016-06-28 13:45:18 +0000126 private static class StreamingConsumer extends FutureConsumption {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100127 private OutputStream out;
128
129 StreamingConsumer(OutputStream out) {
130 this.out = out;
131 }
132
133 @Override
134 public ByteArrayOutputStream getAccumulatedOut() {
135 return CommandResult.NO_OUTPUT_COLLECTED;
136 }
137
138 @Override
139 public void logConsumptionStrategy() {
lberki97abb522017-09-04 18:51:57 +0200140 logger.finer("Output will be sent to streams provided by client");
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100141 }
142
143 @Override protected Runnable createConsumingAndClosingSink(InputStream in,
144 boolean closeConsumer) {
145 return new ClosingSink(in, out, closeConsumer);
146 }
147 }
148
149 /**
150 * This consumer sends the input to a {@link ByteArrayOutputStream}
151 * while consuming it. This accumulated stream can be obtained by
152 * calling {@link #getAccumulatedOut()}.
153 */
Lukacs Berki2b4e2e42016-06-28 13:45:18 +0000154 private static class AccumulatingConsumer extends FutureConsumption {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100155 private ByteArrayOutputStream out = new ByteArrayOutputStream();
156
157 @Override
158 public ByteArrayOutputStream getAccumulatedOut() {
159 return out;
160 }
161
162 @Override
163 public void logConsumptionStrategy() {
lberki97abb522017-09-04 18:51:57 +0200164 logger.finer("Output will be accumulated (promptly read off) and returned");
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100165 }
166
167 @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
168 return new ClosingSink(in, out);
169 }
170 }
171
172 /**
173 * This consumer just discards whatever it reads.
174 */
Lukacs Berki2b4e2e42016-06-28 13:45:18 +0000175 private static class DiscardingConsumer extends FutureConsumption {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100176 private DiscardingConsumer() {
177 }
178
179 @Override
180 public ByteArrayOutputStream getAccumulatedOut() {
181 return CommandResult.NO_OUTPUT_COLLECTED;
182 }
183
184 @Override
185 public void logConsumptionStrategy() {
lberki97abb522017-09-04 18:51:57 +0200186 logger.finer("Output will be ignored");
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100187 }
188
189 @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
190 return new ClosingSink(in);
191 }
192 }
193
194 /**
195 * A mixin that makes consumers active - this is where we kick of
196 * multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions
197 * and wait for the consumers to complete.
198 */
199 private abstract static class FutureConsumption implements OutputConsumer {
200
201 private Future<?> future;
202
203 @Override
204 public void registerInput(InputStream in, boolean closeConsumer){
205 Runnable sink = createConsumingAndClosingSink(in, closeConsumer);
206 future = pool.submit(sink);
207 }
208
209 protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close);
210
211 @Override
212 public void cancel() {
213 future.cancel(true);
214 }
215
216 @Override
217 public void waitForCompletion() throws IOException {
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100218 try {
Janak Ramakrishnanb0848312016-08-22 14:09:18 +0000219 Uninterruptibles.getUninterruptibly(future);
220 } catch (ExecutionException ee) {
221 // Runnable threw a RuntimeException
222 Throwable nested = ee.getCause();
223 if (nested instanceof RuntimeException) {
224 final RuntimeException re = (RuntimeException) nested;
225 // The stream sink classes, unfortunately, tunnel IOExceptions
226 // out of run() in a RuntimeException. If that's the case,
227 // unpack and re-throw the IOException. Otherwise, re-throw
228 // this unexpected RuntimeException
229 final Throwable cause = re.getCause();
230 if (cause instanceof IOException) {
231 throw (IOException) cause;
232 } else {
233 throw re;
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100234 }
Janak Ramakrishnanb0848312016-08-22 14:09:18 +0000235 } else if (nested instanceof OutOfMemoryError) {
236 // OutOfMemoryError does not support exception chaining.
237 throw (OutOfMemoryError) nested;
238 } else if (nested instanceof Error) {
239 throw new Error("unhandled Error in worker thread", ee);
240 } else {
241 throw new RuntimeException("unknown execution problem", ee);
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100242 }
243 }
244 }
245 }
246
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100247 private static class AccumulatorThreadFactory implements ThreadFactory {
248
Nathan Harmata8e17c852015-08-25 18:15:15 +0000249 private static AtomicInteger threadInitNumber = new AtomicInteger(0);
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100250
251 @Override
252 public Thread newThread(final Runnable runnable) {
253 final Thread t =
254 new Thread(null,
255 runnable,
buchgrd215b642018-08-09 01:18:53 -0700256 "Command-Accumulator-Thread-" + threadInitNumber.getAndIncrement());
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100257 // Don't let this thread hold up JVM exit
258 t.setDaemon(true);
259 return t;
260 }
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100261 }
262
263 /**
264 * A sink that closes its input stream once its done.
265 */
266 private static class ClosingSink implements Runnable {
267
268 private final InputStream in;
269 private final OutputStream out;
270 private final Runnable sink;
271 private final boolean close;
272
273 /**
274 * Creates a sink that will pump InputStream <code>in</code>
275 * into OutputStream <code>out</code>.
276 */
277 ClosingSink(final InputStream in, OutputStream out) {
278 this(in, out, false);
279 }
280
281 /**
282 * Creates a sink that will read <code>in</code> and discard it.
283 */
284 ClosingSink(final InputStream in) {
285 this.sink = InputStreamSink.newRunnableSink(in);
286 this.in = in;
287 this.close = false;
288 this.out = null;
289 }
290
291 ClosingSink(final InputStream in, OutputStream out, boolean close){
292 this.sink = InputStreamSink.newRunnableSink(in, out);
293 this.in = in;
294 this.out = out;
295 this.close = close;
296 }
297
298
299 @Override
300 public void run() {
301 try {
302 sink.run();
303 } finally {
304 silentClose(in);
305 if (close && out != null) {
306 silentClose(out);
307 }
308 }
309 }
310
311 }
312
313 /**
314 * Close the <code>in</code> stream and log a warning if anything happens.
315 */
316 private static void silentClose(final Closeable closeable) {
317 try {
318 closeable.close();
319 } catch (IOException ioe) {
320 String message = "Unexpected exception while closing input stream";
lberki97abb522017-09-04 18:51:57 +0200321 logger.log(Level.WARNING, message, ioe);
Han-Wen Nienhuysd08b27f2015-02-25 16:45:20 +0100322 }
323 }
324
325}