Various cleanups and refactorings in the client:

- Made the control flow much simpler and more understandable
- Added some documentation about the interplay of the client and the server
- Abstracted out POSIX mechanisms from blaze.cc so that they can be implemented properly on Windows
- Added assertions that the methods on BlazeServer are called when they should be

Polish for #930.

--
MOS_MIGRATED_REVID=121256601
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 4fdc75f..5a69658 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -98,19 +98,101 @@
 
 static void WriteFileToStreamOrDie(FILE *stream, const char *file_name);
 static string BuildServerRequest();
+static int GetServerPid(const string &server_dir);
 
-// Implementation of the communication protocol with the server process.
-// This object has two states: connected and disconnected. Most of the methods
-// can only be called in one of these states.
+// The following is a treatise on how the interaction between the client and the
+// server works.
+//
+// First, the client unconditionally acquires an flock() lock on
+// $OUTPUT_BASE/lock then verifies if it has already extracted itself by
+// checking if the directory it extracts itself to (install base + a checksum)
+// is present. If not, then it does the extraction. Care is taken that this
+// process is atomic so that Blazen in multiple output bases do not clash.
+//
+// Then the client tries to connect to the currently executing server and kills
+// it if at least one of the following conditions is true:
+//
+// - The server is of the wrong version (as determined by the
+//   $OUTPUT_BASE/install symlink)
+// - The server has different startup options than the client wants
+// - The client wants to run the command in batch mode
+//
+// Then, if needed, the client adjusts the install link to indicate which
+// version of the server it is running.
+//
+// In batch mode, the client then simply executes the server while taking care
+// that the output base lock is kept until it finishes.
+//
+// If in server mode, the client starts up a server if needed then sends the
+// command to the client and streams back stdout and stderr. If an AF_UNIX
+// socket is used, the output base lock is held until the command finishes. If
+// gRPC is used, the lock is released after the command is sent to the server
+// (the server implements its own locking mechanism)
+
+// Synchronization between the client and the server is a little precarious
+// because the client needs to know the PID of the server and it is not
+// available using a Java API and we don't have JNI on Windows at the moment,
+// so the server can't just communicate this over the communication channel.
+// Thus, a PID file is used, but care needs to be taken that the contents of
+// this PID file are right.
+//
+// Upon server startup, the PID file is written before the client spawns the
+// server. Thus, when the client can connect, it can be certain that the PID
+// file is up to date.
+//
+// Upon server shutdown, the PID file is deleted using a server shutdown hook.
+// However, this happens *after* the server stopped listening, so it's possible
+// that a client has already started up a server and written a new PID file.
+// In order to avoid this, when the client starts up a new server, it reads the
+// contents of the PID file and kills the process indicated in it (it could do
+// with a bit more care, since PIDs can be reused, but for now, we just believe
+// the PID file)
+//
+// Some more interesting scenarios:
+//
+// - The server receives a kill signal and it does not have a chance to delete
+//   the PID file: the client cannot connect, reads the PID file, kills the
+//   process indicated in it and starts up a new server.
+//
+// - The server stopped accepting connections but hasn't quit yet and a new
+//   client comes around: the new client will kill the server based on the
+//   PID file before a new server is started up.
+//
+// Alternative implementations:
+//
+// - Don't deal with PIDs at all. This would make it impossible for the client
+//   to deliver a SIGKILL to the server after three SIGINTs. It would only be
+//   possible with gRPC anyway.
+//
+// - Have the server check that the PID file containts the correct things
+//   before deleting them: there is a window of time between checking the file
+//   and deleting it in which a new server can overwrite the PID file. The
+//   output base lock cannot be acquired, either, because when starting up a
+//   new server, the client already holds it.
+//
+// - Delete the PID file before stopping to accept connections: then a client
+//   could come about after deleting the PID file but before stopping accepting
+//   connections. It would also not be resilient against a dead server that
+//   left a PID file around.
+//
+// - The communication method is changed between AF_UNIX and gRPC: the client
+//   will find that the server is not responsive and will kill it based on its
+//   PID.
 class BlazeServer {
  protected:
-  BlazeLock blaze_lock;
+  BlazeLock blaze_lock_;
+  bool connected_;
 
  public:
   virtual ~BlazeServer() {}
 
+  // Acquire a lock for the server running in this output base. Returns the
+  // number of milliseconds spent waiting for the lock.
   uint64_t AcquireLock();
 
+  // Whether there is an active connection to a server.
+  bool Connected() const { return connected_; }
+
   // Connect to the server. Returns if the connection was successful. Only
   // call this when this object is in disconnected state. If it returns true,
   // this object will be in connected state.
@@ -128,10 +210,11 @@
 
   // Disconnects and kills an existing server. Only call this when this object
   // is in connected state.
-  virtual void KillRunningServer(int server_pid) = 0;
+  virtual void KillRunningServer() = 0;
 
   // Cancel the currently running command. If there is no command currently
-  // running, the result is unspecified.
+  // running, the result is unspecified. When called, this object must be in
+  // connected state.
   virtual void Cancel() = 0;
 };
 
@@ -151,13 +234,13 @@
   globals->restart_reason = NO_RESTART;
 }
 
-
 uint64_t BlazeServer::AcquireLock() {
   return blaze::AcquireLock(
       globals->options.output_base, globals->options.batch,
-      globals->options.block_for_lock, &blaze_lock);
+      globals->options.block_for_lock, &blaze_lock_);
 }
 
+// Communication method that uses an AF_UNIX socket and a custom protocol.
 class AfUnixBlazeServer : public BlazeServer {
  public:
   AfUnixBlazeServer();
@@ -166,13 +249,15 @@
   bool Connect() override;
   void Disconnect() override;
   unsigned int Communicate() override;
-  void KillRunningServer(int server_pid) override;
+  void KillRunningServer() override;
   void Cancel() override;
 
  private:
   int server_socket_;
 };
 
+// Communication method that uses gRPC on a socket bound to localhost. More
+// documentation is in command_server.proto .
 class GrpcBlazeServer : public BlazeServer {
  public:
   GrpcBlazeServer();
@@ -181,7 +266,7 @@
   bool Connect() override;
   void Disconnect() override;
   unsigned int Communicate() override;
-  void KillRunningServer(int server_pid) override;
+  void KillRunningServer() override;
   void Cancel() override;
 
  private:
@@ -551,7 +636,7 @@
 
 // Starts the Blaze server.  Returns a readable fd connected to the server.
 // This is currently used only to detect liveness.
-static int StartServer() {
+static void StartServer(BlazeServerStartup** server_startup) {
   vector<string> jvm_args_vector = GetArgumentArray();
   string argument_string = GetArgumentString(jvm_args_vector);
   string server_dir = globals->options.output_base + "/server";
@@ -574,19 +659,19 @@
   // we can still print errors to the terminal.
   GoToWorkspace();
 
-  return ExecuteDaemon(exe, jvm_args_vector, globals->jvm_log_file.c_str(),
-                       server_dir);
+  ExecuteDaemon(exe, jvm_args_vector, globals->jvm_log_file.c_str(),
+                server_dir, server_startup);
 }
 
-static bool KillRunningServerIfAny(BlazeServer *server);
-
 // Replace this process with blaze in standalone/batch mode.
 // The batch mode blaze process handles the command and exits.
 //
 // This function passes the commands array to the blaze process.
 // This array should start with a command ("build", "info", etc.).
 static void StartStandalone(BlazeServer* server) {
-  KillRunningServerIfAny(server);
+  if (server->Connected()) {
+    server->KillRunningServer();
+  }
 
   // Wall clock time since process startup.
   globals->startup_time = ProcessClock() / 1000000LL;
@@ -629,9 +714,12 @@
 
 AfUnixBlazeServer::AfUnixBlazeServer() {
   server_socket_ = -1;
+  connected_ = false;
 }
 
 bool AfUnixBlazeServer::Connect() {
+  assert(!connected_);
+
   if (server_socket_ == -1) {
     server_socket_ = socket(PF_UNIX, SOCK_STREAM, 0);
     if (server_socket_ == -1)  {
@@ -656,7 +744,18 @@
     free(resolved_path);
     sockaddr *paddr = reinterpret_cast<sockaddr *>(&addr);
     int result = connect(server_socket_, paddr, sizeof addr);
-    return result == 0;
+    connected_ = result == 0;
+    if (connected_) {
+      string server_dir = globals->options.output_base + "/server";
+      globals->server_pid = GetServerPid(server_dir);
+      if (globals->server_pid <= 0) {
+        pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+             "can't get PID of existing server (server dir=%s)",
+             server_dir.c_str());
+      }
+    }
+
+    return connected_;
   } else if (errno == ENOENT) {  // No socket means no server to connect to
     errno = ECONNREFUSED;
     return false;
@@ -668,7 +767,10 @@
 }
 
 void AfUnixBlazeServer::Disconnect() {
+  assert(connected_);
+
   close(server_socket_);
+  connected_ = false;
   server_socket_ = -1;
 }
 
@@ -736,6 +838,8 @@
 }
 
 unsigned int AfUnixBlazeServer::Communicate() {
+  assert(connected_);
+
   const string request = BuildServerRequest();
 
   // Send request (Request is written in a single chunk.)
@@ -839,6 +943,7 @@
 }
 
 void AfUnixBlazeServer::Cancel() {
+  assert(connected_);
   kill(globals->server_pid, SIGINT);
 }
 
@@ -898,10 +1003,8 @@
   return result;
 }
 
-// Connects to the Blaze server, returning the socket, or -1 if no
-// server is running and !start.  If start, attempts to start a new
-// server, and exits on failure.
-static bool ConnectToServer(BlazeServer *server, bool start) {
+// Starts up a new server and connects to it. Exits if it didn't work not.
+static void StartServerAndConnect(BlazeServer *server) {
   string server_dir = globals->options.output_base + "/server";
 
   // The server dir has the socket, so we don't allow access by other
@@ -913,70 +1016,49 @@
 
   string socket_file = blaze_util::JoinPath(server_dir, "server.socket");
 
-  globals->server_pid = 0;
-  if (server->Connect()) {
-    globals->server_pid = GetServerPid(server_dir);
-    if (globals->server_pid == -1) {
-      pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
-           "can't get PID of existing server (server dir=%s)",
-           server_dir.c_str());
-    }
-    return true;
+  // If we couldn't connect to the server check if there is still a PID file
+  // and if so, kill the server that wrote it. This can happen e.g. if the
+  // server is in a GC pause and therefore cannot respond to ping requests and
+  // having two server instances running in the same output base is a
+  // disaster.
+  int server_pid = GetServerPid(server_dir);
+  if (server_pid > 0) {
+    fprintf(stderr,
+            "Found non-responsive server process (pid=%d). Killing it.\n",
+            server_pid);
+    KillServerProcess(server_pid, globals->options.output_base);
   }
 
-  if (start) {
-    // If we couldn't connect to the server check if there is still a PID file
-    // and if so, kill the server that wrote it. This can happen e.g. if the
-    // server is in a GC pause and therefore cannot respond to ping requests and
-    // having two server instances running in the same output base is a
-    // disaster.
-    int server_pid = GetServerPid(server_dir);
-    if (server_pid >= 0) {
-      killpg(server_pid, SIGKILL);
-    }
+  SetScheduling(globals->options.batch_cpu_scheduling,
+                globals->options.io_nice_level);
 
-    SetScheduling(globals->options.batch_cpu_scheduling,
-                  globals->options.io_nice_level);
-
-    int fd = StartServer();
-    if (fd != -1 && fcntl(fd, F_SETFL, O_NONBLOCK | fcntl(fd, F_GETFL))) {
-      pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
-           "Failed: fcntl to enable O_NONBLOCK on pipe");
-    }
-    // Give the server one minute to start up.
-    for (int ii = 0; ii < 600; ++ii) {
-      // 60s; enough time to connect with debugger
-      if (server->Connect()) {
-        if (ii) {
-          fputc('\n', stderr);
-          fflush(stderr);
-        }
-        globals->server_pid = GetServerPid(server_dir);
-        if (globals->server_pid == -1) {
-          pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
-               "can't get pid of fresh server from connection (dir=%s)",
-               server_dir.c_str());
-        }
-        return true;
+  BlazeServerStartup* server_startup;
+  StartServer(&server_startup);
+  // Give the server one minute to start up.
+  for (int ii = 0; ii < 600; ++ii) {
+    // 60s; enough time to connect with debugger
+    if (server->Connect()) {
+      if (ii) {
+        fputc('\n', stderr);
+        fflush(stderr);
       }
-      fputc('.', stderr);
-      fflush(stderr);
-      poll(NULL, 0, 1000);  // sleep 100ms.  (usleep(3) is obsolete.)
-      char c;
-      if (fd != -1 && (read(fd, &c, 1) != -1 || errno != EAGAIN)) {
-        fprintf(stderr, "\nunexpected pipe read status: %s\n"
-            "Server presumed dead. Now printing '%s':\n",
-            strerror(errno), globals->jvm_log_file.c_str());
-        WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str());
-        exit(blaze_exit_code::INTERNAL_ERROR);
-      }
+      delete server_startup;
+      return;
     }
-    die(blaze_exit_code::INTERNAL_ERROR,
-        "\nError: couldn't connect to server at '%s' after 60 seconds.",
-        socket_file.c_str());
-    // The if never falls through here.
+    fputc('.', stderr);
+    fflush(stderr);
+    poll(NULL, 0, 1000);  // sleep 100ms.  (usleep(3) is obsolete.)
+    if (!server_startup->IsStillAlive()) {
+      fprintf(stderr, "\nunexpected pipe read status: %s\n"
+          "Server presumed dead. Now printing '%s':\n",
+          strerror(errno), globals->jvm_log_file.c_str());
+      WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str());
+      exit(blaze_exit_code::INTERNAL_ERROR);
+    }
   }
-  return false;
+  die(blaze_exit_code::INTERNAL_ERROR,
+      "\nError: couldn't connect to server at '%s' after 60 seconds.",
+      socket_file.c_str());
 }
 
 // Poll until the given process denoted by pid goes away. Return false if this
@@ -994,27 +1076,33 @@
   return false;
 }
 
-// Kills the specified running Blaze server.
-void AfUnixBlazeServer::KillRunningServer(pid_t server_pid) {
+// Kills the specified running Blaze server. First we send a SIGTERM, and if
+// that does not kill the process, a SIGKILL.
+void AfUnixBlazeServer::KillRunningServer() {
+  assert(connected_);
+  assert(globals->server_pid > 0);
+
   close(server_socket_);
   server_socket_ = -1;
   fprintf(stderr, "Sending SIGTERM to previous %s server (pid=%d)... ",
-          globals->options.GetProductName().c_str(), server_pid);
+          globals->options.GetProductName().c_str(), globals->server_pid);
   fflush(stderr);
-  kill(server_pid, SIGTERM);
-  if (WaitForServerDeath(server_pid, 10)) {
+  kill(globals->server_pid, SIGTERM);
+  if (WaitForServerDeath(globals->server_pid, 10)) {
     fprintf(stderr, "done.\n");
+    connected_ = false;
     return;
   }
 
   // If the previous attempt did not suceeded, kill the whole group.
   fprintf(stderr,
           "Sending SIGKILL to previous %s server process group (pid=%d)... ",
-          globals->options.GetProductName().c_str(), server_pid);
+          globals->options.GetProductName().c_str(), globals->server_pid);
   fflush(stderr);
-  killpg(server_pid, SIGKILL);
-  if (WaitForServerDeath(server_pid, 10)) {
+  killpg(globals->server_pid, SIGKILL);
+  if (WaitForServerDeath(globals->server_pid, 10)) {
     fprintf(stderr, "killed.\n");
+    connected_ = false;
     return;
   }
 
@@ -1022,15 +1110,6 @@
   pdie(blaze_exit_code::INTERNAL_ERROR, "SIGKILL unsuccessful after 10s");
 }
 
-// Kills the running Blaze server, if any.  Finds the pid from the socket.
-static bool KillRunningServerIfAny(BlazeServer* server) {
-  if (ConnectToServer(server, /*start=*/false)) {
-    server->KillRunningServer(globals->server_pid);
-    return true;
-  }
-  return false;
-}
-
 // Calls fsync() on the file (or directory) specified in 'file_path'.
 // pdie()'s if syncing fails.
 static void SyncFile(const char *file_path) {
@@ -1281,7 +1360,7 @@
       continue;
     }
 
-    if (args1[i] !=args2[i]) {
+    if (args1[i] != args2[i]) {
       return true;
     }
 
@@ -1296,7 +1375,7 @@
 
 // Kills the running Blaze server, if any, if the startup options do not match.
 static void KillRunningServerIfDifferentStartupOptions(BlazeServer* server) {
-  if (!ConnectToServer(server, /*start=*/false)) {
+  if (!server->Connected()) {
     return;
   }
 
@@ -1319,9 +1398,7 @@
             "WARNING: Running %s server needs to be killed, because the "
             "startup options are different.\n",
             globals->options.GetProductName().c_str());
-    server->KillRunningServer(globals->server_pid);
-  } else {
-    server->Disconnect();
+    server->KillRunningServer();
   }
 }
 
@@ -1340,9 +1417,11 @@
   bool ok = ReadDirectorySymlink(installation_path.c_str(), &prev_installation);
   if (!ok || !CompareAbsolutePaths(
           prev_installation, globals->options.install_base)) {
-    if (KillRunningServerIfAny(server)) {
-      globals->restart_reason = NEW_VERSION;
+    if (server->Connected()) {
+      server->KillRunningServer();
     }
+
+    globals->restart_reason = NEW_VERSION;
     UnlinkPath(installation_path.c_str());
     if (!SymlinkDirectories(globals->options.install_base.c_str(),
                             installation_path.c_str())) {
@@ -1444,7 +1523,10 @@
 // shuts down the client (by exit or signal).
 static ATTRIBUTE_NORETURN void SendServerRequest(BlazeServer* server) {
   while (true) {
-    ConnectToServer(server, /*start=*/true);
+    if (!server->Connected()) {
+      StartServerAndConnect(server);
+    }
+
     // Check for deleted server cwd:
     string server_cwd = GetProcessCWD(globals->server_pid);
     // TODO(bazel-team): Is this check even necessary? If someone deletes or
@@ -1474,7 +1556,7 @@
         fprintf(stderr, "Server's cwd moved or deleted (%s).\n",
                 server_cwd.c_str());
       }
-      server->KillRunningServer(globals->server_pid);
+      server->KillRunningServer();
     } else {
       break;
     }
@@ -1777,6 +1859,7 @@
   EnsureFiniteStackLimit();
 
   ExtractData(self_path);
+  blaze_server->Connect();
   EnsureCorrectRunningVersion(blaze_server);
   KillRunningServerIfDifferentStartupOptions(blaze_server);
 
@@ -1795,9 +1878,12 @@
 
 GrpcBlazeServer::GrpcBlazeServer() {
   gpr_set_log_function(null_grpc_log_function);
+  connected_ = false;
 }
 
 bool GrpcBlazeServer::Connect() {
+  assert(!connected_);
+
   std::string server_dir = globals->options.output_base + "/server";
   std::string port;
   std::string ipv4_prefix = "127.0.0.1:";
@@ -1842,7 +1928,15 @@
     return false;
   }
 
+  globals->server_pid = GetServerPid(server_dir);
+  if (globals->server_pid <= 0) {
+    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+         "can't get PID of existing server (server dir=%s)",
+         server_dir.c_str());
+  }
+
   this->client_ = std::move(client);
+  connected_ = true;
   return true;
 }
 
@@ -1901,7 +1995,11 @@
   }
 }
 
-void GrpcBlazeServer::KillRunningServer(int server_pid) {
+// This will wait indefinitely until the server shuts down
+void GrpcBlazeServer::KillRunningServer() {
+  assert(connected_);
+  assert(globals->server_pid > 0);
+
   grpc::ClientContext context;
   command_server::RunRequest request;
   command_server::RunResponse response;
@@ -1915,11 +2013,15 @@
 
   while (reader->Read(&response)) {}
 
-  // Send a SIGKILL for good measure
-  killpg(server_pid, SIGKILL);
+  // Kill the server process for good measure.
+  KillServerProcess(globals->server_pid, globals->options.output_base);
+
+  connected_ = false;
 }
 
 unsigned int GrpcBlazeServer::Communicate() {
+  assert(connected_);
+
   vector<string> arg_vector;
   string command = globals->option_processor.GetCommand();
   if (command != "") {
@@ -1945,7 +2047,7 @@
   // Release the server lock because the gRPC handles concurrent clients just
   // fine. Note that this may result in two "waiting for other client" messages
   // (one during server startup and one emitted by the server)
-  blaze::ReleaseLock(&blaze_lock);
+  blaze::ReleaseLock(&blaze_lock_);
 
   {
     std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
@@ -1996,12 +2098,17 @@
 }
 
 void GrpcBlazeServer::Disconnect() {
+  assert(connected_);
+
   client_.reset();
   request_cookie_ = "";
   response_cookie_ = "";
+  connected_ = false;
 }
 
 void GrpcBlazeServer::Cancel() {
+  assert(connected_);
+
   std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
   // Wake up the cancellation thread and tell it to issue its RPC
   cancel_thread_action_ = CANCEL;