| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h" |
| |
| namespace grpc { |
| namespace load_reporter { |
| |
| void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) { |
| GPR_ASSERT(handler_function_ != nullptr); |
| GPR_ASSERT(handler_ != nullptr); |
| handler_function_(std::move(handler_), ok); |
| } |
| |
| LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl( |
| std::unique_ptr<ServerCompletionQueue> cq) |
| : cq_(std::move(cq)) { |
| thread_ = std::unique_ptr<::grpc_core::Thread>( |
| new ::grpc_core::Thread("server_load_reporting", Work, this)); |
| std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr; |
| #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE) |
| cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl()); |
| #endif |
| load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter( |
| kFeedbackSampleWindowSeconds, |
| std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()), |
| std::move(cpu_stats_provider))); |
| } |
| |
| LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() { |
| // We will reach here after the server starts shutting down. |
| shutdown_ = true; |
| { |
| std::unique_lock<std::mutex> lock(cq_shutdown_mu_); |
| cq_->Shutdown(); |
| } |
| if (next_fetch_and_sample_alarm_ != nullptr) |
| next_fetch_and_sample_alarm_->Cancel(); |
| thread_->Join(); |
| } |
| |
| void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() { |
| auto next_fetch_and_sample_time = |
| gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), |
| gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000, |
| GPR_TIMESPAN)); |
| { |
| std::unique_lock<std::mutex> lock(cq_shutdown_mu_); |
| if (shutdown_) return; |
| // TODO(juanlishen): Improve the Alarm implementation to reuse a single |
| // instance for multiple events. |
| next_fetch_and_sample_alarm_.reset(new Alarm); |
| next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time, |
| this); |
| } |
| gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this); |
| } |
| |
| void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) { |
| if (!ok) { |
| gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this); |
| return; |
| } |
| gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this); |
| load_reporter_->FetchAndSample(); |
| ScheduleNextFetchAndSample(); |
| } |
| |
| void LoadReporterAsyncServiceImpl::Work(void* arg) { |
| LoadReporterAsyncServiceImpl* service = |
| reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg); |
| service->FetchAndSample(true /* ok */); |
| // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need |
| // to figure out why cq is not ready after service starts. |
| gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), |
| gpr_time_from_seconds(1, GPR_TIMESPAN))); |
| ReportLoadHandler::CreateAndStart(service->cq_.get(), service, |
| service->load_reporter_.get()); |
| void* tag; |
| bool ok; |
| while (true) { |
| if (!service->cq_->Next(&tag, &ok)) { |
| // The completion queue is shutting down. |
| GPR_ASSERT(service->shutdown_); |
| break; |
| } |
| if (tag == service) { |
| service->FetchAndSample(ok); |
| } else { |
| auto* next_step = static_cast<CallableTag*>(tag); |
| next_step->Run(ok); |
| } |
| } |
| } |
| |
| void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart( |
| ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service, |
| LoadReporter* load_reporter) { |
| std::shared_ptr<ReportLoadHandler> handler = |
| std::make_shared<ReportLoadHandler>(cq, service, load_reporter); |
| ReportLoadHandler* p = handler.get(); |
| { |
| std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); |
| if (service->shutdown_) return; |
| p->on_done_notified_ = |
| CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p, |
| std::placeholders::_1, std::placeholders::_2), |
| handler); |
| p->next_inbound_ = |
| CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p, |
| std::placeholders::_1, std::placeholders::_2), |
| std::move(handler)); |
| p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_); |
| service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq, |
| &p->next_inbound_); |
| } |
| } |
| |
| LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler( |
| ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service, |
| LoadReporter* load_reporter) |
| : cq_(cq), |
| service_(service), |
| load_reporter_(load_reporter), |
| stream_(&ctx_), |
| call_status_(WAITING_FOR_DELIVERY) {} |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered( |
| std::shared_ptr<ReportLoadHandler> self, bool ok) { |
| if (ok) { |
| call_status_ = DELIVERED; |
| } else { |
| // AsyncNotifyWhenDone() needs to be called before the call starts, but the |
| // tag will not pop out if the call never starts ( |
| // https://github.com/grpc/grpc/issues/10136). So we need to manually |
| // release the ownership of the handler in this case. |
| GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); |
| } |
| if (!ok || shutdown_) { |
| // The value of ok being false means that the server is shutting down. |
| Shutdown(std::move(self), "OnRequestDelivered"); |
| return; |
| } |
| // Spawn a new handler instance to serve the next new client. Every handler |
| // instance will deallocate itself when it's done. |
| CreateAndStart(cq_, service_, load_reporter_); |
| { |
| std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
| if (service_->shutdown_) { |
| lock.release()->unlock(); |
| Shutdown(std::move(self), "OnRequestDelivered"); |
| return; |
| } |
| next_inbound_ = |
| CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this, |
| std::placeholders::_1, std::placeholders::_2), |
| std::move(self)); |
| stream_.Read(&request_, &next_inbound_); |
| } |
| // LB ID is unique for each load reporting stream. |
| lb_id_ = load_reporter_->GenerateLbId(); |
| gpr_log(GPR_INFO, |
| "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). " |
| "Start reading the initial request...", |
| service_, lb_id_.c_str(), this); |
| } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone( |
| std::shared_ptr<ReportLoadHandler> self, bool ok) { |
| if (!ok || shutdown_) { |
| if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) { |
| // The client may have half-closed the stream or the stream is broken. |
| gpr_log(GPR_INFO, |
| "[LRS %p] Failed reading the initial request from the stream " |
| "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).", |
| service_, lb_id_.c_str(), this, static_cast<int>(done_notified_), |
| static_cast<int>(is_cancelled_)); |
| } |
| Shutdown(std::move(self), "OnReadDone"); |
| return; |
| } |
| // We only receive one request, which is the initial request. |
| if (call_status_ < INITIAL_REQUEST_RECEIVED) { |
| if (!request_.has_initial_request()) { |
| Shutdown(std::move(self), "OnReadDone+initial_request_not_found"); |
| } else { |
| call_status_ = INITIAL_REQUEST_RECEIVED; |
| const auto& initial_request = request_.initial_request(); |
| load_balanced_hostname_ = initial_request.load_balanced_hostname(); |
| load_key_ = initial_request.load_key(); |
| load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_, |
| load_key_); |
| const auto& load_report_interval = initial_request.load_report_interval(); |
| load_report_interval_ms_ = |
| static_cast<uint64_t>(load_report_interval.seconds() * 1000 + |
| load_report_interval.nanos() / 1000); |
| gpr_log( |
| GPR_INFO, |
| "[LRS %p] Initial request received. Start load reporting (load " |
| "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...", |
| service_, load_balanced_hostname_.c_str(), load_report_interval_ms_, |
| lb_id_.c_str(), this); |
| SendReport(self, true /* ok */); |
| // Expect this read to fail. |
| { |
| std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
| if (service_->shutdown_) { |
| lock.release()->unlock(); |
| Shutdown(std::move(self), "OnReadDone"); |
| return; |
| } |
| next_inbound_ = |
| CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this, |
| std::placeholders::_1, std::placeholders::_2), |
| std::move(self)); |
| stream_.Read(&request_, &next_inbound_); |
| } |
| } |
| } else { |
| // Another request received! This violates the spec. |
| gpr_log(GPR_ERROR, |
| "[LRS %p] Another request received (lb_id_: %s, handler: %p).", |
| service_, lb_id_.c_str(), this); |
| Shutdown(std::move(self), "OnReadDone+second_request"); |
| } |
| } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport( |
| std::shared_ptr<ReportLoadHandler> self, bool ok) { |
| if (!ok || shutdown_) { |
| Shutdown(std::move(self), "ScheduleNextReport"); |
| return; |
| } |
| auto next_report_time = gpr_time_add( |
| gpr_now(GPR_CLOCK_MONOTONIC), |
| gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN)); |
| { |
| std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
| if (service_->shutdown_) { |
| lock.release()->unlock(); |
| Shutdown(std::move(self), "ScheduleNextReport"); |
| return; |
| } |
| next_outbound_ = |
| CallableTag(std::bind(&ReportLoadHandler::SendReport, this, |
| std::placeholders::_1, std::placeholders::_2), |
| std::move(self)); |
| // TODO(juanlishen): Improve the Alarm implementation to reuse a single |
| // instance for multiple events. |
| next_report_alarm_.reset(new Alarm); |
| next_report_alarm_->Set(cq_, next_report_time, &next_outbound_); |
| } |
| gpr_log(GPR_DEBUG, |
| "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).", |
| service_, lb_id_.c_str(), this); |
| } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport( |
| std::shared_ptr<ReportLoadHandler> self, bool ok) { |
| if (!ok || shutdown_) { |
| Shutdown(std::move(self), "SendReport"); |
| return; |
| } |
| ::grpc::lb::v1::LoadReportResponse response; |
| auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_); |
| response.mutable_load()->Swap(&loads); |
| auto feedback = load_reporter_->GenerateLoadBalancingFeedback(); |
| response.mutable_load_balancing_feedback()->Swap(&feedback); |
| if (call_status_ < INITIAL_RESPONSE_SENT) { |
| auto initial_response = response.mutable_initial_response(); |
| initial_response->set_load_balancer_id(lb_id_); |
| initial_response->set_implementation_id( |
| ::grpc::lb::v1::InitialLoadReportResponse::CPP); |
| initial_response->set_server_version(kVersion); |
| call_status_ = INITIAL_RESPONSE_SENT; |
| } |
| { |
| std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
| if (service_->shutdown_) { |
| lock.release()->unlock(); |
| Shutdown(std::move(self), "SendReport"); |
| return; |
| } |
| next_outbound_ = |
| CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this, |
| std::placeholders::_1, std::placeholders::_2), |
| std::move(self)); |
| stream_.Write(response, &next_outbound_); |
| gpr_log(GPR_INFO, |
| "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads " |
| "count: %d)...", |
| service_, lb_id_.c_str(), this, response.load().size()); |
| } |
| } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified( |
| std::shared_ptr<ReportLoadHandler> self, bool ok) { |
| GPR_ASSERT(ok); |
| done_notified_ = true; |
| if (ctx_.IsCancelled()) { |
| is_cancelled_ = true; |
| } |
| gpr_log(GPR_INFO, |
| "[LRS %p] Load reporting call is notified done (handler: %p, " |
| "is_cancelled: %d).", |
| service_, this, static_cast<int>(is_cancelled_)); |
| Shutdown(std::move(self), "OnDoneNotified"); |
| } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown( |
| std::shared_ptr<ReportLoadHandler> self, const char* reason) { |
| if (!shutdown_) { |
| gpr_log(GPR_INFO, |
| "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, " |
| "reason: %s).", |
| service_, lb_id_.c_str(), this, reason); |
| shutdown_ = true; |
| if (call_status_ >= INITIAL_REQUEST_RECEIVED) { |
| load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_); |
| next_report_alarm_->Cancel(); |
| } |
| } |
| // OnRequestDelivered() may be called after OnDoneNotified(), so we need to |
| // try to Finish() every time we are in Shutdown(). |
| if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) { |
| std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
| if (!service_->shutdown_) { |
| on_finish_done_ = |
| CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this, |
| std::placeholders::_1, std::placeholders::_2), |
| std::move(self)); |
| // TODO(juanlishen): Maybe add a message proto for the client to |
| // explicitly cancel the stream so that we can return OK status in such |
| // cases. |
| stream_.Finish(Status::CANCELLED, &on_finish_done_); |
| call_status_ = FINISH_CALLED; |
| } |
| } |
| } |
| |
| void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone( |
| std::shared_ptr<ReportLoadHandler> self, bool ok) { |
| if (ok) { |
| gpr_log(GPR_INFO, |
| "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).", |
| service_, lb_id_.c_str(), this); |
| } |
| } |
| |
| } // namespace load_reporter |
| } // namespace grpc |