| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| |
| #include <algorithm> |
| #include <cstring> |
| |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/string_util.h> |
| |
| #include "src/core/ext/filters/client_channel/client_channel.h" |
| #include "src/core/ext/filters/client_channel/health/health_check_client.h" |
| #include "src/core/ext/filters/client_channel/parse_address.h" |
| #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" |
| #include "src/core/ext/filters/client_channel/subchannel_index.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/connected_channel.h" |
| #include "src/core/lib/debug/stats.h" |
| #include "src/core/lib/gpr/alloc.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/gprpp/mutex_lock.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/iomgr/sockaddr_utils.h" |
| #include "src/core/lib/iomgr/timer.h" |
| #include "src/core/lib/profiling/timers.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/surface/channel_init.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/service_config.h" |
| #include "src/core/lib/transport/status_metadata.h" |
| #include "src/core/lib/uri/uri_parser.h" |
| |
| #define INTERNAL_REF_BITS 16 |
| #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) |
| |
| #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
| #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
| #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 |
| #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 |
| #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 |
| |
| namespace { |
| struct state_watcher { |
| grpc_closure closure; |
| grpc_subchannel* subchannel; |
| grpc_connectivity_state connectivity_state; |
| grpc_connectivity_state last_connectivity_state; |
| grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client; |
| grpc_closure health_check_closure; |
| grpc_connectivity_state health_state; |
| }; |
| } // namespace |
| |
| typedef struct external_state_watcher { |
| grpc_subchannel* subchannel; |
| grpc_pollset_set* pollset_set; |
| grpc_closure* notify; |
| grpc_closure closure; |
| struct external_state_watcher* next; |
| struct external_state_watcher* prev; |
| } external_state_watcher; |
| |
| namespace grpc_core { |
| |
| class ConnectedSubchannelStateWatcher; |
| |
| } // namespace grpc_core |
| |
| struct grpc_subchannel { |
| grpc_connector* connector; |
| |
| /** refcount |
| - lower INTERNAL_REF_BITS bits are for internal references: |
| these do not keep the subchannel open. |
| - upper remaining bits are for public references: these do |
| keep the subchannel open */ |
| gpr_atm ref_pair; |
| |
| /** non-transport related channel filters */ |
| const grpc_channel_filter** filters; |
| size_t num_filters; |
| /** channel arguments */ |
| grpc_channel_args* args; |
| |
| grpc_subchannel_key* key; |
| |
| /** set during connection */ |
| grpc_connect_out_args connecting_result; |
| |
| /** callback for connection finishing */ |
| grpc_closure on_connected; |
| |
| /** callback for our alarm */ |
| grpc_closure on_alarm; |
| |
| /** pollset_set tracking who's interested in a connection |
| being setup */ |
| grpc_pollset_set* pollset_set; |
| |
| grpc_core::UniquePtr<char> health_check_service_name; |
| |
| /** mutex protecting remaining elements */ |
| gpr_mu mu; |
| |
| /** active connection, or null */ |
| grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; |
| grpc_core::OrphanablePtr<grpc_core::ConnectedSubchannelStateWatcher> |
| connected_subchannel_watcher; |
| |
| /** have we seen a disconnection? */ |
| bool disconnected; |
| /** are we connecting */ |
| bool connecting; |
| |
| /** connectivity state tracking */ |
| grpc_connectivity_state_tracker state_tracker; |
| grpc_connectivity_state_tracker state_and_health_tracker; |
| |
| external_state_watcher root_external_state_watcher; |
| |
| /** backoff state */ |
| grpc_core::ManualConstructor<grpc_core::BackOff> backoff; |
| grpc_millis next_attempt_deadline; |
| grpc_millis min_connect_timeout_ms; |
| |
| /** do we have an active alarm? */ |
| bool have_alarm; |
| /** have we started the backoff loop */ |
| bool backoff_begun; |
| // reset_backoff() was called while alarm was pending |
| bool retry_immediately; |
| /** our alarm */ |
| grpc_timer alarm; |
| |
| grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> |
| channelz_subchannel; |
| }; |
| |
| struct grpc_subchannel_call { |
| grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection, |
| const grpc_core::ConnectedSubchannel::CallArgs& args) |
| : connection(connection), deadline(args.deadline) {} |
| |
| grpc_core::ConnectedSubchannel* connection; |
| grpc_closure* schedule_closure_after_destroy = nullptr; |
| // state needed to support channelz interception of recv trailing metadata. |
| grpc_closure recv_trailing_metadata_ready; |
| grpc_closure* original_recv_trailing_metadata; |
| grpc_metadata_batch* recv_trailing_metadata = nullptr; |
| grpc_millis deadline; |
| }; |
| |
| static void maybe_start_connecting_locked(grpc_subchannel* c); |
| |
| static const char* subchannel_connectivity_state_change_string( |
| grpc_connectivity_state state) { |
| switch (state) { |
| case GRPC_CHANNEL_IDLE: |
| return "Subchannel state change to IDLE"; |
| case GRPC_CHANNEL_CONNECTING: |
| return "Subchannel state change to CONNECTING"; |
| case GRPC_CHANNEL_READY: |
| return "Subchannel state change to READY"; |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: |
| return "Subchannel state change to TRANSIENT_FAILURE"; |
| case GRPC_CHANNEL_SHUTDOWN: |
| return "Subchannel state change to SHUTDOWN"; |
| } |
| GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
| } |
| |
| static void set_subchannel_connectivity_state_locked( |
| grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error, |
| const char* reason) { |
| if (c->channelz_subchannel != nullptr) { |
| c->channelz_subchannel->AddTraceEvent( |
| grpc_core::channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string( |
| subchannel_connectivity_state_change_string(state))); |
| } |
| grpc_connectivity_state_set(&c->state_tracker, state, error, reason); |
| } |
| |
| namespace grpc_core { |
| |
| class ConnectedSubchannelStateWatcher |
| : public InternallyRefCounted<ConnectedSubchannelStateWatcher> { |
| public: |
| // Must be instantiated while holding c->mu. |
| explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c) |
| : subchannel_(c) { |
| // Steal subchannel ref for connecting. |
| GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); |
| GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); |
| // Start watching for connectivity state changes. |
| // Callback uses initial ref to this. |
| GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, |
| grpc_schedule_on_exec_ctx); |
| c->connected_subchannel->NotifyOnStateChange(c->pollset_set, |
| &pending_connectivity_state_, |
| &on_connectivity_changed_); |
| // Start health check if needed. |
| grpc_connectivity_state health_state = GRPC_CHANNEL_READY; |
| if (c->health_check_service_name != nullptr) { |
| health_check_client_ = grpc_core::MakeOrphanable<HealthCheckClient>( |
| c->health_check_service_name.get(), c->connected_subchannel, |
| c->pollset_set, c->channelz_subchannel); |
| GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, |
| grpc_schedule_on_exec_ctx); |
| Ref().release(); // Ref for health callback tracked manually. |
| health_check_client_->NotifyOnHealthChange(&health_state_, |
| &on_health_changed_); |
| health_state = GRPC_CHANNEL_CONNECTING; |
| } |
| // Report initial state. |
| set_subchannel_connectivity_state_locked( |
| c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected"); |
| grpc_connectivity_state_set(&c->state_and_health_tracker, health_state, |
| GRPC_ERROR_NONE, "subchannel_connected"); |
| } |
| |
| ~ConnectedSubchannelStateWatcher() { |
| GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher"); |
| } |
| |
| void Orphan() override { health_check_client_.reset(); } |
| |
| private: |
| static void OnConnectivityChanged(void* arg, grpc_error* error) { |
| auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg); |
| grpc_subchannel* c = self->subchannel_; |
| { |
| MutexLock lock(&c->mu); |
| switch (self->pending_connectivity_state_) { |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: |
| case GRPC_CHANNEL_SHUTDOWN: { |
| if (!c->disconnected && c->connected_subchannel != nullptr) { |
| if (grpc_trace_stream_refcount.enabled()) { |
| gpr_log(GPR_INFO, |
| "Connected subchannel %p of subchannel %p has gone into " |
| "%s. Attempting to reconnect.", |
| c->connected_subchannel.get(), c, |
| grpc_connectivity_state_name( |
| self->pending_connectivity_state_)); |
| } |
| c->connected_subchannel.reset(); |
| c->connected_subchannel_watcher.reset(); |
| self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; |
| set_subchannel_connectivity_state_locked( |
| c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), |
| "reflect_child"); |
| grpc_connectivity_state_set(&c->state_and_health_tracker, |
| GRPC_CHANNEL_TRANSIENT_FAILURE, |
| GRPC_ERROR_REF(error), "reflect_child"); |
| c->backoff_begun = false; |
| c->backoff->Reset(); |
| maybe_start_connecting_locked(c); |
| } else { |
| self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; |
| } |
| self->health_check_client_.reset(); |
| break; |
| } |
| default: { |
| // In principle, this should never happen. We should not get |
| // a callback for READY, because that was the state we started |
| // this watch from. And a connected subchannel should never go |
| // from READY to CONNECTING or IDLE. |
| self->last_connectivity_state_ = self->pending_connectivity_state_; |
| set_subchannel_connectivity_state_locked( |
| c, self->pending_connectivity_state_, GRPC_ERROR_REF(error), |
| "reflect_child"); |
| if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { |
| grpc_connectivity_state_set(&c->state_and_health_tracker, |
| self->pending_connectivity_state_, |
| GRPC_ERROR_REF(error), "reflect_child"); |
| } |
| c->connected_subchannel->NotifyOnStateChange( |
| nullptr, &self->pending_connectivity_state_, |
| &self->on_connectivity_changed_); |
| self = nullptr; // So we don't unref below. |
| } |
| } |
| } |
| // Don't unref until we've released the lock, because this might |
| // cause the subchannel (which contains the lock) to be destroyed. |
| if (self != nullptr) self->Unref(); |
| } |
| |
| static void OnHealthChanged(void* arg, grpc_error* error) { |
| auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg); |
| if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) { |
| self->Unref(); |
| return; |
| } |
| grpc_subchannel* c = self->subchannel_; |
| MutexLock lock(&c->mu); |
| if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { |
| grpc_connectivity_state_set(&c->state_and_health_tracker, |
| self->health_state_, GRPC_ERROR_REF(error), |
| "health_changed"); |
| } |
| self->health_check_client_->NotifyOnHealthChange(&self->health_state_, |
| &self->on_health_changed_); |
| } |
| |
| grpc_subchannel* subchannel_; |
| grpc_closure on_connectivity_changed_; |
| grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; |
| grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY; |
| grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client_; |
| grpc_closure on_health_changed_; |
| grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING; |
| }; |
| |
| } // namespace grpc_core |
| |
| #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ |
| (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ |
| sizeof(grpc_subchannel_call))) |
| #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ |
| (grpc_subchannel_call*)(((char*)(call_stack)) - \ |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ |
| sizeof(grpc_subchannel_call))) |
| |
| static void on_subchannel_connected(void* subchannel, grpc_error* error); |
| |
| #ifndef NDEBUG |
| #define REF_REASON reason |
| #define REF_MUTATE_EXTRA_ARGS \ |
| GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose |
| #define REF_MUTATE_PURPOSE(x) , file, line, reason, x |
| #else |
| #define REF_REASON "" |
| #define REF_MUTATE_EXTRA_ARGS |
| #define REF_MUTATE_PURPOSE(x) |
| #endif |
| |
| /* |
| * connection implementation |
| */ |
| |
| static void connection_destroy(void* arg, grpc_error* error) { |
| grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg); |
| grpc_channel_stack_destroy(stk); |
| gpr_free(stk); |
| } |
| |
| /* |
| * grpc_subchannel implementation |
| */ |
| |
| static void subchannel_destroy(void* arg, grpc_error* error) { |
| grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); |
| if (c->channelz_subchannel != nullptr) { |
| c->channelz_subchannel->AddTraceEvent( |
| grpc_core::channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string("Subchannel destroyed")); |
| c->channelz_subchannel->MarkSubchannelDestroyed(); |
| c->channelz_subchannel.reset(); |
| } |
| gpr_free((void*)c->filters); |
| c->health_check_service_name.reset(); |
| grpc_channel_args_destroy(c->args); |
| grpc_connectivity_state_destroy(&c->state_tracker); |
| grpc_connectivity_state_destroy(&c->state_and_health_tracker); |
| grpc_connector_unref(c->connector); |
| grpc_pollset_set_destroy(c->pollset_set); |
| grpc_subchannel_key_destroy(c->key); |
| gpr_mu_destroy(&c->mu); |
| gpr_free(c); |
| } |
| |
| static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta, |
| int barrier REF_MUTATE_EXTRA_ARGS) { |
| gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) |
| : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); |
| #ifndef NDEBUG |
| if (grpc_trace_stream_refcount.enabled()) { |
| gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, |
| "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, |
| purpose, old_val, old_val + delta, reason); |
| } |
| #endif |
| return old_val; |
| } |
| |
| grpc_subchannel* grpc_subchannel_ref( |
| grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), |
| 0 REF_MUTATE_PURPOSE("STRONG_REF")); |
| GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); |
| return c; |
| } |
| |
| grpc_subchannel* grpc_subchannel_weak_ref( |
| grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); |
| GPR_ASSERT(old_refs != 0); |
| return c; |
| } |
| |
| grpc_subchannel* grpc_subchannel_ref_from_weak_ref( |
| grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| if (!c) return nullptr; |
| for (;;) { |
| gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); |
| if (old_refs >= (1 << INTERNAL_REF_BITS)) { |
| gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); |
| if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { |
| return c; |
| } |
| } else { |
| return nullptr; |
| } |
| } |
| } |
| |
| static void disconnect(grpc_subchannel* c) { |
| grpc_subchannel_index_unregister(c->key, c); |
| gpr_mu_lock(&c->mu); |
| GPR_ASSERT(!c->disconnected); |
| c->disconnected = true; |
| grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Subchannel disconnected")); |
| c->connected_subchannel.reset(); |
| c->connected_subchannel_watcher.reset(); |
| gpr_mu_unlock(&c->mu); |
| } |
| |
| void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| // add a weak ref and subtract a strong ref (atomically) |
| old_refs = ref_mutate( |
| c, static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS), |
| 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); |
| if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { |
| disconnect(c); |
| } |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref"); |
| } |
| |
| void grpc_subchannel_weak_unref( |
| grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| old_refs = ref_mutate(c, -static_cast<gpr_atm>(1), |
| 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); |
| if (old_refs == 1) { |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), |
| GRPC_ERROR_NONE); |
| } |
| } |
| |
| static void parse_args_for_backoff_values( |
| const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, |
| grpc_millis* min_connect_timeout_ms) { |
| grpc_millis initial_backoff_ms = |
| GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; |
| *min_connect_timeout_ms = |
| GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; |
| grpc_millis max_backoff_ms = |
| GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; |
| bool fixed_reconnect_backoff = false; |
| if (args != nullptr) { |
| for (size_t i = 0; i < args->num_args; i++) { |
| if (0 == strcmp(args->args[i].key, |
| "grpc.testing.fixed_reconnect_backoff_ms")) { |
| fixed_reconnect_backoff = true; |
| initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = |
| grpc_channel_arg_get_integer( |
| &args->args[i], |
| {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); |
| } else if (0 == |
| strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { |
| fixed_reconnect_backoff = false; |
| *min_connect_timeout_ms = grpc_channel_arg_get_integer( |
| &args->args[i], |
| {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX}); |
| } else if (0 == |
| strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { |
| fixed_reconnect_backoff = false; |
| max_backoff_ms = grpc_channel_arg_get_integer( |
| &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX}); |
| } else if (0 == strcmp(args->args[i].key, |
| GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { |
| fixed_reconnect_backoff = false; |
| initial_backoff_ms = grpc_channel_arg_get_integer( |
| &args->args[i], |
| {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); |
| } |
| } |
| } |
| backoff_options->set_initial_backoff(initial_backoff_ms) |
| .set_multiplier(fixed_reconnect_backoff |
| ? 1.0 |
| : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) |
| .set_jitter(fixed_reconnect_backoff ? 0.0 |
| : GRPC_SUBCHANNEL_RECONNECT_JITTER) |
| .set_max_backoff(max_backoff_ms); |
| } |
| |
| namespace grpc_core { |
| namespace { |
| |
| struct HealthCheckParams { |
| UniquePtr<char> service_name; |
| |
| static void Parse(const grpc_json* field, HealthCheckParams* params) { |
| if (strcmp(field->key, "healthCheckConfig") == 0) { |
| if (field->type != GRPC_JSON_OBJECT) return; |
| for (grpc_json* sub_field = field->child; sub_field != nullptr; |
| sub_field = sub_field->next) { |
| if (sub_field->key == nullptr) return; |
| if (strcmp(sub_field->key, "serviceName") == 0) { |
| if (params->service_name != nullptr) return; // Duplicate. |
| if (sub_field->type != GRPC_JSON_STRING) return; |
| params->service_name.reset(gpr_strdup(sub_field->value)); |
| } |
| } |
| } |
| } |
| }; |
| |
| } // namespace |
| } // namespace grpc_core |
| |
| grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, |
| const grpc_subchannel_args* args) { |
| grpc_subchannel_key* key = grpc_subchannel_key_create(args); |
| grpc_subchannel* c = grpc_subchannel_index_find(key); |
| if (c) { |
| grpc_subchannel_key_destroy(key); |
| return c; |
| } |
| |
| GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); |
| c = static_cast<grpc_subchannel*>(gpr_zalloc(sizeof(*c))); |
| c->key = key; |
| gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); |
| c->connector = connector; |
| grpc_connector_ref(c->connector); |
| c->num_filters = args->filter_count; |
| if (c->num_filters > 0) { |
| c->filters = static_cast<const grpc_channel_filter**>( |
| gpr_malloc(sizeof(grpc_channel_filter*) * c->num_filters)); |
| memcpy((void*)c->filters, args->filters, |
| sizeof(grpc_channel_filter*) * c->num_filters); |
| } else { |
| c->filters = nullptr; |
| } |
| c->pollset_set = grpc_pollset_set_create(); |
| grpc_resolved_address* addr = |
| static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr))); |
| grpc_get_subchannel_address_arg(args->args, addr); |
| grpc_resolved_address* new_address = nullptr; |
| grpc_channel_args* new_args = nullptr; |
| if (grpc_proxy_mappers_map_address(addr, args->args, &new_address, |
| &new_args)) { |
| GPR_ASSERT(new_address != nullptr); |
| gpr_free(addr); |
| addr = new_address; |
| } |
| static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; |
| grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); |
| gpr_free(addr); |
| c->args = grpc_channel_args_copy_and_add_and_remove( |
| new_args != nullptr ? new_args : args->args, keys_to_remove, |
| GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); |
| gpr_free(new_arg.value.string); |
| if (new_args != nullptr) grpc_channel_args_destroy(new_args); |
| c->root_external_state_watcher.next = c->root_external_state_watcher.prev = |
| &c->root_external_state_watcher; |
| GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, |
| grpc_schedule_on_exec_ctx); |
| grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, |
| "subchannel"); |
| grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE, |
| "subchannel"); |
| grpc_core::BackOff::Options backoff_options; |
| parse_args_for_backoff_values(args->args, &backoff_options, |
| &c->min_connect_timeout_ms); |
| c->backoff.Init(backoff_options); |
| gpr_mu_init(&c->mu); |
| |
| // Check whether we should enable health checking. |
| const char* service_config_json = grpc_channel_arg_get_string( |
| grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG)); |
| if (service_config_json != nullptr) { |
| grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = |
| grpc_core::ServiceConfig::Create(service_config_json); |
| if (service_config != nullptr) { |
| grpc_core::HealthCheckParams params; |
| service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse, |
| ¶ms); |
| c->health_check_service_name = std::move(params.service_name); |
| } |
| } |
| |
| const grpc_arg* arg = |
| grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); |
| bool channelz_enabled = |
| grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); |
| arg = grpc_channel_args_find( |
| c->args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); |
| const grpc_integer_options options = { |
| GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; |
| size_t channel_tracer_max_memory = |
| (size_t)grpc_channel_arg_get_integer(arg, options); |
| if (channelz_enabled) { |
| c->channelz_subchannel = |
| grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>( |
| c, channel_tracer_max_memory); |
| c->channelz_subchannel->AddTraceEvent( |
| grpc_core::channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string("Subchannel created")); |
| } |
| |
| return grpc_subchannel_index_register(key, c); |
| } |
| |
| grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( |
| grpc_subchannel* subchannel) { |
| return subchannel->channelz_subchannel.get(); |
| } |
| |
| intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) { |
| if (subchannel->connected_subchannel != nullptr) { |
| return subchannel->connected_subchannel->socket_uuid(); |
| } else { |
| return 0; |
| } |
| } |
| |
| static void continue_connect_locked(grpc_subchannel* c) { |
| grpc_connect_in_args args; |
| args.interested_parties = c->pollset_set; |
| const grpc_millis min_deadline = |
| c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); |
| c->next_attempt_deadline = c->backoff->NextAttemptTime(); |
| args.deadline = std::max(c->next_attempt_deadline, min_deadline); |
| args.channel_args = c->args; |
| set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING, |
| GRPC_ERROR_NONE, "connecting"); |
| grpc_connectivity_state_set(&c->state_and_health_tracker, |
| GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, |
| "connecting"); |
| grpc_connector_connect(c->connector, &args, &c->connecting_result, |
| &c->on_connected); |
| } |
| |
| grpc_connectivity_state grpc_subchannel_check_connectivity( |
| grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) { |
| gpr_mu_lock(&c->mu); |
| grpc_connectivity_state_tracker* tracker = |
| inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; |
| grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); |
| gpr_mu_unlock(&c->mu); |
| return state; |
| } |
| |
| static void on_external_state_watcher_done(void* arg, grpc_error* error) { |
| external_state_watcher* w = static_cast<external_state_watcher*>(arg); |
| grpc_closure* follow_up = w->notify; |
| if (w->pollset_set != nullptr) { |
| grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set, |
| w->pollset_set); |
| } |
| gpr_mu_lock(&w->subchannel->mu); |
| w->next->prev = w->prev; |
| w->prev->next = w->next; |
| gpr_mu_unlock(&w->subchannel->mu); |
| GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); |
| gpr_free(w); |
| GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); |
| } |
| |
| static void on_alarm(void* arg, grpc_error* error) { |
| grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); |
| gpr_mu_lock(&c->mu); |
| c->have_alarm = false; |
| if (c->disconnected) { |
| error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", |
| &error, 1); |
| } else if (c->retry_immediately) { |
| c->retry_immediately = false; |
| error = GRPC_ERROR_NONE; |
| } else { |
| GRPC_ERROR_REF(error); |
| } |
| if (error == GRPC_ERROR_NONE) { |
| gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); |
| continue_connect_locked(c); |
| gpr_mu_unlock(&c->mu); |
| } else { |
| gpr_mu_unlock(&c->mu); |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| static void maybe_start_connecting_locked(grpc_subchannel* c) { |
| if (c->disconnected) { |
| /* Don't try to connect if we're already disconnected */ |
| return; |
| } |
| if (c->connecting) { |
| /* Already connecting: don't restart */ |
| return; |
| } |
| if (c->connected_subchannel != nullptr) { |
| /* Already connected: don't restart */ |
| return; |
| } |
| if (!grpc_connectivity_state_has_watchers(&c->state_tracker) && |
| !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) { |
| /* Nobody is interested in connecting: so don't just yet */ |
| return; |
| } |
| c->connecting = true; |
| GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); |
| if (!c->backoff_begun) { |
| c->backoff_begun = true; |
| continue_connect_locked(c); |
| } else { |
| GPR_ASSERT(!c->have_alarm); |
| c->have_alarm = true; |
| const grpc_millis time_til_next = |
| c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); |
| if (time_til_next <= 0) { |
| gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); |
| } else { |
| gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", c, |
| time_til_next); |
| } |
| GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); |
| grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); |
| } |
| } |
| |
| void grpc_subchannel_notify_on_state_change( |
| grpc_subchannel* c, grpc_pollset_set* interested_parties, |
| grpc_connectivity_state* state, grpc_closure* notify, |
| bool inhibit_health_checks) { |
| grpc_connectivity_state_tracker* tracker = |
| inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; |
| external_state_watcher* w; |
| if (state == nullptr) { |
| gpr_mu_lock(&c->mu); |
| for (w = c->root_external_state_watcher.next; |
| w != &c->root_external_state_watcher; w = w->next) { |
| if (w->notify == notify) { |
| grpc_connectivity_state_notify_on_state_change(tracker, nullptr, |
| &w->closure); |
| } |
| } |
| gpr_mu_unlock(&c->mu); |
| } else { |
| w = static_cast<external_state_watcher*>(gpr_malloc(sizeof(*w))); |
| w->subchannel = c; |
| w->pollset_set = interested_parties; |
| w->notify = notify; |
| GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w, |
| grpc_schedule_on_exec_ctx); |
| if (interested_parties != nullptr) { |
| grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties); |
| } |
| GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); |
| gpr_mu_lock(&c->mu); |
| w->next = &c->root_external_state_watcher; |
| w->prev = w->next->prev; |
| w->next->prev = w->prev->next = w; |
| grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure); |
| maybe_start_connecting_locked(c); |
| gpr_mu_unlock(&c->mu); |
| } |
| } |
| |
| static bool publish_transport_locked(grpc_subchannel* c) { |
| /* construct channel stack */ |
| grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); |
| grpc_channel_stack_builder_set_channel_arguments( |
| builder, c->connecting_result.channel_args); |
| grpc_channel_stack_builder_set_transport(builder, |
| c->connecting_result.transport); |
| |
| if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { |
| grpc_channel_stack_builder_destroy(builder); |
| return false; |
| } |
| grpc_channel_stack* stk; |
| grpc_error* error = grpc_channel_stack_builder_finish( |
| builder, 0, 1, connection_destroy, nullptr, |
| reinterpret_cast<void**>(&stk)); |
| if (error != GRPC_ERROR_NONE) { |
| grpc_transport_destroy(c->connecting_result.transport); |
| gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", |
| grpc_error_string(error)); |
| GRPC_ERROR_UNREF(error); |
| return false; |
| } |
| intptr_t socket_uuid = c->connecting_result.socket_uuid; |
| memset(&c->connecting_result, 0, sizeof(c->connecting_result)); |
| |
| if (c->disconnected) { |
| grpc_channel_stack_destroy(stk); |
| gpr_free(stk); |
| return false; |
| } |
| |
| /* publish */ |
| c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( |
| stk, c->args, c->channelz_subchannel, socket_uuid)); |
| gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", |
| c->connected_subchannel.get(), c); |
| |
| // Instantiate state watcher. Will clean itself up. |
| c->connected_subchannel_watcher = |
| grpc_core::MakeOrphanable<grpc_core::ConnectedSubchannelStateWatcher>(c); |
| |
| return true; |
| } |
| |
| static void on_subchannel_connected(void* arg, grpc_error* error) { |
| grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); |
| grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; |
| |
| GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); |
| gpr_mu_lock(&c->mu); |
| c->connecting = false; |
| if (c->connecting_result.transport != nullptr && |
| publish_transport_locked(c)) { |
| /* do nothing, transport was published */ |
| } else if (c->disconnected) { |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); |
| } else { |
| set_subchannel_connectivity_state_locked( |
| c, GRPC_CHANNEL_TRANSIENT_FAILURE, |
| grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Connect Failed", &error, 1), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), |
| "connect_failed"); |
| grpc_connectivity_state_set( |
| &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, |
| grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Connect Failed", &error, 1), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), |
| "connect_failed"); |
| |
| const char* errmsg = grpc_error_string(error); |
| gpr_log(GPR_INFO, "Connect failed: %s", errmsg); |
| |
| maybe_start_connecting_locked(c); |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); |
| } |
| gpr_mu_unlock(&c->mu); |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected"); |
| grpc_channel_args_destroy(delete_channel_args); |
| } |
| |
| void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { |
| gpr_mu_lock(&subchannel->mu); |
| subchannel->backoff->Reset(); |
| if (subchannel->have_alarm) { |
| subchannel->retry_immediately = true; |
| grpc_timer_cancel(&subchannel->alarm); |
| } else { |
| subchannel->backoff_begun = false; |
| maybe_start_connecting_locked(subchannel); |
| } |
| gpr_mu_unlock(&subchannel->mu); |
| } |
| |
| /* |
| * grpc_subchannel_call implementation |
| */ |
| |
| static void subchannel_call_destroy(void* call, grpc_error* error) { |
| GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0); |
| grpc_subchannel_call* c = static_cast<grpc_subchannel_call*>(call); |
| grpc_core::ConnectedSubchannel* connection = c->connection; |
| grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, |
| c->schedule_closure_after_destroy); |
| connection->Unref(DEBUG_LOCATION, "subchannel_call"); |
| c->~grpc_subchannel_call(); |
| } |
| |
| void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, |
| grpc_closure* closure) { |
| GPR_ASSERT(call->schedule_closure_after_destroy == nullptr); |
| GPR_ASSERT(closure != nullptr); |
| call->schedule_closure_after_destroy = closure; |
| } |
| |
| grpc_subchannel_call* grpc_subchannel_call_ref( |
| grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); |
| return c; |
| } |
| |
| void grpc_subchannel_call_unref( |
| grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); |
| } |
| |
| // Sets *status based on md_batch and error. |
| static void get_call_status(grpc_subchannel_call* call, |
| grpc_metadata_batch* md_batch, grpc_error* error, |
| grpc_status_code* status) { |
| if (error != GRPC_ERROR_NONE) { |
| grpc_error_get_status(error, call->deadline, status, nullptr, nullptr, |
| nullptr); |
| } else { |
| if (md_batch->idx.named.grpc_status != nullptr) { |
| *status = grpc_get_status_code_from_metadata( |
| md_batch->idx.named.grpc_status->md); |
| } else { |
| *status = GRPC_STATUS_UNKNOWN; |
| } |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { |
| grpc_subchannel_call* call = static_cast<grpc_subchannel_call*>(arg); |
| GPR_ASSERT(call->recv_trailing_metadata != nullptr); |
| grpc_status_code status = GRPC_STATUS_OK; |
| grpc_metadata_batch* md_batch = call->recv_trailing_metadata; |
| get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status); |
| grpc_core::channelz::SubchannelNode* channelz_subchannel = |
| call->connection->channelz_subchannel(); |
| GPR_ASSERT(channelz_subchannel != nullptr); |
| if (status == GRPC_STATUS_OK) { |
| channelz_subchannel->RecordCallSucceeded(); |
| } else { |
| channelz_subchannel->RecordCallFailed(); |
| } |
| GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata, |
| GRPC_ERROR_REF(error)); |
| } |
| |
| // If channelz is enabled, intercept recv_trailing so that we may check the |
| // status and associate it to a subchannel. |
| static void maybe_intercept_recv_trailing_metadata( |
| grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { |
| // only intercept payloads with recv trailing. |
| if (!batch->recv_trailing_metadata) { |
| return; |
| } |
| // only add interceptor is channelz is enabled. |
| if (call->connection->channelz_subchannel() == nullptr) { |
| return; |
| } |
| GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready, |
| recv_trailing_metadata_ready, call, |
| grpc_schedule_on_exec_ctx); |
| // save some state needed for the interception callback. |
| GPR_ASSERT(call->recv_trailing_metadata == nullptr); |
| call->recv_trailing_metadata = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata; |
| call->original_recv_trailing_metadata = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &call->recv_trailing_metadata_ready; |
| } |
| |
| void grpc_subchannel_call_process_op(grpc_subchannel_call* call, |
| grpc_transport_stream_op_batch* batch) { |
| GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); |
| maybe_intercept_recv_trailing_metadata(call, batch); |
| grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); |
| grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); |
| GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); |
| top_elem->filter->start_transport_stream_op_batch(top_elem, batch); |
| } |
| |
| grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> |
| grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { |
| gpr_mu_lock(&c->mu); |
| auto copy = c->connected_subchannel; |
| gpr_mu_unlock(&c->mu); |
| return copy; |
| } |
| |
| const grpc_subchannel_key* grpc_subchannel_get_key( |
| const grpc_subchannel* subchannel) { |
| return subchannel->key; |
| } |
| |
| void* grpc_connected_subchannel_call_get_parent_data( |
| grpc_subchannel_call* subchannel_call) { |
| grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack(); |
| return (char*)subchannel_call + sizeof(grpc_subchannel_call) + |
| chanstk->call_stack_size; |
| } |
| |
| grpc_call_stack* grpc_subchannel_call_get_call_stack( |
| grpc_subchannel_call* subchannel_call) { |
| return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); |
| } |
| |
| static void grpc_uri_to_sockaddr(const char* uri_str, |
| grpc_resolved_address* addr) { |
| grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); |
| GPR_ASSERT(uri != nullptr); |
| if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); |
| grpc_uri_destroy(uri); |
| } |
| |
| void grpc_get_subchannel_address_arg(const grpc_channel_args* args, |
| grpc_resolved_address* addr) { |
| const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args); |
| memset(addr, 0, sizeof(*addr)); |
| if (*addr_uri_str != '\0') { |
| grpc_uri_to_sockaddr(addr_uri_str, addr); |
| } |
| } |
| |
| const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { |
| const grpc_arg* addr_arg = |
| grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); |
| const char* addr_str = grpc_channel_arg_get_string(addr_arg); |
| GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. |
| return addr_str; |
| } |
| |
| const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { |
| const grpc_arg* addr_arg = |
| grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); |
| const char* addr_str = grpc_channel_arg_get_string(addr_arg); |
| GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. |
| return addr_str; |
| } |
| |
| grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { |
| return grpc_channel_arg_string_create( |
| (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, |
| addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); |
| } |
| |
| namespace grpc_core { |
| |
| ConnectedSubchannel::ConnectedSubchannel( |
| grpc_channel_stack* channel_stack, const grpc_channel_args* args, |
| grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> |
| channelz_subchannel, |
| intptr_t socket_uuid) |
| : RefCounted<ConnectedSubchannel>(&grpc_trace_stream_refcount), |
| channel_stack_(channel_stack), |
| args_(grpc_channel_args_copy(args)), |
| channelz_subchannel_(std::move(channelz_subchannel)), |
| socket_uuid_(socket_uuid) {} |
| |
| ConnectedSubchannel::~ConnectedSubchannel() { |
| grpc_channel_args_destroy(args_); |
| GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); |
| } |
| |
| void ConnectedSubchannel::NotifyOnStateChange( |
| grpc_pollset_set* interested_parties, grpc_connectivity_state* state, |
| grpc_closure* closure) { |
| grpc_transport_op* op = grpc_make_transport_op(nullptr); |
| grpc_channel_element* elem; |
| op->connectivity_state = state; |
| op->on_connectivity_state_change = closure; |
| op->bind_pollset_set = interested_parties; |
| elem = grpc_channel_stack_element(channel_stack_, 0); |
| elem->filter->start_transport_op(elem, op); |
| } |
| |
| void ConnectedSubchannel::Ping(grpc_closure* on_initiate, |
| grpc_closure* on_ack) { |
| grpc_transport_op* op = grpc_make_transport_op(nullptr); |
| grpc_channel_element* elem; |
| op->send_ping.on_initiate = on_initiate; |
| op->send_ping.on_ack = on_ack; |
| elem = grpc_channel_stack_element(channel_stack_, 0); |
| elem->filter->start_transport_op(elem, op); |
| } |
| |
| grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, |
| grpc_subchannel_call** call) { |
| const size_t allocation_size = |
| GetInitialCallSizeEstimate(args.parent_data_size); |
| *call = new (gpr_arena_alloc(args.arena, allocation_size)) |
| grpc_subchannel_call(this, args); |
| grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); |
| RefCountedPtr<ConnectedSubchannel> connection = |
| Ref(DEBUG_LOCATION, "subchannel_call"); |
| connection.release(); // Ref is passed to the grpc_subchannel_call object. |
| const grpc_call_element_args call_args = { |
| callstk, /* call_stack */ |
| nullptr, /* server_transport_data */ |
| args.context, /* context */ |
| args.path, /* path */ |
| args.start_time, /* start_time */ |
| args.deadline, /* deadline */ |
| args.arena, /* arena */ |
| args.call_combiner /* call_combiner */ |
| }; |
| grpc_error* error = grpc_call_stack_init( |
| channel_stack_, 1, subchannel_call_destroy, *call, &call_args); |
| if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
| const char* error_string = grpc_error_string(error); |
| gpr_log(GPR_ERROR, "error: %s", error_string); |
| return error; |
| } |
| grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); |
| if (channelz_subchannel_ != nullptr) { |
| channelz_subchannel_->RecordCallStarted(); |
| } |
| return GRPC_ERROR_NONE; |
| } |
| |
| size_t ConnectedSubchannel::GetInitialCallSizeEstimate( |
| size_t parent_data_size) const { |
| size_t allocation_size = |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)); |
| if (parent_data_size > 0) { |
| allocation_size += |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + |
| parent_data_size; |
| } else { |
| allocation_size += channel_stack_->call_stack_size; |
| } |
| return allocation_size; |
| } |
| |
| } // namespace grpc_core |