blob: 44aebd2f9d91bd925d0dd1f5ce8a03488e2119ed [file] [log] [blame]
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <memory>
#include <mutex>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/method_handler_impl.h>
#include "pb_decode.h"
#include "pb_encode.h"
#include "src/core/ext/filters/client_channel/health/health.pb.h"
#include "src/cpp/server/health/default_health_check_service.h"
namespace grpc {
//
// DefaultHealthCheckService
//
DefaultHealthCheckService::DefaultHealthCheckService() {
services_map_[""].SetServingStatus(SERVING);
}
void DefaultHealthCheckService::SetServingStatus(
const grpc::string& service_name, bool serving) {
std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map.
serving = false;
}
services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
}
void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING;
std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(status);
}
}
void DefaultHealthCheckService::Shutdown() {
std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
shutdown_ = true;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(NOT_SERVING);
}
}
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const grpc::string& service_name) const {
std::lock_guard<std::mutex> lock(mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) {
return NOT_FOUND;
}
const ServiceData& service_data = it->second;
return service_data.GetServingStatus();
}
void DefaultHealthCheckService::RegisterCallHandler(
const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
std::unique_lock<std::mutex> lock(mu_);
ServiceData& service_data = services_map_[service_name];
service_data.AddCallHandler(handler /* copies ref */);
HealthCheckServiceImpl::CallHandler* h = handler.get();
h->SendHealth(std::move(handler), service_data.GetServingStatus());
}
void DefaultHealthCheckService::UnregisterCallHandler(
const grpc::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
std::unique_lock<std::mutex> lock(mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
service_data.RemoveCallHandler(handler);
if (service_data.Unused()) {
services_map_.erase(it);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq) {
GPR_ASSERT(impl_ == nullptr);
impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
return impl_.get();
}
//
// DefaultHealthCheckService::ServiceData
//
void DefaultHealthCheckService::ServiceData::SetServingStatus(
ServingStatus status) {
status_ = status;
for (auto& call_handler : call_handlers_) {
call_handler->SendHealth(call_handler /* copies ref */, status);
}
}
void DefaultHealthCheckService::ServiceData::AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
call_handlers_.insert(std::move(handler));
}
void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
call_handlers_.erase(handler);
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl
//
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq)
: database_(database), cq_(std::move(cq)) {
// Add Check() method.
AddMethod(new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
// Add Watch() method.
AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
// Create serving thread.
thread_ = std::unique_ptr<::grpc_core::Thread>(
new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
}
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
// We will reach here after the server starts shutting down.
shutdown_ = true;
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
cq_->Shutdown();
}
thread_->Join();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
// Request the calls we're interested in.
// We do this before starting the serving thread, so that we know it's
// done before server startup is complete.
CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
// Start serving thread.
thread_->Start();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service =
reinterpret_cast<HealthCheckServiceImpl*>(arg);
void* tag;
bool ok;
while (true) {
if (!service->cq_->Next(&tag, &ok)) {
// The completion queue is shutting down.
GPR_ASSERT(service->shutdown_);
break;
}
auto* next_step = static_cast<CallableTag*>(tag);
next_step->Run(ok);
}
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
const ByteBuffer& request, grpc::string* service_name) {
std::vector<Slice> slices;
if (!request.Dump(&slices).ok()) return false;
uint8_t* request_bytes = nullptr;
size_t request_size = 0;
grpc_health_v1_HealthCheckRequest request_struct;
request_struct.has_service = false;
if (slices.size() == 1) {
request_bytes = const_cast<uint8_t*>(slices[0].begin());
request_size = slices[0].size();
} else if (slices.size() > 1) {
request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
memcpy(copy_to, slices[i].begin(), slices[i].size());
copy_to += slices[i].size();
}
}
pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
bool decode_status = pb_decode(
&istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
if (slices.size() > 1) {
gpr_free(request_bytes);
}
if (!decode_status) return false;
*service_name = request_struct.has_service ? request_struct.service : "";
return true;
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
ServingStatus status, ByteBuffer* response) {
grpc_health_v1_HealthCheckResponse response_struct;
response_struct.has_status = true;
response_struct.status =
status == NOT_FOUND
? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
: status == SERVING
? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
: grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
pb_ostream_t ostream;
memset(&ostream, 0, sizeof(ostream));
pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
&response_struct);
grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
GRPC_SLICE_LENGTH(response_slice));
bool encode_status = pb_encode(
&ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
if (!encode_status) return false;
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
return true;
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request a Check() call.
handler->next_ =
CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
&handler->writer_, cq, cq, &handler->next_);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// The value of ok being false means that the server is shutting down.
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Process request.
gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
this);
grpc::string service_name;
grpc::Status status = Status::OK;
ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) {
status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
} else {
ServingStatus serving_status = database_->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
status = Status(StatusCode::NOT_FOUND, "service name unknown");
} else if (!service_->EncodeResponse(serving_status, &response)) {
status = Status(StatusCode::INTERNAL, "could not encode response");
}
}
// Send response.
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
if (status.ok()) {
writer_.Finish(response, status, &next_);
} else {
writer_.FinishWithError(status, &next_);
}
}
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
service_, this);
}
self.reset(); // To appease clang-tidy.
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request AsyncNotifyWhenDone().
handler->on_done_notified_ =
CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
std::placeholders::_1, std::placeholders::_2),
self /* copies ref */);
handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
// Request a Watch() call.
handler->next_ =
CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
&handler->stream_, cq, cq,
&handler->next_);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// Server shutting down.
//
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Parse request.
if (!service_->DecodeRequest(request_, &service_name_)) {
SendFinish(std::move(self),
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
return;
}
// Register the call for updates to the service.
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch started for service \"%s\" (handler: %p)",
service_, service_name_.c_str(), this);
database_->RegisterCallHandler(service_name_, std::move(self));
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
std::unique_lock<std::mutex> lock(send_mu_);
// If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes.
if (send_in_flight_) {
pending_status_ = status;
return;
}
// Start a send.
SendHealthLocked(std::move(self), status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
send_in_flight_ = true;
// Construct response.
ByteBuffer response;
bool success = service_->EncodeResponse(status, &response);
// Grab shutdown lock and send response.
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) {
SendFinishLocked(std::move(self), Status::CANCELLED);
return;
}
if (!success) {
SendFinishLocked(std::move(self),
Status(StatusCode::INTERNAL, "could not encode response"));
return;
}
next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Write(response, &next_);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
SendFinish(std::move(self), Status::CANCELLED);
return;
}
std::unique_lock<std::mutex> lock(send_mu_);
send_in_flight_ = false;
// If we got a new status since we started the last send, start a
// new send for it.
if (pending_status_ != NOT_FOUND) {
auto status = pending_status_;
pending_status_ = NOT_FOUND;
SendHealthLocked(std::move(self), status);
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
if (finish_called_) return;
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(status, &on_finish_done_);
finish_called_ = true;
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call finished (service_name: \"%s\", "
"handler: %p).",
service_, service_name_.c_str(), this);
}
self.reset(); // To appease clang-tidy.
}
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(ctx_.IsCancelled()));
database_->UnregisterCallHandler(service_name_, self);
SendFinish(std::move(self), Status::CANCELLED);
}
} // namespace grpc