blob: aa8441f17b099f68b9b3e055427ef3608138dd4d [file] [log] [blame]
//
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/avl/avl.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/tls.h"
// a map of subchannel_key --> subchannel, used for detecting connections
// to the same destination in order to share them
static grpc_avl g_subchannel_index;
static gpr_mu g_mu;
static gpr_refcount g_refcount;
struct grpc_subchannel_key {
grpc_subchannel_args args;
};
static bool g_force_creation = false;
static grpc_subchannel_key* create_key(
const grpc_subchannel_args* args,
grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) {
grpc_subchannel_key* k =
static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k)));
k->args.filter_count = args->filter_count;
if (k->args.filter_count > 0) {
k->args.filters = static_cast<const grpc_channel_filter**>(
gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count));
memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters),
args->filters, sizeof(*k->args.filters) * k->args.filter_count);
} else {
k->args.filters = nullptr;
}
k->args.args = copy_channel_args(args->args);
return k;
}
grpc_subchannel_key* grpc_subchannel_key_create(
const grpc_subchannel_args* args) {
return create_key(args, grpc_channel_args_normalize);
}
static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
return create_key(&k->args, grpc_channel_args_copy);
}
int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
const grpc_subchannel_key* b) {
// To pretend the keys are different, return a non-zero value.
if (GPR_UNLIKELY(g_force_creation)) return 1;
int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.filter_count > 0) {
c = memcmp(a->args.filters, b->args.filters,
a->args.filter_count * sizeof(*a->args.filters));
if (c != 0) return c;
}
return grpc_channel_args_compare(a->args.args, b->args.args);
}
void grpc_subchannel_key_destroy(grpc_subchannel_key* k) {
gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters));
grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args));
gpr_free(k);
}
static void sck_avl_destroy(void* p, void* unused) {
grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p));
}
static void* sck_avl_copy(void* p, void* unused) {
return subchannel_key_copy(static_cast<grpc_subchannel_key*>(p));
}
static long sck_avl_compare(void* a, void* b, void* unused) {
return grpc_subchannel_key_compare(static_cast<grpc_subchannel_key*>(a),
static_cast<grpc_subchannel_key*>(b));
}
static void scv_avl_destroy(void* p, void* unused) {
GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index");
}
static void* scv_avl_copy(void* p, void* unused) {
GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "subchannel_index");
return p;
}
static const grpc_avl_vtable subchannel_avl_vtable = {
sck_avl_destroy, // destroy_key
sck_avl_copy, // copy_key
sck_avl_compare, // compare_keys
scv_avl_destroy, // destroy_value
scv_avl_copy // copy_value
};
void grpc_subchannel_index_init(void) {
g_subchannel_index = grpc_avl_create(&subchannel_avl_vtable);
gpr_mu_init(&g_mu);
gpr_ref_init(&g_refcount, 1);
}
void grpc_subchannel_index_shutdown(void) {
// TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
// To solve that, we should force polling to flush any pending callbacks, then
// shutdown safely.
grpc_subchannel_index_unref();
}
void grpc_subchannel_index_unref(void) {
if (gpr_unref(&g_refcount)) {
gpr_mu_destroy(&g_mu);
grpc_avl_unref(g_subchannel_index, nullptr);
}
}
void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) {
// Lock, and take a reference to the subchannel index.
// We don't need to do the search under a lock as avl's are immutable.
gpr_mu_lock(&g_mu);
grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr);
gpr_mu_unlock(&g_mu);
grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
(grpc_subchannel*)grpc_avl_get(index, key, nullptr), "index_find");
grpc_avl_unref(index, nullptr);
return c;
}
grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key,
grpc_subchannel* constructed) {
grpc_subchannel* c = nullptr;
bool need_to_unref_constructed = false;
while (c == nullptr) {
need_to_unref_constructed = false;
// Compare and swap loop:
// - take a reference to the current index
gpr_mu_lock(&g_mu);
grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr);
gpr_mu_unlock(&g_mu);
// - Check to see if a subchannel already exists
c = static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr));
if (c != nullptr) {
c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register");
}
if (c != nullptr) {
// yes -> we're done
need_to_unref_constructed = true;
} else {
// no -> update the avl and compare/swap
grpc_avl updated = grpc_avl_add(
grpc_avl_ref(index, nullptr), subchannel_key_copy(key),
GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), nullptr);
// it may happen (but it's expected to be unlikely)
// that some other thread has changed the index:
// compare/swap here to check that, and retry as necessary
gpr_mu_lock(&g_mu);
if (index.root == g_subchannel_index.root) {
GPR_SWAP(grpc_avl, updated, g_subchannel_index);
c = constructed;
}
gpr_mu_unlock(&g_mu);
grpc_avl_unref(updated, nullptr);
}
grpc_avl_unref(index, nullptr);
}
if (need_to_unref_constructed) {
GRPC_SUBCHANNEL_UNREF(constructed, "index_register");
}
return c;
}
void grpc_subchannel_index_unregister(grpc_subchannel_key* key,
grpc_subchannel* constructed) {
bool done = false;
while (!done) {
// Compare and swap loop:
// - take a reference to the current index
gpr_mu_lock(&g_mu);
grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr);
gpr_mu_unlock(&g_mu);
// Check to see if this key still refers to the previously
// registered subchannel
grpc_subchannel* c =
static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr));
if (c != constructed) {
grpc_avl_unref(index, nullptr);
break;
}
// compare and swap the update (some other thread may have
// mutated the index behind us)
grpc_avl updated =
grpc_avl_remove(grpc_avl_ref(index, nullptr), key, nullptr);
gpr_mu_lock(&g_mu);
if (index.root == g_subchannel_index.root) {
GPR_SWAP(grpc_avl, updated, g_subchannel_index);
done = true;
}
gpr_mu_unlock(&g_mu);
grpc_avl_unref(updated, nullptr);
grpc_avl_unref(index, nullptr);
}
}
void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
g_force_creation = force_creation;
}