| /* |
| * |
| * Copyright 2016 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| /// Implementation of the gRPC LB policy. |
| /// |
| /// This policy takes as input a list of resolved addresses, which must |
| /// include at least one balancer address. |
| /// |
| /// An internal channel (\a lb_channel_) is created for the addresses |
| /// from that are balancers. This channel behaves just like a regular |
| /// channel that uses pick_first to select from the list of balancer |
| /// addresses. |
| /// |
| /// The first time the policy gets a request for a pick, a ping, or to exit |
| /// the idle state, \a StartPickingLocked() is called. This method is |
| /// responsible for instantiating the internal *streaming* call to the LB |
| /// server (whichever address pick_first chose). The call will be complete |
| /// when either the balancer sends status or when we cancel the call (e.g., |
| /// because we are shutting down). In needed, we retry the call. If we |
| /// received at least one valid message from the server, a new call attempt |
| /// will be made immediately; otherwise, we apply back-off delays between |
| /// attempts. |
| /// |
| /// We maintain an internal round_robin policy instance for distributing |
| /// requests across backends. Whenever we receive a new serverlist from |
| /// the balancer, we update the round_robin policy with the new list of |
| /// addresses. If we cannot communicate with the balancer on startup, |
| /// however, we may enter fallback mode, in which case we will populate |
| /// the RR policy's addresses from the backend addresses returned by the |
| /// resolver. |
| /// |
| /// Once an RR policy instance is in place (and getting updated as described), |
| /// calls for a pick, a ping, or a cancellation will be serviced right |
| /// away by forwarding them to the RR instance. Any time there's no RR |
| /// policy available (i.e., right after the creation of the gRPCLB policy), |
| /// pick and ping requests are added to a list of pending picks and pings |
| /// to be flushed and serviced when the RR policy instance becomes available. |
| /// |
| /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the |
| /// high level design and details. |
| |
| // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when |
| // using that endpoint. Because of various transitive includes in uv.h, |
| // including windows.h on Windows, uv.h must be included before other system |
| // headers. Therefore, sockaddr.h must always be included first. |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/lib/iomgr/sockaddr.h" |
| #include "src/core/lib/iomgr/socket_utils.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <string.h> |
| |
| #include <grpc/byte_buffer_reader.h> |
| #include <grpc/grpc.h> |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/time.h> |
| |
| #include "src/core/ext/filters/client_channel/client_channel.h" |
| #include "src/core/ext/filters/client_channel/client_channel_factory.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/parse_address.h" |
| #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" |
| #include "src/core/ext/filters/client_channel/server_address.h" |
| #include "src/core/ext/filters/client_channel/subchannel_index.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/channel_stack.h" |
| #include "src/core/lib/gpr/host_port.h" |
| #include "src/core/lib/gpr/string.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/gprpp/memory.h" |
| #include "src/core/lib/gprpp/mutex_lock.h" |
| #include "src/core/lib/gprpp/orphanable.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/sockaddr.h" |
| #include "src/core/lib/iomgr/sockaddr_utils.h" |
| #include "src/core/lib/iomgr/timer.h" |
| #include "src/core/lib/slice/slice_hash_table.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/slice/slice_string_helpers.h" |
| #include "src/core/lib/surface/call.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/surface/channel_init.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| |
| #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
| #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
| #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 |
| #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 |
| #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 |
| |
| #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token" |
| |
| namespace grpc_core { |
| |
| TraceFlag grpc_lb_glb_trace(false, "glb"); |
| |
| namespace { |
| |
| constexpr char kGrpclb[] = "grpclb"; |
| |
| class GrpcLb : public LoadBalancingPolicy { |
| public: |
| explicit GrpcLb(const Args& args); |
| |
| const char* name() const override { return kGrpclb; } |
| |
| void UpdateLocked(const grpc_channel_args& args, |
| grpc_json* lb_config) override; |
| bool PickLocked(PickState* pick, grpc_error** error) override; |
| void CancelPickLocked(PickState* pick, grpc_error* error) override; |
| void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, |
| uint32_t initial_metadata_flags_eq, |
| grpc_error* error) override; |
| void NotifyOnStateChangeLocked(grpc_connectivity_state* state, |
| grpc_closure* closure) override; |
| grpc_connectivity_state CheckConnectivityLocked( |
| grpc_error** connectivity_error) override; |
| void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; |
| void ExitIdleLocked() override; |
| void ResetBackoffLocked() override; |
| void FillChildRefsForChannelz( |
| channelz::ChildRefsList* child_subchannels, |
| channelz::ChildRefsList* child_channels) override; |
| |
| private: |
| /// Linked list of pending pick requests. It stores all information needed to |
| /// eventually call (Round Robin's) pick() on them. They mainly stay pending |
| /// waiting for the RR policy to be created. |
| /// |
| /// Note that when a pick is sent to the RR policy, we inject our own |
| /// on_complete callback, so that we can intercept the result before |
| /// invoking the original on_complete callback. This allows us to set the |
| /// LB token metadata and add client_stats to the call context. |
| /// See \a pending_pick_complete() for details. |
| struct PendingPick { |
| // The grpclb instance that created the wrapping. This instance is not |
| // owned; reference counts are untouched. It's used only for logging |
| // purposes. |
| GrpcLb* grpclb_policy; |
| // The original pick. |
| PickState* pick; |
| // Our on_complete closure and the original one. |
| grpc_closure on_complete; |
| grpc_closure* original_on_complete; |
| // Stats for client-side load reporting. |
| RefCountedPtr<GrpcLbClientStats> client_stats; |
| // Next pending pick. |
| PendingPick* next = nullptr; |
| }; |
| |
| /// Contains a call to the LB server and all the data related to the call. |
| class BalancerCallState : public InternallyRefCounted<BalancerCallState> { |
| public: |
| explicit BalancerCallState( |
| RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy); |
| |
| // It's the caller's responsibility to ensure that Orphan() is called from |
| // inside the combiner. |
| void Orphan() override; |
| |
| void StartQuery(); |
| |
| GrpcLbClientStats* client_stats() const { return client_stats_.get(); } |
| |
| bool seen_initial_response() const { return seen_initial_response_; } |
| |
| private: |
| // So Delete() can access our private dtor. |
| template <typename T> |
| friend void grpc_core::Delete(T*); |
| |
| ~BalancerCallState(); |
| |
| GrpcLb* grpclb_policy() const { |
| return static_cast<GrpcLb*>(grpclb_policy_.get()); |
| } |
| |
| void ScheduleNextClientLoadReportLocked(); |
| void SendClientLoadReportLocked(); |
| |
| static bool LoadReportCountersAreZero(grpc_grpclb_request* request); |
| |
| static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); |
| static void ClientLoadReportDoneLocked(void* arg, grpc_error* error); |
| static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
| static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); |
| static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); |
| |
| // The owning LB policy. |
| RefCountedPtr<LoadBalancingPolicy> grpclb_policy_; |
| |
| // The streaming call to the LB server. Always non-NULL. |
| grpc_call* lb_call_ = nullptr; |
| |
| // recv_initial_metadata |
| grpc_metadata_array lb_initial_metadata_recv_; |
| |
| // send_message |
| grpc_byte_buffer* send_message_payload_ = nullptr; |
| grpc_closure lb_on_initial_request_sent_; |
| |
| // recv_message |
| grpc_byte_buffer* recv_message_payload_ = nullptr; |
| grpc_closure lb_on_balancer_message_received_; |
| bool seen_initial_response_ = false; |
| |
| // recv_trailing_metadata |
| grpc_closure lb_on_balancer_status_received_; |
| grpc_metadata_array lb_trailing_metadata_recv_; |
| grpc_status_code lb_call_status_; |
| grpc_slice lb_call_status_details_; |
| |
| // The stats for client-side load reporting associated with this LB call. |
| // Created after the first serverlist is received. |
| RefCountedPtr<GrpcLbClientStats> client_stats_; |
| grpc_millis client_stats_report_interval_ = 0; |
| grpc_timer client_load_report_timer_; |
| bool client_load_report_timer_callback_pending_ = false; |
| bool last_client_load_report_counters_were_zero_ = false; |
| bool client_load_report_is_due_ = false; |
| // The closure used for either the load report timer or the callback for |
| // completion of sending the load report. |
| grpc_closure client_load_report_closure_; |
| }; |
| |
| ~GrpcLb(); |
| |
| void ShutdownLocked() override; |
| |
| // Helper function used in ctor and UpdateLocked(). |
| void ProcessChannelArgsLocked(const grpc_channel_args& args); |
| |
| // Methods for dealing with the balancer channel and call. |
| void StartPickingLocked(); |
| void StartBalancerCallLocked(); |
| static void OnFallbackTimerLocked(void* arg, grpc_error* error); |
| void StartBalancerCallRetryTimerLocked(); |
| static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); |
| static void OnBalancerChannelConnectivityChangedLocked(void* arg, |
| grpc_error* error); |
| |
| // Pending pick methods. |
| static void PendingPickSetMetadataAndContext(PendingPick* pp); |
| PendingPick* PendingPickCreate(PickState* pick); |
| void AddPendingPick(PendingPick* pp); |
| static void OnPendingPickComplete(void* arg, grpc_error* error); |
| |
| // Methods for dealing with the RR policy. |
| void CreateOrUpdateRoundRobinPolicyLocked(); |
| grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); |
| void CreateRoundRobinPolicyLocked(const Args& args); |
| bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, |
| grpc_error** error); |
| void UpdateConnectivityStateFromRoundRobinPolicyLocked( |
| grpc_error* rr_state_error); |
| static void OnRoundRobinConnectivityChangedLocked(void* arg, |
| grpc_error* error); |
| static void OnRoundRobinRequestReresolutionLocked(void* arg, |
| grpc_error* error); |
| |
| // Who the client is trying to communicate with. |
| const char* server_name_ = nullptr; |
| |
| // Current channel args from the resolver. |
| grpc_channel_args* args_ = nullptr; |
| |
| // Internal state. |
| bool started_picking_ = false; |
| bool shutting_down_ = false; |
| grpc_connectivity_state_tracker state_tracker_; |
| |
| // The channel for communicating with the LB server. |
| grpc_channel* lb_channel_ = nullptr; |
| // Mutex to protect the channel to the LB server. This is used when |
| // processing a channelz request. |
| gpr_mu lb_channel_mu_; |
| grpc_connectivity_state lb_channel_connectivity_; |
| grpc_closure lb_channel_on_connectivity_changed_; |
| // Are we already watching the LB channel's connectivity? |
| bool watching_lb_channel_ = false; |
| // Response generator to inject address updates into lb_channel_. |
| RefCountedPtr<FakeResolverResponseGenerator> response_generator_; |
| |
| // The data associated with the current LB call. It holds a ref to this LB |
| // policy. It's initialized every time we query for backends. It's reset to |
| // NULL whenever the current LB call is no longer needed (e.g., the LB policy |
| // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always |
| // contains a non-NULL lb_call_. |
| OrphanablePtr<BalancerCallState> lb_calld_; |
| // Timeout in milliseconds for the LB call. 0 means no deadline. |
| int lb_call_timeout_ms_ = 0; |
| // Balancer call retry state. |
| BackOff lb_call_backoff_; |
| bool retry_timer_callback_pending_ = false; |
| grpc_timer lb_call_retry_timer_; |
| grpc_closure lb_on_call_retry_; |
| |
| // The deserialized response from the balancer. May be nullptr until one |
| // such response has arrived. |
| grpc_grpclb_serverlist* serverlist_ = nullptr; |
| // Index into serverlist for next pick. |
| // If the server at this index is a drop, we return a drop. |
| // Otherwise, we delegate to the RR policy. |
| size_t serverlist_index_ = 0; |
| |
| // Timeout in milliseconds for before using fallback backend addresses. |
| // 0 means not using fallback. |
| int lb_fallback_timeout_ms_ = 0; |
| // The backend addresses from the resolver. |
| UniquePtr<ServerAddressList> fallback_backend_addresses_; |
| // Fallback timer. |
| bool fallback_timer_callback_pending_ = false; |
| grpc_timer lb_fallback_timer_; |
| grpc_closure lb_on_fallback_; |
| |
| // Pending picks that are waiting on the RR policy's connectivity. |
| PendingPick* pending_picks_ = nullptr; |
| |
| // The RR policy to use for the backends. |
| OrphanablePtr<LoadBalancingPolicy> rr_policy_; |
| grpc_connectivity_state rr_connectivity_state_; |
| grpc_closure on_rr_connectivity_changed_; |
| grpc_closure on_rr_request_reresolution_; |
| }; |
| |
| // |
| // serverlist parsing code |
| // |
| |
| // vtable for LB token channel arg. |
| void* lb_token_copy(void* token) { |
| return token == nullptr |
| ? nullptr |
| : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; |
| } |
| void lb_token_destroy(void* token) { |
| if (token != nullptr) { |
| GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); |
| } |
| } |
| int lb_token_cmp(void* token1, void* token2) { |
| // Always indicate a match, since we don't want this channel arg to |
| // affect the subchannel's key in the index. |
| return 0; |
| } |
| const grpc_arg_pointer_vtable lb_token_arg_vtable = { |
| lb_token_copy, lb_token_destroy, lb_token_cmp}; |
| |
| bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { |
| if (server->drop) return false; |
| const grpc_grpclb_ip_address* ip = &server->ip_address; |
| if (GPR_UNLIKELY(server->port >> 16 != 0)) { |
| if (log) { |
| gpr_log(GPR_ERROR, |
| "Invalid port '%d' at index %lu of serverlist. Ignoring.", |
| server->port, (unsigned long)idx); |
| } |
| return false; |
| } |
| if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) { |
| if (log) { |
| gpr_log(GPR_ERROR, |
| "Expected IP to be 4 or 16 bytes, got %d at index %lu of " |
| "serverlist. Ignoring", |
| ip->size, (unsigned long)idx); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| void ParseServer(const grpc_grpclb_server* server, |
| grpc_resolved_address* addr) { |
| memset(addr, 0, sizeof(*addr)); |
| if (server->drop) return; |
| const uint16_t netorder_port = grpc_htons((uint16_t)server->port); |
| /* the addresses are given in binary format (a in(6)_addr struct) in |
| * server->ip_address.bytes. */ |
| const grpc_grpclb_ip_address* ip = &server->ip_address; |
| if (ip->size == 4) { |
| addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in)); |
| grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr); |
| addr4->sin_family = GRPC_AF_INET; |
| memcpy(&addr4->sin_addr, ip->bytes, ip->size); |
| addr4->sin_port = netorder_port; |
| } else if (ip->size == 16) { |
| addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6)); |
| grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr; |
| addr6->sin6_family = GRPC_AF_INET6; |
| memcpy(&addr6->sin6_addr, ip->bytes, ip->size); |
| addr6->sin6_port = netorder_port; |
| } |
| } |
| |
| // Returns addresses extracted from \a serverlist. |
| ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { |
| ServerAddressList addresses; |
| for (size_t i = 0; i < serverlist->num_servers; ++i) { |
| const grpc_grpclb_server* server = serverlist->servers[i]; |
| if (!IsServerValid(serverlist->servers[i], i, false)) continue; |
| // Address processing. |
| grpc_resolved_address addr; |
| ParseServer(server, &addr); |
| // LB token processing. |
| grpc_mdelem lb_token; |
| if (server->has_load_balance_token) { |
| const size_t lb_token_max_length = |
| GPR_ARRAY_SIZE(server->load_balance_token); |
| const size_t lb_token_length = |
| strnlen(server->load_balance_token, lb_token_max_length); |
| grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( |
| server->load_balance_token, lb_token_length); |
| lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); |
| } else { |
| char* uri = grpc_sockaddr_to_uri(&addr); |
| gpr_log(GPR_INFO, |
| "Missing LB token for backend address '%s'. The empty token will " |
| "be used instead", |
| uri); |
| gpr_free(uri); |
| lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY; |
| } |
| // Add address. |
| grpc_arg arg = grpc_channel_arg_pointer_create( |
| const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), |
| (void*)lb_token.payload, &lb_token_arg_vtable); |
| grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); |
| addresses.emplace_back(addr, args); |
| // Clean up. |
| GRPC_MDELEM_UNREF(lb_token); |
| } |
| return addresses; |
| } |
| |
| // |
| // GrpcLb::BalancerCallState |
| // |
| |
| GrpcLb::BalancerCallState::BalancerCallState( |
| RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy) |
| : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace), |
| grpclb_policy_(std::move(parent_grpclb_policy)) { |
| GPR_ASSERT(grpclb_policy_ != nullptr); |
| GPR_ASSERT(!grpclb_policy()->shutting_down_); |
| // Init the LB call. Note that the LB call will progress every time there's |
| // activity in grpclb_policy_->interested_parties(), which is comprised of |
| // the polling entities from client_channel. |
| GPR_ASSERT(grpclb_policy()->server_name_ != nullptr); |
| GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0'); |
| const grpc_millis deadline = |
| grpclb_policy()->lb_call_timeout_ms_ == 0 |
| ? GRPC_MILLIS_INF_FUTURE |
| : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_; |
| lb_call_ = grpc_channel_create_pollset_set_call( |
| grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, |
| grpclb_policy_->interested_parties(), |
| GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, |
| nullptr, deadline, nullptr); |
| // Init the LB call request payload. |
| grpc_grpclb_request* request = |
| grpc_grpclb_request_create(grpclb_policy()->server_name_); |
| grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); |
| send_message_payload_ = |
| grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
| grpc_slice_unref_internal(request_payload_slice); |
| grpc_grpclb_request_destroy(request); |
| // Init other data associated with the LB call. |
| grpc_metadata_array_init(&lb_initial_metadata_recv_); |
| grpc_metadata_array_init(&lb_trailing_metadata_recv_); |
| GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked, |
| this, grpc_combiner_scheduler(grpclb_policy()->combiner())); |
| GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, |
| OnBalancerMessageReceivedLocked, this, |
| grpc_combiner_scheduler(grpclb_policy()->combiner())); |
| GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, |
| OnBalancerStatusReceivedLocked, this, |
| grpc_combiner_scheduler(grpclb_policy()->combiner())); |
| } |
| |
| GrpcLb::BalancerCallState::~BalancerCallState() { |
| GPR_ASSERT(lb_call_ != nullptr); |
| grpc_call_unref(lb_call_); |
| grpc_metadata_array_destroy(&lb_initial_metadata_recv_); |
| grpc_metadata_array_destroy(&lb_trailing_metadata_recv_); |
| grpc_byte_buffer_destroy(send_message_payload_); |
| grpc_byte_buffer_destroy(recv_message_payload_); |
| grpc_slice_unref_internal(lb_call_status_details_); |
| } |
| |
| void GrpcLb::BalancerCallState::Orphan() { |
| GPR_ASSERT(lb_call_ != nullptr); |
| // If we are here because grpclb_policy wants to cancel the call, |
| // lb_on_balancer_status_received_ will complete the cancellation and clean |
| // up. Otherwise, we are here because grpclb_policy has to orphan a failed |
| // call, then the following cancellation will be a no-op. |
| grpc_call_cancel(lb_call_, nullptr); |
| if (client_load_report_timer_callback_pending_) { |
| grpc_timer_cancel(&client_load_report_timer_); |
| } |
| // Note that the initial ref is hold by lb_on_balancer_status_received_ |
| // instead of the caller of this function. So the corresponding unref happens |
| // in lb_on_balancer_status_received_ instead of here. |
| } |
| |
| void GrpcLb::BalancerCallState::StartQuery() { |
| GPR_ASSERT(lb_call_ != nullptr); |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p", |
| grpclb_policy_.get(), this, lb_call_); |
| } |
| // Create the ops. |
| grpc_call_error call_error; |
| grpc_op ops[3]; |
| memset(ops, 0, sizeof(ops)); |
| // Op: send initial metadata. |
| grpc_op* op = ops; |
| op->op = GRPC_OP_SEND_INITIAL_METADATA; |
| op->data.send_initial_metadata.count = 0; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // Op: send request message. |
| GPR_ASSERT(send_message_payload_ != nullptr); |
| op->op = GRPC_OP_SEND_MESSAGE; |
| op->data.send_message.send_message = send_message_payload_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // TODO(roth): We currently track this ref manually. Once the |
| // ClosureRef API is ready, we should pass the RefCountedPtr<> along |
| // with the callback. |
| auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent"); |
| self.release(); |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| // Op: recv initial metadata. |
| op = ops; |
| op->op = GRPC_OP_RECV_INITIAL_METADATA; |
| op->data.recv_initial_metadata.recv_initial_metadata = |
| &lb_initial_metadata_recv_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // Op: recv response. |
| op->op = GRPC_OP_RECV_MESSAGE; |
| op->data.recv_message.recv_message = &recv_message_payload_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // TODO(roth): We currently track this ref manually. Once the |
| // ClosureRef API is ready, we should pass the RefCountedPtr<> along |
| // with the callback. |
| self = Ref(DEBUG_LOCATION, "on_message_received"); |
| self.release(); |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| // Op: recv server status. |
| op = ops; |
| op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
| op->data.recv_status_on_client.trailing_metadata = |
| &lb_trailing_metadata_recv_; |
| op->data.recv_status_on_client.status = &lb_call_status_; |
| op->data.recv_status_on_client.status_details = &lb_call_status_details_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // This callback signals the end of the LB call, so it relies on the initial |
| // ref instead of a new ref. When it's invoked, it's the initial ref that is |
| // unreffed. |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| }; |
| |
| void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { |
| const grpc_millis next_client_load_report_time = |
| ExecCtx::Get()->Now() + client_stats_report_interval_; |
| GRPC_CLOSURE_INIT(&client_load_report_closure_, |
| MaybeSendClientLoadReportLocked, this, |
| grpc_combiner_scheduler(grpclb_policy()->combiner())); |
| grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, |
| &client_load_report_closure_); |
| client_load_report_timer_callback_pending_ = true; |
| } |
| |
| void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( |
| void* arg, grpc_error* error) { |
| BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
| GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); |
| lb_calld->client_load_report_timer_callback_pending_ = false; |
| if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) { |
| lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); |
| return; |
| } |
| // If we've already sent the initial request, then we can go ahead and send |
| // the load report. Otherwise, we need to wait until the initial request has |
| // been sent to send this (see OnInitialRequestSentLocked()). |
| if (lb_calld->send_message_payload_ == nullptr) { |
| lb_calld->SendClientLoadReportLocked(); |
| } else { |
| lb_calld->client_load_report_is_due_ = true; |
| } |
| } |
| |
| bool GrpcLb::BalancerCallState::LoadReportCountersAreZero( |
| grpc_grpclb_request* request) { |
| GrpcLbClientStats::DroppedCallCounts* drop_entries = |
| static_cast<GrpcLbClientStats::DroppedCallCounts*>( |
| request->client_stats.calls_finished_with_drop.arg); |
| return request->client_stats.num_calls_started == 0 && |
| request->client_stats.num_calls_finished == 0 && |
| request->client_stats.num_calls_finished_with_client_failed_to_send == |
| 0 && |
| request->client_stats.num_calls_finished_known_received == 0 && |
| (drop_entries == nullptr || drop_entries->size() == 0); |
| } |
| |
| void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { |
| // Construct message payload. |
| GPR_ASSERT(send_message_payload_ == nullptr); |
| grpc_grpclb_request* request = |
| grpc_grpclb_load_report_request_create_locked(client_stats_.get()); |
| // Skip client load report if the counters were all zero in the last |
| // report and they are still zero in this one. |
| if (LoadReportCountersAreZero(request)) { |
| if (last_client_load_report_counters_were_zero_) { |
| grpc_grpclb_request_destroy(request); |
| ScheduleNextClientLoadReportLocked(); |
| return; |
| } |
| last_client_load_report_counters_were_zero_ = true; |
| } else { |
| last_client_load_report_counters_were_zero_ = false; |
| } |
| grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); |
| send_message_payload_ = |
| grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
| grpc_slice_unref_internal(request_payload_slice); |
| grpc_grpclb_request_destroy(request); |
| // Send the report. |
| grpc_op op; |
| memset(&op, 0, sizeof(op)); |
| op.op = GRPC_OP_SEND_MESSAGE; |
| op.data.send_message.send_message = send_message_payload_; |
| GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked, |
| this, grpc_combiner_scheduler(grpclb_policy()->combiner())); |
| grpc_call_error call_error = grpc_call_start_batch_and_execute( |
| lb_call_, &op, 1, &client_load_report_closure_); |
| if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { |
| gpr_log(GPR_ERROR, |
| "[grpclb %p] lb_calld=%p call_error=%d sending client load report", |
| grpclb_policy_.get(), this, call_error); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } |
| } |
| |
| void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, |
| grpc_error* error) { |
| BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
| GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); |
| grpc_byte_buffer_destroy(lb_calld->send_message_payload_); |
| lb_calld->send_message_payload_ = nullptr; |
| if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) { |
| lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); |
| return; |
| } |
| lb_calld->ScheduleNextClientLoadReportLocked(); |
| } |
| |
| void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, |
| grpc_error* error) { |
| BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
| grpc_byte_buffer_destroy(lb_calld->send_message_payload_); |
| lb_calld->send_message_payload_ = nullptr; |
| // If we attempted to send a client load report before the initial request was |
| // sent (and this lb_calld is still in use), send the load report now. |
| if (lb_calld->client_load_report_is_due_ && |
| lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) { |
| lb_calld->SendClientLoadReportLocked(); |
| lb_calld->client_load_report_is_due_ = false; |
| } |
| lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent"); |
| } |
| |
| void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
| void* arg, grpc_error* error) { |
| BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
| GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); |
| // Null payload means the LB call was cancelled. |
| if (lb_calld != grpclb_policy->lb_calld_.get() || |
| lb_calld->recv_message_payload_ == nullptr) { |
| lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); |
| return; |
| } |
| grpc_byte_buffer_reader bbr; |
| grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_); |
| grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
| grpc_byte_buffer_reader_destroy(&bbr); |
| grpc_byte_buffer_destroy(lb_calld->recv_message_payload_); |
| lb_calld->recv_message_payload_ = nullptr; |
| grpc_grpclb_initial_response* initial_response; |
| grpc_grpclb_serverlist* serverlist; |
| if (!lb_calld->seen_initial_response_ && |
| (initial_response = grpc_grpclb_initial_response_parse(response_slice)) != |
| nullptr) { |
| // Have NOT seen initial response, look for initial response. |
| if (initial_response->has_client_stats_report_interval) { |
| lb_calld->client_stats_report_interval_ = GPR_MAX( |
| GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( |
| &initial_response->client_stats_report_interval)); |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] lb_calld=%p: Received initial LB response " |
| "message; client load reporting interval = %" PRId64 |
| " milliseconds", |
| grpclb_policy, lb_calld, |
| lb_calld->client_stats_report_interval_); |
| } |
| } else if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] lb_calld=%p: Received initial LB response message; " |
| "client load reporting NOT enabled", |
| grpclb_policy, lb_calld); |
| } |
| grpc_grpclb_initial_response_destroy(initial_response); |
| lb_calld->seen_initial_response_ = true; |
| } else if ((serverlist = grpc_grpclb_response_parse_serverlist( |
| response_slice)) != nullptr) { |
| // Have seen initial response, look for serverlist. |
| GPR_ASSERT(lb_calld->lb_call_ != nullptr); |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR |
| " servers received", |
| grpclb_policy, lb_calld, serverlist->num_servers); |
| for (size_t i = 0; i < serverlist->num_servers; ++i) { |
| grpc_resolved_address addr; |
| ParseServer(serverlist->servers[i], &addr); |
| char* ipport; |
| grpc_sockaddr_to_string(&ipport, &addr, false); |
| gpr_log(GPR_INFO, |
| "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s", |
| grpclb_policy, lb_calld, i, ipport); |
| gpr_free(ipport); |
| } |
| } |
| // Start sending client load report only after we start using the |
| // serverlist returned from the current LB call. |
| if (lb_calld->client_stats_report_interval_ > 0 && |
| lb_calld->client_stats_ == nullptr) { |
| lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); |
| // TODO(roth): We currently track this ref manually. Once the |
| // ClosureRef API is ready, we should pass the RefCountedPtr<> along |
| // with the callback. |
| auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); |
| self.release(); |
| lb_calld->ScheduleNextClientLoadReportLocked(); |
| } |
| // Check if the serverlist differs from the previous one. |
| if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] lb_calld=%p: Incoming server list identical to " |
| "current, ignoring.", |
| grpclb_policy, lb_calld); |
| } |
| grpc_grpclb_destroy_serverlist(serverlist); |
| } else { // New serverlist. |
| if (grpclb_policy->serverlist_ != nullptr) { |
| // Dispose of the old serverlist. |
| grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); |
| } else { |
| // Dispose of the fallback. |
| grpclb_policy->fallback_backend_addresses_.reset(); |
| if (grpclb_policy->fallback_timer_callback_pending_) { |
| grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); |
| } |
| } |
| // Update the serverlist in the GrpcLb instance. This serverlist |
| // instance will be destroyed either upon the next update or when the |
| // GrpcLb instance is destroyed. |
| grpclb_policy->serverlist_ = serverlist; |
| grpclb_policy->serverlist_index_ = 0; |
| grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); |
| } |
| } else { |
| // No valid initial response or serverlist found. |
| char* response_slice_str = |
| grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); |
| gpr_log(GPR_ERROR, |
| "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. " |
| "Ignoring.", |
| grpclb_policy, lb_calld, response_slice_str); |
| gpr_free(response_slice_str); |
| } |
| grpc_slice_unref_internal(response_slice); |
| if (!grpclb_policy->shutting_down_) { |
| // Keep listening for serverlist updates. |
| grpc_op op; |
| memset(&op, 0, sizeof(op)); |
| op.op = GRPC_OP_RECV_MESSAGE; |
| op.data.recv_message.recv_message = &lb_calld->recv_message_payload_; |
| op.flags = 0; |
| op.reserved = nullptr; |
| // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery(). |
| const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
| lb_calld->lb_call_, &op, 1, |
| &lb_calld->lb_on_balancer_message_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } else { |
| lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); |
| } |
| } |
| |
| void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
| void* arg, grpc_error* error) { |
| BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
| GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); |
| GPR_ASSERT(lb_calld->lb_call_ != nullptr); |
| if (grpc_lb_glb_trace.enabled()) { |
| char* status_details = |
| grpc_slice_to_c_string(lb_calld->lb_call_status_details_); |
| gpr_log(GPR_INFO, |
| "[grpclb %p] lb_calld=%p: Status from LB server received. " |
| "Status = %d, details = '%s', (lb_call: %p), error '%s'", |
| grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details, |
| lb_calld->lb_call_, grpc_error_string(error)); |
| gpr_free(status_details); |
| } |
| grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE); |
| // If this lb_calld is still in use, this call ended because of a failure so |
| // we want to retry connecting. Otherwise, we have deliberately ended this |
| // call and no further action is required. |
| if (lb_calld == grpclb_policy->lb_calld_.get()) { |
| grpclb_policy->lb_calld_.reset(); |
| GPR_ASSERT(!grpclb_policy->shutting_down_); |
| if (lb_calld->seen_initial_response_) { |
| // If we lose connection to the LB server, reset the backoff and restart |
| // the LB call immediately. |
| grpclb_policy->lb_call_backoff_.Reset(); |
| grpclb_policy->StartBalancerCallLocked(); |
| } else { |
| // If this LB call fails establishing any connection to the LB server, |
| // retry later. |
| grpclb_policy->StartBalancerCallRetryTimerLocked(); |
| } |
| } |
| lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended"); |
| } |
| |
| // |
| // helper code for creating balancer channel |
| // |
| |
| ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) { |
| ServerAddressList balancer_addresses; |
| for (size_t i = 0; i < addresses.size(); ++i) { |
| if (addresses[i].IsBalancer()) { |
| // Strip out the is_balancer channel arg, since we don't want to |
| // recursively use the grpclb policy in the channel used to talk to |
| // the balancers. Note that we do NOT strip out the balancer_name |
| // channel arg, since we need that to set the authority correctly |
| // to talk to the balancers. |
| static const char* args_to_remove[] = { |
| GRPC_ARG_ADDRESS_IS_BALANCER, |
| }; |
| balancer_addresses.emplace_back( |
| addresses[i].address(), |
| grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove, |
| GPR_ARRAY_SIZE(args_to_remove))); |
| } |
| } |
| return balancer_addresses; |
| } |
| |
| /* Returns the channel args for the LB channel, used to create a bidirectional |
| * stream for the reception of load balancing updates. |
| * |
| * Inputs: |
| * - \a addresses: corresponding to the balancers. |
| * - \a response_generator: in order to propagate updates from the resolver |
| * above the grpclb policy. |
| * - \a args: other args inherited from the grpclb policy. */ |
| grpc_channel_args* BuildBalancerChannelArgs( |
| const ServerAddressList& addresses, |
| FakeResolverResponseGenerator* response_generator, |
| const grpc_channel_args* args) { |
| ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses); |
| // Channel args to remove. |
| static const char* args_to_remove[] = { |
| // LB policy name, since we want to use the default (pick_first) in |
| // the LB channel. |
| GRPC_ARG_LB_POLICY_NAME, |
| // The channel arg for the server URI, since that will be different for |
| // the LB channel than for the parent channel. The client channel |
| // factory will re-add this arg with the right value. |
| GRPC_ARG_SERVER_URI, |
| // The resolved addresses, which will be generated by the name resolver |
| // used in the LB channel. Note that the LB channel will use the fake |
| // resolver, so this won't actually generate a query to DNS (or some |
| // other name service). However, the addresses returned by the fake |
| // resolver will have is_balancer=false, whereas our own addresses have |
| // is_balancer=true. We need the LB channel to return addresses with |
| // is_balancer=false so that it does not wind up recursively using the |
| // grpclb LB policy, as per the special case logic in client_channel.c. |
| GRPC_ARG_SERVER_ADDRESS_LIST, |
| // The fake resolver response generator, because we are replacing it |
| // with the one from the grpclb policy, used to propagate updates to |
| // the LB channel. |
| GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, |
| // The LB channel should use the authority indicated by the target |
| // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args), |
| // as opposed to the authority from the parent channel. |
| GRPC_ARG_DEFAULT_AUTHORITY, |
| // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be |
| // treated as a stand-alone channel and not inherit this argument from the |
| // args of the parent channel. |
| GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, |
| }; |
| // Channel args to add. |
| const grpc_arg args_to_add[] = { |
| // New address list. |
| // Note that we pass these in both when creating the LB channel |
| // and via the fake resolver. The latter is what actually gets used. |
| CreateServerAddressListChannelArg(&balancer_addresses), |
| // The fake resolver response generator, which we use to inject |
| // address updates into the LB channel. |
| grpc_core::FakeResolverResponseGenerator::MakeChannelArg( |
| response_generator), |
| // A channel arg indicating the target is a grpclb load balancer. |
| grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1), |
| // A channel arg indicating this is an internal channels, aka it is |
| // owned by components in Core, not by the user application. |
| grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1), |
| }; |
| // Construct channel args. |
| grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( |
| args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, |
| GPR_ARRAY_SIZE(args_to_add)); |
| // Make any necessary modifications for security. |
| return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); |
| } |
| |
| // |
| // ctor and dtor |
| // |
| |
| GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args) |
| : LoadBalancingPolicy(args), |
| response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), |
| lb_call_backoff_( |
| BackOff::Options() |
| .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * |
| 1000) |
| .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) |
| .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) |
| .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * |
| 1000)) { |
| // Initialization. |
| gpr_mu_init(&lb_channel_mu_); |
| grpc_subchannel_index_ref(); |
| GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, |
| &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, |
| grpc_combiner_scheduler(args.combiner)); |
| GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_, |
| &GrpcLb::OnRoundRobinConnectivityChangedLocked, this, |
| grpc_combiner_scheduler(args.combiner)); |
| GRPC_CLOSURE_INIT(&on_rr_request_reresolution_, |
| &GrpcLb::OnRoundRobinRequestReresolutionLocked, this, |
| grpc_combiner_scheduler(args.combiner)); |
| grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb"); |
| // Record server name. |
| const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
| const char* server_uri = grpc_channel_arg_get_string(arg); |
| GPR_ASSERT(server_uri != nullptr); |
| grpc_uri* uri = grpc_uri_parse(server_uri, true); |
| GPR_ASSERT(uri->path[0] != '\0'); |
| server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] Will use '%s' as the server name for LB request.", |
| this, server_name_); |
| } |
| grpc_uri_destroy(uri); |
| // Record LB call timeout. |
| arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); |
| lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); |
| // Record fallback timeout. |
| arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); |
| lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( |
| arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); |
| // Process channel args. |
| ProcessChannelArgsLocked(*args.args); |
| } |
| |
| GrpcLb::~GrpcLb() { |
| GPR_ASSERT(pending_picks_ == nullptr); |
| gpr_mu_destroy(&lb_channel_mu_); |
| gpr_free((void*)server_name_); |
| grpc_channel_args_destroy(args_); |
| grpc_connectivity_state_destroy(&state_tracker_); |
| if (serverlist_ != nullptr) { |
| grpc_grpclb_destroy_serverlist(serverlist_); |
| } |
| grpc_subchannel_index_unref(); |
| } |
| |
| void GrpcLb::ShutdownLocked() { |
| grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); |
| shutting_down_ = true; |
| lb_calld_.reset(); |
| if (retry_timer_callback_pending_) { |
| grpc_timer_cancel(&lb_call_retry_timer_); |
| } |
| if (fallback_timer_callback_pending_) { |
| grpc_timer_cancel(&lb_fallback_timer_); |
| } |
| rr_policy_.reset(); |
| TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); |
| // We destroy the LB channel here instead of in our destructor because |
| // destroying the channel triggers a last callback to |
| // OnBalancerChannelConnectivityChangedLocked(), and we need to be |
| // alive when that callback is invoked. |
| if (lb_channel_ != nullptr) { |
| gpr_mu_lock(&lb_channel_mu_); |
| grpc_channel_destroy(lb_channel_); |
| lb_channel_ = nullptr; |
| gpr_mu_unlock(&lb_channel_mu_); |
| } |
| grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, |
| GRPC_ERROR_REF(error), "grpclb_shutdown"); |
| // Clear pending picks. |
| PendingPick* pp; |
| while ((pp = pending_picks_) != nullptr) { |
| pending_picks_ = pp->next; |
| pp->pick->connected_subchannel.reset(); |
| // Note: pp is deleted in this callback. |
| GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // |
| // public methods |
| // |
| |
| void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { |
| PendingPick* pp; |
| while ((pp = pending_picks_) != nullptr) { |
| pending_picks_ = pp->next; |
| pp->pick->on_complete = pp->original_on_complete; |
| grpc_error* error = GRPC_ERROR_NONE; |
| if (new_policy->PickLocked(pp->pick, &error)) { |
| // Synchronous return; schedule closure. |
| GRPC_CLOSURE_SCHED(pp->pick->on_complete, error); |
| } |
| Delete(pp); |
| } |
| } |
| |
| // Cancel a specific pending pick. |
| // |
| // A grpclb pick progresses as follows: |
| // - If there's a Round Robin policy (rr_policy_) available, it'll be |
| // handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From |
| // that point onwards, it'll be RR's responsibility. For cancellations, that |
| // implies the pick needs also be cancelled by the RR instance. |
| // - Otherwise, without an RR instance, picks stay pending at this policy's |
| // level (grpclb), inside the pending_picks_ list. To cancel these, |
| // we invoke the completion closure and set the pick's connected |
| // subchannel to nullptr right here. |
| void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) { |
| PendingPick* pp = pending_picks_; |
| pending_picks_ = nullptr; |
| while (pp != nullptr) { |
| PendingPick* next = pp->next; |
| if (pp->pick == pick) { |
| pick->connected_subchannel.reset(); |
| // Note: pp is deleted in this callback. |
| GRPC_CLOSURE_SCHED(&pp->on_complete, |
| GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Pick Cancelled", &error, 1)); |
| } else { |
| pp->next = pending_picks_; |
| pending_picks_ = pp; |
| } |
| pp = next; |
| } |
| if (rr_policy_ != nullptr) { |
| rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error)); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| // Cancel all pending picks. |
| // |
| // A grpclb pick progresses as follows: |
| // - If there's a Round Robin policy (rr_policy_) available, it'll be |
| // handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From |
| // that point onwards, it'll be RR's responsibility. For cancellations, that |
| // implies the pick needs also be cancelled by the RR instance. |
| // - Otherwise, without an RR instance, picks stay pending at this policy's |
| // level (grpclb), inside the pending_picks_ list. To cancel these, |
| // we invoke the completion closure and set the pick's connected |
| // subchannel to nullptr right here. |
| void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, |
| uint32_t initial_metadata_flags_eq, |
| grpc_error* error) { |
| PendingPick* pp = pending_picks_; |
| pending_picks_ = nullptr; |
| while (pp != nullptr) { |
| PendingPick* next = pp->next; |
| if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == |
| initial_metadata_flags_eq) { |
| // Note: pp is deleted in this callback. |
| GRPC_CLOSURE_SCHED(&pp->on_complete, |
| GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Pick Cancelled", &error, 1)); |
| } else { |
| pp->next = pending_picks_; |
| pending_picks_ = pp; |
| } |
| pp = next; |
| } |
| if (rr_policy_ != nullptr) { |
| rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask, |
| initial_metadata_flags_eq, |
| GRPC_ERROR_REF(error)); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| void GrpcLb::ExitIdleLocked() { |
| if (!started_picking_) { |
| StartPickingLocked(); |
| } |
| } |
| |
| void GrpcLb::ResetBackoffLocked() { |
| if (lb_channel_ != nullptr) { |
| grpc_channel_reset_connect_backoff(lb_channel_); |
| } |
| if (rr_policy_ != nullptr) { |
| rr_policy_->ResetBackoffLocked(); |
| } |
| } |
| |
| bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) { |
| PendingPick* pp = PendingPickCreate(pick); |
| bool pick_done = false; |
| if (rr_policy_ != nullptr) { |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this, |
| rr_policy_.get()); |
| } |
| pick_done = |
| PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error); |
| } else { // rr_policy_ == NULL |
| if (pick->on_complete == nullptr) { |
| *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "No pick result available but synchronous result required."); |
| pick_done = true; |
| } else { |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] No RR policy. Adding to grpclb's pending picks", |
| this); |
| } |
| AddPendingPick(pp); |
| if (!started_picking_) { |
| StartPickingLocked(); |
| } |
| pick_done = false; |
| } |
| } |
| return pick_done; |
| } |
| |
| void GrpcLb::FillChildRefsForChannelz( |
| channelz::ChildRefsList* child_subchannels, |
| channelz::ChildRefsList* child_channels) { |
| // delegate to the RoundRobin to fill the children subchannels. |
| rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); |
| MutexLock lock(&lb_channel_mu_); |
| if (lb_channel_ != nullptr) { |
| grpc_core::channelz::ChannelNode* channel_node = |
| grpc_channel_get_channelz_node(lb_channel_); |
| if (channel_node != nullptr) { |
| child_channels->push_back(channel_node->uuid()); |
| } |
| } |
| } |
| |
| grpc_connectivity_state GrpcLb::CheckConnectivityLocked( |
| grpc_error** connectivity_error) { |
| return grpc_connectivity_state_get(&state_tracker_, connectivity_error); |
| } |
| |
| void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, |
| grpc_closure* notify) { |
| grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, |
| notify); |
| } |
| |
| // Returns the backend addresses extracted from the given addresses. |
| UniquePtr<ServerAddressList> ExtractBackendAddresses( |
| const ServerAddressList& addresses) { |
| void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; |
| grpc_arg arg = grpc_channel_arg_pointer_create( |
| const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, |
| &lb_token_arg_vtable); |
| auto backend_addresses = MakeUnique<ServerAddressList>(); |
| for (size_t i = 0; i < addresses.size(); ++i) { |
| if (!addresses[i].IsBalancer()) { |
| backend_addresses->emplace_back( |
| addresses[i].address(), |
| grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1)); |
| } |
| } |
| return backend_addresses; |
| } |
| |
| void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { |
| const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); |
| if (addresses == nullptr) { |
| // Ignore this update. |
| gpr_log( |
| GPR_ERROR, |
| "[grpclb %p] No valid LB addresses channel arg in update, ignoring.", |
| this); |
| return; |
| } |
| // Update fallback address list. |
| fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); |
| // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, |
| // since we use this to trigger the client_load_reporting filter. |
| static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; |
| grpc_arg new_arg = grpc_channel_arg_string_create( |
| (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb"); |
| grpc_channel_args_destroy(args_); |
| args_ = grpc_channel_args_copy_and_add_and_remove( |
| &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); |
| // Construct args for balancer channel. |
| grpc_channel_args* lb_channel_args = |
| BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); |
| // Create balancer channel if needed. |
| if (lb_channel_ == nullptr) { |
| char* uri_str; |
| gpr_asprintf(&uri_str, "fake:///%s", server_name_); |
| gpr_mu_lock(&lb_channel_mu_); |
| lb_channel_ = grpc_client_channel_factory_create_channel( |
| client_channel_factory(), uri_str, |
| GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args); |
| gpr_mu_unlock(&lb_channel_mu_); |
| GPR_ASSERT(lb_channel_ != nullptr); |
| gpr_free(uri_str); |
| } |
| // Propagate updates to the LB channel (pick_first) through the fake |
| // resolver. |
| response_generator_->SetResponse(lb_channel_args); |
| grpc_channel_args_destroy(lb_channel_args); |
| } |
| |
| void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { |
| ProcessChannelArgsLocked(args); |
| // Update the existing RR policy. |
| if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked(); |
| // Start watching the LB channel connectivity for connection, if not |
| // already doing so. |
| if (!watching_lb_channel_) { |
| lb_channel_connectivity_ = grpc_channel_check_connectivity_state( |
| lb_channel_, true /* try to connect */); |
| grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( |
| grpc_channel_get_channel_stack(lb_channel_)); |
| GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
| watching_lb_channel_ = true; |
| // TODO(roth): We currently track this ref manually. Once the |
| // ClosureRef API is ready, we should pass the RefCountedPtr<> along |
| // with the callback. |
| auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); |
| self.release(); |
| grpc_client_channel_watch_connectivity_state( |
| client_channel_elem, |
| grpc_polling_entity_create_from_pollset_set(interested_parties()), |
| &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, |
| nullptr); |
| } |
| } |
| |
| // |
| // code for balancer channel and call |
| // |
| |
| void GrpcLb::StartPickingLocked() { |
| // Start a timer to fall back. |
| if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && |
| !fallback_timer_callback_pending_) { |
| grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; |
| // TODO(roth): We currently track this ref manually. Once the |
| // ClosureRef API is ready, we should pass the RefCountedPtr<> along |
| // with the callback. |
| auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); |
| self.release(); |
| GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, |
| grpc_combiner_scheduler(combiner())); |
| fallback_timer_callback_pending_ = true; |
| grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); |
| } |
| started_picking_ = true; |
| StartBalancerCallLocked(); |
| } |
| |
| void GrpcLb::StartBalancerCallLocked() { |
| GPR_ASSERT(lb_channel_ != nullptr); |
| if (shutting_down_) return; |
| // Init the LB call data. |
| GPR_ASSERT(lb_calld_ == nullptr); |
| lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)", |
| this, lb_channel_, lb_calld_.get()); |
| } |
| lb_calld_->StartQuery(); |
| } |
| |
| void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
| GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
| grpclb_policy->fallback_timer_callback_pending_ = false; |
| // If we receive a serverlist after the timer fires but before this callback |
| // actually runs, don't fall back. |
| if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ && |
| error == GRPC_ERROR_NONE) { |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] Falling back to use backends from resolver", |
| grpclb_policy); |
| } |
| GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr); |
| grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); |
| } |
| grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); |
| } |
| |
| void GrpcLb::StartBalancerCallRetryTimerLocked() { |
| grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); |
| grpc_millis timeout = next_try - ExecCtx::Get()->Now(); |
| if (timeout > 0) { |
| gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.", |
| this, timeout); |
| } else { |
| gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.", |
| this); |
| } |
| } |
| // TODO(roth): We currently track this ref manually. Once the |
| // ClosureRef API is ready, we should pass the RefCountedPtr<> along |
| // with the callback. |
| auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
| self.release(); |
| GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked, |
| this, grpc_combiner_scheduler(combiner())); |
| retry_timer_callback_pending_ = true; |
| grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); |
| } |
| |
| void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { |
| GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
| grpclb_policy->retry_timer_callback_pending_ = false; |
| if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE && |
| grpclb_policy->lb_calld_ == nullptr) { |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", |
| grpclb_policy); |
| } |
| grpclb_policy->StartBalancerCallLocked(); |
| } |
| grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
| } |
| |
| // Invoked as part of the update process. It continues watching the LB channel |
| // until it shuts down or becomes READY. It's invoked even if the LB channel |
| // stayed READY throughout the update (for example if the update is identical). |
| void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, |
| grpc_error* error) { |
| GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
| if (grpclb_policy->shutting_down_) goto done; |
| // Re-initialize the lb_call. This should also take care of updating the |
| // embedded RR policy. Note that the current RR policy, if any, will stay in |
| // effect until an update from the new lb_call is received. |
| switch (grpclb_policy->lb_channel_connectivity_) { |
| case GRPC_CHANNEL_CONNECTING: |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
| // Keep watching the LB channel. |
| grpc_channel_element* client_channel_elem = |
| grpc_channel_stack_last_element( |
| grpc_channel_get_channel_stack(grpclb_policy->lb_channel_)); |
| GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
| grpc_client_channel_watch_connectivity_state( |
| client_channel_elem, |
| grpc_polling_entity_create_from_pollset_set( |
| grpclb_policy->interested_parties()), |
| &grpclb_policy->lb_channel_connectivity_, |
| &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr); |
| break; |
| } |
| // The LB channel may be IDLE because it's shut down before the update. |
| // Restart the LB call to kick the LB channel into gear. |
| case GRPC_CHANNEL_IDLE: |
| case GRPC_CHANNEL_READY: |
| grpclb_policy->lb_calld_.reset(); |
| if (grpclb_policy->started_picking_) { |
| if (grpclb_policy->retry_timer_callback_pending_) { |
| grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_); |
| } |
| grpclb_policy->lb_call_backoff_.Reset(); |
| grpclb_policy->StartBalancerCallLocked(); |
| } |
| // fallthrough |
| case GRPC_CHANNEL_SHUTDOWN: |
| done: |
| grpclb_policy->watching_lb_channel_ = false; |
| grpclb_policy->Unref(DEBUG_LOCATION, |
| "watch_lb_channel_connectivity_cb_shutdown"); |
| } |
| } |
| |
| // |
| // PendingPick |
| // |
| |
| // Adds lb_token of selected subchannel (address) to the call's initial |
| // metadata. |
| grpc_error* AddLbTokenToInitialMetadata( |
| grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, |
| grpc_metadata_batch* initial_metadata) { |
| GPR_ASSERT(lb_token_mdelem_storage != nullptr); |
| GPR_ASSERT(!GRPC_MDISNULL(lb_token)); |
| return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, |
| lb_token); |
| } |
| |
| // Destroy function used when embedding client stats in call context. |
| void DestroyClientStats(void* arg) { |
| static_cast<GrpcLbClientStats*>(arg)->Unref(); |
| } |
| |
| void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { |
| // If connected_subchannel is nullptr, no pick has been made by the RR |
| // policy (e.g., all addresses failed to connect). There won't be any |
| // LB token available. |
| if (pp->pick->connected_subchannel != nullptr) { |
| const grpc_arg* arg = |
| grpc_channel_args_find(pp->pick->connected_subchannel->args(), |
| GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); |
| if (arg != nullptr) { |
| grpc_mdelem lb_token = { |
| reinterpret_cast<uintptr_t>(arg->value.pointer.p)}; |
| AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token), |
| &pp->pick->lb_token_mdelem_storage, |
| pp->pick->initial_metadata); |
| } else { |
| gpr_log(GPR_ERROR, |
| "[grpclb %p] No LB token for connected subchannel pick %p", |
| pp->grpclb_policy, pp->pick); |
| abort(); |
| } |
| // Pass on client stats via context. Passes ownership of the reference. |
| if (pp->client_stats != nullptr) { |
| pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = |
| pp->client_stats.release(); |
| pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = |
| DestroyClientStats; |
| } |
| } else { |
| pp->client_stats.reset(); |
| } |
| } |
| |
| /* The \a on_complete closure passed as part of the pick requires keeping a |
| * reference to its associated round robin instance. We wrap this closure in |
| * order to unref the round robin instance upon its invocation */ |
| void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) { |
| PendingPick* pp = static_cast<PendingPick*>(arg); |
| PendingPickSetMetadataAndContext(pp); |
| GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); |
| Delete(pp); |
| } |
| |
| GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) { |
| PendingPick* pp = New<PendingPick>(); |
| pp->grpclb_policy = this; |
| pp->pick = pick; |
| GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp, |
| grpc_schedule_on_exec_ctx); |
| pp->original_on_complete = pick->on_complete; |
| pick->on_complete = &pp->on_complete; |
| return pp; |
| } |
| |
| void GrpcLb::AddPendingPick(PendingPick* pp) { |
| pp->next = pending_picks_; |
| pending_picks_ = pp; |
| } |
| |
| // |
| // code for interacting with the RR policy |
| // |
| |
| // Performs a pick over \a rr_policy_. Given that a pick can return |
| // immediately (ignoring its completion callback), we need to perform the |
| // cleanups this callback would otherwise be responsible for. |
| // If \a force_async is true, then we will manually schedule the |
| // completion callback even if the pick is available immediately. |
| bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, |
| grpc_error** error) { |
| // Check for drops if we are not using fallback backend addresses. |
| if (serverlist_ != nullptr && serverlist_->num_servers > 0) { |
| // Look at the index into the serverlist to see if we should drop this call. |
| grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; |
| if (serverlist_index_ == serverlist_->num_servers) { |
| serverlist_index_ = 0; // Wrap-around. |
| } |
| if (server->drop) { |
| // Update client load reporting stats to indicate the number of |
| // dropped calls. Note that we have to do this here instead of in |
| // the client_load_reporting filter, because we do not create a |
| // subchannel call (and therefore no client_load_reporting filter) |
| // for dropped calls. |
| if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { |
| lb_calld_->client_stats()->AddCallDroppedLocked( |
| server->load_balance_token); |
| } |
| if (force_async) { |
| GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); |
| Delete(pp); |
| return false; |
| } |
| Delete(pp); |
| return true; |
| } |
| } |
| // Set client_stats. |
| if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { |
| pp->client_stats = lb_calld_->client_stats()->Ref(); |
| } |
| // Pick via the RR policy. |
| bool pick_done = rr_policy_->PickLocked(pp->pick, error); |
| if (pick_done) { |
| PendingPickSetMetadataAndContext(pp); |
| if (force_async) { |
| GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); |
| *error = GRPC_ERROR_NONE; |
| pick_done = false; |
| } |
| Delete(pp); |
| } |
| // else, the pending pick will be registered and taken care of by the |
| // pending pick list inside the RR policy. Eventually, |
| // OnPendingPickComplete() will be called, which will (among other |
| // things) add the LB token to the call's initial metadata. |
| return pick_done; |
| } |
| |
| void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { |
| GPR_ASSERT(rr_policy_ == nullptr); |
| rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
| "round_robin", args); |
| if (GPR_UNLIKELY(rr_policy_ == nullptr)) { |
| gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", |
| this); |
| return; |
| } |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, |
| rr_policy_.get()); |
| } |
| // TODO(roth): We currently track this ref manually. Once the new |
| // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. |
| auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); |
| self.release(); |
| rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_); |
| grpc_error* rr_state_error = nullptr; |
| rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error); |
| // Connectivity state is a function of the RR policy updated/created. |
| UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error); |
| // Add the gRPC LB's interested_parties pollset_set to that of the newly |
| // created RR policy. This will make the RR policy progress upon activity on |
| // gRPC LB, which in turn is tied to the application's call. |
| grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), |
| interested_parties()); |
| // Subscribe to changes to the connectivity of the new RR. |
| // TODO(roth): We currently track this ref manually. Once the new |
| // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. |
| self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed"); |
| self.release(); |
| rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_, |
| &on_rr_connectivity_changed_); |
| rr_policy_->ExitIdleLocked(); |
| // Send pending picks to RR policy. |
| PendingPick* pp; |
| while ((pp = pending_picks_)) { |
| pending_picks_ = pp->next; |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[grpclb %p] Pending pick about to (async) PICK from RR %p", this, |
| rr_policy_.get()); |
| } |
| grpc_error* error = GRPC_ERROR_NONE; |
| PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error); |
| } |
| } |
| |
| grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { |
| ServerAddressList tmp_addresses; |
| ServerAddressList* addresses = &tmp_addresses; |
| bool is_backend_from_grpclb_load_balancer = false; |
| if (serverlist_ != nullptr) { |
| tmp_addresses = ProcessServerlist(serverlist_); |
| is_backend_from_grpclb_load_balancer = true; |
| } else { |
| // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't |
| // received any serverlist from the balancer, we use the fallback backends |
| // returned by the resolver. Note that the fallback backend list may be |
| // empty, in which case the new round_robin policy will keep the requested |
| // picks pending. |
| GPR_ASSERT(fallback_backend_addresses_ != nullptr); |
| addresses = fallback_backend_addresses_.get(); |
| } |
| GPR_ASSERT(addresses != nullptr); |
| // Replace the server address list in the channel args that we pass down to |
| // the subchannel. |
| static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; |
| grpc_arg args_to_add[3] = { |
| CreateServerAddressListChannelArg(addresses), |
| // A channel arg indicating if the target is a backend inferred from a |
| // grpclb load balancer. |
| grpc_channel_arg_integer_create( |
| const_cast<char*>( |
| GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), |
| is_backend_from_grpclb_load_balancer), |
| }; |
| size_t num_args_to_add = 2; |
| if (is_backend_from_grpclb_load_balancer) { |
| args_to_add[2] = grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); |
| ++num_args_to_add; |
| } |
| grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( |
| args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, |
| num_args_to_add); |
| return args; |
| } |
| |
| void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { |
| if (shutting_down_) return; |
| grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); |
| GPR_ASSERT(args != nullptr); |
| if (rr_policy_ != nullptr) { |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, |
| rr_policy_.get()); |
| } |
| rr_policy_->UpdateLocked(*args, nullptr); |
| } else { |
| LoadBalancingPolicy::Args lb_policy_args; |
| lb_policy_args.combiner = combiner(); |
| lb_policy_args.client_channel_factory = client_channel_factory(); |
| lb_policy_args.args = args; |
| CreateRoundRobinPolicyLocked(lb_policy_args); |
| } |
| grpc_channel_args_destroy(args); |
| } |
| |
| void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg, |
| grpc_error* error) { |
| GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
| if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) { |
| grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested"); |
| return; |
| } |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log( |
| GPR_INFO, |
| "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", |
| grpclb_policy, grpclb_policy->rr_policy_.get()); |
| } |
| // If we are talking to a balancer, we expect to get updated addresses form |
| // the balancer, so we can ignore the re-resolution request from the RR |
| // policy. Otherwise, handle the re-resolution request using the |
| // grpclb policy's original re-resolution closure. |
| if (grpclb_policy->lb_calld_ == nullptr || |
| !grpclb_policy->lb_calld_->seen_initial_response()) { |
| grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE); |
| } |
| // Give back the wrapper closure to the RR policy. |
| grpclb_policy->rr_policy_->SetReresolutionClosureLocked( |
| &grpclb_policy->on_rr_request_reresolution_); |
| } |
| |
| void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked( |
| grpc_error* rr_state_error) { |
| const grpc_connectivity_state curr_glb_state = |
| grpc_connectivity_state_check(&state_tracker_); |
| /* The new connectivity status is a function of the previous one and the new |
| * input coming from the status of the RR policy. |
| * |
| * current state (grpclb's) |
| * | |
| * v || I | C | R | TF | SD | <- new state (RR's) |
| * ===++====+=====+=====+======+======+ |
| * I || I | C | R | [I] | [I] | |
| * ---++----+-----+-----+------+------+ |
| * C || I | C | R | [C] | [C] | |
| * ---++----+-----+-----+------+------+ |
| * R || I | C | R | [R] | [R] | |
| * ---++----+-----+-----+------+------+ |
| * TF || I | C | R | [TF] | [TF] | |
| * ---++----+-----+-----+------+------+ |
| * SD || NA | NA | NA | NA | NA | (*) |
| * ---++----+-----+-----+------+------+ |
| * |
| * A [STATE] indicates that the old RR policy is kept. In those cases, STATE |
| * is the current state of grpclb, which is left untouched. |
| * |
| * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to |
| * the previous RR instance. |
| * |
| * Note that the status is never updated to SHUTDOWN as a result of calling |
| * this function. Only glb_shutdown() has the power to set that state. |
| * |
| * (*) This function mustn't be called during shutting down. */ |
| GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); |
| switch (rr_connectivity_state_) { |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: |
| case GRPC_CHANNEL_SHUTDOWN: |
| GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); |
| break; |
| case GRPC_CHANNEL_IDLE: |
| case GRPC_CHANNEL_CONNECTING: |
| case GRPC_CHANNEL_READY: |
| GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); |
| } |
| if (grpc_lb_glb_trace.enabled()) { |
| gpr_log( |
| GPR_INFO, |
| "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", |
| this, grpc_connectivity_state_name(rr_connectivity_state_), |
| rr_policy_.get()); |
| } |
| grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_, |
| rr_state_error, |
| "update_lb_connectivity_status_locked"); |
| } |
| |
| void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg, |
| grpc_error* error) { |
| GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
| if (grpclb_policy->shutting_down_) { |
| grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed"); |
| return; |
| } |
| grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked( |
| GRPC_ERROR_REF(error)); |
| // Resubscribe. Reuse the "on_rr_connectivity_changed" ref. |
| grpclb_policy->rr_policy_->NotifyOnStateChangeLocked( |
| &grpclb_policy->rr_connectivity_state_, |
| &grpclb_policy->on_rr_connectivity_changed_); |
| } |
| |
| // |
| // factory |
| // |
| |
| class GrpcLbFactory : public LoadBalancingPolicyFactory { |
| public: |
| OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
| const LoadBalancingPolicy::Args& args) const override { |
| /* Count the number of gRPC-LB addresses. There must be at least one. */ |
| const ServerAddressList* addresses = |
| FindServerAddressListChannelArg(args.args); |
| if (addresses == nullptr) return nullptr; |
| bool found_balancer = false; |
| for (size_t i = 0; i < addresses->size(); ++i) { |
| if ((*addresses)[i].IsBalancer()) { |
| found_balancer = true; |
| break; |
| } |
| } |
| if (!found_balancer) return nullptr; |
| return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args)); |
| } |
| |
| const char* name() const override { return kGrpclb; } |
| }; |
| |
| } // namespace |
| |
| } // namespace grpc_core |
| |
| // |
| // Plugin registration |
| // |
| |
| namespace { |
| |
| // Only add client_load_reporting filter if the grpclb LB policy is used. |
| bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder, |
| void* arg) { |
| const grpc_channel_args* args = |
| grpc_channel_stack_builder_get_channel_arguments(builder); |
| const grpc_arg* channel_arg = |
| grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); |
| if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING && |
| strcmp(channel_arg->value.string, "grpclb") == 0) { |
| return grpc_channel_stack_builder_append_filter( |
| builder, (const grpc_channel_filter*)arg, nullptr, nullptr); |
| } |
| return true; |
| } |
| |
| } // namespace |
| |
| void grpc_lb_policy_grpclb_init() { |
| grpc_core::LoadBalancingPolicyRegistry::Builder:: |
| RegisterLoadBalancingPolicyFactory( |
| grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( |
| grpc_core::New<grpc_core::GrpcLbFactory>())); |
| grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, |
| GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
| maybe_add_client_load_reporting_filter, |
| (void*)&grpc_client_load_reporting_filter); |
| } |
| |
| void grpc_lb_policy_grpclb_shutdown() {} |