// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.server;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod;
import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
import com.google.devtools.build.lib.unix.LocalClientSocket;
import com.google.devtools.build.lib.unix.LocalServerSocket;
import com.google.devtools.build.lib.unix.LocalSocketAddress;
import com.google.devtools.build.lib.unix.NativePosixFiles;
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.ThreadUtils;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.util.io.StreamMultiplexer;
import com.google.devtools.build.lib.vfs.Path;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

/**
 * An RPCServer server is a Java object that sits and waits for RPC requests
 * (the sit-and-wait is implemented in {@link #serve()}).  These requests
 * arrive via UNIX file sockets. The RPCServer then calls the application
 * (which implements ServerCommand) to handle the request. (Since the Blaze
 * server may need to stat hundreds of directories during initialization, this
 * is a significant speedup.)  The server thread will terminate after idling
 * for a user-specified time.
 *
 * Note: If you are contemplating to call into the RPCServer from
 * within Java, consider using the {@link RPCService} class instead.
 */
// TODO(bazel-team): Signal handling.
// TODO(bazel-team): Gives clients status information when the server is busy. One
// way to do this is to put the server status in a file (pid, the current
// target, etc) in the server directory. Alternatively, we can have a separate
// thread taking care of the server socket and put the information into socket
// handshakes.
// TODO(bazel-team): Use Reporter for server-side messages.
public final class AfUnixServer extends RPCServer {
  private final Clock clock;
  private final RPCService rpcService;
  private final LocalServerSocket serverSocket;
  private final long maxIdleMillis;
  private final long statusCheckMillis;
  private final Path serverDirectory;
  private final Path workspaceDir;
  private static final Logger LOG = Logger.getLogger(AfUnixServer.class.getName());
  private volatile boolean lameDuck;

  private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute.
  private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0');

  /**
   * Create a new server instance. After creating the server, you can start it
   * by calling the {@link #serve()} method.
   *
   * @param clock The clock to take time measurements
   * @param rpcService The underlying service object, which takes
   *                           care of dispatching to the {@link ServerCommand}
   *                           instances, as requests arrive.
   * @param maxIdleMillis      The maximum time the server will wait idly.
   * @param statusCheckPeriodMillis How long to wait between system status checks.
   * @param serverDirectory    Directory to put file socket and pid files, etc.
   * @param workspaceDir The workspace. Used solely to ensure it persists.
   * @throws IOException
   */
  public AfUnixServer(Clock clock, RPCService rpcService,
      long maxIdleMillis, long statusCheckPeriodMillis,
      Path serverDirectory, Path workspaceDir)
      throws IOException {
    super(serverDirectory);
    this.clock = clock;
    this.rpcService = rpcService;
    this.maxIdleMillis = maxIdleMillis;
    this.statusCheckMillis = statusCheckPeriodMillis;
    this.serverDirectory = serverDirectory;
    this.workspaceDir = workspaceDir;

    this.serverSocket = openServerSocket();
    serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis));
    lameDuck = false;
  }

  /**
   * Create a new server instance. After creating the server, you can start it
   * by calling the {@link #serve()} method.
   *
   * @param clock The clock to take time measurements
   * @param rpcService The underlying service object, which takes
   *                           care of dispatching to the {@link ServerCommand}
   *                           instances, as requests arrive.
   * @param maxIdleMillis      The maximum time the server will wait idly.
   * @param serverDirectory    Directory to put file socket and pid files, etc.
   * @param workspaceDir       The workspace. Used solely to ensure it persists.
   * @throws IOException
   */
  public AfUnixServer(Clock clock, RPCService rpcService,
      long maxIdleMillis, Path serverDirectory, Path workspaceDir)
      throws IOException {
    this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS,
        serverDirectory, workspaceDir);
  }


  private final AtomicBoolean inAction = new AtomicBoolean(false);
  private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
  private final AtomicLong cmdNum = new AtomicLong();
  private final Thread mainThread = Thread.currentThread();
  private final Object interruptLock = new Object();

  @Override
  public void interrupt() {
    // Only interrupt during actions - otherwise we may end up setting the interrupt bit
    // at the end of a build and responding to it at the beginning of the subsequent build.
    synchronized (interruptLock) {
      if (allowingInterrupt.get()) {
        mainThread.interrupt();
      }
    }

    if (inAction.get()) {
      Runnable interruptWatcher =
          new Runnable() {
            @Override
            public void run() {
              try {
                long originalCmd = cmdNum.get();
                Thread.sleep(10 * 1000);
                if (inAction.get() && cmdNum.get() == originalCmd) {
                  // We're still operating on the same command.
                  // Interrupt took too long.
                  ThreadUtils.warnAboutSlowInterrupt();
                }
              } catch (InterruptedException e) {
                // Ignore.
              }
            }
          };
      Thread interruptWatcherThread =
          new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
      interruptWatcherThread.setDaemon(true);
      interruptWatcherThread.start();
    }
  }

  /**
   * Wait on a socket for business (answer requests). Note that this
   * method won't return until the server shuts down.
   */
  @Override
  public void serve() {
    try {
      while (!lameDuck) {
        try {
          IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir);
          idleChecker.idle();
          RequestIo requestIo;

          long startTime = clock.currentTimeMillis();
          while (true) {
            try {
              allowingInterrupt.set(true);
              Socket socket = serverSocket.accept();
              long firstContactTime = clock.currentTimeMillis();
              requestIo = new RequestIo(socket, firstContactTime);
              break;
            } catch (SocketTimeoutException e) {
              long idleTime = clock.currentTimeMillis() - startTime;
              if (lameDuck) {
                closeServerSocket();
                return;
              } else if (idleTime > maxIdleMillis
                  || (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) {
                enterLameDuck();
              }
            }
          }
          idleChecker.busy();


          List<String> request = null;
          try {
            request = extractRequest(requestIo);
            cmdNum.incrementAndGet();
            inAction.set(true);
            if (request != null) {
              executeRequest(request, requestIo);
            }
          } finally {
            inAction.set(false);
            // Don't reset interruption unless we executed a request. Otherwise this is just a
            // ping from the client verifying our existence, in which case we should retain the
            // interrupt status for the subsequent request.
            if (request != null) {
              synchronized (interruptLock) {
                allowingInterrupt.set(false);
                Thread.interrupted(); // clears thread interrupted status
              }
            }
            requestIo.shutdown();
            switch (rpcService.getShutdown()) {
              case NONE:
                break;

              case CLEAN:
                return;

              case EXPUNGE:
                disableShutdownHooks();
                return;
            }
          }
        } catch (EOFException e) {
          LOG.info("Connection to the client lost: "
                         + e.getMessage());
        } catch (IOException e) {
          // Something else happened. Print a stack trace for debugging.
          printStack(e);
        }
      }
    } finally {
      rpcService.shutdown(ShutdownMethod.CLEAN);
      LOG.info("Logging finished");
    }
  }

  private void closeServerSocket() {
    LOG.info("Closing serverSocket.");
    try {
      serverSocket.close();
    } catch (IOException e) {
      printStack(e);
    }

    if (!lameDuck) {
      try {
        getSocketPath().delete();
      } catch (IOException e) {
        printStack(e);
      }
    }
  }

  /**
   * Allow one last request to be serviced.
   */
  private void enterLameDuck() {
    lameDuck = true;
    try {
      getSocketPath().delete();
    } catch (IOException e) {
      e.printStackTrace();
    }
    serverSocket.setSoTimeout(1);
  }

  /**
   * Returns the path of the socket file to be used.
   */
  public Path getSocketPath() {
    return serverDirectory.getRelative("server.socket");
  }

  /**
   * Ensures no other server is running for the current socket file.  This
   * guarantees that no two servers are running against the same output
   * directory.
   *
   * @throws IOException if another server holds the lock for the socket file.
   */
  public static void ensureExclusiveAccess(Path socketFile) throws IOException {
    LocalSocketAddress address =
        new LocalSocketAddress(socketFile.getPathFile());
    if (socketFile.exists()) {
      try {
        new LocalClientSocket(address).close();
      } catch (IOException e) {
        // The previous server process is dead--unlink the file:
        socketFile.delete();
        return;
      }
      // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message
      // and add it to the message.
      throw new IOException("Socket file " + socketFile.getPathString()
                            + " is locked by another server");
    }
  }

  /**
   * Opens a UNIX local server socket.
   * @throws IOException if the socket file is used by another server or can
   * not be made exclusive.
   */
  private LocalServerSocket openServerSocket() throws IOException {
    // This is the "well known" socket path via which the server is found...
    Path socketFile = getSocketPath();

    // ...but it may have a name that's too long for AF_UNIX, in which case we
    // make it a symlink to /tmp/something.  This typically only happens in
    // tests where the --output_base is beneath a very deep temp dir.
    // (All this extra complexity is just used in tests... *sigh*).
    if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX
      Path socketLink = socketFile;
      String tmpDirDefault = System.getenv("TMPDIR");
      if (tmpDirDefault == null
          || tmpDirDefault.length() > 104 - "/blaze-4294967296/server.socket".length()) {
        // Default for unset TMPDIR, or if TMPDIR is so that the resulting
        // path would be too long.
        tmpDirDefault = "/tmp";
      }
      String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", tmpDirDefault);
      socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)).
          getRelative("server.socket");
      LOG.info("Using symlinked socket at " + socketFile);

      socketLink.delete(); // Remove stale symlink, if any.
      socketLink.createSymbolicLink(socketFile);

      deleteAtExit(socketLink, /*deleteParent=*/false);
      deleteAtExit(socketFile, /*deleteParent=*/true);
    } else {
      deleteAtExit(socketFile, /*deleteParent=*/false);
    }

    ensureExclusiveAccess(socketFile);


    LocalServerSocket serverSocket = new LocalServerSocket();
    serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile()));
    NativePosixFiles.chmod(socketFile.getPathFile(), 0600);  // Lock it down.
    serverSocket.listen(/*backlog=*/50);
    return serverSocket;
  }

  // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a
  // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it
  // succeeds.
  private static Path createTempSocketDirectory(Path tempDir) {
    Random random = new Random();
    while (true) {
      Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt()));
      try {
        if (socketDir.createDirectory()) {
          // Make sure it's private; unfortunately, createDirectory() doesn't take a mode
          // argument.
          socketDir.chmod(0700);
          return socketDir; // Created.
        }
        // Already existed; try again.
      } catch (IOException e) {
        // Failed; try again.
      }
    }
  }

  /**
   * Read a string in platform default encoding and split it into a list of
   * NUL-separated words.
   *
   * <p>Blaze consistently uses the platform default encoding (defined in
   * blaze.cc) to interface with Unix APIs.
   */
  private static List<String> readRequest(InputStream input) throws IOException {
    byte[] sizeBuffer = new byte[4];
    ByteStreams.readFully(input, sizeBuffer);
    int size = ((sizeBuffer[0] & 0xff) << 24)
        + ((sizeBuffer[1] & 0xff) << 16)
        + ((sizeBuffer[2] & 0xff) << 8)
        + (sizeBuffer[3] & 0xff);
    byte[] inputBytes = new byte[size];
    ByteStreams.readFully(input, inputBytes);

    String s = new String(inputBytes, Charset.defaultCharset());
    return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
  }

  private static List<String> extractRequest(RequestIo requestIo) throws IOException {
    List<String> request = readRequest(requestIo.in);
    if (request == null) {
      LOG.info("Short-circuiting empty request");
      return null;
    }
    return request;
  }

  private void executeRequest(List<String> request, RequestIo requestIo) {
    Preconditions.checkNotNull(request);
    int exitStatus = 2;
    try {
      exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr,
              requestIo.firstContactTime);
      LOG.info("Finished executing request");
    } catch (UnknownCommandException e) {
      requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage());
      LOG.severe("SERVER ERROR: " + e.getMessage());
    } catch (Exception e) {
      // Stacktrace for unknown exception.
      StringWriter trace = new StringWriter();
      e.printStackTrace(new PrintWriter(trace, true));
      requestIo.requestOutErr.printErr("SERVER ERROR: " + trace);
      LOG.severe("SERVER ERROR: " + trace);
    }

    if (rpcService.getShutdown() != ShutdownMethod.NONE) {
      // In case of shutdown, disable the listening socket *before* we write
      // the last part of the response.  Otherwise, a sufficiently fast client
      // could read the response and exit, and a new client could make a
      // connection to this server, which is still in the listening state, even
      // though it is about to shut down imminently.
      closeServerSocket();
    }

    requestIo.writeExitStatus(exitStatus);
  }

  /**
   * Because it's a little complicated, this class factors out all the IO Hook
   * up we need per request, that is, in
   * {@link AfUnixServer#executeRequest(List, RequestIo)}.
   * It's unfortunately complicated, so it's explained here.
   */
  private static class RequestIo {

    // Used by the client code
    private final InputStream in;
    private final OutErr requestOutErr;
    private final OutputStream controlChannel;

    // just used by this class to keep the state around
    private final Socket requestSocket;
    private final OutputStream requestOut;
    private final long firstContactTime;

    RequestIo(Socket requestSocket, long firstContactTime) throws IOException {
      this.requestSocket = requestSocket;
      this.firstContactTime = firstContactTime;
      this.in = requestSocket.getInputStream();
      this.requestOut = requestSocket.getOutputStream();

      // We encode the response sent to the client with a multiplexer so
      // we can send three streams (out / err / control) over one wire stream
      // (requestOut).
      StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut);

      // We'll be writing control messages (exit code + out of date message)
      // to this control channel.
      controlChannel = multiplexer.createControl();

      // This is the outErr part of the multiplexed output.
      requestOutErr = OutErr.create(multiplexer.createStdout(),
                                    multiplexer.createStderr());
      // We hook up System.out / System.err to our IO object. Stuff written to
      // System.out / System.err will show up on the user's screen, prefixed
      // with "System.out "/"System.err ".
      requestOutErr.addSystemOutErrAsSource();
    }

    public void writeExitStatus(int exitStatus) {
      // Make sure to flush the output / error streams prior to writing the exit status.
      // The client may stop reading that direction of the socket immediately upon reading the
      // exit code.
      flushOutErr();
      try {
        controlChannel.write((exitStatus >> 24) & 0xff);
        controlChannel.write((exitStatus >> 16) & 0xff);
        controlChannel.write((exitStatus >> 8) & 0xff);
        controlChannel.write(exitStatus & 0xff);
        controlChannel.flush();
        LOG.info("" + exitStatus);
      } catch (IOException ignored) {
        // This exception is historically ignored.
      }
    }

    private void flushOutErr() {
      try {
        requestOutErr.getOutputStream().flush();
      } catch (IOException e) {
        printStack(e);
      }
      try {
        requestOutErr.getErrorStream().flush();
      } catch (IOException e) {
        printStack(e);
      }
    }

    public void shutdown() {
      try {
        requestOut.close();
      } catch (IOException e) {
        printStack(e);
      }
      try {
        in.close();
      } catch (IOException e) {
        printStack(e);
      }
      try {
        requestSocket.close();
      } catch (IOException e) {
        printStack(e);
      }
    }
  }

  /**
   * Creates and returns a new RPC server.
   * Use {@link AfUnixServer#serve()} to start the server.
   *
   * @param appCommand The application's ServerCommand implementation.
   * @param serverDirectory The directory for server-related files. The caller
   * must ensure the directory has been created.
   * @param workspaceDir The workspace, used solely to ensure it persists.
   * @param maxIdleSeconds The idle time in seconds after which the rpc
   * server will die unless it receives a request.
   */
  public static AfUnixServer newServerWith(Clock clock,
                                        ServerCommand appCommand,
                                        Path serverDirectory,
                                        Path workspaceDir,
                                        int maxIdleSeconds)
      throws IOException {
    // Creates and starts the RPC server.
    RPCService service = new RPCService(appCommand);

    return new AfUnixServer(clock, service, maxIdleSeconds * 1000L,
                         serverDirectory, workspaceDir);
  }

}
