| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H |
| #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <atomic> |
| #include <chrono> |
| #include <deque> |
| #include <vector> |
| |
| #include <grpc/support/log.h> |
| #include <grpcpp/impl/codegen/config.h> |
| |
| #include "src/cpp/server/load_reporter/load_data_store.h" |
| #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" |
| |
| #include "opencensus/stats/stats.h" |
| |
| namespace grpc { |
| namespace load_reporter { |
| |
| // The interface to get the Census stats. Abstracted for mocking. |
| class CensusViewProvider { |
| public: |
| // Maps from the view name to the view data. |
| using ViewDataMap = |
| std::unordered_map<grpc::string, ::opencensus::stats::ViewData>; |
| // Maps from the view name to the view descriptor. |
| using ViewDescriptorMap = |
| std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>; |
| |
| CensusViewProvider(); |
| virtual ~CensusViewProvider() = default; |
| |
| // Fetches the view data accumulated since last fetching, and returns it as a |
| // map from the view name to the view data. |
| virtual ViewDataMap FetchViewData() = 0; |
| |
| // A helper function that gets a row with the input tag values from the view |
| // data. Only used when we know that row must exist because we have seen a row |
| // with the same tag values in a related view data. Several ViewData's are |
| // considered related if their views are based on the measures that are always |
| // recorded at the same time. |
| static double GetRelatedViewDataRowDouble( |
| const ViewDataMap& view_data_map, const char* view_name, |
| size_t view_name_len, const std::vector<grpc::string>& tag_values); |
| static uint64_t GetRelatedViewDataRowInt( |
| const ViewDataMap& view_data_map, const char* view_name, |
| size_t view_name_len, const std::vector<grpc::string>& tag_values); |
| |
| protected: |
| const ViewDescriptorMap& view_descriptor_map() const { |
| return view_descriptor_map_; |
| } |
| |
| private: |
| ViewDescriptorMap view_descriptor_map_; |
| // Tag keys. |
| ::opencensus::stats::TagKey tag_key_token_; |
| ::opencensus::stats::TagKey tag_key_host_; |
| ::opencensus::stats::TagKey tag_key_user_id_; |
| ::opencensus::stats::TagKey tag_key_status_; |
| ::opencensus::stats::TagKey tag_key_metric_name_; |
| }; |
| |
| // The default implementation fetches the real stats from Census. |
| class CensusViewProviderDefaultImpl : public CensusViewProvider { |
| public: |
| CensusViewProviderDefaultImpl(); |
| |
| ViewDataMap FetchViewData() override; |
| |
| private: |
| std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_; |
| }; |
| |
| // The interface to get the CPU stats. Abstracted for mocking. |
| class CpuStatsProvider { |
| public: |
| // The used and total amounts of CPU usage. |
| using CpuStatsSample = std::pair<uint64_t, uint64_t>; |
| |
| virtual ~CpuStatsProvider() = default; |
| |
| // Gets the cumulative used CPU and total CPU resource. |
| virtual CpuStatsSample GetCpuStats() = 0; |
| }; |
| |
| // The default implementation reads CPU jiffies from the system to calculate CPU |
| // utilization. |
| class CpuStatsProviderDefaultImpl : public CpuStatsProvider { |
| public: |
| CpuStatsSample GetCpuStats() override; |
| }; |
| |
| // Maintains all the load data and load reporting streams. |
| class LoadReporter { |
| public: |
| // TODO(juanlishen): Allow config for providers from users. |
| LoadReporter(uint32_t feedback_sample_window_seconds, |
| std::unique_ptr<CensusViewProvider> census_view_provider, |
| std::unique_ptr<CpuStatsProvider> cpu_stats_provider) |
| : feedback_sample_window_seconds_(feedback_sample_window_seconds), |
| census_view_provider_(std::move(census_view_provider)), |
| cpu_stats_provider_(std::move(cpu_stats_provider)) { |
| // Append the initial record so that the next real record can have a base. |
| AppendNewFeedbackRecord(0, 0); |
| } |
| |
| // Fetches the latest data from Census and merge it into the data store. |
| // Also adds a new sample to the LB feedback sliding window. |
| // Thread-unsafe. (1). The access to the load data store and feedback records |
| // has locking. (2). The access to the Census view provider and CPU stats |
| // provider lacks locking, but we only access these two members in this method |
| // (in testing, we also access them when setting up expectation). So the |
| // invocations of this method must be serialized. |
| void FetchAndSample(); |
| |
| // Generates a report for that host and balancer. The report contains |
| // all the stats data accumulated between the last report (i.e., the last |
| // consumption) and the last fetch from Census (i.e., the last production). |
| // Thread-safe. |
| ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads( |
| const grpc::string& hostname, const grpc::string& lb_id); |
| |
| // The feedback is calculated from the stats data recorded in the sliding |
| // window. Outdated records are discarded. |
| // Thread-safe. |
| ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback(); |
| |
| // Wrapper around LoadDataStore::ReportStreamCreated. |
| // Thread-safe. |
| void ReportStreamCreated(const grpc::string& hostname, |
| const grpc::string& lb_id, |
| const grpc::string& load_key); |
| |
| // Wrapper around LoadDataStore::ReportStreamClosed. |
| // Thread-safe. |
| void ReportStreamClosed(const grpc::string& hostname, |
| const grpc::string& lb_id); |
| |
| // Generates a unique LB ID of length kLbIdLength. Returns an empty string |
| // upon failure. Thread-safe. |
| grpc::string GenerateLbId(); |
| |
| // Accessors only for testing. |
| CensusViewProvider* census_view_provider() { |
| return census_view_provider_.get(); |
| } |
| CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); } |
| |
| private: |
| struct LoadBalancingFeedbackRecord { |
| std::chrono::system_clock::time_point end_time; |
| uint64_t rpcs; |
| uint64_t errors; |
| uint64_t cpu_usage; |
| uint64_t cpu_limit; |
| |
| LoadBalancingFeedbackRecord( |
| const std::chrono::system_clock::time_point& end_time, uint64_t rpcs, |
| uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit) |
| : end_time(end_time), |
| rpcs(rpcs), |
| errors(errors), |
| cpu_usage(cpu_usage), |
| cpu_limit(cpu_limit) {} |
| }; |
| |
| // Finds the view data about starting call from the view_data_map and merges |
| // the data to the load data store. |
| void ProcessViewDataCallStart( |
| const CensusViewProvider::ViewDataMap& view_data_map); |
| // Finds the view data about ending call from the view_data_map and merges the |
| // data to the load data store. |
| void ProcessViewDataCallEnd( |
| const CensusViewProvider::ViewDataMap& view_data_map); |
| // Finds the view data about the customized call metrics from the |
| // view_data_map and merges the data to the load data store. |
| void ProcessViewDataOtherCallMetrics( |
| const CensusViewProvider::ViewDataMap& view_data_map); |
| |
| bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record, |
| std::chrono::system_clock::time_point now) { |
| return record.end_time > now - feedback_sample_window_seconds_; |
| } |
| |
| void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors); |
| |
| // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches |
| // it to the load. |
| void AttachOrphanLoadId(::grpc::lb::v1::Load* load, |
| const PerBalancerStore& per_balancer_store); |
| |
| std::atomic<int64_t> next_lb_id_{0}; |
| const std::chrono::seconds feedback_sample_window_seconds_; |
| std::mutex feedback_mu_; |
| std::deque<LoadBalancingFeedbackRecord> feedback_records_; |
| // TODO(juanlishen): Lock in finer grain. Locking the whole store may be |
| // too expensive. |
| std::mutex store_mu_; |
| LoadDataStore load_data_store_; |
| std::unique_ptr<CensusViewProvider> census_view_provider_; |
| std::unique_ptr<CpuStatsProvider> cpu_stats_provider_; |
| }; |
| |
| } // namespace load_reporter |
| } // namespace grpc |
| |
| #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H |