// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.shell;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This class provides convenience methods for consuming (actively reading)
 * output and error streams with different consumption policies:
 * accumulating ({@link #createAccumulatingConsumers()},
 * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}).
 */
final class Consumers {

  private static final Logger logger =
      Logger.getLogger("com.google.devtools.build.lib.shell.Command");

  private Consumers() {}

  private static final ExecutorService pool =
    Executors.newCachedThreadPool(new AccumulatorThreadFactory());

  static OutErrConsumers createAccumulatingConsumers() {
    return new OutErrConsumers(new AccumulatingConsumer(), new AccumulatingConsumer());
  }

  static OutErrConsumers createStreamingConsumers(OutputStream out, OutputStream err) {
    Preconditions.checkNotNull(out);
    Preconditions.checkNotNull(err);
    return new OutErrConsumers(new StreamingConsumer(out), new StreamingConsumer(err));
  }

  static class OutErrConsumers {
    private final OutputConsumer out;
    private final OutputConsumer err;

    private OutErrConsumers(final OutputConsumer out, final OutputConsumer err) {
      this.out = out;
      this.err = err;
    }

    void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams) {
      out.registerInput(outInput, closeStreams);
      err.registerInput(errInput, closeStreams);
    }

    void cancel() {
      out.cancel();
      err.cancel();
    }

    void waitForCompletion() throws IOException {
      out.waitForCompletion();
      err.waitForCompletion();
    }

    ByteArrayOutputStream getAccumulatedOut(){
      return out.getAccumulatedOut();
    }

    ByteArrayOutputStream getAccumulatedErr() {
      return err.getAccumulatedOut();
    }

    void logConsumptionStrategy() {
      // The creation methods guarantee that the consumption strategy is
      // the same for out and err - doesn't matter whether we call out or err,
      // let's pick out.
      out.logConsumptionStrategy();
    }

  }

  /**
   * This interface describes just one consumer, which consumes the
   * InputStream provided by {@link #registerInput(InputStream, boolean)}.
   * Implementations implement different consumption strategies.
   */
  private static interface OutputConsumer {
    /**
     * Returns whatever the consumer accumulated internally, or
     * {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate
     * any output.
     *
     * @see AccumulatingConsumer
     */
    ByteArrayOutputStream getAccumulatedOut();

    void logConsumptionStrategy();

    void registerInput(InputStream in, boolean closeConsumer);

    void cancel();

    void waitForCompletion() throws IOException;
  }

  /**
   * This consumer sends the input to a stream while consuming it.
   */
  private static class StreamingConsumer extends FutureConsumption {
    private OutputStream out;

    StreamingConsumer(OutputStream out) {
      this.out = out;
    }

    @Override
    public ByteArrayOutputStream getAccumulatedOut() {
      return CommandResult.NO_OUTPUT_COLLECTED;
    }

    @Override
    public void logConsumptionStrategy() {
      logger.finer("Output will be sent to streams provided by client");
    }

    @Override protected Runnable createConsumingAndClosingSink(InputStream in,
                                                               boolean closeConsumer) {
      return new ClosingSink(in, out, closeConsumer);
    }
  }

  /**
   * This consumer sends the input to a {@link ByteArrayOutputStream}
   * while consuming it. This accumulated stream can be obtained by
   * calling {@link #getAccumulatedOut()}.
   */
  private static class AccumulatingConsumer extends FutureConsumption {
    private ByteArrayOutputStream out = new ByteArrayOutputStream();

    @Override
    public ByteArrayOutputStream getAccumulatedOut() {
      return out;
    }

    @Override
    public void logConsumptionStrategy() {
      logger.finer("Output will be accumulated (promptly read off) and returned");
    }

    @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
      return new ClosingSink(in, out);
    }
  }

  /**
   * This consumer just discards whatever it reads.
   */
  private static class DiscardingConsumer extends FutureConsumption {
    private DiscardingConsumer() {
    }

    @Override
    public ByteArrayOutputStream getAccumulatedOut() {
      return CommandResult.NO_OUTPUT_COLLECTED;
    }

    @Override
    public void logConsumptionStrategy() {
      logger.finer("Output will be ignored");
    }

    @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
      return new ClosingSink(in);
    }
  }

  /**
   * A mixin that makes consumers active - this is where we kick of
   * multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions
   * and wait for the consumers to complete.
   */
  private abstract static class FutureConsumption implements OutputConsumer {

    private Future<?> future;

    @Override
    public void registerInput(InputStream in, boolean closeConsumer){
      Runnable sink = createConsumingAndClosingSink(in, closeConsumer);
      future = pool.submit(sink);
    }

    protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close);

    @Override
    public void cancel() {
      future.cancel(true);
    }

    @Override
    public void waitForCompletion() throws IOException {
      try {
        Uninterruptibles.getUninterruptibly(future);
      } catch (ExecutionException ee) {
        // Runnable threw a RuntimeException
        Throwable nested = ee.getCause();
        if (nested instanceof RuntimeException) {
          final RuntimeException re = (RuntimeException) nested;
          // The stream sink classes, unfortunately, tunnel IOExceptions
          // out of run() in a RuntimeException. If that's the case,
          // unpack and re-throw the IOException. Otherwise, re-throw
          // this unexpected RuntimeException
          final Throwable cause = re.getCause();
          if (cause instanceof IOException) {
            throw (IOException) cause;
          } else {
            throw re;
          }
        } else if (nested instanceof OutOfMemoryError) {
          // OutOfMemoryError does not support exception chaining.
          throw (OutOfMemoryError) nested;
        } else if (nested instanceof Error) {
          throw new Error("unhandled Error in worker thread", ee);
        } else {
          throw new RuntimeException("unknown execution problem", ee);
        }
      }
    }
  }

  private static class AccumulatorThreadFactory implements ThreadFactory {

    private static AtomicInteger threadInitNumber = new AtomicInteger(0);

    @Override
    public Thread newThread(final Runnable runnable) {
      final Thread t =
        new Thread(null,
                   runnable,
                   "Command-Accumulator-Thread-" + threadInitNumber.getAndIncrement());
      // Don't let this thread hold up JVM exit
      t.setDaemon(true);
      return t;
    }
  }

  /**
   * A sink that closes its input stream once its done.
   */
  private static class ClosingSink implements Runnable {

    private final InputStream in;
    private final OutputStream out;
    private final Runnable sink;
    private final boolean close;

    /**
     * Creates a sink that will pump InputStream <code>in</code>
     * into OutputStream <code>out</code>.
     */
    ClosingSink(final InputStream in, OutputStream out) {
      this(in, out, false);
    }

    /**
     * Creates a sink that will read <code>in</code> and discard it.
     */
    ClosingSink(final InputStream in) {
      this.sink = InputStreamSink.newRunnableSink(in);
      this.in = in;
      this.close = false;
      this.out = null;
    }

    ClosingSink(final InputStream in, OutputStream out, boolean close){
      this.sink = InputStreamSink.newRunnableSink(in, out);
      this.in = in;
      this.out = out;
      this.close = close;
    }


    @Override
    public void run() {
      try {
        sink.run();
      } finally {
        silentClose(in);
        if (close && out != null) {
          silentClose(out);
        }
      }
    }

  }

  /**
   * Close the <code>in</code> stream and log a warning if anything happens.
   */
  private static void silentClose(final Closeable closeable) {
    try {
      closeable.close();
    } catch (IOException ioe) {
      String message = "Unexpected exception while closing input stream";
      logger.log(Level.WARNING, message, ioe);
    }
  }

}
