Bazel client: create a wrapper around Unix pipes This allows implementing pipe-handling in a platform-specific way. Windows also supports pipes but through its own API. -- MOS_MIGRATED_REVID=139564316
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index 8f3151a..9633565 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc
@@ -236,8 +236,10 @@ std::mutex cancel_thread_mutex_; int connect_timeout_secs_; - int recv_socket_; // Socket the cancel thread reads actions from - int send_socket_; // Socket the main thread writes actions to + + // Pipe that the main thread sends actions to and the cancel thread receieves + // actions from. + blaze_util::IPipe* _pipe; void CancelThread(); void SendAction(CancelThreadAction action); @@ -1404,28 +1406,15 @@ gpr_set_log_function(null_grpc_log_function); - int fd[2]; - if (pipe(fd) < 0) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "pipe()"); - } - recv_socket_ = fd[0]; - send_socket_ = fd[1]; - - if (fcntl(recv_socket_, F_SETFD, FD_CLOEXEC) == -1) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "fcntl(F_SETFD, FD_CLOEXEC) failed"); - } - - if (fcntl(send_socket_, F_SETFD, FD_CLOEXEC) == -1) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "fcntl(F_SETFD, FD_CLOEXEC) failed"); + _pipe = blaze_util::CreatePipe(); + if (_pipe == NULL) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "Couldn't create pipe"); } } GrpcBlazeServer::~GrpcBlazeServer() { - close(send_socket_); - close(recv_socket_); + delete _pipe; + _pipe = NULL; } bool GrpcBlazeServer::Connect() { @@ -1528,8 +1517,9 @@ bool command_id_received = false; while (running) { char buf; - int bytes_read = read(recv_socket_, &buf, 1); - if (bytes_read == -1 && errno == EINTR) { + + int bytes_read = _pipe->Receive(&buf, 1); + if (bytes_read < 0 && errno == EINTR) { continue; } else if (bytes_read != 1) { pdie(blaze_exit_code::INTERNAL_ERROR, @@ -1699,7 +1689,7 @@ void GrpcBlazeServer::SendAction(CancelThreadAction action) { char msg = action; - if (write(send_socket_, &msg, 1) <= 0) { + if (!_pipe->Send(&msg, 1)) { sigprintf("\nCould not interrupt server (cannot write to client pipe)\n\n"); } }
diff --git a/src/main/cpp/util/file.h b/src/main/cpp/util/file.h index dfa08ad..6118eef 100644 --- a/src/main/cpp/util/file.h +++ b/src/main/cpp/util/file.h
@@ -19,6 +19,18 @@ namespace blaze_util { +class IPipe { + public: + virtual ~IPipe() {} + + // Sends `size` bytes from `buffer` through the pipe. + virtual bool Send(void *buffer, size_t size) = 0; + + // Receives at most `size` bytes into `buffer` from the pipe. + // Returns the number of bytes received; sets `errno` upon error. + virtual int Receive(void *buffer, size_t size) = 0; +}; + // Returns the part of the path before the final "/". If there is a single // leading "/" in the path, the result will be the leading "/". If there is // no "/" in the path, the result is the empty prefix of the input (i.e., "").
diff --git a/src/main/cpp/util/file_platform.h b/src/main/cpp/util/file_platform.h index a807c0d..7682e21 100644 --- a/src/main/cpp/util/file_platform.h +++ b/src/main/cpp/util/file_platform.h
@@ -21,6 +21,10 @@ namespace blaze_util { +class IPipe; + +IPipe* CreatePipe(); + // Checks each element of the PATH variable for executable. If none is found, "" // is returned. Otherwise, the full path to executable is returned. Can die if // looking up PATH fails.
diff --git a/src/main/cpp/util/file_posix.cc b/src/main/cpp/util/file_posix.cc index b6c37e1..5bb8772 100644 --- a/src/main/cpp/util/file_posix.cc +++ b/src/main/cpp/util/file_posix.cc
@@ -33,6 +33,53 @@ using std::pair; using std::string; +class PosixPipe : public IPipe { + public: + PosixPipe(int recv_socket, int send_socket) + : _recv_socket(recv_socket), _send_socket(send_socket) {} + + PosixPipe() = delete; + + virtual ~PosixPipe() { + close(_recv_socket); + close(_send_socket); + } + + // Sends `size` bytes from `buffer` through the pipe. + bool Send(void* buffer, size_t size) override { + return write(_send_socket, buffer, size) == size; + } + + // Receives at most `size` bytes into `buffer` from the pipe. + // Returns the number of bytes received; sets `errno` upon error. + int Receive(void* buffer, size_t size) override { + return read(_recv_socket, buffer, size); + } + + private: + int _recv_socket; + int _send_socket; +}; + +IPipe* CreatePipe() { + int fd[2]; + if (pipe(fd) < 0) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "pipe()"); + } + + if (fcntl(fd[0], F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } + + if (fcntl(fd[1], F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } + + return new PosixPipe(fd[0], fd[1]); +} + string Which(const string &executable) { char *path_cstr = getenv("PATH"); if (path_cstr == NULL || path_cstr[0] == '\0') {