| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H |
| #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <memory> |
| #include <set> |
| #include <unordered_map> |
| |
| #include <grpc/support/log.h> |
| #include <grpcpp/impl/codegen/config.h> |
| |
| #include "src/cpp/server/load_reporter/constants.h" |
| |
| namespace grpc { |
| namespace load_reporter { |
| |
| // The load data storage is organized in hierarchy. The LoadDataStore is the |
| // top-level data store. In LoadDataStore, for each host we keep a |
| // PerHostStore, in which for each balancer we keep a PerBalancerStore. Each |
| // PerBalancerStore maintains a map of load records, mapping from LoadRecordKey |
| // to LoadRecordValue. The LoadRecordValue contains a map of customized call |
| // metrics, mapping from a call metric name to the CallMetricValue. |
| |
| // The value of a customized call metric. |
| class CallMetricValue { |
| public: |
| explicit CallMetricValue(uint64_t num_calls = 0, |
| double total_metric_value = 0) |
| : num_calls_(num_calls), total_metric_value_(total_metric_value) {} |
| |
| void MergeFrom(CallMetricValue other) { |
| num_calls_ += other.num_calls_; |
| total_metric_value_ += other.total_metric_value_; |
| } |
| |
| // Getters. |
| uint64_t num_calls() const { return num_calls_; } |
| double total_metric_value() const { return total_metric_value_; } |
| |
| private: |
| // The number of calls that finished with this metric. |
| uint64_t num_calls_ = 0; |
| // The sum of metric values across all the calls that finished with this |
| // metric. |
| double total_metric_value_ = 0; |
| }; |
| |
| // The key of a load record. |
| class LoadRecordKey { |
| public: |
| LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id, |
| grpc::string client_ip_hex) |
| : lb_id_(std::move(lb_id)), |
| lb_tag_(std::move(lb_tag)), |
| user_id_(std::move(user_id)), |
| client_ip_hex_(std::move(client_ip_hex)) {} |
| |
| // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. |
| LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id); |
| |
| grpc::string ToString() const { |
| return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ + |
| ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ + |
| "]"; |
| } |
| |
| bool operator==(const LoadRecordKey& other) const { |
| return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ && |
| user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_; |
| } |
| |
| // Gets the client IP bytes in network order (i.e., big-endian). |
| grpc::string GetClientIpBytes() const; |
| |
| // Getters. |
| const grpc::string& lb_id() const { return lb_id_; } |
| const grpc::string& lb_tag() const { return lb_tag_; } |
| const grpc::string& user_id() const { return user_id_; } |
| const grpc::string& client_ip_hex() const { return client_ip_hex_; } |
| |
| struct Hasher { |
| void hash_combine(size_t* seed, const grpc::string& k) const { |
| *seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) + |
| (*seed >> 2); |
| } |
| |
| size_t operator()(const LoadRecordKey& k) const { |
| size_t h = 0; |
| hash_combine(&h, k.lb_id_); |
| hash_combine(&h, k.lb_tag_); |
| hash_combine(&h, k.user_id_); |
| hash_combine(&h, k.client_ip_hex_); |
| return h; |
| } |
| }; |
| |
| private: |
| grpc::string lb_id_; |
| grpc::string lb_tag_; |
| grpc::string user_id_; |
| grpc::string client_ip_hex_; |
| }; |
| |
| // The value of a load record. |
| class LoadRecordValue { |
| public: |
| explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0, |
| uint64_t error_count = 0, uint64_t bytes_sent = 0, |
| uint64_t bytes_recv = 0, uint64_t latency_ms = 0) |
| : start_count_(start_count), |
| ok_count_(ok_count), |
| error_count_(error_count), |
| bytes_sent_(bytes_sent), |
| bytes_recv_(bytes_recv), |
| latency_ms_(latency_ms) {} |
| |
| LoadRecordValue(grpc::string metric_name, uint64_t num_calls, |
| double total_metric_value); |
| |
| void MergeFrom(const LoadRecordValue& other) { |
| start_count_ += other.start_count_; |
| ok_count_ += other.ok_count_; |
| error_count_ += other.error_count_; |
| bytes_sent_ += other.bytes_sent_; |
| bytes_recv_ += other.bytes_recv_; |
| latency_ms_ += other.latency_ms_; |
| for (const auto& p : other.call_metrics_) { |
| const grpc::string& key = p.first; |
| const CallMetricValue& value = p.second; |
| call_metrics_[key].MergeFrom(value); |
| } |
| } |
| |
| int64_t GetNumCallsInProgressDelta() const { |
| return static_cast<int64_t>(start_count_ - ok_count_ - error_count_); |
| } |
| |
| grpc::string ToString() const { |
| return "[start_count_=" + grpc::to_string(start_count_) + |
| ", ok_count_=" + grpc::to_string(ok_count_) + |
| ", error_count_=" + grpc::to_string(error_count_) + |
| ", bytes_sent_=" + grpc::to_string(bytes_sent_) + |
| ", bytes_recv_=" + grpc::to_string(bytes_recv_) + |
| ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " + |
| grpc::to_string(call_metrics_.size()) + " other call metric(s)]"; |
| } |
| |
| bool InsertCallMetric(const grpc::string& metric_name, |
| const CallMetricValue& metric_value) { |
| return call_metrics_.insert({metric_name, metric_value}).second; |
| } |
| |
| // Getters. |
| uint64_t start_count() const { return start_count_; } |
| uint64_t ok_count() const { return ok_count_; } |
| uint64_t error_count() const { return error_count_; } |
| uint64_t bytes_sent() const { return bytes_sent_; } |
| uint64_t bytes_recv() const { return bytes_recv_; } |
| uint64_t latency_ms() const { return latency_ms_; } |
| const std::unordered_map<grpc::string, CallMetricValue>& call_metrics() |
| const { |
| return call_metrics_; |
| } |
| |
| private: |
| uint64_t start_count_ = 0; |
| uint64_t ok_count_ = 0; |
| uint64_t error_count_ = 0; |
| uint64_t bytes_sent_ = 0; |
| uint64_t bytes_recv_ = 0; |
| uint64_t latency_ms_ = 0; |
| std::unordered_map<grpc::string, CallMetricValue> call_metrics_; |
| }; |
| |
| // Stores the data associated with a particular LB ID. |
| class PerBalancerStore { |
| public: |
| using LoadRecordMap = |
| std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>; |
| |
| PerBalancerStore(grpc::string lb_id, grpc::string load_key) |
| : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {} |
| |
| // Merge a load record with the given key and value if the store is not |
| // suspended. |
| void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value); |
| |
| // Suspend this store, so that no detailed load data will be recorded. |
| void Suspend(); |
| // Resume this store from suspension. |
| void Resume(); |
| // Is this store suspended or not? |
| bool IsSuspended() const { return suspended_; } |
| |
| bool IsNumCallsInProgressChangedSinceLastReport() const { |
| return num_calls_in_progress_ != last_reported_num_calls_in_progress_; |
| } |
| |
| uint64_t GetNumCallsInProgressForReport(); |
| |
| grpc::string ToString() { |
| return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ + |
| "]"; |
| } |
| |
| void ClearLoadRecordMap() { load_record_map_.clear(); } |
| |
| // Getters. |
| const grpc::string& lb_id() const { return lb_id_; } |
| const grpc::string& load_key() const { return load_key_; } |
| const LoadRecordMap& load_record_map() const { return load_record_map_; } |
| |
| private: |
| grpc::string lb_id_; |
| // TODO(juanlishen): Use bytestring protobuf type? |
| grpc::string load_key_; |
| LoadRecordMap load_record_map_; |
| uint64_t num_calls_in_progress_ = 0; |
| uint64_t last_reported_num_calls_in_progress_ = 0; |
| bool suspended_ = false; |
| }; |
| |
| // Stores the data associated with a particular host. |
| class PerHostStore { |
| public: |
| // When a report stream is created, a PerBalancerStore is created for the |
| // LB ID (guaranteed unique) associated with that stream. If it is the only |
| // active store, adopt all the orphaned stores. If it is the first created |
| // store, adopt the store of kInvalidLbId. |
| void ReportStreamCreated(const grpc::string& lb_id, |
| const grpc::string& load_key); |
| |
| // When a report stream is closed, the PerBalancerStores assigned to the |
| // associate LB ID need to be re-assigned to other active balancers, |
| // ideally with the same load key. If there is no active balancer, we have |
| // to suspend those stores and drop the incoming load data until they are |
| // resumed. |
| void ReportStreamClosed(const grpc::string& lb_id); |
| |
| // Returns null if not found. Caller doesn't own the returned store. |
| PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const; |
| |
| // Returns null if lb_id is not found. The returned pointer points to the |
| // underlying data structure, which is not owned by the caller. |
| const std::set<PerBalancerStore*>* GetAssignedStores( |
| const grpc::string& lb_id) const; |
| |
| private: |
| // Creates a PerBalancerStore for the given LB ID, assigns the store to |
| // itself, and records the LB ID to the load key. |
| void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key); |
| |
| void AssignOrphanedStore(PerBalancerStore* orphaned_store, |
| const grpc::string& new_receiver); |
| |
| std::unordered_map<grpc::string, std::set<grpc::string>> |
| load_key_to_receiving_lb_ids_; |
| |
| // Key: LB ID. The key set includes all the LB IDs that have been |
| // allocated for reporting streams so far. |
| // Value: the unique pointer to the PerBalancerStore of the LB ID. |
| std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>> |
| per_balancer_stores_; |
| |
| // Key: LB ID. The key set includes the LB IDs of the balancers that are |
| // currently receiving report. |
| // Value: the set of raw pointers to the PerBalancerStores assigned to the LB |
| // ID. Note that the sets in assigned_stores_ form a division of the value set |
| // of per_balancer_stores_. |
| std::unordered_map<grpc::string, std::set<PerBalancerStore*>> |
| assigned_stores_; |
| }; |
| |
| // Thread-unsafe two-level bookkeeper of all the load data. |
| // Note: We never remove any store objects from this class, as per the |
| // current spec. That's because premature removal of the store objects |
| // may lead to loss of critical information, e.g., mapping from lb_id to |
| // load_key, and the number of in-progress calls. Such loss will cause |
| // information inconsistency when the balancer is re-connected. Keeping |
| // all the stores should be fine for PerHostStore, since we assume there |
| // should only be a few hostnames. But it's a potential problem for |
| // PerBalancerStore. |
| class LoadDataStore { |
| public: |
| // Returns null if not found. Caller doesn't own the returned store. |
| PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname, |
| const grpc::string& lb_id) const; |
| |
| // Returns null if hostname or lb_id is not found. The returned pointer points |
| // to the underlying data structure, which is not owned by the caller. |
| const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname, |
| const string& lb_id); |
| |
| // If a PerBalancerStore can be found by the hostname and LB ID in |
| // LoadRecordKey, the load data will be merged to that store. Otherwise, |
| // only track the number of the in-progress calls for this unknown LB ID. |
| void MergeRow(const grpc::string& hostname, const LoadRecordKey& key, |
| const LoadRecordValue& value); |
| |
| // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated |
| // with some received load data but unknown to this load data store)? |
| bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const { |
| return unknown_balancer_id_trackers_.find(lb_id) != |
| unknown_balancer_id_trackers_.end(); |
| } |
| |
| // Wrapper around PerHostStore::ReportStreamCreated. |
| void ReportStreamCreated(const grpc::string& hostname, |
| const grpc::string& lb_id, |
| const grpc::string& load_key); |
| |
| // Wrapper around PerHostStore::ReportStreamClosed. |
| void ReportStreamClosed(const grpc::string& hostname, |
| const grpc::string& lb_id); |
| |
| private: |
| // Buffered data that was fetched from Census but hasn't been sent to |
| // balancer. We need to keep this data ourselves because Census will |
| // delete the data once it's returned. |
| std::unordered_map<grpc::string, PerHostStore> per_host_stores_; |
| |
| // Tracks the number of in-progress calls for each unknown LB ID. |
| std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_; |
| }; |
| |
| } // namespace load_reporter |
| } // namespace grpc |
| |
| #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H |