Bazel client: create a wrapper around Unix pipes

This allows implementing pipe-handling in a
platform-specific way. Windows also supports pipes
but through its own API.

--
MOS_MIGRATED_REVID=139564316
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 8f3151a..9633565 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -236,8 +236,10 @@
   std::mutex cancel_thread_mutex_;
 
   int connect_timeout_secs_;
-  int recv_socket_;  // Socket the cancel thread reads actions from
-  int send_socket_;  // Socket the main thread writes actions to
+
+  // Pipe that the main thread sends actions to and the cancel thread receieves
+  // actions from.
+  blaze_util::IPipe* _pipe;
 
   void CancelThread();
   void SendAction(CancelThreadAction action);
@@ -1404,28 +1406,15 @@
 
   gpr_set_log_function(null_grpc_log_function);
 
-  int fd[2];
-  if (pipe(fd) < 0) {
-    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
-         "pipe()");
-  }
-  recv_socket_ = fd[0];
-  send_socket_ = fd[1];
-
-  if (fcntl(recv_socket_, F_SETFD, FD_CLOEXEC) == -1) {
-    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
-         "fcntl(F_SETFD, FD_CLOEXEC) failed");
-  }
-
-  if (fcntl(send_socket_, F_SETFD, FD_CLOEXEC) == -1) {
-    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
-         "fcntl(F_SETFD, FD_CLOEXEC) failed");
+  _pipe = blaze_util::CreatePipe();
+  if (_pipe == NULL) {
+    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "Couldn't create pipe");
   }
 }
 
 GrpcBlazeServer::~GrpcBlazeServer() {
-  close(send_socket_);
-  close(recv_socket_);
+  delete _pipe;
+  _pipe = NULL;
 }
 
 bool GrpcBlazeServer::Connect() {
@@ -1528,8 +1517,9 @@
   bool command_id_received = false;
   while (running) {
     char buf;
-    int bytes_read = read(recv_socket_, &buf, 1);
-    if (bytes_read == -1 && errno == EINTR) {
+
+    int bytes_read = _pipe->Receive(&buf, 1);
+    if (bytes_read < 0 && errno == EINTR) {
       continue;
     } else if (bytes_read != 1) {
       pdie(blaze_exit_code::INTERNAL_ERROR,
@@ -1699,7 +1689,7 @@
 
 void GrpcBlazeServer::SendAction(CancelThreadAction action) {
   char msg = action;
-  if (write(send_socket_, &msg, 1) <= 0) {
+  if (!_pipe->Send(&msg, 1)) {
     sigprintf("\nCould not interrupt server (cannot write to client pipe)\n\n");
   }
 }
diff --git a/src/main/cpp/util/file.h b/src/main/cpp/util/file.h
index dfa08ad..6118eef 100644
--- a/src/main/cpp/util/file.h
+++ b/src/main/cpp/util/file.h
@@ -19,6 +19,18 @@
 
 namespace blaze_util {
 
+class IPipe {
+ public:
+  virtual ~IPipe() {}
+
+  // Sends `size` bytes from `buffer` through the pipe.
+  virtual bool Send(void *buffer, size_t size) = 0;
+
+  // Receives at most `size` bytes into `buffer` from the pipe.
+  // Returns the number of bytes received; sets `errno` upon error.
+  virtual int Receive(void *buffer, size_t size) = 0;
+};
+
 // Returns the part of the path before the final "/".  If there is a single
 // leading "/" in the path, the result will be the leading "/".  If there is
 // no "/" in the path, the result is the empty prefix of the input (i.e., "").
diff --git a/src/main/cpp/util/file_platform.h b/src/main/cpp/util/file_platform.h
index a807c0d..7682e21 100644
--- a/src/main/cpp/util/file_platform.h
+++ b/src/main/cpp/util/file_platform.h
@@ -21,6 +21,10 @@
 
 namespace blaze_util {
 
+class IPipe;
+
+IPipe* CreatePipe();
+
 // Checks each element of the PATH variable for executable. If none is found, ""
 // is returned.  Otherwise, the full path to executable is returned. Can die if
 // looking up PATH fails.
diff --git a/src/main/cpp/util/file_posix.cc b/src/main/cpp/util/file_posix.cc
index b6c37e1..5bb8772 100644
--- a/src/main/cpp/util/file_posix.cc
+++ b/src/main/cpp/util/file_posix.cc
@@ -33,6 +33,53 @@
 using std::pair;
 using std::string;
 
+class PosixPipe : public IPipe {
+ public:
+  PosixPipe(int recv_socket, int send_socket)
+      : _recv_socket(recv_socket), _send_socket(send_socket) {}
+
+  PosixPipe() = delete;
+
+  virtual ~PosixPipe() {
+    close(_recv_socket);
+    close(_send_socket);
+  }
+
+  // Sends `size` bytes from `buffer` through the pipe.
+  bool Send(void* buffer, size_t size) override {
+    return write(_send_socket, buffer, size) == size;
+  }
+
+  // Receives at most `size` bytes into `buffer` from the pipe.
+  // Returns the number of bytes received; sets `errno` upon error.
+  int Receive(void* buffer, size_t size) override {
+    return read(_recv_socket, buffer, size);
+  }
+
+ private:
+  int _recv_socket;
+  int _send_socket;
+};
+
+IPipe* CreatePipe() {
+  int fd[2];
+  if (pipe(fd) < 0) {
+    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "pipe()");
+  }
+
+  if (fcntl(fd[0], F_SETFD, FD_CLOEXEC) == -1) {
+    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+         "fcntl(F_SETFD, FD_CLOEXEC) failed");
+  }
+
+  if (fcntl(fd[1], F_SETFD, FD_CLOEXEC) == -1) {
+    pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+         "fcntl(F_SETFD, FD_CLOEXEC) failed");
+  }
+
+  return new PosixPipe(fd[0], fd[1]);
+}
+
 string Which(const string &executable) {
   char *path_cstr = getenv("PATH");
   if (path_cstr == NULL || path_cstr[0] == '\0') {