|  | // Copyright 2014 The Bazel Authors. All rights reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  | package com.google.devtools.build.lib.shell; | 
|  |  | 
|  | import com.google.common.base.Preconditions; | 
|  | import com.google.common.flogger.GoogleLogger; | 
|  | import com.google.common.util.concurrent.Uninterruptibles; | 
|  | import java.io.ByteArrayOutputStream; | 
|  | import java.io.Closeable; | 
|  | import java.io.IOException; | 
|  | import java.io.InputStream; | 
|  | import java.io.OutputStream; | 
|  | import java.util.concurrent.ExecutionException; | 
|  | import java.util.concurrent.ExecutorService; | 
|  | import java.util.concurrent.Executors; | 
|  | import java.util.concurrent.Future; | 
|  | import java.util.concurrent.ThreadFactory; | 
|  | import java.util.concurrent.atomic.AtomicInteger; | 
|  |  | 
|  | /** | 
|  | * This class provides convenience methods for consuming (actively reading) | 
|  | * output and error streams with different consumption policies: | 
|  | * accumulating ({@link #createAccumulatingConsumers()}, | 
|  | * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}). | 
|  | */ | 
|  | final class Consumers { | 
|  | private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); | 
|  |  | 
|  | private Consumers() {} | 
|  |  | 
|  | private static final ExecutorService pool = | 
|  | Executors.newCachedThreadPool(new AccumulatorThreadFactory()); | 
|  |  | 
|  | static OutErrConsumers createAccumulatingConsumers() { | 
|  | return new OutErrConsumers(new AccumulatingConsumer(), new AccumulatingConsumer()); | 
|  | } | 
|  |  | 
|  | static OutErrConsumers createStreamingConsumers(OutputStream out, OutputStream err) { | 
|  | Preconditions.checkNotNull(out); | 
|  | Preconditions.checkNotNull(err); | 
|  | return new OutErrConsumers(new StreamingConsumer(out), new StreamingConsumer(err)); | 
|  | } | 
|  |  | 
|  | static class OutErrConsumers { | 
|  | private final OutputConsumer out; | 
|  | private final OutputConsumer err; | 
|  |  | 
|  | private OutErrConsumers(final OutputConsumer out, final OutputConsumer err) { | 
|  | this.out = out; | 
|  | this.err = err; | 
|  | } | 
|  |  | 
|  | void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams) { | 
|  | out.registerInput(outInput, closeStreams); | 
|  | err.registerInput(errInput, closeStreams); | 
|  | } | 
|  |  | 
|  | void cancel() { | 
|  | out.cancel(); | 
|  | err.cancel(); | 
|  | } | 
|  |  | 
|  | void waitForCompletion() throws IOException { | 
|  | out.waitForCompletion(); | 
|  | err.waitForCompletion(); | 
|  | } | 
|  |  | 
|  | ByteArrayOutputStream getAccumulatedOut(){ | 
|  | return out.getAccumulatedOut(); | 
|  | } | 
|  |  | 
|  | ByteArrayOutputStream getAccumulatedErr() { | 
|  | return err.getAccumulatedOut(); | 
|  | } | 
|  |  | 
|  | void logConsumptionStrategy() { | 
|  | // The creation methods guarantee that the consumption strategy is | 
|  | // the same for out and err - doesn't matter whether we call out or err, | 
|  | // let's pick out. | 
|  | out.logConsumptionStrategy(); | 
|  | } | 
|  |  | 
|  | } | 
|  |  | 
|  | /** | 
|  | * This interface describes just one consumer, which consumes the | 
|  | * InputStream provided by {@link #registerInput(InputStream, boolean)}. | 
|  | * Implementations implement different consumption strategies. | 
|  | */ | 
|  | private static interface OutputConsumer { | 
|  | /** | 
|  | * Returns whatever the consumer accumulated internally, or | 
|  | * {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate | 
|  | * any output. | 
|  | * | 
|  | * @see AccumulatingConsumer | 
|  | */ | 
|  | ByteArrayOutputStream getAccumulatedOut(); | 
|  |  | 
|  | void logConsumptionStrategy(); | 
|  |  | 
|  | void registerInput(InputStream in, boolean closeConsumer); | 
|  |  | 
|  | void cancel(); | 
|  |  | 
|  | void waitForCompletion() throws IOException; | 
|  | } | 
|  |  | 
|  | /** This consumer sends the input to a stream while consuming it. */ | 
|  | private static class StreamingConsumer extends FutureConsumption { | 
|  | private final OutputStream out; | 
|  |  | 
|  | StreamingConsumer(OutputStream out) { | 
|  | this.out = out; | 
|  | } | 
|  |  | 
|  | @Override | 
|  | public ByteArrayOutputStream getAccumulatedOut() { | 
|  | return CommandResult.NO_OUTPUT_COLLECTED; | 
|  | } | 
|  |  | 
|  | @Override | 
|  | public void logConsumptionStrategy() { | 
|  | logger.atFiner().log("Output will be sent to streams provided by client"); | 
|  | } | 
|  |  | 
|  | @Override protected Runnable createConsumingAndClosingSink(InputStream in, | 
|  | boolean closeConsumer) { | 
|  | return new ClosingSink(in, out, closeConsumer); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * This consumer sends the input to a {@link ByteArrayOutputStream} | 
|  | * while consuming it. This accumulated stream can be obtained by | 
|  | * calling {@link #getAccumulatedOut()}. | 
|  | */ | 
|  | private static class AccumulatingConsumer extends FutureConsumption { | 
|  | private final ByteArrayOutputStream out = new ByteArrayOutputStream(); | 
|  |  | 
|  | @Override | 
|  | public ByteArrayOutputStream getAccumulatedOut() { | 
|  | return out; | 
|  | } | 
|  |  | 
|  | @Override | 
|  | public void logConsumptionStrategy() { | 
|  | logger.atFiner().log("Output will be accumulated (promptly read off) and returned"); | 
|  | } | 
|  |  | 
|  | @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) { | 
|  | return new ClosingSink(in, out); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * A mixin that makes consumers active - this is where we kick of | 
|  | * multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions | 
|  | * and wait for the consumers to complete. | 
|  | */ | 
|  | private abstract static class FutureConsumption implements OutputConsumer { | 
|  |  | 
|  | private Future<?> future; | 
|  |  | 
|  | @Override | 
|  | public void registerInput(InputStream in, boolean closeConsumer){ | 
|  | Runnable sink = createConsumingAndClosingSink(in, closeConsumer); | 
|  | future = pool.submit(sink); | 
|  | } | 
|  |  | 
|  | protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close); | 
|  |  | 
|  | @Override | 
|  | public void cancel() { | 
|  | future.cancel(true); | 
|  | } | 
|  |  | 
|  | @Override | 
|  | public void waitForCompletion() throws IOException { | 
|  | try { | 
|  | Uninterruptibles.getUninterruptibly(future); | 
|  | } catch (ExecutionException ee) { | 
|  | // Runnable threw a RuntimeException | 
|  | Throwable nested = ee.getCause(); | 
|  | if (nested instanceof final RuntimeException re) { | 
|  | // The stream sink classes, unfortunately, tunnel IOExceptions | 
|  | // out of run() in a RuntimeException. If that's the case, | 
|  | // unpack and re-throw the IOException. Otherwise, re-throw | 
|  | // this unexpected RuntimeException | 
|  | final Throwable cause = re.getCause(); | 
|  | if (cause instanceof IOException ioException) { | 
|  | throw ioException; | 
|  | } else { | 
|  | throw re; | 
|  | } | 
|  | } else if (nested instanceof OutOfMemoryError) { | 
|  | // OutOfMemoryError does not support exception chaining. | 
|  | throw (OutOfMemoryError) nested; | 
|  | } else if (nested instanceof Error) { | 
|  | throw new Error("unhandled Error in worker thread", ee); | 
|  | } else { | 
|  | throw new RuntimeException("unknown execution problem", ee); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | private static class AccumulatorThreadFactory implements ThreadFactory { | 
|  | private static final AtomicInteger threadInitNumber = new AtomicInteger(0); | 
|  |  | 
|  | @Override | 
|  | public Thread newThread(final Runnable runnable) { | 
|  | final Thread t = | 
|  | new Thread(null, | 
|  | runnable, | 
|  | "Command-Accumulator-Thread-" + threadInitNumber.getAndIncrement()); | 
|  | // Don't let this thread hold up JVM exit | 
|  | t.setDaemon(true); | 
|  | return t; | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * A sink that closes its input stream once its done. | 
|  | */ | 
|  | private static class ClosingSink implements Runnable { | 
|  | private final InputStream in; | 
|  | private final OutputStream out; | 
|  | private final Runnable sink; | 
|  | private final boolean close; | 
|  |  | 
|  | /** | 
|  | * Creates a sink that will pump InputStream <code>in</code> | 
|  | * into OutputStream <code>out</code>. | 
|  | */ | 
|  | ClosingSink(final InputStream in, OutputStream out) { | 
|  | this(in, out, false); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Creates a sink that will read <code>in</code> and discard it. | 
|  | */ | 
|  | ClosingSink(final InputStream in) { | 
|  | this.sink = InputStreamSink.newRunnableSink(in); | 
|  | this.in = in; | 
|  | this.close = false; | 
|  | this.out = null; | 
|  | } | 
|  |  | 
|  | ClosingSink(final InputStream in, OutputStream out, boolean close){ | 
|  | this.sink = InputStreamSink.newRunnableSink(in, out); | 
|  | this.in = in; | 
|  | this.out = out; | 
|  | this.close = close; | 
|  | } | 
|  |  | 
|  |  | 
|  | @Override | 
|  | public void run() { | 
|  | try { | 
|  | sink.run(); | 
|  | } finally { | 
|  | silentClose(in); | 
|  | if (close && out != null) { | 
|  | silentClose(out); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Close the <code>in</code> stream and log a warning if anything happens. | 
|  | */ | 
|  | private static void silentClose(final Closeable closeable) { | 
|  | try { | 
|  | closeable.close(); | 
|  | } catch (IOException ioe) { | 
|  | logger.atWarning().withCause(ioe).log("Unexpected exception while closing input stream"); | 
|  | } | 
|  | } | 
|  |  | 
|  | } |