blob: b4cb07f0f92f971a5f1c77acecd5c40642056328 [file] [log] [blame]
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
//
// grpc_deadline_state
//
// The on_complete callback used when sending a cancel_error batch down the
// filter stack. Yields the call combiner when the batch returns.
static void yield_call_combiner(void* arg, grpc_error* ignored) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"got on_complete from cancel_stream batch");
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
}
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void send_cancel_op_in_call_combiner(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
deadline_state, grpc_schedule_on_exec_ctx));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
elem->filter->start_transport_stream_op_batch(elem, batch);
}
// Timer callback.
static void timer_callback(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
if (error != GRPC_ERROR_CANCELLED) {
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
grpc_call_combiner_cancel(deadline_state->call_combiner,
GRPC_ERROR_REF(error));
GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
send_cancel_op_in_call_combiner, elem,
grpc_schedule_on_exec_ctx);
GRPC_CALL_COMBINER_START(deadline_state->call_combiner,
&deadline_state->timer_callback, error,
"deadline exceeded -- sending cancel_stream op");
} else {
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
}
}
// Starts the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void start_timer_if_needed(grpc_call_element* elem,
grpc_millis deadline) {
if (deadline == GRPC_MILLIS_INF_FUTURE) {
return;
}
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
grpc_closure* closure = nullptr;
switch (deadline_state->timer_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
// If we've already created and destroyed a timer, we always create a
// new closure: we have no other guarantee that the inlined closure is
// not in use (it may hold a pending call to timer_callback)
closure =
GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
break;
case GRPC_DEADLINE_STATE_INITIAL:
deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
closure =
GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
elem, grpc_schedule_on_exec_ctx);
break;
}
GPR_ASSERT(closure != nullptr);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(&deadline_state->timer, deadline, closure);
}
// Cancels the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
grpc_timer_cancel(&deadline_state->timer);
} else {
// timer was either in STATE_INITAL (nothing to cancel)
// OR in STATE_FINISHED (again nothing to cancel)
}
}
// Callback run when we receive trailing metadata.
static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
// Invoke the original callback.
GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
// Inject our own recv_trailing_metadata_ready callback into op.
static void inject_recv_trailing_metadata_ready(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
deadline_state->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
: elem(elem), deadline(deadline) {}
~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
bool in_call_combiner = false;
grpc_call_element* elem;
grpc_millis deadline;
grpc_closure closure;
};
static void start_timer_after_init(void* arg, grpc_error* error) {
struct start_timer_after_init_state* state =
static_cast<struct start_timer_after_init_state*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(state->elem->call_data);
if (!state->in_call_combiner) {
// We are initially called without holding the call combiner, so we
// need to bounce ourselves into it.
state->in_call_combiner = true;
GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
GRPC_ERROR_REF(error),
"scheduling deadline timer");
return;
}
grpc_core::Delete(state);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"done scheduling deadline timer");
}
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner,
grpc_millis deadline)
: call_stack(call_stack), call_combiner(call_combiner) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
if (deadline != GRPC_MILLIS_INF_FUTURE) {
// When the deadline passes, we indicate the failure by sending down
// an op with cancel_error set. However, we can't send down any ops
// until after the call stack is fully initialized. If we start the
// timer here, we have no guarantee that the timer won't pop before
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state =
grpc_core::New<start_timer_after_init_state>(elem, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
}
}
grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
void grpc_deadline_state_reset(grpc_call_element* elem,
grpc_millis new_deadline) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
cancel_timer_if_needed(deadline_state);
start_timer_if_needed(elem, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
if (op->cancel_stream) {
cancel_timer_if_needed(deadline_state);
} else {
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
//
// filter code
//
// Constructor for channel_data. Used for both client and server filters.
static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;
}
// Destructor for channel_data. Used for both client and server filters.
static void destroy_channel_elem(grpc_channel_element* elem) {}
// Call data used for both client and server filter.
typedef struct base_call_data {
grpc_deadline_state deadline_state;
} base_call_data;
// Additional call data used only for the server filter.
typedef struct server_call_data {
base_call_data base; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
grpc_metadata_batch* recv_initial_metadata;
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;
// Constructor for call_data. Used for both client and server filters.
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) grpc_deadline_state(
elem, args->call_stack, args->call_combiner, args->deadline);
return GRPC_ERROR_NONE;
}
// Destructor for call_data. Used for both client and server filters.
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
deadline_state->~grpc_deadline_state();
}
// Method for starting a call op for client filter.
static void client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
// Chain to next filter.
grpc_call_next_op(elem, op);
}
// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
// Method for starting a call op for server filter.
static void server_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
if (op->cancel_stream) {
cancel_timer_if_needed(&calld->base.deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our
// own callback into that hook.
if (op->recv_initial_metadata) {
calld->next_recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
// the timer.
// Note that we trigger this on recv_trailing_metadata, even though
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
grpc_call_next_op(elem, op);
}
const grpc_channel_filter grpc_client_deadline_filter = {
client_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(base_call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
grpc_channel_next_get_info,
"deadline",
};
const grpc_channel_filter grpc_server_deadline_filter = {
server_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(server_call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
grpc_channel_next_get_info,
"deadline",
};
bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
return grpc_channel_arg_get_bool(
grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
!grpc_channel_args_want_minimal_stack(channel_args));
}
static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
void* arg) {
return grpc_deadline_checking_enabled(
grpc_channel_stack_builder_get_channel_arguments(builder))
? grpc_channel_stack_builder_prepend_filter(
builder, static_cast<const grpc_channel_filter*>(arg),
nullptr, nullptr)
: true;
}
void grpc_deadline_filter_init(void) {
grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_deadline_filter, (void*)&grpc_client_deadline_filter);
grpc_channel_init_register_stage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_deadline_filter, (void*)&grpc_server_deadline_filter);
}
void grpc_deadline_filter_shutdown(void) {}