// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.Digest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputPrefetcher;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.vfs.Path;
import io.grpc.Context;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

/**
 * Stages output files that are stored remotely to the local filesystem.
 *
 * <p>This is necessary for remote caching/execution when {@code
 * --experimental_remote_download_outputs=minimal} is specified.
 */
class RemoteActionInputFetcher implements ActionInputPrefetcher {

  private static final Logger logger = Logger.getLogger(RemoteActionInputFetcher.class.getName());

  private final Object lock = new Object();

  /** Set of successfully downloaded output files. */
  @GuardedBy("lock")
  private final Set<Path> downloadedPaths = new HashSet<>();

  @VisibleForTesting
  @GuardedBy("lock")
  final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();

  private final RemoteCache remoteCache;
  private final Path execRoot;
  private final Context ctx;

  RemoteActionInputFetcher(RemoteCache remoteCache, Path execRoot, Context ctx) {
    this.remoteCache = Preconditions.checkNotNull(remoteCache);
    this.execRoot = Preconditions.checkNotNull(execRoot);
    this.ctx = Preconditions.checkNotNull(ctx);
  }

  /**
   * Fetches remotely stored action outputs, that are inputs to this spawn, and stores them under
   * their path in the output base.
   *
   * <p>This method blocks until all downloads have finished.
   *
   * <p>This method is safe to be called concurrently from spawn runners before running any local
   * spawn.
   */
  @Override
  public void prefetchFiles(
      Iterable<? extends ActionInput> inputs, MetadataProvider metadataProvider)
      throws IOException, InterruptedException {
    try (SilentCloseable c =
        Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "stage remote inputs")) {
      Map<Path, ListenableFuture<Void>> downloadsToWaitFor = new HashMap<>();
      for (ActionInput input : inputs) {
        if (input instanceof VirtualActionInput) {
          VirtualActionInput paramFileActionInput = (VirtualActionInput) input;
          Path outputPath = execRoot.getRelative(paramFileActionInput.getExecPath());
          outputPath.getParentDirectory().createDirectoryAndParents();
          try (OutputStream out = outputPath.getOutputStream()) {
            paramFileActionInput.writeTo(out);
          }
        } else {
          FileArtifactValue metadata = metadataProvider.getMetadata(input);
          if (metadata == null || !metadata.isRemote()) {
            continue;
          }

          Path path = execRoot.getRelative(input.getExecPath());
          synchronized (lock) {
            if (downloadedPaths.contains(path)) {
              continue;
            }
            ListenableFuture<Void> download = downloadFileAsync(path, metadata);
            downloadsToWaitFor.putIfAbsent(path, download);
          }
        }
      }

      IOException ioException = null;
      InterruptedException interruptedException = null;
      for (Map.Entry<Path, ListenableFuture<Void>> entry : downloadsToWaitFor.entrySet()) {
        try {
          Utils.getFromFuture(entry.getValue());
        } catch (IOException e) {
          if (e instanceof CacheNotFoundException) {
            e =
                new IOException(
                    String.format(
                        "Failed to fetch file with hash '%s' because it does not exist remotely."
                            + " --experimental_remote_outputs=minimal does not work if"
                            + " your remote cache evicts files during builds.",
                        ((CacheNotFoundException) e).getMissingDigest().getHash()));
          }
          ioException = ioException == null ? e : ioException;
        } catch (InterruptedException e) {
          interruptedException = interruptedException == null ? e : interruptedException;
        }
      }

      if (interruptedException != null) {
        throw interruptedException;
      }
      if (ioException != null) {
        throw ioException;
      }
    }
  }

  ImmutableSet<Path> downloadedFiles() {
    synchronized (lock) {
      return ImmutableSet.copyOf(downloadedPaths);
    }
  }

  void downloadFile(Path path, FileArtifactValue metadata)
      throws IOException, InterruptedException {
    try {
      downloadFileAsync(path, metadata).get();
    } catch (ExecutionException e) {
      if (e.getCause() instanceof IOException) {
        throw (IOException) e.getCause();
      }
      throw new IOException(e.getCause());
    }
  }

  private ListenableFuture<Void> downloadFileAsync(Path path, FileArtifactValue metadata)
      throws IOException {
    synchronized (lock) {
      if (downloadedPaths.contains(path)) {
        return Futures.immediateFuture(null);
      }

      ListenableFuture<Void> download = downloadsInProgress.get(path);
      if (download == null) {
        Context prevCtx = ctx.attach();
        try {
          Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
          download = remoteCache.downloadFile(path, digest);
          downloadsInProgress.put(path, download);
          Futures.addCallback(
              download,
              new FutureCallback<Void>() {
                @Override
                public void onSuccess(Void v) {
                  synchronized (lock) {
                    downloadsInProgress.remove(path);
                    downloadedPaths.add(path);
                  }

                  try {
                    path.chmod(0755);
                  } catch (IOException e) {
                    logger.log(Level.WARNING, "Failed to chmod 755 on " + path, e);
                  }
                }

                @Override
                public void onFailure(Throwable t) {
                  synchronized (lock) {
                    downloadsInProgress.remove(path);
                  }
                  try {
                    path.delete();
                  } catch (IOException e) {
                    logger.log(
                        Level.WARNING,
                        "Failed to delete output file after incomplete download: " + path,
                        e);
                  }
                }
              },
              MoreExecutors.directExecutor());
        } finally {
          ctx.detach(prevCtx);
        }
      }
      return download;
    }
  }
}
