// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.skyframe;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.skyframe.ParallelEvaluatorContext.RunnableMaker;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Threadpool manager for {@link ParallelEvaluator}. Wraps a {@link QuiescingExecutor} and keeps
 * track of pending nodes.
 */
class NodeEntryVisitor {
  static final ErrorClassifier NODE_ENTRY_VISITOR_ERROR_CLASSIFIER =
      new ErrorClassifier() {
        @Override
        protected ErrorClassification classifyException(Exception e) {
          if (e instanceof SchedulerException) {
            return ErrorClassification.CRITICAL;
          }
          if (e instanceof RuntimeException) {
            // We treat non-SchedulerException RuntimeExceptions as more severe than
            // SchedulerExceptions so that AbstractQueueVisitor will propagate instances of the
            // former. They indicate actual Blaze bugs, rather than normal Skyframe evaluation
            // control flow.
            return ErrorClassification.CRITICAL_AND_LOG;
          }
          return ErrorClassification.NOT_CRITICAL;
        }
      };

  private final QuiescingExecutor quiescingExecutor;
  private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
  private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
  private final DirtyTrackingProgressReceiver progressReceiver;
  /**
   * Function that allows this visitor to execute the appropriate {@link Runnable} when given a
   * {@link SkyKey} to evaluate.
   */
  private final RunnableMaker runnableMaker;

  NodeEntryVisitor(
      QuiescingExecutor quiescingExecutor,
      DirtyTrackingProgressReceiver progressReceiver,
      RunnableMaker runnableMaker) {
    this.quiescingExecutor = quiescingExecutor;
    this.progressReceiver = progressReceiver;
    this.runnableMaker = runnableMaker;
  }

  void waitForCompletion() throws InterruptedException {
    quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
  }

  /**
   * Enqueue {@code key} for evaluation, at {@code evaluationPriority} if this visitor is using a
   * priority queue.
   *
   * <p>{@code evaluationPriority} is used to minimize evaluation "sprawl": inefficiencies coming
   * from incompletely evaluating many nodes, versus focusing on finishing the evaluation of nodes
   * that have already started evaluating. Sprawl can be expensive because an incompletely evaluated
   * node keeps state in Skyframe, and often in external caches, that uses memory.
   *
   * <p>In general, {@code evaluationPriority} should be maximal ({@link Integer#MAX_VALUE}) when
   * restarting a node that has already started evaluation, and minimal when enqueueing a node that
   * no other tasks depend on. Setting {@code evaluationPriority} to the same value for all children
   * of a parent has good results experimentally, since it prioritizes batches of work that can be
   * used together. Similarly, prioritizing deeper nodes (depth-first search of the evaluation
   * graph) also has good results experimentally, since it minimizes sprawl.
   */
  void enqueueEvaluation(SkyKey key, int evaluationPriority) {
    if (preventNewEvaluations.get()) {
      // If an error happens in nokeep_going mode, we still want to mark these nodes as inflight,
      // otherwise cleanup will not happen properly.
      progressReceiver.enqueueAfterError(key);
      return;
    }
    progressReceiver.enqueueing(key);
    quiescingExecutor.execute(runnableMaker.make(key, evaluationPriority));
  }

  /**
   * Registers a listener with all passed futures that causes the node to be re-enqueued when all
   * futures are completed.
   */
  void registerExternalDeps(
      SkyKey skyKey, NodeEntry entry, List<ListenableFuture<?>> externalDeps) {
    // Generally speaking, there is no ordering guarantee for listeners registered with a single
    // listenable future. If we used a listener here, there would be a potential race condition
    // between re-enqueuing the key and notifying the quiescing executor, in which case the executor
    // could shut down even though the work is not done yet. That would be bad.
    //
    // However, the whenAllComplete + run API guarantees that the Runnable is run before the
    // returned future completes, i.e., before the quiescing executor is notified.
    ListenableFuture<?> future =
        Futures.whenAllComplete(externalDeps)
            .run(
                () -> {
                  if (entry.signalDep(entry.getVersion(), null)) {
                    enqueueEvaluation(skyKey, Integer.MAX_VALUE);
                  }
                },
                MoreExecutors.directExecutor());
    quiescingExecutor.dependOnFuture(future);
  }

  /**
   * Stop any new evaluations from being enqueued. Returns whether this was the first thread to
   * request a halt.
   *
   * <p>If called from within node evaluation, the caller may use the return value to determine
   * whether it is responsible for throwing an exception to halt evaluation at the executor level.
   */
  boolean preventNewEvaluations() {
    return preventNewEvaluations.compareAndSet(false, true);
  }

  void noteCrash(RuntimeException e) {
    crashes.add(e);
  }

  Collection<RuntimeException> getCrashes() {
    return crashes;
  }

  @VisibleForTesting
  CountDownLatch getExceptionLatchForTestingOnly() {
    return quiescingExecutor.getExceptionLatchForTestingOnly();
  }
}
