blob: d6ff74ec7f7745675a1aa5cb0676ccc17947346c [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
namespace {
//
// pick_first LB policy
//
constexpr char kPickFirst[] = "pick_first";
class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
const char* name() const override { return kPickFirst; }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override;
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override;
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* ignored) override;
private:
~PickFirst();
class PickFirstSubchannelList;
class PickFirstSubchannelData
: public SubchannelData<PickFirstSubchannelList,
PickFirstSubchannelData> {
public:
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
// Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked();
void CheckConnectivityStateAndStartWatchingLocked();
};
class PickFirstSubchannelList
: public SubchannelList<PickFirstSubchannelList,
PickFirstSubchannelData> {
public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
client_channel_factory, args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
}
~PickFirstSubchannelList() {
PickFirst* p = static_cast<PickFirst*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
};
// Helper class to ensure that any function that modifies the child refs
// data structures will update the channelz snapshot data structures before
// returning.
class AutoChildRefsUpdater {
public:
explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
private:
PickFirst* pf_;
};
void ShutdownLocked() override;
void StartPickingLocked();
void UpdateChildRefsLocked();
// All our subchannels.
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Have we started picking?
bool started_picking_ = false;
// Are we shut down?
bool shutdown_ = false;
// List of picks that are waiting on connectivity.
PickState* pending_picks_ = nullptr;
// Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
/// Lock and data used to capture snapshots of this channels child
/// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_;
channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_;
};
PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
UpdateLocked(*args.args, args.lb_config);
grpc_subchannel_index_ref();
}
PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
grpc_connectivity_state_destroy(&state_tracker_);
grpc_subchannel_index_unref();
}
void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
void PickFirst::ShutdownLocked() {
AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
}
shutdown_ = true;
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
void PickFirst::CancelPickLocked(PickState* pick, grpc_error* error) {
PickState* pp = pending_picks_;
pending_picks_ = nullptr;
while (pp != nullptr) {
PickState* next = pp->next;
if (pp == pick) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
pp->next = pending_picks_;
pending_picks_ = pp;
}
pp = next;
}
GRPC_ERROR_UNREF(error);
}
void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
PickState* pick = pending_picks_;
pending_picks_ = nullptr;
while (pick != nullptr) {
PickState* next = pick->next;
if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
pick->next = pending_picks_;
pending_picks_ = pick;
}
pick = next;
}
GRPC_ERROR_UNREF(error);
}
void PickFirst::StartPickingLocked() {
started_picking_ = true;
if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
}
}
void PickFirst::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
void PickFirst::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
if (pick->on_complete == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No pick result available but synchronous result required.");
return true;
}
pick->next = pending_picks_;
pending_picks_ = pick;
if (!started_picking_) {
StartPickingLocked();
}
return false;
}
grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
grpc_closure* notify) {
grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
notify);
}
void PickFirst::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels_to_fill,
channelz::ChildRefsList* ignored) {
MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
// performance when channelz requests are made.
bool found = false;
for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
found = true;
break;
}
}
if (!found) {
child_subchannels_to_fill->push_back(child_subchannels_[i]);
}
}
}
void PickFirst::UpdateChildRefsLocked() {
channelz::ChildRefsList cs;
if (subchannel_list_ != nullptr) {
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
void PickFirst::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
AutoChildRefsUpdater guard(this);
const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
if (addresses == nullptr) {
if (subchannel_list_ == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"pf_update_missing");
} else {
// otherwise, keep using the current subchannel list (ignore this update).
gpr_log(GPR_ERROR,
"No valid LB addresses channel arg for Pick First %p update, "
"ignoring.",
this);
}
return;
}
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->size());
}
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, *addresses, combiner(),
client_channel_factory(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
}
// If one of the subchannels in the new list is already in state
// READY, then select it immediately. This can happen when the
// currently selected subchannel is also present in the update. It
// can also happen if one of the subchannels in the update is already
// in the subchannel index because it's in use by another channel.
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
grpc_error* error = GRPC_ERROR_NONE;
grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error);
GRPC_ERROR_UNREF(error);
if (state == GRPC_CHANNEL_READY) {
subchannel_list_ = std::move(subchannel_list);
sd->ProcessUnselectedReadyLocked();
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
// Make sure that subsequent calls to ExitIdleLocked() don't cause
// us to start watching a subchannel other than the one we've
// selected.
started_picking_ = true;
return;
}
}
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
} else {
// We do have a selected subchannel, so keep using it until one of
// the subchannels in the new list reports READY.
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
this, latest_pending_subchannel_list_.get(),
subchannel_list.get());
}
}
latest_pending_subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked();
}
}
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
AutoChildRefsUpdater guard(p);
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p connectivity changed for selected subchannel", p);
}
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
if (connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr;
StopConnectivityWatchLocked();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
error != GRPC_ERROR_NONE
? GRPC_ERROR_REF(error)
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"selected subchannel not ready; switching to pending "
"update"),
"selected_not_ready+switch_to_update");
} else {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected subchannel goes bad, request a re-resolution. We also
// set the channel state to IDLE and reset started_picking_. The reason
// is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
// reception we don't want to connect to the re-resolved backends until
// we leave the IDLE state.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
p->started_picking_ = false;
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
StopConnectivityWatchLocked();
} else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
RenewConnectivityWatchLocked();
}
}
GRPC_ERROR_UNREF(error);
return;
}
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
ProcessUnselectedReadyLocked();
// Renew notification.
RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
PickFirstSubchannelData* sd = this;
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "exhausted_subchannels");
}
sd->CheckConnectivityStateAndStartWatchingLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
// Renew notification.
RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break);
}
GRPC_ERROR_UNREF(error);
}
void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
// Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "subchannel_ready");
p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
// Update any calls that were waiting for a pick.
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
p->selected_->subchannel());
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
void PickFirst::PickFirstSubchannelData::
CheckConnectivityStateAndStartWatchingLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
grpc_error* error = GRPC_ERROR_NONE;
if (p->selected_ != this &&
CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
// We must process the READY subchannel before we start watching it.
// Otherwise, we won't know it's READY because we will be waiting for its
// connectivity state to change from READY.
ProcessUnselectedReadyLocked();
}
GRPC_ERROR_UNREF(error);
StartConnectivityWatchLocked();
}
//
// factory
//
class PickFirstFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
}
const char* name() const override { return kPickFirst; }
};
} // namespace
} // namespace grpc_core
void grpc_lb_policy_pick_first_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<grpc_core::PickFirstFactory>()));
}
void grpc_lb_policy_pick_first_shutdown() {}