blob: 3bcb33ef11c5f38f98fdf37afe505bfa7ecdef13 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/** Round Robin Policy.
*
* Before every pick, the \a get_next_ready_subchannel_index_locked function
* returns the p->subchannel_list->subchannels index for next subchannel,
* respecting the relative order of the addresses provided upon creation or
* updates. Note however that updates will start picking from the beginning of
* the updated list. */
#include <grpc/support/port_platform.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
namespace grpc_core {
TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
namespace {
//
// round_robin LB policy
//
constexpr char kRoundRobin[] = "round_robin";
class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(const Args& args);
const char* name() const override { return kRoundRobin; }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override;
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override;
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* ignored) override;
private:
~RoundRobin();
// Forward declaration.
class RoundRobinSubchannelList;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RoundRobinSubchannelData
: public SubchannelData<RoundRobinSubchannelList,
RoundRobinSubchannelData> {
public:
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
}
void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state, grpc_error* error);
private:
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
};
// A list of subchannels.
class RoundRobinSubchannelList
: public SubchannelList<RoundRobinSubchannelList,
RoundRobinSubchannelData> {
public:
RoundRobinSubchannelList(
RoundRobin* policy, TraceFlag* tracer,
const ServerAddressList& addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
client_channel_factory, args),
last_ready_index_(num_subchannels() - 1) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
}
~RoundRobinSubchannelList() {
GRPC_ERROR_UNREF(last_transient_failure_error_);
RoundRobin* p = static_cast<RoundRobin*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
// Starts watching the subchannels in this list.
void StartWatchingLocked();
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
// transient_failure_error is the error that is reported when
// new_state is TRANSIENT_FAILURE.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state,
grpc_error* transient_failure_error);
// If this subchannel list is the RR policy's current subchannel
// list, updates the RR policy's connectivity state based on the
// subchannel list's state counters.
void MaybeUpdateRoundRobinConnectivityStateLocked();
// Updates the RR policy's overall state based on the counters of
// subchannels in each state.
void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
size_t GetNextReadySubchannelIndexLocked();
void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
private:
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
size_t last_ready_index_; // Index into list of last pick.
};
// Helper class to ensure that any function that modifies the child refs
// data structures will update the channelz snapshot data structures before
// returning.
class AutoChildRefsUpdater {
public:
explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
private:
RoundRobin* rr_;
};
void ShutdownLocked() override;
void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
bool shutdown_ = false;
/** List of picks that are waiting on connectivity */
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
/// Lock and data used to capture snapshots of this channel's child
/// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_;
channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args, args.lb_config);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this,
subchannel_list_->num_subchannels());
}
grpc_subchannel_index_ref();
}
RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
grpc_connectivity_state_destroy(&state_tracker_);
grpc_subchannel_index_unref();
}
void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
void RoundRobin::ShutdownLocked() {
AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
}
shutdown_ = true;
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) {
PickState* pp = pending_picks_;
pending_picks_ = nullptr;
while (pp != nullptr) {
PickState* next = pp->next;
if (pp == pick) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
pp->next = pending_picks_;
pending_picks_ = pp;
}
pp = next;
}
GRPC_ERROR_UNREF(error);
}
void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
PickState* pick = pending_picks_;
pending_picks_ = nullptr;
while (pick != nullptr) {
PickState* next = pick->next;
if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
pick->next = pending_picks_;
pending_picks_ = pick;
}
pick = next;
}
GRPC_ERROR_UNREF(error);
}
void RoundRobin::StartPickingLocked() {
started_picking_ = true;
subchannel_list_->StartWatchingLocked();
}
void RoundRobin::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
void RoundRobin::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
bool RoundRobin::DoPickLocked(PickState* pick) {
const size_t next_ready_index =
subchannel_list_->GetNextReadySubchannelIndexLocked();
if (next_ready_index < subchannel_list_->num_subchannels()) {
/* readily available, report right away */
RoundRobinSubchannelData* sd =
subchannel_list_->subchannel(next_ready_index);
GPR_ASSERT(sd->connected_subchannel() != nullptr);
pick->connected_subchannel = sd->connected_subchannel()->Ref();
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")",
this, sd->subchannel(), pick->connected_subchannel.get(),
sd->subchannel_list(), next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index);
return true;
}
return false;
}
void RoundRobin::DrainPendingPicksLocked() {
PickState* pick;
while ((pick = pending_picks_)) {
pending_picks_ = pick->next;
GPR_ASSERT(DoPickLocked(pick));
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
}
GPR_ASSERT(!shutdown_);
if (subchannel_list_ != nullptr) {
if (DoPickLocked(pick)) return true;
}
if (pick->on_complete == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No pick result available but synchronous result required.");
return true;
}
/* no pick currently available. Save for later in list of pending picks */
pick->next = pending_picks_;
pending_picks_ = pick;
if (!started_picking_) {
StartPickingLocked();
}
return false;
}
void RoundRobin::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels_to_fill,
channelz::ChildRefsList* ignored) {
MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
// performance when channelz requests are made.
bool found = false;
for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
found = true;
break;
}
}
if (!found) {
child_subchannels_to_fill->push_back(child_subchannels_[i]);
}
}
}
void RoundRobin::UpdateChildRefsLocked() {
channelz::ChildRefsList cs;
if (subchannel_list_ != nullptr) {
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
// subchannel already used by some other channel may have a non-IDLE
// state.
for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked(&error);
if (state != GRPC_CHANNEL_IDLE) {
subchannel(i)->UpdateConnectivityStateLocked(state, error);
}
}
// Now set the LB policy's state based on the subchannels' states.
UpdateRoundRobinStateFromSubchannelStateCountsLocked();
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
}
}
}
void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state,
grpc_error* transient_failure_error) {
GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (old_state == GRPC_CHANNEL_CONNECTING) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
GRPC_ERROR_UNREF(last_transient_failure_error_);
last_transient_failure_error_ = transient_failure_error;
}
// Sets the RR policy's connectivity state based on the current
// subchannel list.
void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
* 1) RULE: ANY subchannel is READY => policy is READY.
* CHECK: subchannel_list->num_ready > 0.
*
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
if (num_ready_ > 0) {
/* 1) READY */
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
} else if (num_connecting_ > 0) {
/* 2) CONNECTING */
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
} else if (num_transient_failure_ == num_subchannels()) {
/* 3) TRANSIENT_FAILURE */
grpc_connectivity_state_set(&p->state_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(last_transient_failure_error_),
"rr_exhausted_subchannels");
}
}
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
// This list must be p->latest_pending_subchannel_list_, because
// any previous update would have been shut down already and
// therefore we would not be receiving a notification for them.
GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
GPR_ASSERT(!shutting_down());
if (grpc_lb_round_robin_trace.enabled()) {
const size_t old_num_subchannels =
p->subchannel_list_ != nullptr
? p->subchannel_list_->num_subchannels()
: 0;
gpr_log(GPR_INFO,
"[RR %p] phasing out subchannel list %p (size %" PRIuPTR
") in favor of %p (size %" PRIuPTR ")",
p, p->subchannel_list_.get(), old_num_subchannels, this,
num_subchannels());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Drain pending picks.
p->DrainPendingPicksLocked();
}
// Update the RR policy's connectivity state if needed.
MaybeUpdateRoundRobinConnectivityStateLocked();
}
void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_INFO,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
grpc_connectivity_state_name(last_connectivity_state_),
grpc_connectivity_state_name(connectivity_state));
}
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state, error);
last_connectivity_state_ = connectivity_state;
}
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
p, subchannel());
}
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
}
// Update state counters.
UpdateConnectivityStateLocked(connectivity_state, error);
// Update overall state and renew notification.
subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
RenewConnectivityWatchLocked();
}
/** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state, or p->subchannel_list->num_subchannels if no
* subchannel is READY.
*
* Note that this function does *not* update p->last_ready_subchannel_index.
* The caller must do that if it returns a pick. */
size_t
RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] getting next ready subchannel (out of %" PRIuPTR
"), last_ready_index=%" PRIuPTR,
policy(), num_subchannels(), last_ready_index_);
}
for (size_t i = 0; i < num_subchannels(); ++i) {
const size_t index = (i + last_ready_index_ + 1) % num_subchannels();
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_INFO,
"[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
": state=%s",
policy(), subchannel(index)->subchannel(), this, index,
grpc_connectivity_state_name(
subchannel(index)->connectivity_state()));
}
if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
" of subchannel_list %p",
policy(), subchannel(index)->subchannel(), index, this);
}
return index;
}
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
}
return num_subchannels();
}
// Sets last_ready_index_ to last_ready_index.
void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked(
size_t last_ready_index) {
GPR_ASSERT(last_ready_index < num_subchannels());
last_ready_index_ = last_ready_index;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
" (SC %p, CSC %p)",
policy(), last_ready_index,
subchannel(last_ready_index)->subchannel(),
subchannel(last_ready_index)->connected_subchannel());
}
}
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
grpc_closure* notify) {
grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
notify);
}
void RoundRobin::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
AutoChildRefsUpdater guard(this);
const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
if (addresses == nullptr) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
if (subchannel_list_ == nullptr) {
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
}
return;
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, addresses->size());
}
// Replace latest_pending_subchannel_list_.
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, *addresses, combiner(),
client_channel_factory(), args);
// If we haven't started picking yet or the new list is empty,
// immediately promote the new list to the current list.
if (!started_picking_ ||
latest_pending_subchannel_list_->num_subchannels() == 0) {
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else {
// If we've started picking, start watching the new list.
latest_pending_subchannel_list_->StartWatchingLocked();
}
}
//
// factory
//
class RoundRobinFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
}
const char* name() const override { return kRoundRobin; }
};
} // namespace
} // namespace grpc_core
void grpc_lb_policy_round_robin_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<grpc_core::RoundRobinFactory>()));
}
void grpc_lb_policy_round_robin_shutdown() {}