| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/cpp/ext/filters/census/client_filter.h" |
| |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "opencensus/stats/stats.h" |
| #include "src/core/lib/surface/call.h" |
| #include "src/cpp/ext/filters/census/grpc_plugin.h" |
| #include "src/cpp/ext/filters/census/measures.h" |
| |
| namespace grpc { |
| |
| constexpr uint32_t CensusClientCallData::kMaxTraceContextLen; |
| constexpr uint32_t CensusClientCallData::kMaxTagsLen; |
| |
| namespace { |
| |
| void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) { |
| if (b->idx.named.grpc_server_stats_bin != nullptr) { |
| ServerStatsDeserialize( |
| reinterpret_cast<const char*>(GRPC_SLICE_START_PTR( |
| GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))), |
| GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)), |
| elapsed_time); |
| grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin); |
| } |
| } |
| |
| } // namespace |
| |
| void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data, |
| grpc_error* error) { |
| grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); |
| CensusClientCallData* calld = |
| reinterpret_cast<CensusClientCallData*>(elem->call_data); |
| GPR_ASSERT(calld != nullptr); |
| if (error == GRPC_ERROR_NONE) { |
| GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr); |
| FilterTrailingMetadata(calld->recv_trailing_metadata_, |
| &calld->elapsed_time_); |
| } |
| GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_, |
| GRPC_ERROR_REF(error)); |
| } |
| |
| void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, |
| grpc_error* error) { |
| grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); |
| CensusClientCallData* calld = |
| reinterpret_cast<CensusClientCallData*>(elem->call_data); |
| CensusChannelData* channeld = |
| reinterpret_cast<CensusChannelData*>(elem->channel_data); |
| GPR_ASSERT(calld != nullptr); |
| GPR_ASSERT(channeld != nullptr); |
| // Stream messages are no longer valid after receiving trailing metadata. |
| if ((*calld->recv_message_) != nullptr) { |
| calld->recv_message_count_++; |
| } |
| GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); |
| } |
| |
| void CensusClientCallData::StartTransportStreamOpBatch( |
| grpc_call_element* elem, TransportStreamOpBatch* op) { |
| if (op->send_initial_metadata() != nullptr) { |
| census_context* ctxt = op->get_census_context(); |
| GenerateClientContext( |
| qualified_method_, &context_, |
| (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt)); |
| size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_, |
| kMaxTraceContextLen); |
| if (tracing_len > 0) { |
| GRPC_LOG_IF_ERROR( |
| "census grpc_filter", |
| grpc_metadata_batch_add_tail( |
| op->send_initial_metadata()->batch(), &tracing_bin_, |
| grpc_mdelem_from_slices( |
| GRPC_MDSTR_GRPC_TRACE_BIN, |
| grpc_slice_from_copied_buffer(tracing_buf_, tracing_len)))); |
| } |
| grpc_slice tags = grpc_empty_slice(); |
| // TODO: Add in tagging serialization. |
| size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); |
| if (encoded_tags_len > 0) { |
| GRPC_LOG_IF_ERROR( |
| "census grpc_filter", |
| grpc_metadata_batch_add_tail( |
| op->send_initial_metadata()->batch(), &stats_bin_, |
| grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags))); |
| } |
| } |
| |
| if (op->send_message() != nullptr) { |
| ++sent_message_count_; |
| } |
| if (op->recv_message() != nullptr) { |
| recv_message_ = op->op()->payload->recv_message.recv_message; |
| initial_on_done_recv_message_ = |
| op->op()->payload->recv_message.recv_message_ready; |
| op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; |
| } |
| if (op->recv_trailing_metadata() != nullptr) { |
| recv_trailing_metadata_ = op->recv_trailing_metadata()->batch(); |
| initial_on_done_recv_trailing_metadata_ = |
| op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
| op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &on_done_recv_trailing_metadata_; |
| } |
| // Call next op. |
| grpc_call_next_op(elem, op->op()); |
| } |
| |
| grpc_error* CensusClientCallData::Init(grpc_call_element* elem, |
| const grpc_call_element_args* args) { |
| path_ = grpc_slice_ref_internal(args->path); |
| start_time_ = absl::Now(); |
| method_ = GetMethod(&path_); |
| qualified_method_ = absl::StrCat("Sent.", method_); |
| GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, |
| grpc_schedule_on_exec_ctx); |
| GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_, |
| OnDoneRecvTrailingMetadataCb, elem, |
| grpc_schedule_on_exec_ctx); |
| return GRPC_ERROR_NONE; |
| } |
| |
| void CensusClientCallData::Destroy(grpc_call_element* elem, |
| const grpc_call_final_info* final_info, |
| grpc_closure* then_call_closure) { |
| const uint64_t request_size = GetOutgoingDataSize(final_info); |
| const uint64_t response_size = GetIncomingDataSize(final_info); |
| double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_); |
| ::opencensus::stats::Record( |
| {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)}, |
| {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)}, |
| {RpcClientRoundtripLatency(), latency_ms}, |
| {RpcClientServerLatency(), |
| ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}, |
| {RpcClientSentMessagesPerRpc(), sent_message_count_}, |
| {RpcClientReceivedMessagesPerRpc(), recv_message_count_}}, |
| {{ClientMethodTagKey(), method_}, |
| {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}}); |
| grpc_slice_unref_internal(path_); |
| context_.EndSpan(); |
| } |
| |
| } // namespace grpc |