blob: cefd4c6d08be71ecdffc19999832f3729c9c235d [file] [log] [blame]
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.shell;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class provides convenience methods for consuming (actively reading)
* output and error streams with different consumption policies:
* accumulating ({@link #createAccumulatingConsumers()},
* and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}).
*/
final class Consumers {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private Consumers() {}
private static final ExecutorService pool =
Executors.newCachedThreadPool(new AccumulatorThreadFactory());
static OutErrConsumers createAccumulatingConsumers() {
return new OutErrConsumers(new AccumulatingConsumer(), new AccumulatingConsumer());
}
static OutErrConsumers createStreamingConsumers(OutputStream out, OutputStream err) {
Preconditions.checkNotNull(out);
Preconditions.checkNotNull(err);
return new OutErrConsumers(new StreamingConsumer(out), new StreamingConsumer(err));
}
static class OutErrConsumers {
private final OutputConsumer out;
private final OutputConsumer err;
private OutErrConsumers(final OutputConsumer out, final OutputConsumer err) {
this.out = out;
this.err = err;
}
void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams) {
out.registerInput(outInput, closeStreams);
err.registerInput(errInput, closeStreams);
}
void cancel() {
out.cancel();
err.cancel();
}
void waitForCompletion() throws IOException {
out.waitForCompletion();
err.waitForCompletion();
}
ByteArrayOutputStream getAccumulatedOut(){
return out.getAccumulatedOut();
}
ByteArrayOutputStream getAccumulatedErr() {
return err.getAccumulatedOut();
}
void logConsumptionStrategy() {
// The creation methods guarantee that the consumption strategy is
// the same for out and err - doesn't matter whether we call out or err,
// let's pick out.
out.logConsumptionStrategy();
}
}
/**
* This interface describes just one consumer, which consumes the
* InputStream provided by {@link #registerInput(InputStream, boolean)}.
* Implementations implement different consumption strategies.
*/
private static interface OutputConsumer {
/**
* Returns whatever the consumer accumulated internally, or
* {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate
* any output.
*
* @see AccumulatingConsumer
*/
ByteArrayOutputStream getAccumulatedOut();
void logConsumptionStrategy();
void registerInput(InputStream in, boolean closeConsumer);
void cancel();
void waitForCompletion() throws IOException;
}
/** This consumer sends the input to a stream while consuming it. */
private static class StreamingConsumer extends FutureConsumption {
private final OutputStream out;
StreamingConsumer(OutputStream out) {
this.out = out;
}
@Override
public ByteArrayOutputStream getAccumulatedOut() {
return CommandResult.NO_OUTPUT_COLLECTED;
}
@Override
public void logConsumptionStrategy() {
logger.atFiner().log("Output will be sent to streams provided by client");
}
@Override protected Runnable createConsumingAndClosingSink(InputStream in,
boolean closeConsumer) {
return new ClosingSink(in, out, closeConsumer);
}
}
/**
* This consumer sends the input to a {@link ByteArrayOutputStream}
* while consuming it. This accumulated stream can be obtained by
* calling {@link #getAccumulatedOut()}.
*/
private static class AccumulatingConsumer extends FutureConsumption {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
@Override
public ByteArrayOutputStream getAccumulatedOut() {
return out;
}
@Override
public void logConsumptionStrategy() {
logger.atFiner().log("Output will be accumulated (promptly read off) and returned");
}
@Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
return new ClosingSink(in, out);
}
}
/**
* A mixin that makes consumers active - this is where we kick of
* multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions
* and wait for the consumers to complete.
*/
private abstract static class FutureConsumption implements OutputConsumer {
private Future<?> future;
@Override
public void registerInput(InputStream in, boolean closeConsumer){
Runnable sink = createConsumingAndClosingSink(in, closeConsumer);
future = pool.submit(sink);
}
protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close);
@Override
public void cancel() {
future.cancel(true);
}
@Override
public void waitForCompletion() throws IOException {
try {
Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException ee) {
// Runnable threw a RuntimeException
Throwable nested = ee.getCause();
if (nested instanceof RuntimeException) {
final RuntimeException re = (RuntimeException) nested;
// The stream sink classes, unfortunately, tunnel IOExceptions
// out of run() in a RuntimeException. If that's the case,
// unpack and re-throw the IOException. Otherwise, re-throw
// this unexpected RuntimeException
final Throwable cause = re.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw re;
}
} else if (nested instanceof OutOfMemoryError) {
// OutOfMemoryError does not support exception chaining.
throw (OutOfMemoryError) nested;
} else if (nested instanceof Error) {
throw new Error("unhandled Error in worker thread", ee);
} else {
throw new RuntimeException("unknown execution problem", ee);
}
}
}
}
private static class AccumulatorThreadFactory implements ThreadFactory {
private static final AtomicInteger threadInitNumber = new AtomicInteger(0);
@Override
public Thread newThread(final Runnable runnable) {
final Thread t =
new Thread(null,
runnable,
"Command-Accumulator-Thread-" + threadInitNumber.getAndIncrement());
// Don't let this thread hold up JVM exit
t.setDaemon(true);
return t;
}
}
/**
* A sink that closes its input stream once its done.
*/
private static class ClosingSink implements Runnable {
private final InputStream in;
private final OutputStream out;
private final Runnable sink;
private final boolean close;
/**
* Creates a sink that will pump InputStream <code>in</code>
* into OutputStream <code>out</code>.
*/
ClosingSink(final InputStream in, OutputStream out) {
this(in, out, false);
}
/**
* Creates a sink that will read <code>in</code> and discard it.
*/
ClosingSink(final InputStream in) {
this.sink = InputStreamSink.newRunnableSink(in);
this.in = in;
this.close = false;
this.out = null;
}
ClosingSink(final InputStream in, OutputStream out, boolean close){
this.sink = InputStreamSink.newRunnableSink(in, out);
this.in = in;
this.out = out;
this.close = close;
}
@Override
public void run() {
try {
sink.run();
} finally {
silentClose(in);
if (close && out != null) {
silentClose(out);
}
}
}
}
/**
* Close the <code>in</code> stream and log a warning if anything happens.
*/
private static void silentClose(final Closeable closeable) {
try {
closeable.close();
} catch (IOException ioe) {
logger.atWarning().withCause(ioe).log("Unexpected exception while closing input stream");
}
}
}