Update from Google.

--
MOE_MIGRATED_REVID=85702957
diff --git a/src/main/java/com/google/devtools/build/lib/shell/Consumers.java b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
new file mode 100644
index 0000000..3ed5b7e
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
@@ -0,0 +1,359 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.shell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class provides convenience methods for consuming (actively reading)
+ * output and error streams with different consumption policies:
+ * discarding ({@link #createDiscardingConsumers()},
+ * accumulating ({@link #createAccumulatingConsumers()},
+ * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}).
+ */
+class Consumers {
+
+  private static final Logger log =
+    Logger.getLogger("com.google.devtools.build.lib.shell.Command");
+
+  private Consumers() {}
+
+  private static final ExecutorService pool =
+    Executors.newCachedThreadPool(new AccumulatorThreadFactory());
+
+  static OutErrConsumers createDiscardingConsumers() {
+    return new OutErrConsumers(new DiscardingConsumer(),
+                               new DiscardingConsumer());
+  }
+
+  static OutErrConsumers createAccumulatingConsumers() {
+    return new OutErrConsumers(new AccumulatingConsumer(),
+                               new AccumulatingConsumer());
+  }
+
+  static OutErrConsumers createStreamingConsumers(OutputStream out,
+                                                  OutputStream err) {
+    return new OutErrConsumers(new StreamingConsumer(out),
+                               new StreamingConsumer(err));
+  }
+
+  static class OutErrConsumers {
+
+    private final OutputConsumer out;
+    private final OutputConsumer err;
+
+    private OutErrConsumers(final OutputConsumer out, final OutputConsumer err){
+      this.out = out;
+      this.err = err;
+    }
+
+    void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams){
+      out.registerInput(outInput, closeStreams);
+      err.registerInput(errInput, closeStreams);
+    }
+
+    void cancel() {
+      out.cancel();
+      err.cancel();
+    }
+
+    void waitForCompletion() throws IOException {
+      out.waitForCompletion();
+      err.waitForCompletion();
+    }
+
+    ByteArrayOutputStream getAccumulatedOut(){
+      return out.getAccumulatedOut();
+    }
+
+    ByteArrayOutputStream getAccumulatedErr() {
+      return err.getAccumulatedOut();
+    }
+
+    void logConsumptionStrategy() {
+      // The creation methods guarantee that the consumption strategy is
+      // the same for out and err - doesn't matter whether we call out or err,
+      // let's pick out.
+      out.logConsumptionStrategy();
+    }
+
+  }
+
+  /**
+   * This interface describes just one consumer, which consumes the
+   * InputStream provided by {@link #registerInput(InputStream, boolean)}.
+   * Implementations implement different consumption strategies.
+   */
+  private static interface OutputConsumer {
+    /**
+     * Returns whatever the consumer accumulated internally, or
+     * {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate
+     * any output.
+     *
+     * @see AccumulatingConsumer
+     */
+    ByteArrayOutputStream getAccumulatedOut();
+
+    void logConsumptionStrategy();
+
+    void registerInput(InputStream in, boolean closeConsumer);
+
+    void cancel();
+
+    void waitForCompletion() throws IOException;
+  }
+
+  /**
+   * This consumer sends the input to a stream while consuming it.
+   */
+  private static class StreamingConsumer extends FutureConsumption
+                                         implements OutputConsumer {
+    private OutputStream out;
+
+    StreamingConsumer(OutputStream out) {
+      this.out = out;
+    }
+
+    @Override
+    public ByteArrayOutputStream getAccumulatedOut() {
+      return CommandResult.NO_OUTPUT_COLLECTED;
+    }
+
+    @Override
+    public void logConsumptionStrategy() {
+      log.finer("Output will be sent to streams provided by client");
+    }
+
+    @Override protected Runnable createConsumingAndClosingSink(InputStream in,
+                                                               boolean closeConsumer) {
+      return new ClosingSink(in, out, closeConsumer);
+    }
+  }
+
+  /**
+   * This consumer sends the input to a {@link ByteArrayOutputStream}
+   * while consuming it. This accumulated stream can be obtained by
+   * calling {@link #getAccumulatedOut()}.
+   */
+  private static class AccumulatingConsumer extends FutureConsumption
+                                            implements OutputConsumer {
+    private ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    @Override
+    public ByteArrayOutputStream getAccumulatedOut() {
+      return out;
+    }
+
+    @Override
+    public void logConsumptionStrategy() {
+      log.finer("Output will be accumulated (promptly read off) and returned");
+    }
+
+    @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
+      return new ClosingSink(in, out);
+    }
+  }
+
+  /**
+   * This consumer just discards whatever it reads.
+   */
+  private static class DiscardingConsumer extends FutureConsumption
+                                          implements OutputConsumer {
+    private DiscardingConsumer() {
+    }
+
+    @Override
+    public ByteArrayOutputStream getAccumulatedOut() {
+      return CommandResult.NO_OUTPUT_COLLECTED;
+    }
+
+    @Override
+    public void logConsumptionStrategy() {
+      log.finer("Output will be ignored");
+    }
+
+    @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
+      return new ClosingSink(in);
+    }
+  }
+
+  /**
+   * A mixin that makes consumers active - this is where we kick of
+   * multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions
+   * and wait for the consumers to complete.
+   */
+  private abstract static class FutureConsumption implements OutputConsumer {
+
+    private Future<?> future;
+
+    @Override
+    public void registerInput(InputStream in, boolean closeConsumer){
+      Runnable sink = createConsumingAndClosingSink(in, closeConsumer);
+      future = pool.submit(sink);
+    }
+
+    protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close);
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    @Override
+    public void waitForCompletion() throws IOException {
+      boolean wasInterrupted = false;
+      try {
+        while (true) {
+          try {
+            future.get();
+            break;
+          } catch (InterruptedException ie) {
+            wasInterrupted = true;
+            // continue waiting
+          } catch (ExecutionException ee) {
+            // Runnable threw a RuntimeException
+            Throwable nested = ee.getCause();
+            if (nested instanceof RuntimeException) {
+              final RuntimeException re = (RuntimeException) nested;
+              // The stream sink classes, unfortunately, tunnel IOExceptions
+              // out of run() in a RuntimeException. If that's the case,
+              // unpack and re-throw the IOException. Otherwise, re-throw
+              // this unexpected RuntimeException
+              final Throwable cause = re.getCause();
+              if (cause instanceof IOException) {
+                throw (IOException) cause;
+              } else {
+                throw re;
+              }
+            } else if (nested instanceof OutOfMemoryError) {
+              // OutOfMemoryError does not support exception chaining.
+              throw (OutOfMemoryError) nested;
+            } else if (nested instanceof Error) {
+              throw new Error("unhandled Error in worker thread", ee);
+            } else {
+              throw new RuntimeException("unknown execution problem", ee);
+            }
+          }
+        }
+      } finally {
+        // Read this for detailed explanation:
+        // http://www-128.ibm.com/developerworks/java/library/j-jtp05236.html
+        if (wasInterrupted) {
+          Thread.currentThread().interrupt(); // preserve interrupted status
+        }
+      }
+    }
+  }
+
+  /**
+   * Factory which produces threads with a 32K stack size.
+   */
+  private static class AccumulatorThreadFactory implements ThreadFactory {
+
+    private static final int THREAD_STACK_SIZE = 32 * 1024;
+
+    private static int threadInitNumber;
+
+    private static synchronized int nextThreadNum() {
+      return threadInitNumber++;
+    }
+
+    @Override
+    public Thread newThread(final Runnable runnable) {
+      final Thread t =
+        new Thread(null,
+                   runnable,
+                   "Command-Accumulator-Thread-" + nextThreadNum(),
+                   THREAD_STACK_SIZE);
+      // Don't let this thread hold up JVM exit
+      t.setDaemon(true);
+      return t;
+    }
+
+  }
+
+  /**
+   * A sink that closes its input stream once its done.
+   */
+  private static class ClosingSink implements Runnable {
+
+    private final InputStream in;
+    private final OutputStream out;
+    private final Runnable sink;
+    private final boolean close;
+
+    /**
+     * Creates a sink that will pump InputStream <code>in</code>
+     * into OutputStream <code>out</code>.
+     */
+    ClosingSink(final InputStream in, OutputStream out) {
+      this(in, out, false);
+    }
+
+    /**
+     * Creates a sink that will read <code>in</code> and discard it.
+     */
+    ClosingSink(final InputStream in) {
+      this.sink = InputStreamSink.newRunnableSink(in);
+      this.in = in;
+      this.close = false;
+      this.out = null;
+    }
+
+    ClosingSink(final InputStream in, OutputStream out, boolean close){
+      this.sink = InputStreamSink.newRunnableSink(in, out);
+      this.in = in;
+      this.out = out;
+      this.close = close;
+    }
+
+
+    @Override
+    public void run() {
+      try {
+        sink.run();
+      } finally {
+        silentClose(in);
+        if (close && out != null) {
+          silentClose(out);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Close the <code>in</code> stream and log a warning if anything happens.
+   */
+  private static void silentClose(final Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (IOException ioe) {
+      String message = "Unexpected exception while closing input stream";
+      log.log(Level.WARNING, message, ioe);
+    }
+  }
+
+}