// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package com.google.devtools.build.lib.bazel.rules.ninja.file;

import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * Parallel file processing implementation. See comment to {@link #processFile(ReadableByteChannel,
 * BlockParameters, Supplier, ListeningExecutorService, SeparatorFinder)}.
 */
public class ParallelFileProcessing {
  private final ReadableByteChannel channel;
  private final BlockParameters parameters;
  private final Supplier<DeclarationConsumer> tokenConsumerFactory;
  private final ListeningExecutorService executorService;
  private final SeparatorFinder predicate;

  private ParallelFileProcessing(
      ReadableByteChannel channel,
      BlockParameters parameters,
      Supplier<DeclarationConsumer> tokenConsumerFactory,
      ListeningExecutorService executorService,
      SeparatorFinder predicate) {
    this.channel = channel;
    this.parameters = parameters;
    this.tokenConsumerFactory = tokenConsumerFactory;
    this.executorService = executorService;
    this.predicate = predicate;
  }

  /**
   * Processes file in parallel: {@link java.nio.channels.FileChannel} is used to read contents into
   * a sequence of buffers. Each buffer is split into chunks, which are tokenized in parallel by
   * {@link FileFragmentSplitter}, using the <code>predicate</code>. Fragments of tokens (on the
   * bounds of buffer fragments) are assembled by {@link DeclarationAssembler}. The resulting tokens
   * (each can be further parsed independently) are passed to {@link DeclarationConsumer}.
   *
   * <p>The main ideas behind this implementation are:
   *
   * <p>1) using the NIO with direct buffer allocation for reading from file, (a quote from
   * ByteBuffer's javadoc: "Given a direct byte buffer, the Java virtual machine will make a best
   * effort to perform native I/O operations directly upon it. That is, it will attempt to avoid
   * copying the buffer's content to (or from) an intermediate buffer before (or after) each
   * invocation of one of the underlying operating system's native I/O operations.")
   *
   * <p>2) utilizing the possibilities for parallel processing, since splitting into tokens and
   * parsing them can be done in high degree independently.
   *
   * <p>3) not creating additional copies of character buffers for keeping tokens and only then
   * specific objects.
   *
   * <p>4) for absorbing the results, it is possible to create a consumer for each tokenizer, and
   * escape synchronization, summarizing the results after all parallel work is done, of use just
   * one shared consumer with synchronized data structures.
   *
   * <p>Please see a comment about performance test results: {@link
   * com.google.devtools.build.lib.bazel.rules.ninja.ParallelFileProcessingTest}.
   *
   * @param channel open {@link ReadableByteChannel} to file to process. The lifetime of the channel
   *     is outside the scope of this method. Channel should not be closed by this method.
   * @param parameters {@link BlockParameters} with sizes of read and tokenize blocks
   * @param tokenConsumerFactory factory of {@link DeclarationConsumer} for further processing /
   *     parsing
   * @param executorService executorService to use for parallel tokenization tasks
   * @param predicate predicate for separating tokens
   * @throws GenericParsingException thrown by further processing in <code>tokenConsumer</code>
   * @throws IOException thrown by file reading
   */
  public static void processFile(
      ReadableByteChannel channel,
      BlockParameters parameters,
      Supplier<DeclarationConsumer> tokenConsumerFactory,
      ListeningExecutorService executorService,
      SeparatorFinder predicate)
      throws GenericParsingException, IOException, InterruptedException {
    new ParallelFileProcessing(
            channel, parameters, tokenConsumerFactory, executorService, predicate)
        .processFileImpl();
  }

  private void processFileImpl() throws InterruptedException, IOException, GenericParsingException {
    if (parameters.readBlockSize == 0) {
      // Return immediately, if the file is empty.
      return;
    }
    DeclarationAssembler assembler =
        new DeclarationAssembler(tokenConsumerFactory.get(), predicate);

    CollectingListFuture<List<FileFragment>, GenericParsingException> future =
        new CollectingListFuture<>(GenericParsingException.class);
    List<List<FileFragment>> listOfLists;
    int offset = 0;
    boolean keepReading = true;
    while (keepReading) {
      ByteBuffer bb = ByteBuffer.allocateDirect(parameters.getReadBlockSize());
      keepReading = readFromChannel(channel, bb);
      if (bb.position() > 0) {
        bb.flip();
        tokenizeFragments(bb.asReadOnlyBuffer(), offset, future);
        offset += bb.limit();
      }
    }
    listOfLists = future.getResult();
    List<FileFragment> fragments =
        listOfLists.stream().flatMap(List::stream).collect(Collectors.toList());

    assembler.wrapUp(fragments);
  }

  private boolean readFromChannel(ReadableByteChannel ch, ByteBuffer bb) throws IOException {
    // Continue reading until we filled the minimum buffer size.
    while (bb.position() < parameters.getMinReadBlockSize()) {
      // Stop if we reached the end of stream.
      if (ch.read(bb) < 0) {
        return false;
      }
    }
    return true;
  }

  private void tokenizeFragments(
      ByteBuffer bb,
      int offset,
      CollectingListFuture<List<FileFragment>, GenericParsingException> future) {
    int from = 0;
    int blockSize = parameters.getTokenizeBlockSize();
    while (from < bb.limit()) {
      int to = Math.min(bb.limit(), from + blockSize);
      if (bb.limit() - to < BlockParameters.MIN_TOKENIZE_BLOCK_SIZE) {
        // Do not create the last block too small, rather join it with the previous block.
        to = bb.limit();
      }
      DeclarationConsumer consumer = tokenConsumerFactory.get();
      FileFragment fragment = new FileFragment(bb, offset, from, to);
      FileFragmentSplitter tokenizer = new FileFragmentSplitter(fragment, consumer, predicate);
      future.add(executorService.submit(tokenizer));
      from = to;
    }
  }

  /** Sizes of blocks for reading from file and parsing for {@link ParallelFileProcessing}. */
  public static class BlockParameters {

    private static final int READ_BLOCK_SIZE = 25 * 1024 * 1024;
    private static final int MIN_READ_BLOCK_SIZE = 10 * 1024 * 1024;
    private static final int TOKENIZE_BLOCK_SIZE = 1024 * 1024;
    private static final int MIN_TOKENIZE_BLOCK_SIZE = 100;

    /** Size of the reading buffer. */
    private int readBlockSize;
    /**
     * Minimum size of the reading buffer - read() calls will be repeated until the reading buffer
     * has at least minReadBlockSize bytes, only after that the contents will be passed for
     * tokenization.
     */
    private int minReadBlockSize;
    /**
     * Size of the piece for the tokenization task. The read buffer will be split into pieces of
     * tokenizeBlockSize size, and passed for tokenization in parallel.
     */
    private int tokenizeBlockSize;

    /** @param fileSize size of the file we are going to parse */
    public BlockParameters(long fileSize) {
      readBlockSize = (int) Math.min(READ_BLOCK_SIZE, fileSize);
      minReadBlockSize = Math.min(MIN_READ_BLOCK_SIZE, (int) Math.ceil((double) fileSize / 2));
      tokenizeBlockSize =
          Math.max(MIN_TOKENIZE_BLOCK_SIZE, Math.min(TOKENIZE_BLOCK_SIZE, minReadBlockSize / 4));
    }

    public int getReadBlockSize() {
      return readBlockSize;
    }

    /**
     * Sets the size of readBlockSize and adjusts other block sizes so that they together make
     * sense.
     */
    public BlockParameters setReadBlockSize(int readBlockSize) {
      if (readBlockSize > 0) {
        this.readBlockSize = readBlockSize;
        minReadBlockSize = Math.min(minReadBlockSize, (int) Math.ceil((double) readBlockSize / 2));
        tokenizeBlockSize =
            Math.max(MIN_TOKENIZE_BLOCK_SIZE, Math.min(tokenizeBlockSize, minReadBlockSize / 4));
      }
      return this;
    }

    public int getTokenizeBlockSize() {
      return tokenizeBlockSize;
    }

    public int getMinReadBlockSize() {
      return minReadBlockSize;
    }
  }
}
