| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/filters/client_channel/client_channel.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <stdbool.h> |
| #include <stdio.h> |
| #include <string.h> |
| |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/sync.h> |
| |
| #include "src/core/ext/filters/client_channel/backup_poller.h" |
| #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" |
| #include "src/core/ext/filters/client_channel/request_routing.h" |
| #include "src/core/ext/filters/client_channel/resolver_registry.h" |
| #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" |
| #include "src/core/ext/filters/client_channel/retry_throttle.h" |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| #include "src/core/ext/filters/deadline/deadline_filter.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/connected_channel.h" |
| #include "src/core/lib/channel/status_util.h" |
| #include "src/core/lib/gpr/string.h" |
| #include "src/core/lib/gprpp/inlined_vector.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/iomgr.h" |
| #include "src/core/lib/iomgr/polling_entity.h" |
| #include "src/core/lib/profiling/timers.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/slice/slice_string_helpers.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/metadata.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/service_config.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| #include "src/core/lib/transport/status_metadata.h" |
| |
| using grpc_core::internal::ClientChannelMethodParams; |
| using grpc_core::internal::ClientChannelMethodParamsTable; |
| using grpc_core::internal::ProcessedResolverResult; |
| using grpc_core::internal::ServerRetryThrottleData; |
| |
| /* Client channel implementation */ |
| |
| // By default, we buffer 256 KiB per RPC for retries. |
| // TODO(roth): Do we have any data to suggest a better value? |
| #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) |
| |
| // This value was picked arbitrarily. It can be changed if there is |
| // any even moderately compelling reason to do so. |
| #define RETRY_BACKOFF_JITTER 0.2 |
| |
| grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); |
| |
| /************************************************************************* |
| * CHANNEL-WIDE FUNCTIONS |
| */ |
| |
| struct external_connectivity_watcher; |
| |
| typedef struct client_channel_channel_data { |
| grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router; |
| |
| bool deadline_checking_enabled; |
| bool enable_retries; |
| size_t per_rpc_retry_buffer_size; |
| |
| /** combiner protecting all variables below in this data structure */ |
| grpc_combiner* combiner; |
| /** retry throttle data */ |
| grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
| /** maps method names to method_parameters structs */ |
| grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table; |
| /** owning stack */ |
| grpc_channel_stack* owning_stack; |
| /** interested parties (owned) */ |
| grpc_pollset_set* interested_parties; |
| |
| /* external_connectivity_watcher_list head is guarded by its own mutex, since |
| * counts need to be grabbed immediately without polling on a cq */ |
| gpr_mu external_connectivity_watcher_list_mu; |
| struct external_connectivity_watcher* external_connectivity_watcher_list_head; |
| |
| /* the following properties are guarded by a mutex since APIs require them |
| to be instantaneously available */ |
| gpr_mu info_mu; |
| grpc_core::UniquePtr<char> info_lb_policy_name; |
| /** service config in JSON form */ |
| grpc_core::UniquePtr<char> info_service_config_json; |
| } channel_data; |
| |
| // Synchronous callback from chand->request_router to process a resolver |
| // result update. |
| static bool process_resolver_result_locked(void* arg, |
| const grpc_channel_args& args, |
| const char** lb_policy_name, |
| grpc_json** lb_policy_config) { |
| channel_data* chand = static_cast<channel_data*>(arg); |
| ProcessedResolverResult resolver_result(args, chand->enable_retries); |
| grpc_core::UniquePtr<char> service_config_json = |
| resolver_result.service_config_json(); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", |
| chand, service_config_json.get()); |
| } |
| // Update channel state. |
| chand->retry_throttle_data = resolver_result.retry_throttle_data(); |
| chand->method_params_table = resolver_result.method_params_table(); |
| // Swap out the data used by cc_get_channel_info(). |
| gpr_mu_lock(&chand->info_mu); |
| chand->info_lb_policy_name = resolver_result.lb_policy_name(); |
| const bool service_config_changed = |
| ((service_config_json == nullptr) != |
| (chand->info_service_config_json == nullptr)) || |
| (service_config_json != nullptr && |
| strcmp(service_config_json.get(), |
| chand->info_service_config_json.get()) != 0); |
| chand->info_service_config_json = std::move(service_config_json); |
| gpr_mu_unlock(&chand->info_mu); |
| // Return results. |
| *lb_policy_name = chand->info_lb_policy_name.get(); |
| *lb_policy_config = resolver_result.lb_policy_config(); |
| return service_config_changed; |
| } |
| |
| static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { |
| grpc_transport_op* op = static_cast<grpc_transport_op*>(arg); |
| grpc_channel_element* elem = |
| static_cast<grpc_channel_element*>(op->handler_private.extra_arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| |
| if (op->on_connectivity_state_change != nullptr) { |
| chand->request_router->NotifyOnConnectivityStateChange( |
| op->connectivity_state, op->on_connectivity_state_change); |
| op->on_connectivity_state_change = nullptr; |
| op->connectivity_state = nullptr; |
| } |
| |
| if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { |
| if (chand->request_router->lb_policy() == nullptr) { |
| grpc_error* error = |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); |
| GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); |
| GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); |
| } else { |
| grpc_error* error = GRPC_ERROR_NONE; |
| grpc_core::LoadBalancingPolicy::PickState pick_state; |
| // Pick must return synchronously, because pick_state.on_complete is null. |
| GPR_ASSERT( |
| chand->request_router->lb_policy()->PickLocked(&pick_state, &error)); |
| if (pick_state.connected_subchannel != nullptr) { |
| pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, |
| op->send_ping.on_ack); |
| } else { |
| if (error == GRPC_ERROR_NONE) { |
| error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "LB policy dropped call on ping"); |
| } |
| GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); |
| GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); |
| } |
| op->bind_pollset = nullptr; |
| } |
| op->send_ping.on_initiate = nullptr; |
| op->send_ping.on_ack = nullptr; |
| } |
| |
| if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
| chand->request_router->ShutdownLocked(op->disconnect_with_error); |
| } |
| |
| if (op->reset_connect_backoff) { |
| chand->request_router->ResetConnectionBackoffLocked(); |
| } |
| |
| GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); |
| GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); |
| } |
| |
| static void cc_start_transport_op(grpc_channel_element* elem, |
| grpc_transport_op* op) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| |
| GPR_ASSERT(op->set_accept_stream == false); |
| if (op->bind_pollset != nullptr) { |
| grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset); |
| } |
| |
| op->handler_private.extra_arg = elem; |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, |
| op, grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } |
| |
| static void cc_get_channel_info(grpc_channel_element* elem, |
| const grpc_channel_info* info) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| gpr_mu_lock(&chand->info_mu); |
| if (info->lb_policy_name != nullptr) { |
| *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get()); |
| } |
| if (info->service_config_json != nullptr) { |
| *info->service_config_json = |
| gpr_strdup(chand->info_service_config_json.get()); |
| } |
| gpr_mu_unlock(&chand->info_mu); |
| } |
| |
| /* Constructor for channel_data */ |
| static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, |
| grpc_channel_element_args* args) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| GPR_ASSERT(args->is_last); |
| GPR_ASSERT(elem->filter == &grpc_client_channel_filter); |
| // Initialize data members. |
| chand->combiner = grpc_combiner_create(); |
| gpr_mu_init(&chand->info_mu); |
| gpr_mu_init(&chand->external_connectivity_watcher_list_mu); |
| |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| chand->external_connectivity_watcher_list_head = nullptr; |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| |
| chand->owning_stack = args->channel_stack; |
| chand->deadline_checking_enabled = |
| grpc_deadline_checking_enabled(args->channel_args); |
| chand->interested_parties = grpc_pollset_set_create(); |
| grpc_client_channel_start_backup_polling(chand->interested_parties); |
| // Record max per-RPC retry buffer size. |
| const grpc_arg* arg = grpc_channel_args_find( |
| args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); |
| chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( |
| arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); |
| // Record enable_retries. |
| arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); |
| chand->enable_retries = grpc_channel_arg_get_bool(arg, true); |
| // Record client channel factory. |
| arg = grpc_channel_args_find(args->channel_args, |
| GRPC_ARG_CLIENT_CHANNEL_FACTORY); |
| if (arg == nullptr) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Missing client channel factory in args for client channel filter"); |
| } |
| if (arg->type != GRPC_ARG_POINTER) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "client channel factory arg must be a pointer"); |
| } |
| grpc_client_channel_factory* client_channel_factory = |
| static_cast<grpc_client_channel_factory*>(arg->value.pointer.p); |
| // Get server name to resolve, using proxy mapper if needed. |
| arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); |
| if (arg == nullptr) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Missing server uri in args for client channel filter"); |
| } |
| if (arg->type != GRPC_ARG_STRING) { |
| return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "server uri arg must be a string"); |
| } |
| char* proxy_name = nullptr; |
| grpc_channel_args* new_args = nullptr; |
| grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, |
| &proxy_name, &new_args); |
| // Instantiate request router. |
| grpc_client_channel_factory_ref(client_channel_factory); |
| grpc_error* error = GRPC_ERROR_NONE; |
| chand->request_router.Init( |
| chand->owning_stack, chand->combiner, client_channel_factory, |
| chand->interested_parties, &grpc_client_channel_trace, |
| process_resolver_result_locked, chand, |
| proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */, |
| new_args != nullptr ? new_args : args->channel_args, &error); |
| gpr_free(proxy_name); |
| grpc_channel_args_destroy(new_args); |
| return error; |
| } |
| |
| /* Destructor for channel_data */ |
| static void cc_destroy_channel_elem(grpc_channel_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| chand->request_router.Destroy(); |
| // TODO(roth): Once we convert the filter API to C++, there will no |
| // longer be any need to explicitly reset these smart pointer data members. |
| chand->info_lb_policy_name.reset(); |
| chand->info_service_config_json.reset(); |
| chand->retry_throttle_data.reset(); |
| chand->method_params_table.reset(); |
| grpc_client_channel_stop_backup_polling(chand->interested_parties); |
| grpc_pollset_set_destroy(chand->interested_parties); |
| GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); |
| gpr_mu_destroy(&chand->info_mu); |
| gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); |
| } |
| |
| /************************************************************************* |
| * PER-CALL FUNCTIONS |
| */ |
| |
| // Max number of batches that can be pending on a call at any given |
| // time. This includes one batch for each of the following ops: |
| // recv_initial_metadata |
| // send_initial_metadata |
| // recv_message |
| // send_message |
| // recv_trailing_metadata |
| // send_trailing_metadata |
| #define MAX_PENDING_BATCHES 6 |
| |
| // Retry support: |
| // |
| // In order to support retries, we act as a proxy for stream op batches. |
| // When we get a batch from the surface, we add it to our list of pending |
| // batches, and we then use those batches to construct separate "child" |
| // batches to be started on the subchannel call. When the child batches |
| // return, we then decide which pending batches have been completed and |
| // schedule their callbacks accordingly. If a subchannel call fails and |
| // we want to retry it, we do a new pick and start again, constructing |
| // new "child" batches for the new subchannel call. |
| // |
| // Note that retries are committed when receiving data from the server |
| // (except for Trailers-Only responses). However, there may be many |
| // send ops started before receiving any data, so we may have already |
| // completed some number of send ops (and returned the completions up to |
| // the surface) by the time we realize that we need to retry. To deal |
| // with this, we cache data for send ops, so that we can replay them on a |
| // different subchannel call even after we have completed the original |
| // batches. |
| // |
| // There are two sets of data to maintain: |
| // - In call_data (in the parent channel), we maintain a list of pending |
| // ops and cached data for send ops. |
| // - In the subchannel call, we maintain state to indicate what ops have |
| // already been sent down to that call. |
| // |
| // When constructing the "child" batches, we compare those two sets of |
| // data to see which batches need to be sent to the subchannel call. |
| |
| // TODO(roth): In subsequent PRs: |
| // - add support for transparent retries (including initial metadata) |
| // - figure out how to record stats in census for retries |
| // (census filter is on top of this one) |
| // - add census stats for retries |
| |
| namespace { |
| |
| struct call_data; |
| |
| // State used for starting a retryable batch on a subchannel call. |
| // This provides its own grpc_transport_stream_op_batch and other data |
| // structures needed to populate the ops in the batch. |
| // We allocate one struct on the arena for each attempt at starting a |
| // batch on a given subchannel call. |
| struct subchannel_batch_data { |
| subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, |
| bool set_on_complete); |
| // All dtor code must be added in `destroy`. This is because we may |
| // call closures in `subchannel_batch_data` after they are unrefed by |
| // `batch_data_unref`, and msan would complain about accessing this class |
| // after calling dtor. As a result we cannot call the `dtor` in |
| // `batch_data_unref`. |
| // TODO(soheil): We should try to call the dtor in `batch_data_unref`. |
| ~subchannel_batch_data() { destroy(); } |
| void destroy(); |
| |
| gpr_refcount refs; |
| grpc_call_element* elem; |
| grpc_subchannel_call* subchannel_call; // Holds a ref. |
| // The batch to use in the subchannel call. |
| // Its payload field points to subchannel_call_retry_state.batch_payload. |
| grpc_transport_stream_op_batch batch; |
| // For intercepting on_complete. |
| grpc_closure on_complete; |
| }; |
| |
| // Retry state associated with a subchannel call. |
| // Stored in the parent_data of the subchannel call object. |
| struct subchannel_call_retry_state { |
| explicit subchannel_call_retry_state(grpc_call_context_element* context) |
| : batch_payload(context), |
| started_send_initial_metadata(false), |
| completed_send_initial_metadata(false), |
| started_send_trailing_metadata(false), |
| completed_send_trailing_metadata(false), |
| started_recv_initial_metadata(false), |
| completed_recv_initial_metadata(false), |
| started_recv_trailing_metadata(false), |
| completed_recv_trailing_metadata(false), |
| retry_dispatched(false) {} |
| |
| // subchannel_batch_data.batch.payload points to this. |
| grpc_transport_stream_op_batch_payload batch_payload; |
| // For send_initial_metadata. |
| // Note that we need to make a copy of the initial metadata for each |
| // subchannel call instead of just referring to the copy in call_data, |
| // because filters in the subchannel stack will probably add entries, |
| // so we need to start in a pristine state for each attempt of the call. |
| grpc_linked_mdelem* send_initial_metadata_storage; |
| grpc_metadata_batch send_initial_metadata; |
| // For send_message. |
| grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> |
| send_message; |
| // For send_trailing_metadata. |
| grpc_linked_mdelem* send_trailing_metadata_storage; |
| grpc_metadata_batch send_trailing_metadata; |
| // For intercepting recv_initial_metadata. |
| grpc_metadata_batch recv_initial_metadata; |
| grpc_closure recv_initial_metadata_ready; |
| bool trailing_metadata_available = false; |
| // For intercepting recv_message. |
| grpc_closure recv_message_ready; |
| grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; |
| // For intercepting recv_trailing_metadata. |
| grpc_metadata_batch recv_trailing_metadata; |
| grpc_transport_stream_stats collect_stats; |
| grpc_closure recv_trailing_metadata_ready; |
| // These fields indicate which ops have been started and completed on |
| // this subchannel call. |
| size_t started_send_message_count = 0; |
| size_t completed_send_message_count = 0; |
| size_t started_recv_message_count = 0; |
| size_t completed_recv_message_count = 0; |
| bool started_send_initial_metadata : 1; |
| bool completed_send_initial_metadata : 1; |
| bool started_send_trailing_metadata : 1; |
| bool completed_send_trailing_metadata : 1; |
| bool started_recv_initial_metadata : 1; |
| bool completed_recv_initial_metadata : 1; |
| bool started_recv_trailing_metadata : 1; |
| bool completed_recv_trailing_metadata : 1; |
| // State for callback processing. |
| subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr; |
| grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; |
| subchannel_batch_data* recv_message_ready_deferred_batch = nullptr; |
| grpc_error* recv_message_error = GRPC_ERROR_NONE; |
| subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr; |
| // NOTE: Do not move this next to the metadata bitfields above. That would |
| // save space but will also result in a data race because compiler will |
| // generate a 2 byte store which overwrites the meta-data fields upon |
| // setting this field. |
| bool retry_dispatched : 1; |
| }; |
| |
| // Pending batches stored in call data. |
| struct pending_batch { |
| // The pending batch. If nullptr, this slot is empty. |
| grpc_transport_stream_op_batch* batch; |
| // Indicates whether payload for send ops has been cached in call data. |
| bool send_ops_cached; |
| }; |
| |
| /** Call data. Holds a pointer to grpc_subchannel_call and the |
| associated machinery to create such a pointer. |
| Handles queueing of stream ops until a call object is ready, waiting |
| for initial metadata before trying to create a call object, |
| and handling cancellation gracefully. */ |
| struct call_data { |
| call_data(grpc_call_element* elem, const channel_data& chand, |
| const grpc_call_element_args& args) |
| : deadline_state(elem, args.call_stack, args.call_combiner, |
| GPR_LIKELY(chand.deadline_checking_enabled) |
| ? args.deadline |
| : GRPC_MILLIS_INF_FUTURE), |
| path(grpc_slice_ref_internal(args.path)), |
| call_start_time(args.start_time), |
| deadline(args.deadline), |
| arena(args.arena), |
| owning_call(args.call_stack), |
| call_combiner(args.call_combiner), |
| pending_send_initial_metadata(false), |
| pending_send_message(false), |
| pending_send_trailing_metadata(false), |
| enable_retries(chand.enable_retries), |
| retry_committed(false), |
| last_attempt_got_server_pushback(false) {} |
| |
| ~call_data() { |
| if (GPR_LIKELY(subchannel_call != nullptr)) { |
| GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, |
| "client_channel_destroy_call"); |
| } |
| grpc_slice_unref_internal(path); |
| GRPC_ERROR_UNREF(cancel_error); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { |
| GPR_ASSERT(pending_batches[i].batch == nullptr); |
| } |
| if (have_request) { |
| request.Destroy(); |
| } |
| } |
| |
| // State for handling deadlines. |
| // The code in deadline_filter.c requires this to be the first field. |
| // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state |
| // and this struct both independently store pointers to the call stack |
| // and call combiner. If/when we have time, find a way to avoid this |
| // without breaking the grpc_deadline_state abstraction. |
| grpc_deadline_state deadline_state; |
| |
| grpc_slice path; // Request path. |
| gpr_timespec call_start_time; |
| grpc_millis deadline; |
| gpr_arena* arena; |
| grpc_call_stack* owning_call; |
| grpc_call_combiner* call_combiner; |
| |
| grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
| grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; |
| |
| grpc_subchannel_call* subchannel_call = nullptr; |
| |
| // Set when we get a cancel_stream op. |
| grpc_error* cancel_error = GRPC_ERROR_NONE; |
| |
| grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request; |
| bool have_request = false; |
| grpc_closure pick_closure; |
| |
| grpc_polling_entity* pollent = nullptr; |
| |
| // Batches are added to this list when received from above. |
| // They are removed when we are done handling the batch (i.e., when |
| // either we have invoked all of the batch's callbacks or we have |
| // passed the batch down to the subchannel call and are not |
| // intercepting any of its callbacks). |
| pending_batch pending_batches[MAX_PENDING_BATCHES] = {}; |
| bool pending_send_initial_metadata : 1; |
| bool pending_send_message : 1; |
| bool pending_send_trailing_metadata : 1; |
| |
| // Retry state. |
| bool enable_retries : 1; |
| bool retry_committed : 1; |
| bool last_attempt_got_server_pushback : 1; |
| int num_attempts_completed = 0; |
| size_t bytes_buffered_for_retry = 0; |
| grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff; |
| grpc_timer retry_timer; |
| |
| // The number of pending retriable subchannel batches containing send ops. |
| // We hold a ref to the call stack while this is non-zero, since replay |
| // batches may not complete until after all callbacks have been returned |
| // to the surface, and we need to make sure that the call is not destroyed |
| // until all of these batches have completed. |
| // Note that we actually only need to track replay batches, but it's |
| // easier to track all batches with send ops. |
| int num_pending_retriable_subchannel_send_batches = 0; |
| |
| // Cached data for retrying send ops. |
| // send_initial_metadata |
| bool seen_send_initial_metadata = false; |
| grpc_linked_mdelem* send_initial_metadata_storage = nullptr; |
| grpc_metadata_batch send_initial_metadata; |
| uint32_t send_initial_metadata_flags; |
| gpr_atm* peer_string; |
| // send_message |
| // When we get a send_message op, we replace the original byte stream |
| // with a CachingByteStream that caches the slices to a local buffer for |
| // use in retries. |
| // Note: We inline the cache for the first 3 send_message ops and use |
| // dynamic allocation after that. This number was essentially picked |
| // at random; it could be changed in the future to tune performance. |
| grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; |
| // send_trailing_metadata |
| bool seen_send_trailing_metadata = false; |
| grpc_linked_mdelem* send_trailing_metadata_storage = nullptr; |
| grpc_metadata_batch send_trailing_metadata; |
| }; |
| |
| } // namespace |
| |
| // Forward declarations. |
| static void retry_commit(grpc_call_element* elem, |
| subchannel_call_retry_state* retry_state); |
| static void start_internal_recv_trailing_metadata(grpc_call_element* elem); |
| static void on_complete(void* arg, grpc_error* error); |
| static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); |
| static void start_pick_locked(void* arg, grpc_error* ignored); |
| |
| // |
| // send op data caching |
| // |
| |
| // Caches data for send ops so that it can be retried later, if not |
| // already cached. |
| static void maybe_cache_send_ops_for_batch(call_data* calld, |
| pending_batch* pending) { |
| if (pending->send_ops_cached) return; |
| pending->send_ops_cached = true; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| // Save a copy of metadata for send_initial_metadata ops. |
| if (batch->send_initial_metadata) { |
| calld->seen_send_initial_metadata = true; |
| GPR_ASSERT(calld->send_initial_metadata_storage == nullptr); |
| grpc_metadata_batch* send_initial_metadata = |
| batch->payload->send_initial_metadata.send_initial_metadata; |
| calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( |
| calld->arena, |
| sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count); |
| grpc_metadata_batch_copy(send_initial_metadata, |
| &calld->send_initial_metadata, |
| calld->send_initial_metadata_storage); |
| calld->send_initial_metadata_flags = |
| batch->payload->send_initial_metadata.send_initial_metadata_flags; |
| calld->peer_string = batch->payload->send_initial_metadata.peer_string; |
| } |
| // Set up cache for send_message ops. |
| if (batch->send_message) { |
| grpc_core::ByteStreamCache* cache = |
| static_cast<grpc_core::ByteStreamCache*>( |
| gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); |
| new (cache) grpc_core::ByteStreamCache( |
| std::move(batch->payload->send_message.send_message)); |
| calld->send_messages.push_back(cache); |
| } |
| // Save metadata batch for send_trailing_metadata ops. |
| if (batch->send_trailing_metadata) { |
| calld->seen_send_trailing_metadata = true; |
| GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr); |
| grpc_metadata_batch* send_trailing_metadata = |
| batch->payload->send_trailing_metadata.send_trailing_metadata; |
| calld->send_trailing_metadata_storage = |
| (grpc_linked_mdelem*)gpr_arena_alloc( |
| calld->arena, |
| sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count); |
| grpc_metadata_batch_copy(send_trailing_metadata, |
| &calld->send_trailing_metadata, |
| calld->send_trailing_metadata_storage); |
| } |
| } |
| |
| // Frees cached send_initial_metadata. |
| static void free_cached_send_initial_metadata(channel_data* chand, |
| call_data* calld) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, |
| calld); |
| } |
| grpc_metadata_batch_destroy(&calld->send_initial_metadata); |
| } |
| |
| // Frees cached send_message at index idx. |
| static void free_cached_send_message(channel_data* chand, call_data* calld, |
| size_t idx) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", |
| chand, calld, idx); |
| } |
| calld->send_messages[idx]->Destroy(); |
| } |
| |
| // Frees cached send_trailing_metadata. |
| static void free_cached_send_trailing_metadata(channel_data* chand, |
| call_data* calld) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: destroying calld->send_trailing_metadata", |
| chand, calld); |
| } |
| grpc_metadata_batch_destroy(&calld->send_trailing_metadata); |
| } |
| |
| // Frees cached send ops that have already been completed after |
| // committing the call. |
| static void free_cached_send_op_data_after_commit( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (retry_state->completed_send_initial_metadata) { |
| free_cached_send_initial_metadata(chand, calld); |
| } |
| for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { |
| free_cached_send_message(chand, calld, i); |
| } |
| if (retry_state->completed_send_trailing_metadata) { |
| free_cached_send_trailing_metadata(chand, calld); |
| } |
| } |
| |
| // Frees cached send ops that were completed by the completed batch in |
| // batch_data. Used when batches are completed after the call is committed. |
| static void free_cached_send_op_data_for_completed_batch( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (batch_data->batch.send_initial_metadata) { |
| free_cached_send_initial_metadata(chand, calld); |
| } |
| if (batch_data->batch.send_message) { |
| free_cached_send_message(chand, calld, |
| retry_state->completed_send_message_count - 1); |
| } |
| if (batch_data->batch.send_trailing_metadata) { |
| free_cached_send_trailing_metadata(chand, calld); |
| } |
| } |
| |
| // |
| // pending_batches management |
| // |
| |
| // Returns the index into calld->pending_batches to be used for batch. |
| static size_t get_batch_index(grpc_transport_stream_op_batch* batch) { |
| // Note: It is important the send_initial_metadata be the first entry |
| // here, since the code in pick_subchannel_locked() assumes it will be. |
| if (batch->send_initial_metadata) return 0; |
| if (batch->send_message) return 1; |
| if (batch->send_trailing_metadata) return 2; |
| if (batch->recv_initial_metadata) return 3; |
| if (batch->recv_message) return 4; |
| if (batch->recv_trailing_metadata) return 5; |
| GPR_UNREACHABLE_CODE(return (size_t)-1); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void pending_batches_add(grpc_call_element* elem, |
| grpc_transport_stream_op_batch* batch) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| const size_t idx = get_batch_index(batch); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, |
| calld, idx); |
| } |
| pending_batch* pending = &calld->pending_batches[idx]; |
| GPR_ASSERT(pending->batch == nullptr); |
| pending->batch = batch; |
| pending->send_ops_cached = false; |
| if (calld->enable_retries) { |
| // Update state in calld about pending batches. |
| // Also check if the batch takes us over the retry buffer limit. |
| // Note: We don't check the size of trailing metadata here, because |
| // gRPC clients do not send trailing metadata. |
| if (batch->send_initial_metadata) { |
| calld->pending_send_initial_metadata = true; |
| calld->bytes_buffered_for_retry += grpc_metadata_batch_size( |
| batch->payload->send_initial_metadata.send_initial_metadata); |
| } |
| if (batch->send_message) { |
| calld->pending_send_message = true; |
| calld->bytes_buffered_for_retry += |
| batch->payload->send_message.send_message->length(); |
| } |
| if (batch->send_trailing_metadata) { |
| calld->pending_send_trailing_metadata = true; |
| } |
| if (GPR_UNLIKELY(calld->bytes_buffered_for_retry > |
| chand->per_rpc_retry_buffer_size)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: exceeded retry buffer size, committing", |
| chand, calld); |
| } |
| subchannel_call_retry_state* retry_state = |
| calld->subchannel_call == nullptr |
| ? nullptr |
| : static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| retry_commit(elem, retry_state); |
| // If we are not going to retry and have not yet started, pretend |
| // retries are disabled so that we don't bother with retry overhead. |
| if (calld->num_attempts_completed == 0) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: disabling retries before first attempt", |
| chand, calld); |
| } |
| calld->enable_retries = false; |
| } |
| } |
| } |
| } |
| |
| static void pending_batch_clear(call_data* calld, pending_batch* pending) { |
| if (calld->enable_retries) { |
| if (pending->batch->send_initial_metadata) { |
| calld->pending_send_initial_metadata = false; |
| } |
| if (pending->batch->send_message) { |
| calld->pending_send_message = false; |
| } |
| if (pending->batch->send_trailing_metadata) { |
| calld->pending_send_trailing_metadata = false; |
| } |
| } |
| pending->batch = nullptr; |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg); |
| // Note: This will release the call combiner. |
| grpc_transport_stream_op_batch_finish_with_failure( |
| batch, GRPC_ERROR_REF(error), calld->call_combiner); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| // If yield_call_combiner is true, assumes responsibility for yielding |
| // the call combiner. |
| static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, |
| bool yield_call_combiner) { |
| GPR_ASSERT(error != GRPC_ERROR_NONE); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| size_t num_batches = 0; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| if (calld->pending_batches[i].batch != nullptr) ++num_batches; |
| } |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", |
| elem->channel_data, calld, num_batches, grpc_error_string(error)); |
| } |
| grpc_core::CallCombinerClosureList closures; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch != nullptr) { |
| batch->handler_private.extra_arg = calld; |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
| fail_pending_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), |
| "pending_batches_fail"); |
| pending_batch_clear(calld, pending); |
| } |
| } |
| if (yield_call_combiner) { |
| closures.RunClosures(calld->call_combiner); |
| } else { |
| closures.RunClosuresWithoutYielding(calld->call_combiner); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void resume_pending_batch_in_call_combiner(void* arg, |
| grpc_error* ignored) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| grpc_subchannel_call* subchannel_call = |
| static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(subchannel_call, batch); |
| } |
| |
| // This is called via the call combiner, so access to calld is synchronized. |
| static void pending_batches_resume(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (calld->enable_retries) { |
| start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE); |
| return; |
| } |
| // Retries not enabled; send down batches as-is. |
| if (grpc_client_channel_trace.enabled()) { |
| size_t num_batches = 0; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| if (calld->pending_batches[i].batch != nullptr) ++num_batches; |
| } |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting %" PRIuPTR |
| " pending batches on subchannel_call=%p", |
| chand, calld, num_batches, calld->subchannel_call); |
| } |
| grpc_core::CallCombinerClosureList closures; |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch != nullptr) { |
| batch->handler_private.extra_arg = calld->subchannel_call; |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
| resume_pending_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, |
| "pending_batches_resume"); |
| pending_batch_clear(calld, pending); |
| } |
| } |
| // Note: This will release the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| } |
| |
| static void maybe_clear_pending_batch(grpc_call_element* elem, |
| pending_batch* pending) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| // We clear the pending batch if all of its callbacks have been |
| // scheduled and reset to nullptr. |
| if (batch->on_complete == nullptr && |
| (!batch->recv_initial_metadata || |
| batch->payload->recv_initial_metadata.recv_initial_metadata_ready == |
| nullptr) && |
| (!batch->recv_message || |
| batch->payload->recv_message.recv_message_ready == nullptr) && |
| (!batch->recv_trailing_metadata || |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == |
| nullptr)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, |
| calld); |
| } |
| pending_batch_clear(calld, pending); |
| } |
| } |
| |
| // Returns a pointer to the first pending batch for which predicate(batch) |
| // returns true, or null if not found. |
| template <typename Predicate> |
| static pending_batch* pending_batch_find(grpc_call_element* elem, |
| const char* log_message, |
| Predicate predicate) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch != nullptr && predicate(batch)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, |
| calld, log_message, i); |
| } |
| return pending; |
| } |
| } |
| return nullptr; |
| } |
| |
| // |
| // retry code |
| // |
| |
| // Commits the call so that no further retry attempts will be performed. |
| static void retry_commit(grpc_call_element* elem, |
| subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (calld->retry_committed) return; |
| calld->retry_committed = true; |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld); |
| } |
| if (retry_state != nullptr) { |
| free_cached_send_op_data_after_commit(elem, retry_state); |
| } |
| } |
| |
| // Starts a retry after appropriate back-off. |
| static void do_retry(grpc_call_element* elem, |
| subchannel_call_retry_state* retry_state, |
| grpc_millis server_pushback_ms) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| GPR_ASSERT(calld->method_params != nullptr); |
| const ClientChannelMethodParams::RetryPolicy* retry_policy = |
| calld->method_params->retry_policy(); |
| GPR_ASSERT(retry_policy != nullptr); |
| // Reset subchannel call and connected subchannel. |
| if (calld->subchannel_call != nullptr) { |
| GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, |
| "client_channel_call_retry"); |
| calld->subchannel_call = nullptr; |
| } |
| if (calld->have_request) { |
| calld->have_request = false; |
| calld->request.Destroy(); |
| } |
| // Compute backoff delay. |
| grpc_millis next_attempt_time; |
| if (server_pushback_ms >= 0) { |
| next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms; |
| calld->last_attempt_got_server_pushback = true; |
| } else { |
| if (calld->num_attempts_completed == 1 || |
| calld->last_attempt_got_server_pushback) { |
| calld->retry_backoff.Init( |
| grpc_core::BackOff::Options() |
| .set_initial_backoff(retry_policy->initial_backoff) |
| .set_multiplier(retry_policy->backoff_multiplier) |
| .set_jitter(RETRY_BACKOFF_JITTER) |
| .set_max_backoff(retry_policy->max_backoff)); |
| calld->last_attempt_got_server_pushback = false; |
| } |
| next_attempt_time = calld->retry_backoff->NextAttemptTime(); |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand, |
| calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); |
| } |
| // Schedule retry after computed delay. |
| GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, |
| grpc_combiner_scheduler(chand->combiner)); |
| grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); |
| // Update bookkeeping. |
| if (retry_state != nullptr) retry_state->retry_dispatched = true; |
| } |
| |
| // Returns true if the call is being retried. |
| static bool maybe_retry(grpc_call_element* elem, |
| subchannel_batch_data* batch_data, |
| grpc_status_code status, |
| grpc_mdelem* server_pushback_md) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| // Get retry policy. |
| if (calld->method_params == nullptr) return false; |
| const ClientChannelMethodParams::RetryPolicy* retry_policy = |
| calld->method_params->retry_policy(); |
| if (retry_policy == nullptr) return false; |
| // If we've already dispatched a retry from this call, return true. |
| // This catches the case where the batch has multiple callbacks |
| // (i.e., it includes either recv_message or recv_initial_metadata). |
| subchannel_call_retry_state* retry_state = nullptr; |
| if (batch_data != nullptr) { |
| retry_state = static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| if (retry_state->retry_dispatched) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, |
| calld); |
| } |
| return true; |
| } |
| } |
| // Check status. |
| if (GPR_LIKELY(status == GRPC_STATUS_OK)) { |
| if (calld->retry_throttle_data != nullptr) { |
| calld->retry_throttle_data->RecordSuccess(); |
| } |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld); |
| } |
| return false; |
| } |
| // Status is not OK. Check whether the status is retryable. |
| if (!retry_policy->retryable_status_codes.Contains(status)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: status %s not configured as retryable", chand, |
| calld, grpc_status_code_to_string(status)); |
| } |
| return false; |
| } |
| // Record the failure and check whether retries are throttled. |
| // Note that it's important for this check to come after the status |
| // code check above, since we should only record failures whose statuses |
| // match the configured retryable status codes, so that we don't count |
| // things like failures due to malformed requests (INVALID_ARGUMENT). |
| // Conversely, it's important for this to come before the remaining |
| // checks, so that we don't fail to record failures due to other factors. |
| if (calld->retry_throttle_data != nullptr && |
| !calld->retry_throttle_data->RecordFailure()) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld); |
| } |
| return false; |
| } |
| // Check whether the call is committed. |
| if (calld->retry_committed) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand, |
| calld); |
| } |
| return false; |
| } |
| // Check whether we have retries remaining. |
| ++calld->num_attempts_completed; |
| if (calld->num_attempts_completed >= retry_policy->max_attempts) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand, |
| calld, retry_policy->max_attempts); |
| } |
| return false; |
| } |
| // If the call was cancelled from the surface, don't retry. |
| if (calld->cancel_error != GRPC_ERROR_NONE) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: call cancelled from surface, not retrying", |
| chand, calld); |
| } |
| return false; |
| } |
| // Check server push-back. |
| grpc_millis server_pushback_ms = -1; |
| if (server_pushback_md != nullptr) { |
| // If the value is "-1" or any other unparseable string, we do not retry. |
| uint32_t ms; |
| if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: not retrying due to server push-back", |
| chand, calld); |
| } |
| return false; |
| } else { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", |
| chand, calld, ms); |
| } |
| server_pushback_ms = (grpc_millis)ms; |
| } |
| } |
| do_retry(elem, retry_state, server_pushback_ms); |
| return true; |
| } |
| |
| // |
| // subchannel_batch_data |
| // |
| |
| namespace { |
| |
| subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, |
| call_data* calld, int refcount, |
| bool set_on_complete) |
| : elem(elem), |
| subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, |
| "batch_data_create")) { |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| batch.payload = &retry_state->batch_payload; |
| gpr_ref_init(&refs, refcount); |
| if (set_on_complete) { |
| GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this, |
| grpc_schedule_on_exec_ctx); |
| batch.on_complete = &on_complete; |
| } |
| GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); |
| } |
| |
| void subchannel_batch_data::destroy() { |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data(subchannel_call)); |
| if (batch.send_initial_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); |
| } |
| if (batch.send_trailing_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); |
| } |
| if (batch.recv_initial_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); |
| } |
| if (batch.recv_trailing_metadata) { |
| grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); |
| } |
| GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref"); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); |
| } |
| |
| } // namespace |
| |
| // Creates a subchannel_batch_data object on the call's arena with the |
| // specified refcount. If set_on_complete is true, the batch's |
| // on_complete callback will be set to point to on_complete(); |
| // otherwise, the batch's on_complete callback will be null. |
| static subchannel_batch_data* batch_data_create(grpc_call_element* elem, |
| int refcount, |
| bool set_on_complete) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| subchannel_batch_data* batch_data = |
| new (gpr_arena_alloc(calld->arena, sizeof(*batch_data))) |
| subchannel_batch_data(elem, calld, refcount, set_on_complete); |
| return batch_data; |
| } |
| |
| static void batch_data_unref(subchannel_batch_data* batch_data) { |
| if (gpr_unref(&batch_data->refs)) { |
| batch_data->destroy(); |
| } |
| } |
| |
| // |
| // recv_initial_metadata callback handling |
| // |
| |
| // Invokes recv_initial_metadata_ready for a subchannel batch. |
| static void invoke_recv_initial_metadata_callback(void* arg, |
| grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| // Find pending batch. |
| pending_batch* pending = pending_batch_find( |
| batch_data->elem, "invoking recv_initial_metadata_ready for", |
| [](grpc_transport_stream_op_batch* batch) { |
| return batch->recv_initial_metadata && |
| batch->payload->recv_initial_metadata |
| .recv_initial_metadata_ready != nullptr; |
| }); |
| GPR_ASSERT(pending != nullptr); |
| // Return metadata. |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| grpc_metadata_batch_move( |
| &retry_state->recv_initial_metadata, |
| pending->batch->payload->recv_initial_metadata.recv_initial_metadata); |
| // Update bookkeeping. |
| // Note: Need to do this before invoking the callback, since invoking |
| // the callback will result in yielding the call combiner. |
| grpc_closure* recv_initial_metadata_ready = |
| pending->batch->payload->recv_initial_metadata |
| .recv_initial_metadata_ready; |
| pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = |
| nullptr; |
| maybe_clear_pending_batch(batch_data->elem, pending); |
| batch_data_unref(batch_data); |
| // Invoke callback. |
| GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); |
| } |
| |
| // Intercepts recv_initial_metadata_ready callback for retries. |
| // Commits the call and returns the initial metadata up the stack. |
| static void recv_initial_metadata_ready(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| retry_state->completed_recv_initial_metadata = true; |
| // If a retry was already dispatched, then we're not going to use the |
| // result of this recv_initial_metadata op, so do nothing. |
| if (retry_state->retry_dispatched) { |
| GRPC_CALL_COMBINER_STOP( |
| calld->call_combiner, |
| "recv_initial_metadata_ready after retry dispatched"); |
| return; |
| } |
| // If we got an error or a Trailers-Only response and have not yet gotten |
| // the recv_trailing_metadata_ready callback, then defer propagating this |
| // callback back to the surface. We can evaluate whether to retry when |
| // recv_trailing_metadata comes back. |
| if (GPR_UNLIKELY((retry_state->trailing_metadata_available || |
| error != GRPC_ERROR_NONE) && |
| !retry_state->completed_recv_trailing_metadata)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: deferring recv_initial_metadata_ready " |
| "(Trailers-Only)", |
| chand, calld); |
| } |
| retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; |
| retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); |
| if (!retry_state->started_recv_trailing_metadata) { |
| // recv_trailing_metadata not yet started by application; start it |
| // ourselves to get status. |
| start_internal_recv_trailing_metadata(elem); |
| } else { |
| GRPC_CALL_COMBINER_STOP( |
| calld->call_combiner, |
| "recv_initial_metadata_ready trailers-only or error"); |
| } |
| return; |
| } |
| // Received valid initial metadata, so commit the call. |
| retry_commit(elem, retry_state); |
| // Invoke the callback to return the result to the surface. |
| // Manually invoking a callback function; it does not take ownership of error. |
| invoke_recv_initial_metadata_callback(batch_data, error); |
| } |
| |
| // |
| // recv_message callback handling |
| // |
| |
| // Invokes recv_message_ready for a subchannel batch. |
| static void invoke_recv_message_callback(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| // Find pending op. |
| pending_batch* pending = pending_batch_find( |
| batch_data->elem, "invoking recv_message_ready for", |
| [](grpc_transport_stream_op_batch* batch) { |
| return batch->recv_message && |
| batch->payload->recv_message.recv_message_ready != nullptr; |
| }); |
| GPR_ASSERT(pending != nullptr); |
| // Return payload. |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| *pending->batch->payload->recv_message.recv_message = |
| std::move(retry_state->recv_message); |
| // Update bookkeeping. |
| // Note: Need to do this before invoking the callback, since invoking |
| // the callback will result in yielding the call combiner. |
| grpc_closure* recv_message_ready = |
| pending->batch->payload->recv_message.recv_message_ready; |
| pending->batch->payload->recv_message.recv_message_ready = nullptr; |
| maybe_clear_pending_batch(batch_data->elem, pending); |
| batch_data_unref(batch_data); |
| // Invoke callback. |
| GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); |
| } |
| |
| // Intercepts recv_message_ready callback for retries. |
| // Commits the call and returns the message up the stack. |
| static void recv_message_ready(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| ++retry_state->completed_recv_message_count; |
| // If a retry was already dispatched, then we're not going to use the |
| // result of this recv_message op, so do nothing. |
| if (retry_state->retry_dispatched) { |
| GRPC_CALL_COMBINER_STOP(calld->call_combiner, |
| "recv_message_ready after retry dispatched"); |
| return; |
| } |
| // If we got an error or the payload was nullptr and we have not yet gotten |
| // the recv_trailing_metadata_ready callback, then defer propagating this |
| // callback back to the surface. We can evaluate whether to retry when |
| // recv_trailing_metadata comes back. |
| if (GPR_UNLIKELY( |
| (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && |
| !retry_state->completed_recv_trailing_metadata)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: deferring recv_message_ready (nullptr " |
| "message and recv_trailing_metadata pending)", |
| chand, calld); |
| } |
| retry_state->recv_message_ready_deferred_batch = batch_data; |
| retry_state->recv_message_error = GRPC_ERROR_REF(error); |
| if (!retry_state->started_recv_trailing_metadata) { |
| // recv_trailing_metadata not yet started by application; start it |
| // ourselves to get status. |
| start_internal_recv_trailing_metadata(elem); |
| } else { |
| GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null"); |
| } |
| return; |
| } |
| // Received a valid message, so commit the call. |
| retry_commit(elem, retry_state); |
| // Invoke the callback to return the result to the surface. |
| // Manually invoking a callback function; it does not take ownership of error. |
| invoke_recv_message_callback(batch_data, error); |
| } |
| |
| // |
| // recv_trailing_metadata handling |
| // |
| |
| // Sets *status and *server_pushback_md based on md_batch and error. |
| // Only sets *server_pushback_md if server_pushback_md != nullptr. |
| static void get_call_status(grpc_call_element* elem, |
| grpc_metadata_batch* md_batch, grpc_error* error, |
| grpc_status_code* status, |
| grpc_mdelem** server_pushback_md) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (error != GRPC_ERROR_NONE) { |
| grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, |
| nullptr); |
| } else { |
| GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); |
| *status = |
| grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); |
| if (server_pushback_md != nullptr && |
| md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { |
| *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; |
| } |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Adds recv_trailing_metadata_ready closure to closures. |
| static void add_closure_for_recv_trailing_metadata_ready( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| grpc_error* error, grpc_core::CallCombinerClosureList* closures) { |
| // Find pending batch. |
| pending_batch* pending = pending_batch_find( |
| elem, "invoking recv_trailing_metadata for", |
| [](grpc_transport_stream_op_batch* batch) { |
| return batch->recv_trailing_metadata && |
| batch->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready != nullptr; |
| }); |
| // If we generated the recv_trailing_metadata op internally via |
| // start_internal_recv_trailing_metadata(), then there will be no |
| // pending batch. |
| if (pending == nullptr) { |
| GRPC_ERROR_UNREF(error); |
| return; |
| } |
| // Return metadata. |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| grpc_metadata_batch_move( |
| &retry_state->recv_trailing_metadata, |
| pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); |
| // Add closure. |
| closures->Add(pending->batch->payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready, |
| error, "recv_trailing_metadata_ready for pending batch"); |
| // Update bookkeeping. |
| pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| nullptr; |
| maybe_clear_pending_batch(elem, pending); |
| } |
| |
| // Adds any necessary closures for deferred recv_initial_metadata and |
| // recv_message callbacks to closures. |
| static void add_closures_for_deferred_recv_callbacks( |
| subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, |
| grpc_core::CallCombinerClosureList* closures) { |
| if (batch_data->batch.recv_trailing_metadata) { |
| // Add closure for deferred recv_initial_metadata_ready. |
| if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != |
| nullptr)) { |
| GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, |
| invoke_recv_initial_metadata_callback, |
| retry_state->recv_initial_metadata_ready_deferred_batch, |
| grpc_schedule_on_exec_ctx); |
| closures->Add(&retry_state->recv_initial_metadata_ready, |
| retry_state->recv_initial_metadata_error, |
| "resuming recv_initial_metadata_ready"); |
| retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; |
| } |
| // Add closure for deferred recv_message_ready. |
| if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != |
| nullptr)) { |
| GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, |
| invoke_recv_message_callback, |
| retry_state->recv_message_ready_deferred_batch, |
| grpc_schedule_on_exec_ctx); |
| closures->Add(&retry_state->recv_message_ready, |
| retry_state->recv_message_error, |
| "resuming recv_message_ready"); |
| retry_state->recv_message_ready_deferred_batch = nullptr; |
| } |
| } |
| } |
| |
| // Returns true if any op in the batch was not yet started. |
| // Only looks at send ops, since recv ops are always started immediately. |
| static bool pending_batch_is_unstarted( |
| pending_batch* pending, call_data* calld, |
| subchannel_call_retry_state* retry_state) { |
| if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { |
| return false; |
| } |
| if (pending->batch->send_initial_metadata && |
| !retry_state->started_send_initial_metadata) { |
| return true; |
| } |
| if (pending->batch->send_message && |
| retry_state->started_send_message_count < calld->send_messages.size()) { |
| return true; |
| } |
| if (pending->batch->send_trailing_metadata && |
| !retry_state->started_send_trailing_metadata) { |
| return true; |
| } |
| return false; |
| } |
| |
| // For any pending batch containing an op that has not yet been started, |
| // adds the pending batch's completion closures to closures. |
| static void add_closures_to_fail_unstarted_pending_batches( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state, |
| grpc_error* error, grpc_core::CallCombinerClosureList* closures) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| if (pending_batch_is_unstarted(pending, calld, retry_state)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: failing unstarted pending batch at index " |
| "%" PRIuPTR, |
| chand, calld, i); |
| } |
| closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), |
| "failing on_complete for pending batch"); |
| pending->batch->on_complete = nullptr; |
| maybe_clear_pending_batch(elem, pending); |
| } |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Runs necessary closures upon completion of a call attempt. |
| static void run_closures_for_completed_call(subchannel_batch_data* batch_data, |
| grpc_error* error) { |
| grpc_call_element* elem = batch_data->elem; |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| // Construct list of closures to execute. |
| grpc_core::CallCombinerClosureList closures; |
| // First, add closure for recv_trailing_metadata_ready. |
| add_closure_for_recv_trailing_metadata_ready( |
| elem, batch_data, GRPC_ERROR_REF(error), &closures); |
| // If there are deferred recv_initial_metadata_ready or recv_message_ready |
| // callbacks, add them to closures. |
| add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); |
| // Add closures to fail any pending batches that have not yet been started. |
| add_closures_to_fail_unstarted_pending_batches( |
| elem, retry_state, GRPC_ERROR_REF(error), &closures); |
| // Don't need batch_data anymore. |
| batch_data_unref(batch_data); |
| // Schedule all of the closures identified above. |
| // Note: This will release the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Intercepts recv_trailing_metadata_ready callback for retries. |
| // Commits the call and returns the trailing metadata up the stack. |
| static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", |
| chand, calld, grpc_error_string(error)); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| retry_state->completed_recv_trailing_metadata = true; |
| // Get the call's status and check for server pushback metadata. |
| grpc_status_code status = GRPC_STATUS_OK; |
| grpc_mdelem* server_pushback_md = nullptr; |
| grpc_metadata_batch* md_batch = |
| batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; |
| get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, |
| &server_pushback_md); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, |
| calld, grpc_status_code_to_string(status)); |
| } |
| // Check if we should retry. |
| if (maybe_retry(elem, batch_data, status, server_pushback_md)) { |
| // Unref batch_data for deferred recv_initial_metadata_ready or |
| // recv_message_ready callbacks, if any. |
| if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { |
| batch_data_unref(batch_data); |
| GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); |
| } |
| if (retry_state->recv_message_ready_deferred_batch != nullptr) { |
| batch_data_unref(batch_data); |
| GRPC_ERROR_UNREF(retry_state->recv_message_error); |
| } |
| batch_data_unref(batch_data); |
| return; |
| } |
| // Not retrying, so commit the call. |
| retry_commit(elem, retry_state); |
| // Run any necessary closures. |
| run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); |
| } |
| |
| // |
| // on_complete callback handling |
| // |
| |
| // Adds the on_complete closure for the pending batch completed in |
| // batch_data to closures. |
| static void add_closure_for_completed_pending_batch( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| subchannel_call_retry_state* retry_state, grpc_error* error, |
| grpc_core::CallCombinerClosureList* closures) { |
| pending_batch* pending = pending_batch_find( |
| elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { |
| // Match the pending batch with the same set of send ops as the |
| // subchannel batch we've just completed. |
| return batch->on_complete != nullptr && |
| batch_data->batch.send_initial_metadata == |
| batch->send_initial_metadata && |
| batch_data->batch.send_message == batch->send_message && |
| batch_data->batch.send_trailing_metadata == |
| batch->send_trailing_metadata; |
| }); |
| // If batch_data is a replay batch, then there will be no pending |
| // batch to complete. |
| if (pending == nullptr) { |
| GRPC_ERROR_UNREF(error); |
| return; |
| } |
| // Add closure. |
| closures->Add(pending->batch->on_complete, error, |
| "on_complete for pending batch"); |
| pending->batch->on_complete = nullptr; |
| maybe_clear_pending_batch(elem, pending); |
| } |
| |
| // If there are any cached ops to replay or pending ops to start on the |
| // subchannel call, adds a closure to closures to invoke |
| // start_retriable_subchannel_batches(). |
| static void add_closures_for_replay_or_pending_send_ops( |
| grpc_call_element* elem, subchannel_batch_data* batch_data, |
| subchannel_call_retry_state* retry_state, |
| grpc_core::CallCombinerClosureList* closures) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| bool have_pending_send_message_ops = |
| retry_state->started_send_message_count < calld->send_messages.size(); |
| bool have_pending_send_trailing_metadata_op = |
| calld->seen_send_trailing_metadata && |
| !retry_state->started_send_trailing_metadata; |
| if (!have_pending_send_message_ops && |
| !have_pending_send_trailing_metadata_op) { |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch == nullptr || pending->send_ops_cached) continue; |
| if (batch->send_message) have_pending_send_message_ops = true; |
| if (batch->send_trailing_metadata) { |
| have_pending_send_trailing_metadata_op = true; |
| } |
| } |
| } |
| if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting next batch for pending send op(s)", |
| chand, calld); |
| } |
| GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, |
| start_retriable_subchannel_batches, elem, |
| grpc_schedule_on_exec_ctx); |
| closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, |
| "starting next batch for send_* op(s)"); |
| } |
| } |
| |
| // Callback used to intercept on_complete from subchannel calls. |
| // Called only when retries are enabled. |
| static void on_complete(void* arg, grpc_error* error) { |
| subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); |
| grpc_call_element* elem = batch_data->elem; |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); |
| gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", |
| chand, calld, grpc_error_string(error), batch_str); |
| gpr_free(batch_str); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| batch_data->subchannel_call)); |
| // Update bookkeeping in retry_state. |
| if (batch_data->batch.send_initial_metadata) { |
| retry_state->completed_send_initial_metadata = true; |
| } |
| if (batch_data->batch.send_message) { |
| ++retry_state->completed_send_message_count; |
| } |
| if (batch_data->batch.send_trailing_metadata) { |
| retry_state->completed_send_trailing_metadata = true; |
| } |
| // If the call is committed, free cached data for send ops that we've just |
| // completed. |
| if (calld->retry_committed) { |
| free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); |
| } |
| // Construct list of closures to execute. |
| grpc_core::CallCombinerClosureList closures; |
| // If a retry was already dispatched, that means we saw |
| // recv_trailing_metadata before this, so we do nothing here. |
| // Otherwise, invoke the callback to return the result to the surface. |
| if (!retry_state->retry_dispatched) { |
| // Add closure for the completed pending batch, if any. |
| add_closure_for_completed_pending_batch(elem, batch_data, retry_state, |
| GRPC_ERROR_REF(error), &closures); |
| // If needed, add a callback to start any replay or pending send ops on |
| // the subchannel call. |
| if (!retry_state->completed_recv_trailing_metadata) { |
| add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, |
| &closures); |
| } |
| } |
| // Track number of pending subchannel send batches and determine if this |
| // was the last one. |
| --calld->num_pending_retriable_subchannel_send_batches; |
| const bool last_send_batch_complete = |
| calld->num_pending_retriable_subchannel_send_batches == 0; |
| // Don't need batch_data anymore. |
| batch_data_unref(batch_data); |
| // Schedule all of the closures identified above. |
| // Note: This yeilds the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| // If this was the last subchannel send batch, unref the call stack. |
| if (last_send_batch_complete) { |
| GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); |
| } |
| } |
| |
| // |
| // subchannel batch construction |
| // |
| |
| // Helper function used to start a subchannel batch in the call combiner. |
| static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { |
| grpc_transport_stream_op_batch* batch = |
| static_cast<grpc_transport_stream_op_batch*>(arg); |
| grpc_subchannel_call* subchannel_call = |
| static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(subchannel_call, batch); |
| } |
| |
| // Adds a closure to closures that will execute batch in the call combiner. |
| static void add_closure_for_subchannel_batch( |
| grpc_call_element* elem, grpc_transport_stream_op_batch* batch, |
| grpc_core::CallCombinerClosureList* closures) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| batch->handler_private.extra_arg = calld->subchannel_call; |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
| start_batch_in_call_combiner, batch, |
| grpc_schedule_on_exec_ctx); |
| if (grpc_client_channel_trace.enabled()) { |
| char* batch_str = grpc_transport_stream_op_batch_string(batch); |
| gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, |
| calld, batch_str); |
| gpr_free(batch_str); |
| } |
| closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, |
| "start_subchannel_batch"); |
| } |
| |
| // Adds retriable send_initial_metadata op to batch_data. |
| static void add_retriable_send_initial_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| // Maps the number of retries to the corresponding metadata value slice. |
| static const grpc_slice* retry_count_strings[] = { |
| &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4}; |
| // We need to make a copy of the metadata batch for each attempt, since |
| // the filters in the subchannel stack may modify this batch, and we don't |
| // want those modifications to be passed forward to subsequent attempts. |
| // |
| // If we've already completed one or more attempts, add the |
| // grpc-retry-attempts header. |
| retry_state->send_initial_metadata_storage = |
| static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( |
| calld->arena, sizeof(grpc_linked_mdelem) * |
| (calld->send_initial_metadata.list.count + |
| (calld->num_attempts_completed > 0)))); |
| grpc_metadata_batch_copy(&calld->send_initial_metadata, |
| &retry_state->send_initial_metadata, |
| retry_state->send_initial_metadata_storage); |
| if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named |
| .grpc_previous_rpc_attempts != nullptr)) { |
| grpc_metadata_batch_remove(&retry_state->send_initial_metadata, |
| retry_state->send_initial_metadata.idx.named |
| .grpc_previous_rpc_attempts); |
| } |
| if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { |
| grpc_mdelem retry_md = grpc_mdelem_create( |
| GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, |
| *retry_count_strings[calld->num_attempts_completed - 1], nullptr); |
| grpc_error* error = grpc_metadata_batch_add_tail( |
| &retry_state->send_initial_metadata, |
| &retry_state->send_initial_metadata_storage[calld->send_initial_metadata |
| .list.count], |
| retry_md); |
| if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
| gpr_log(GPR_ERROR, "error adding retry metadata: %s", |
| grpc_error_string(error)); |
| GPR_ASSERT(false); |
| } |
| } |
| retry_state->started_send_initial_metadata = true; |
| batch_data->batch.send_initial_metadata = true; |
| batch_data->batch.payload->send_initial_metadata.send_initial_metadata = |
| &retry_state->send_initial_metadata; |
| batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = |
| calld->send_initial_metadata_flags; |
| batch_data->batch.payload->send_initial_metadata.peer_string = |
| calld->peer_string; |
| } |
| |
| // Adds retriable send_message op to batch_data. |
| static void add_retriable_send_message_op( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", |
| chand, calld, retry_state->started_send_message_count); |
| } |
| grpc_core::ByteStreamCache* cache = |
| calld->send_messages[retry_state->started_send_message_count]; |
| ++retry_state->started_send_message_count; |
| retry_state->send_message.Init(cache); |
| batch_data->batch.send_message = true; |
| batch_data->batch.payload->send_message.send_message.reset( |
| retry_state->send_message.get()); |
| } |
| |
| // Adds retriable send_trailing_metadata op to batch_data. |
| static void add_retriable_send_trailing_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| // We need to make a copy of the metadata batch for each attempt, since |
| // the filters in the subchannel stack may modify this batch, and we don't |
| // want those modifications to be passed forward to subsequent attempts. |
| retry_state->send_trailing_metadata_storage = |
| static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( |
| calld->arena, sizeof(grpc_linked_mdelem) * |
| calld->send_trailing_metadata.list.count)); |
| grpc_metadata_batch_copy(&calld->send_trailing_metadata, |
| &retry_state->send_trailing_metadata, |
| retry_state->send_trailing_metadata_storage); |
| retry_state->started_send_trailing_metadata = true; |
| batch_data->batch.send_trailing_metadata = true; |
| batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = |
| &retry_state->send_trailing_metadata; |
| } |
| |
| // Adds retriable recv_initial_metadata op to batch_data. |
| static void add_retriable_recv_initial_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| retry_state->started_recv_initial_metadata = true; |
| batch_data->batch.recv_initial_metadata = true; |
| grpc_metadata_batch_init(&retry_state->recv_initial_metadata); |
| batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = |
| &retry_state->recv_initial_metadata; |
| batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = |
| &retry_state->trailing_metadata_available; |
| GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, |
| recv_initial_metadata_ready, batch_data, |
| grpc_schedule_on_exec_ctx); |
| batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = |
| &retry_state->recv_initial_metadata_ready; |
| } |
| |
| // Adds retriable recv_message op to batch_data. |
| static void add_retriable_recv_message_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| ++retry_state->started_recv_message_count; |
| batch_data->batch.recv_message = true; |
| batch_data->batch.payload->recv_message.recv_message = |
| &retry_state->recv_message; |
| GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, |
| batch_data, grpc_schedule_on_exec_ctx); |
| batch_data->batch.payload->recv_message.recv_message_ready = |
| &retry_state->recv_message_ready; |
| } |
| |
| // Adds retriable recv_trailing_metadata op to batch_data. |
| static void add_retriable_recv_trailing_metadata_op( |
| call_data* calld, subchannel_call_retry_state* retry_state, |
| subchannel_batch_data* batch_data) { |
| retry_state->started_recv_trailing_metadata = true; |
| batch_data->batch.recv_trailing_metadata = true; |
| grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); |
| batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = |
| &retry_state->recv_trailing_metadata; |
| batch_data->batch.payload->recv_trailing_metadata.collect_stats = |
| &retry_state->collect_stats; |
| GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, |
| recv_trailing_metadata_ready, batch_data, |
| grpc_schedule_on_exec_ctx); |
| batch_data->batch.payload->recv_trailing_metadata |
| .recv_trailing_metadata_ready = |
| &retry_state->recv_trailing_metadata_ready; |
| } |
| |
| // Helper function used to start a recv_trailing_metadata batch. This |
| // is used in the case where a recv_initial_metadata or recv_message |
| // op fails in a way that we know the call is over but when the application |
| // has not yet started its own recv_trailing_metadata op. |
| static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: call failed but recv_trailing_metadata not " |
| "started; starting it internally", |
| chand, calld); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| // Create batch_data with 2 refs, since this batch will be unreffed twice: |
| // once for the recv_trailing_metadata_ready callback when the subchannel |
| // batch returns, and again when we actually get a recv_trailing_metadata |
| // op from the surface. |
| subchannel_batch_data* batch_data = |
| batch_data_create(elem, 2, false /* set_on_complete */); |
| add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); |
| retry_state->recv_trailing_metadata_internal_batch = batch_data; |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); |
| } |
| |
| // If there are any cached send ops that need to be replayed on the |
| // current subchannel call, creates and returns a new subchannel batch |
| // to replay those ops. Otherwise, returns nullptr. |
| static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| subchannel_batch_data* replay_batch_data = nullptr; |
| // send_initial_metadata. |
| if (calld->seen_send_initial_metadata && |
| !retry_state->started_send_initial_metadata && |
| !calld->pending_send_initial_metadata) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: replaying previously completed " |
| "send_initial_metadata op", |
| chand, calld); |
| } |
| replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); |
| add_retriable_send_initial_metadata_op(calld, retry_state, |
| replay_batch_data); |
| } |
| // send_message. |
| // Note that we can only have one send_message op in flight at a time. |
| if (retry_state->started_send_message_count < calld->send_messages.size() && |
| retry_state->started_send_message_count == |
| retry_state->completed_send_message_count && |
| !calld->pending_send_message) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: replaying previously completed " |
| "send_message op", |
| chand, calld); |
| } |
| if (replay_batch_data == nullptr) { |
| replay_batch_data = |
| batch_data_create(elem, 1, true /* set_on_complete */); |
| } |
| add_retriable_send_message_op(elem, retry_state, replay_batch_data); |
| } |
| // send_trailing_metadata. |
| // Note that we only add this op if we have no more send_message ops |
| // to start, since we can't send down any more send_message ops after |
| // send_trailing_metadata. |
| if (calld->seen_send_trailing_metadata && |
| retry_state->started_send_message_count == calld->send_messages.size() && |
| !retry_state->started_send_trailing_metadata && |
| !calld->pending_send_trailing_metadata) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: replaying previously completed " |
| "send_trailing_metadata op", |
| chand, calld); |
| } |
| if (replay_batch_data == nullptr) { |
| replay_batch_data = |
| batch_data_create(elem, 1, true /* set_on_complete */); |
| } |
| add_retriable_send_trailing_metadata_op(calld, retry_state, |
| replay_batch_data); |
| } |
| return replay_batch_data; |
| } |
| |
| // Adds subchannel batches for pending batches to batches, updating |
| // *num_batches as needed. |
| static void add_subchannel_batches_for_pending_batches( |
| grpc_call_element* elem, subchannel_call_retry_state* retry_state, |
| grpc_core::CallCombinerClosureList* closures) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
| pending_batch* pending = &calld->pending_batches[i]; |
| grpc_transport_stream_op_batch* batch = pending->batch; |
| if (batch == nullptr) continue; |
| // Skip any batch that either (a) has already been started on this |
| // subchannel call or (b) we can't start yet because we're still |
| // replaying send ops that need to be completed first. |
| // TODO(roth): Note that if any one op in the batch can't be sent |
| // yet due to ops that we're replaying, we don't start any of the ops |
| // in the batch. This is probably okay, but it could conceivably |
| // lead to increased latency in some cases -- e.g., we could delay |
| // starting a recv op due to it being in the same batch with a send |
| // op. If/when we revamp the callback protocol in |
| // transport_stream_op_batch, we may be able to fix this. |
| if (batch->send_initial_metadata && |
| retry_state->started_send_initial_metadata) { |
| continue; |
| } |
| if (batch->send_message && retry_state->completed_send_message_count < |
| retry_state->started_send_message_count) { |
| continue; |
| } |
| // Note that we only start send_trailing_metadata if we have no more |
| // send_message ops to start, since we can't send down any more |
| // send_message ops after send_trailing_metadata. |
| if (batch->send_trailing_metadata && |
| (retry_state->started_send_message_count + batch->send_message < |
| calld->send_messages.size() || |
| retry_state->started_send_trailing_metadata)) { |
| continue; |
| } |
| if (batch->recv_initial_metadata && |
| retry_state->started_recv_initial_metadata) { |
| continue; |
| } |
| if (batch->recv_message && retry_state->completed_recv_message_count < |
| retry_state->started_recv_message_count) { |
| continue; |
| } |
| if (batch->recv_trailing_metadata && |
| retry_state->started_recv_trailing_metadata) { |
| // If we previously completed a recv_trailing_metadata op |
| // initiated by start_internal_recv_trailing_metadata(), use the |
| // result of that instead of trying to re-start this op. |
| if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch != |
| nullptr))) { |
| // If the batch completed, then trigger the completion callback |
| // directly, so that we return the previously returned results to |
| // the application. Otherwise, just unref the internally |
| // started subchannel batch, since we'll propagate the |
| // completion when it completes. |
| if (retry_state->completed_recv_trailing_metadata) { |
| // Batches containing recv_trailing_metadata always succeed. |
| closures->Add( |
| &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, |
| "re-executing recv_trailing_metadata_ready to propagate " |
| "internally triggered result"); |
| } else { |
| batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); |
| } |
| retry_state->recv_trailing_metadata_internal_batch = nullptr; |
| } |
| continue; |
| } |
| // If we're not retrying, just send the batch as-is. |
| if (calld->method_params == nullptr || |
| calld->method_params->retry_policy() == nullptr || |
| calld->retry_committed) { |
| add_closure_for_subchannel_batch(elem, batch, closures); |
| pending_batch_clear(calld, pending); |
| continue; |
| } |
| // Create batch with the right number of callbacks. |
| const bool has_send_ops = batch->send_initial_metadata || |
| batch->send_message || |
| batch->send_trailing_metadata; |
| const int num_callbacks = has_send_ops + batch->recv_initial_metadata + |
| batch->recv_message + |
| batch->recv_trailing_metadata; |
| subchannel_batch_data* batch_data = batch_data_create( |
| elem, num_callbacks, has_send_ops /* set_on_complete */); |
| // Cache send ops if needed. |
| maybe_cache_send_ops_for_batch(calld, pending); |
| // send_initial_metadata. |
| if (batch->send_initial_metadata) { |
| add_retriable_send_initial_metadata_op(calld, retry_state, batch_data); |
| } |
| // send_message. |
| if (batch->send_message) { |
| add_retriable_send_message_op(elem, retry_state, batch_data); |
| } |
| // send_trailing_metadata. |
| if (batch->send_trailing_metadata) { |
| add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data); |
| } |
| // recv_initial_metadata. |
| if (batch->recv_initial_metadata) { |
| // recv_flags is only used on the server side. |
| GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); |
| add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data); |
| } |
| // recv_message. |
| if (batch->recv_message) { |
| add_retriable_recv_message_op(calld, retry_state, batch_data); |
| } |
| // recv_trailing_metadata. |
| if (batch->recv_trailing_metadata) { |
| add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); |
| } |
| add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); |
| // Track number of pending subchannel send batches. |
| // If this is the first one, take a ref to the call stack. |
| if (batch->send_initial_metadata || batch->send_message || |
| batch->send_trailing_metadata) { |
| if (calld->num_pending_retriable_subchannel_send_batches == 0) { |
| GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); |
| } |
| ++calld->num_pending_retriable_subchannel_send_batches; |
| } |
| } |
| } |
| |
| // Constructs and starts whatever subchannel batches are needed on the |
| // subchannel call. |
| static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", |
| chand, calld); |
| } |
| subchannel_call_retry_state* retry_state = |
| static_cast<subchannel_call_retry_state*>( |
| grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)); |
| // Construct list of closures to execute, one for each pending batch. |
| grpc_core::CallCombinerClosureList closures; |
| // Replay previously-returned send_* ops if needed. |
| subchannel_batch_data* replay_batch_data = |
| maybe_create_subchannel_batch_for_replay(elem, retry_state); |
| if (replay_batch_data != nullptr) { |
| add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, |
| &closures); |
| // Track number of pending subchannel send batches. |
| // If this is the first one, take a ref to the call stack. |
| if (calld->num_pending_retriable_subchannel_send_batches == 0) { |
| GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); |
| } |
| ++calld->num_pending_retriable_subchannel_send_batches; |
| } |
| // Now add pending batches. |
| add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); |
| // Start batches on subchannel call. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting %" PRIuPTR |
| " retriable batches on subchannel_call=%p", |
| chand, calld, closures.size(), calld->subchannel_call); |
| } |
| // Note: This will yield the call combiner. |
| closures.RunClosures(calld->call_combiner); |
| } |
| |
| // |
| // LB pick |
| // |
| |
| static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| const size_t parent_data_size = |
| calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; |
| const grpc_core::ConnectedSubchannel::CallArgs call_args = { |
| calld->pollent, // pollent |
| calld->path, // path |
| calld->call_start_time, // start_time |
| calld->deadline, // deadline |
| calld->arena, // arena |
| calld->request->pick()->subchannel_call_context, // context |
| calld->call_combiner, // call_combiner |
| parent_data_size // parent_data_size |
| }; |
| grpc_error* new_error = |
| calld->request->pick()->connected_subchannel->CreateCall( |
| call_args, &calld->subchannel_call); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", |
| chand, calld, calld->subchannel_call, grpc_error_string(new_error)); |
| } |
| if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { |
| new_error = grpc_error_add_child(new_error, error); |
| pending_batches_fail(elem, new_error, true /* yield_call_combiner */); |
| } else { |
| if (parent_data_size > 0) { |
| new (grpc_connected_subchannel_call_get_parent_data( |
| calld->subchannel_call)) |
| subchannel_call_retry_state( |
| calld->request->pick()->subchannel_call_context); |
| } |
| pending_batches_resume(elem); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Invoked when a pick is completed, on both success or failure. |
| static void pick_done(void* arg, grpc_error* error) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) { |
| // Failed to create subchannel. |
| // If there was no error, this is an LB policy drop, in which case |
| // we return an error; otherwise, we may retry. |
| grpc_status_code status = GRPC_STATUS_OK; |
| grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, |
| nullptr); |
| if (error == GRPC_ERROR_NONE || !calld->enable_retries || |
| !maybe_retry(elem, nullptr /* batch_data */, status, |
| nullptr /* server_pushback_md */)) { |
| grpc_error* new_error = |
| error == GRPC_ERROR_NONE |
| ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "Call dropped by load balancing policy") |
| : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Failed to create subchannel", &error, 1); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: failed to create subchannel: error=%s", |
| chand, calld, grpc_error_string(new_error)); |
| } |
| pending_batches_fail(elem, new_error, true /* yield_call_combiner */); |
| } |
| } else { |
| /* Create call on subchannel. */ |
| create_subchannel_call(elem, GRPC_ERROR_REF(error)); |
| } |
| } |
| |
| // If the channel is in TRANSIENT_FAILURE and the call is not |
| // wait_for_ready=true, fails the call and returns true. |
| static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; |
| if (chand->request_router->GetConnectivityState() == |
| GRPC_CHANNEL_TRANSIENT_FAILURE && |
| (batch->payload->send_initial_metadata.send_initial_metadata_flags & |
| GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { |
| pending_batches_fail( |
| elem, |
| grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "channel is in state TRANSIENT_FAILURE"), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), |
| true /* yield_call_combiner */); |
| return true; |
| } |
| return false; |
| } |
| |
| // Applies service config to the call. Must be invoked once we know |
| // that the resolver has returned results to the channel. |
| static void apply_service_config_to_call_locked(grpc_call_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", |
| chand, calld); |
| } |
| if (chand->retry_throttle_data != nullptr) { |
| calld->retry_throttle_data = chand->retry_throttle_data->Ref(); |
| } |
| if (chand->method_params_table != nullptr) { |
| calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( |
| *chand->method_params_table, calld->path); |
| if (calld->method_params != nullptr) { |
| // If the deadline from the service config is shorter than the one |
| // from the client API, reset the deadline timer. |
| if (chand->deadline_checking_enabled && |
| calld->method_params->timeout() != 0) { |
| const grpc_millis per_method_deadline = |
| grpc_timespec_to_millis_round_up(calld->call_start_time) + |
| calld->method_params->timeout(); |
| if (per_method_deadline < calld->deadline) { |
| calld->deadline = per_method_deadline; |
| grpc_deadline_state_reset(elem, calld->deadline); |
| } |
| } |
| // If the service config set wait_for_ready and the application |
| // did not explicitly set it, use the value from the service config. |
| uint32_t* send_initial_metadata_flags = |
| &calld->pending_batches[0] |
| .batch->payload->send_initial_metadata |
| .send_initial_metadata_flags; |
| if (GPR_UNLIKELY( |
| calld->method_params->wait_for_ready() != |
| ClientChannelMethodParams::WAIT_FOR_READY_UNSET && |
| !(*send_initial_metadata_flags & |
| GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { |
| if (calld->method_params->wait_for_ready() == |
| ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { |
| *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
| } else { |
| *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
| } |
| } |
| } |
| } |
| // If no retry policy, disable retries. |
| // TODO(roth): Remove this when adding support for transparent retries. |
| if (calld->method_params == nullptr || |
| calld->method_params->retry_policy() == nullptr) { |
| calld->enable_retries = false; |
| } |
| } |
| |
| // Invoked once resolver results are available. |
| static bool maybe_apply_service_config_to_call_locked(void* arg) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| // Only get service config data on the first attempt. |
| if (GPR_LIKELY(calld->num_attempts_completed == 0)) { |
| apply_service_config_to_call_locked(elem); |
| // Check this after applying service config, since it may have |
| // affected the call's wait_for_ready value. |
| if (fail_call_if_in_transient_failure(elem)) return false; |
| } |
| return true; |
| } |
| |
| static void start_pick_locked(void* arg, grpc_error* ignored) { |
| grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| GPR_ASSERT(!calld->have_request); |
| GPR_ASSERT(calld->subchannel_call == nullptr); |
| // Normally, we want to do this check until after we've processed the |
| // service config, so that we can honor the wait_for_ready setting in |
| // the service config. However, if the channel is in TRANSIENT_FAILURE |
| // and we don't have an LB policy at this point, that means that the |
| // resolver has returned a failure, so we're not going to get a service |
| // config right away. In that case, we fail the call now based on the |
| // wait_for_ready value passed in from the application. |
| if (chand->request_router->lb_policy() == nullptr && |
| fail_call_if_in_transient_failure(elem)) { |
| return; |
| } |
| // If this is a retry, use the send_initial_metadata payload that |
| // we've cached; otherwise, use the pending batch. The |
| // send_initial_metadata batch will be the first pending batch in the |
| // list, as set by get_batch_index() above. |
| // TODO(roth): What if the LB policy needs to add something to the |
| // call's initial metadata, and then there's a retry? We don't want |
| // the new metadata to be added twice. We might need to somehow |
| // allocate the subchannel batch earlier so that we can give the |
| // subchannel's copy of the metadata batch (which is copied for each |
| // attempt) to the LB policy instead the one from the parent channel. |
| grpc_metadata_batch* initial_metadata = |
| calld->seen_send_initial_metadata |
| ? &calld->send_initial_metadata |
| : calld->pending_batches[0] |
| .batch->payload->send_initial_metadata.send_initial_metadata; |
| uint32_t* initial_metadata_flags = |
| calld->seen_send_initial_metadata |
| ? &calld->send_initial_metadata_flags |
| : &calld->pending_batches[0] |
| .batch->payload->send_initial_metadata |
| .send_initial_metadata_flags; |
| GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, |
| grpc_schedule_on_exec_ctx); |
| calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent, |
| initial_metadata, initial_metadata_flags, |
| maybe_apply_service_config_to_call_locked, elem, |
| &calld->pick_closure); |
| calld->have_request = true; |
| chand->request_router->RouteCallLocked(calld->request.get()); |
| } |
| |
| // |
| // filter call vtable functions |
| // |
| |
| static void cc_start_transport_stream_op_batch( |
| grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
| GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
| grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); |
| } |
| // If we've previously been cancelled, immediately fail any new batches. |
| if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", |
| chand, calld, grpc_error_string(calld->cancel_error)); |
| } |
| // Note: This will release the call combiner. |
| grpc_transport_stream_op_batch_finish_with_failure( |
| batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); |
| return; |
| } |
| // Handle cancellation. |
| if (GPR_UNLIKELY(batch->cancel_stream)) { |
| // Stash a copy of cancel_error in our call data, so that we can use |
| // it for subsequent operations. This ensures that if the call is |
| // cancelled before any batches are passed down (e.g., if the deadline |
| // is in the past when the call starts), we can return the right |
| // error to the caller when the first batch does get passed down. |
| GRPC_ERROR_UNREF(calld->cancel_error); |
| calld->cancel_error = |
| GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, |
| calld, grpc_error_string(calld->cancel_error)); |
| } |
| // If we do not have a subchannel call (i.e., a pick has not yet |
| // been started), fail all pending batches. Otherwise, send the |
| // cancellation down to the subchannel call. |
| if (calld->subchannel_call == nullptr) { |
| pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error), |
| false /* yield_call_combiner */); |
| // Note: This will release the call combiner. |
| grpc_transport_stream_op_batch_finish_with_failure( |
| batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); |
| } else { |
| // Note: This will release the call combiner. |
| grpc_subchannel_call_process_op(calld->subchannel_call, batch); |
| } |
| return; |
| } |
| // Add the batch to the pending list. |
| pending_batches_add(elem, batch); |
| // Check if we've already gotten a subchannel call. |
| // Note that once we have completed the pick, we do not need to enter |
| // the channel combiner, which is more efficient (especially for |
| // streaming calls). |
| if (calld->subchannel_call != nullptr) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, |
| calld, calld->subchannel_call); |
| } |
| pending_batches_resume(elem); |
| return; |
| } |
| // We do not yet have a subchannel call. |
| // For batches containing a send_initial_metadata op, enter the channel |
| // combiner to start a pick. |
| if (GPR_LIKELY(batch->send_initial_metadata)) { |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", |
| chand, calld); |
| } |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, |
| elem, grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } else { |
| // For all other batches, release the call combiner. |
| if (grpc_client_channel_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "chand=%p calld=%p: saved batch, yielding call combiner", chand, |
| calld); |
| } |
| GRPC_CALL_COMBINER_STOP(calld->call_combiner, |
| "batch does not include send_initial_metadata"); |
| } |
| } |
| |
| /* Constructor for call_data */ |
| static grpc_error* cc_init_call_elem(grpc_call_element* elem, |
| const grpc_call_element_args* args) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| new (elem->call_data) call_data(elem, *chand, *args); |
| return GRPC_ERROR_NONE; |
| } |
| |
| /* Destructor for call_data */ |
| static void cc_destroy_call_elem(grpc_call_element* elem, |
| const grpc_call_final_info* final_info, |
| grpc_closure* then_schedule_closure) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| if (GPR_LIKELY(calld->subchannel_call != nullptr)) { |
| grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, |
| then_schedule_closure); |
| then_schedule_closure = nullptr; |
| } |
| calld->~call_data(); |
| GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); |
| } |
| |
| static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, |
| grpc_polling_entity* pollent) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| calld->pollent = pollent; |
| } |
| |
| /************************************************************************* |
| * EXPORTED SYMBOLS |
| */ |
| |
| const grpc_channel_filter grpc_client_channel_filter = { |
| cc_start_transport_stream_op_batch, |
| cc_start_transport_op, |
| sizeof(call_data), |
| cc_init_call_elem, |
| cc_set_pollset_or_pollset_set, |
| cc_destroy_call_elem, |
| sizeof(channel_data), |
| cc_init_channel_elem, |
| cc_destroy_channel_elem, |
| cc_get_channel_info, |
| "client-channel", |
| }; |
| |
| void grpc_client_channel_set_channelz_node( |
| grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| chand->request_router->set_channelz_node(node); |
| } |
| |
| void grpc_client_channel_populate_child_refs( |
| grpc_channel_element* elem, |
| grpc_core::channelz::ChildRefsList* child_subchannels, |
| grpc_core::channelz::ChildRefsList* child_channels) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| if (chand->request_router->lb_policy() != nullptr) { |
| chand->request_router->lb_policy()->FillChildRefsForChannelz( |
| child_subchannels, child_channels); |
| } |
| } |
| |
| static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { |
| channel_data* chand = static_cast<channel_data*>(arg); |
| chand->request_router->ExitIdleLocked(); |
| GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); |
| } |
| |
| grpc_connectivity_state grpc_client_channel_check_connectivity_state( |
| grpc_channel_element* elem, int try_to_connect) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| grpc_connectivity_state out = chand->request_router->GetConnectivityState(); |
| if (out == GRPC_CHANNEL_IDLE && try_to_connect) { |
| GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, |
| grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } |
| return out; |
| } |
| |
| typedef struct external_connectivity_watcher { |
| channel_data* chand; |
| grpc_polling_entity pollent; |
| grpc_closure* on_complete; |
| grpc_closure* watcher_timer_init; |
| grpc_connectivity_state* state; |
| grpc_closure my_closure; |
| struct external_connectivity_watcher* next; |
| } external_connectivity_watcher; |
| |
| static external_connectivity_watcher* lookup_external_connectivity_watcher( |
| channel_data* chand, grpc_closure* on_complete) { |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| external_connectivity_watcher* w = |
| chand->external_connectivity_watcher_list_head; |
| while (w != nullptr && w->on_complete != on_complete) { |
| w = w->next; |
| } |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| return w; |
| } |
| |
| static void external_connectivity_watcher_list_append( |
| channel_data* chand, external_connectivity_watcher* w) { |
| GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); |
| |
| gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); |
| GPR_ASSERT(!w->next); |
| w->next = chand->external_connectivity_watcher_list_head; |
| chand->external_connectivity_watcher_list_head = w; |
| gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); |
| } |
| |
| static void external_connectivity_watcher_list_remove( |
| channel_data* chand, external_connectivity_watcher* to_remove) { |
| GPR_ASSERT( |
| lookup_external_connectivity_watcher(chand, to_remove->on_complete)); |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| if (to_remove == chand->external_connectivity_watcher_list_head) { |
| chand->external_connectivity_watcher_list_head = to_remove->next; |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| return; |
| } |
| external_connectivity_watcher* w = |
| chand->external_connectivity_watcher_list_head; |
| while (w != nullptr) { |
| if (w->next == to_remove) { |
| w->next = w->next->next; |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| return; |
| } |
| w = w->next; |
| } |
| GPR_UNREACHABLE_CODE(return ); |
| } |
| |
| int grpc_client_channel_num_external_connectivity_watchers( |
| grpc_channel_element* elem) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| int count = 0; |
| |
| gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
| external_connectivity_watcher* w = |
| chand->external_connectivity_watcher_list_head; |
| while (w != nullptr) { |
| count++; |
| w = w->next; |
| } |
| gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
| |
| return count; |
| } |
| |
| static void on_external_watch_complete_locked(void* arg, grpc_error* error) { |
| external_connectivity_watcher* w = |
| static_cast<external_connectivity_watcher*>(arg); |
| grpc_closure* follow_up = w->on_complete; |
| grpc_polling_entity_del_from_pollset_set(&w->pollent, |
| w->chand->interested_parties); |
| GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, |
| "external_connectivity_watcher"); |
| external_connectivity_watcher_list_remove(w->chand, w); |
| gpr_free(w); |
| GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); |
| } |
| |
| static void watch_connectivity_state_locked(void* arg, |
| grpc_error* error_ignored) { |
| external_connectivity_watcher* w = |
| static_cast<external_connectivity_watcher*>(arg); |
| external_connectivity_watcher* found = nullptr; |
| if (w->state != nullptr) { |
| external_connectivity_watcher_list_append(w->chand, w); |
| // An assumption is being made that the closure is scheduled on the exec ctx |
| // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately. |
| GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); |
| GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, |
| grpc_combiner_scheduler(w->chand->combiner)); |
| w->chand->request_router->NotifyOnConnectivityStateChange(w->state, |
| &w->my_closure); |
| } else { |
| GPR_ASSERT(w->watcher_timer_init == nullptr); |
| found = lookup_external_connectivity_watcher(w->chand, w->on_complete); |
| if (found) { |
| GPR_ASSERT(found->on_complete == w->on_complete); |
| found->chand->request_router->NotifyOnConnectivityStateChange( |
| nullptr, &found->my_closure); |
| } |
| grpc_polling_entity_del_from_pollset_set(&w->pollent, |
| w->chand->interested_parties); |
| GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, |
| "external_connectivity_watcher"); |
| gpr_free(w); |
| } |
| } |
| |
| void grpc_client_channel_watch_connectivity_state( |
| grpc_channel_element* elem, grpc_polling_entity pollent, |
| grpc_connectivity_state* state, grpc_closure* closure, |
| grpc_closure* watcher_timer_init) { |
| channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
| external_connectivity_watcher* w = |
| static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w))); |
| w->chand = chand; |
| w->pollent = pollent; |
| w->on_complete = closure; |
| w->state = state; |
| w->watcher_timer_init = watcher_timer_init; |
| grpc_polling_entity_add_to_pollset_set(&w->pollent, |
| chand->interested_parties); |
| GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, |
| "external_connectivity_watcher"); |
| GRPC_CLOSURE_SCHED( |
| GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, |
| grpc_combiner_scheduler(chand->combiner)), |
| GRPC_ERROR_NONE); |
| } |
| |
| grpc_subchannel_call* grpc_client_channel_get_subchannel_call( |
| grpc_call_element* elem) { |
| call_data* calld = static_cast<call_data*>(elem->call_data); |
| return calld->subchannel_call; |
| } |