blob: 824db07fbfdd0059bf411fd4689bef04bb7db138 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
#include "src/core/lib/iomgr/tcp_server.h"
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server) {
grpc_tcp_server* s =
static_cast<grpc_tcp_server*>(gpr_zalloc(sizeof(grpc_tcp_server)));
s->so_reuseport = grpc_is_socket_reuse_port_supported();
s->expand_wildcard_addrs = false;
for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
(args->args[i].value.integer != 0);
} else {
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
} else {
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
}
}
}
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = false;
s->shutdown_starting.head = nullptr;
s->shutdown_starting.tail = nullptr;
s->shutdown_complete = shutdown_complete;
s->on_accept_cb = nullptr;
s->on_accept_cb_arg = nullptr;
s->head = nullptr;
s->tail = nullptr;
s->nports = 0;
s->channel_args = grpc_channel_args_copy(args);
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return GRPC_ERROR_NONE;
}
static void finish_shutdown(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
while (s->head) {
grpc_tcp_listener* sp = s->head;
s->head = sp->next;
gpr_free(sp);
}
grpc_channel_args_destroy(s->channel_args);
gpr_free(s);
}
static void destroyed_port(void* server, grpc_error* error) {
grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
} else {
GPR_ASSERT(s->destroyed_ports < s->nports);
gpr_mu_unlock(&s->mu);
}
}
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
static void deactivated_all_ports(grpc_tcp_server* s) {
/* delete ALL the things */
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->shutdown);
if (s->head) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
"tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
}
}
static void tcp_server_destroy(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
s->shutdown = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(
sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
deactivated_all_ports(s);
}
}
/* event manager callback when reads are ready */
static void on_read(void* arg, grpc_error* err) {
grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
grpc_pollset* read_notifier_pollset;
if (err != GRPC_ERROR_NONE) {
goto error;
}
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
grpc_resolved_address addr;
char* addr_str;
char* name;
memset(&addr, 0, sizeof(addr));
addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
/* Note: If we ever decide to return this address to the user, remember to
strip off the ::ffff:0.0.0.0/96 prefix first. */
int fd = grpc_accept4(sp->fd, &addr, 1, 1);
if (fd < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
return;
default:
gpr_mu_lock(&sp->server->mu);
if (!sp->server->shutdown_listeners) {
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
} else {
/* if we have shutdown listeners, accept4 could fail, and we
needn't notify users */
}
gpr_mu_unlock(&sp->server->mu);
goto error;
}
}
grpc_set_socket_no_sigpipe_if_possible(fd);
addr_str = grpc_sockaddr_to_uri(&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
grpc_fd* fdobj = grpc_fd_create(fd, name, true);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1)) %
sp->server->pollset_count];
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
// Create acceptor.
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = sp->fd_index;
sp->server->on_accept_cb(
sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
read_notifier_pollset, acceptor);
gpr_free(name);
gpr_free(addr_str);
}
GPR_UNREACHABLE_CODE(return );
error:
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(sp->server);
} else {
gpr_mu_unlock(&sp->server->mu);
}
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
unsigned port_index,
int requested_port,
int* out_port) {
grpc_resolved_address wild4;
grpc_resolved_address wild6;
unsigned fd_index = 0;
grpc_dualstack_mode dsmode;
grpc_tcp_listener* sp = nullptr;
grpc_tcp_listener* sp2 = nullptr;
grpc_error* v6_err = GRPC_ERROR_NONE;
grpc_error* v4_err = GRPC_ERROR_NONE;
*out_port = -1;
if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
out_port);
}
grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
/* Try listening on IPv6 first. */
if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
&dsmode, &sp)) == GRPC_ERROR_NONE) {
++fd_index;
requested_port = *out_port = sp->port;
if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
return GRPC_ERROR_NONE;
}
}
/* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
grpc_sockaddr_set_port(&wild4, requested_port);
if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
&dsmode, &sp2)) == GRPC_ERROR_NONE) {
*out_port = sp2->port;
if (sp != nullptr) {
sp2->is_sibling = 1;
sp->sibling = sp2;
}
}
if (*out_port > 0) {
if (v6_err != GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"Failed to add :: listener, "
"the environment may not support IPv6: %s",
grpc_error_string(v6_err));
GRPC_ERROR_UNREF(v6_err);
}
if (v4_err != GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"Failed to add 0.0.0.0 listener, "
"the environment may not support IPv4: %s",
grpc_error_string(v4_err));
GRPC_ERROR_UNREF(v4_err);
}
return GRPC_ERROR_NONE;
} else {
grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to add any wildcard listeners");
GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE);
root_err = grpc_error_add_child(root_err, v6_err);
root_err = grpc_error_add_child(root_err, v4_err);
return root_err;
}
}
static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
grpc_tcp_listener* sp = nullptr;
char* addr_str;
char* name;
grpc_error* err;
for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
l->fd_index += count;
}
for (unsigned i = 0; i < count; i++) {
int fd = -1;
int port = -1;
grpc_dualstack_mode dsmode;
err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
&fd);
if (err != GRPC_ERROR_NONE) return err;
err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
true, &port);
if (err != GRPC_ERROR_NONE) return err;
listener->server->nports++;
grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
sp->next = listener->next;
listener->next = sp;
/* sp (the new listener) is a sibling of 'listener' (the original
listener). */
sp->is_sibling = 1;
sp->sibling = listener->sibling;
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
sp->fd_index = listener->fd_index + count - i;
GPR_ASSERT(sp->emfd);
while (listener->server->tail->next != nullptr) {
listener->server->tail = listener->server->tail->next;
}
gpr_free(addr_str);
gpr_free(name);
}
return GRPC_ERROR_NONE;
}
static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* out_port) {
grpc_tcp_listener* sp;
grpc_resolved_address sockname_temp;
grpc_resolved_address addr6_v4mapped;
int requested_port = grpc_sockaddr_get_port(addr);
unsigned port_index = 0;
grpc_dualstack_mode dsmode;
grpc_error* err;
*out_port = -1;
if (s->tail != nullptr) {
port_index = s->tail->port_index + 1;
}
grpc_unlink_if_unix_domain_socket(addr);
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (requested_port == 0) {
for (sp = s->head; sp; sp = sp->next) {
sockname_temp.len =
static_cast<socklen_t>(sizeof(struct sockaddr_storage));
if (0 ==
getsockname(sp->fd,
reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
&sockname_temp.len)) {
int used_port = grpc_sockaddr_get_port(&sockname_temp);
if (used_port > 0) {
memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
grpc_sockaddr_set_port(&sockname_temp, used_port);
requested_port = used_port;
addr = &sockname_temp;
break;
}
}
}
}
if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
return add_wildcard_addrs_to_server(s, port_index, requested_port,
out_port);
}
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
}
if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
GRPC_ERROR_NONE) {
*out_port = sp->port;
}
return err;
}
/* Return listener at port_index or NULL. Should only be called with s->mu
locked. */
static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
unsigned port_index) {
unsigned num_ports = 0;
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
if (!sp->is_sibling) {
if (++num_ports > port_index) {
return sp;
}
}
}
return nullptr;
}
unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
unsigned num_fds = 0;
gpr_mu_lock(&s->mu);
grpc_tcp_listener* sp = get_port_index(s, port_index);
for (; sp; sp = sp->sibling) {
++num_fds;
}
gpr_mu_unlock(&s->mu);
return num_fds;
}
static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
unsigned fd_index) {
gpr_mu_lock(&s->mu);
grpc_tcp_listener* sp = get_port_index(s, port_index);
for (; sp; sp = sp->sibling, --fd_index) {
if (fd_index == 0) {
gpr_mu_unlock(&s->mu);
return sp->fd;
}
}
gpr_mu_unlock(&s->mu);
return -1;
}
static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
void* on_accept_cb_arg) {
size_t i;
grpc_tcp_listener* sp;
GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb);
GPR_ASSERT(s->active_ports == 0);
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets;
s->pollset_count = pollset_count;
sp = s->head;
while (sp != nullptr) {
if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
pollset_count > 1) {
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
}
} else {
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
}
}
gpr_mu_unlock(&s->mu);
}
grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
gpr_ref_non_zero(&s->refs);
return s;
}
static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
grpc_closure* shutdown_starting) {
gpr_mu_lock(&s->mu);
grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
GRPC_ERROR_NONE);
gpr_mu_unlock(&s->mu);
}
static void tcp_server_unref(grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(s);
gpr_mu_lock(&s->mu);
GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(s);
}
}
static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
s->shutdown_listeners = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(sp->emfd,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
}
}
gpr_mu_unlock(&s->mu);
}
grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_create,
tcp_server_start,
tcp_server_add_port,
tcp_server_port_fd_count,
tcp_server_port_fd,
tcp_server_ref,
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */