Update bundled gRPC to 1.18.0 and nanopb to 0.3.6
And take the opportunity to align third_party/grpc/BUILD closer with
gRPC upstream's BUILD file (by adopting the same intermediate targets)
to make importing easier next time.
The MSYS2 patch is no longer needed judging by discussion in
https://groups.google.com/forum/#!msg/grpc-io/gd6sIuo6rjQ/eyEFarhABgAJ
Fixes #2804.
diff --git a/third_party/grpc/src/cpp/server/async_generic_service.cc b/third_party/grpc/src/cpp/server/async_generic_service.cc
index 6b9ea53..3061376 100644
--- a/third_party/grpc/src/cpp/server/async_generic_service.cc
+++ b/third_party/grpc/src/cpp/server/async_generic_service.cc
@@ -1,39 +1,24 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-#include <grpc++/generic/async_generic_service.h>
+#include <grpcpp/generic/async_generic_service.h>
-#include <grpc++/server.h>
+#include <grpcpp/server.h>
namespace grpc {
diff --git a/third_party/grpc/src/cpp/server/channel_argument_option.cc b/third_party/grpc/src/cpp/server/channel_argument_option.cc
new file mode 100644
index 0000000..4d6be90
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/channel_argument_option.cc
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/impl/channel_argument_option.h>
+
+namespace grpc {
+
+std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
+ const grpc::string& name, const grpc::string& value) {
+ class StringOption final : public ServerBuilderOption {
+ public:
+ StringOption(const grpc::string& name, const grpc::string& value)
+ : name_(name), value_(value) {}
+
+ virtual void UpdateArguments(ChannelArguments* args) override {
+ args->SetString(name_, value_);
+ }
+ virtual void UpdatePlugins(
+ std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {}
+
+ private:
+ const grpc::string name_;
+ const grpc::string value_;
+ };
+ return std::unique_ptr<ServerBuilderOption>(new StringOption(name, value));
+}
+
+std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
+ const grpc::string& name, int value) {
+ class IntOption final : public ServerBuilderOption {
+ public:
+ IntOption(const grpc::string& name, int value)
+ : name_(name), value_(value) {}
+
+ virtual void UpdateArguments(ChannelArguments* args) override {
+ args->SetInt(name_, value_);
+ }
+ virtual void UpdatePlugins(
+ std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {}
+
+ private:
+ const grpc::string name_;
+ const int value_;
+ };
+ return std::unique_ptr<ServerBuilderOption>(new IntOption(name, value));
+}
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/channelz/channelz_service.cc b/third_party/grpc/src/cpp/server/channelz/channelz_service.cc
new file mode 100644
index 0000000..c44a9ac
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/channelz/channelz_service.cc
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/channelz/channelz_service.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+namespace grpc {
+
+Status ChannelzService::GetTopChannels(
+ ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
+ channelz::v1::GetTopChannelsResponse* response) {
+ char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_top_channels returned null");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServers(
+ ServerContext* unused, const channelz::v1::GetServersRequest* request,
+ channelz::v1::GetServersResponse* response) {
+ char* json_str = grpc_channelz_get_servers(request->start_server_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_servers returned null");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServer(ServerContext* unused,
+ const channelz::v1::GetServerRequest* request,
+ channelz::v1::GetServerResponse* response) {
+ char* json_str = grpc_channelz_get_server(request->server_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server returned null");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServerSockets(
+ ServerContext* unused, const channelz::v1::GetServerSocketsRequest* request,
+ channelz::v1::GetServerSocketsResponse* response) {
+ char* json_str = grpc_channelz_get_server_sockets(
+ request->server_id(), request->start_socket_id(), request->max_results());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server_sockets returned null");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetChannel(
+ ServerContext* unused, const channelz::v1::GetChannelRequest* request,
+ channelz::v1::GetChannelResponse* response) {
+ char* json_str = grpc_channelz_get_channel(request->channel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetSubchannel(
+ ServerContext* unused, const channelz::v1::GetSubchannelRequest* request,
+ channelz::v1::GetSubchannelResponse* response) {
+ char* json_str = grpc_channelz_get_subchannel(request->subchannel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND,
+ "No object found for that SubchannelId");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetSocket(ServerContext* unused,
+ const channelz::v1::GetSocketRequest* request,
+ channelz::v1::GetSocketResponse* response) {
+ char* json_str = grpc_channelz_get_socket(request->socket_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND, "No object found for that SocketId");
+ }
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/channelz/channelz_service.h b/third_party/grpc/src/cpp/server/channelz/channelz_service.h
new file mode 100644
index 0000000..b4a66ba
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/channelz/channelz_service.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/grpcpp.h>
+#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
+
+namespace grpc {
+
+class ChannelzService final : public channelz::v1::Channelz::Service {
+ private:
+ // implementation of GetTopChannels rpc
+ Status GetTopChannels(
+ ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
+ channelz::v1::GetTopChannelsResponse* response) override;
+ // implementation of GetServers rpc
+ Status GetServers(ServerContext* unused,
+ const channelz::v1::GetServersRequest* request,
+ channelz::v1::GetServersResponse* response) override;
+ // implementation of GetServer rpc
+ Status GetServer(ServerContext* unused,
+ const channelz::v1::GetServerRequest* request,
+ channelz::v1::GetServerResponse* response) override;
+ // implementation of GetServerSockets rpc
+ Status GetServerSockets(
+ ServerContext* unused,
+ const channelz::v1::GetServerSocketsRequest* request,
+ channelz::v1::GetServerSocketsResponse* response) override;
+ // implementation of GetChannel rpc
+ Status GetChannel(ServerContext* unused,
+ const channelz::v1::GetChannelRequest* request,
+ channelz::v1::GetChannelResponse* response) override;
+ // implementation of GetSubchannel rpc
+ Status GetSubchannel(ServerContext* unused,
+ const channelz::v1::GetSubchannelRequest* request,
+ channelz::v1::GetSubchannelResponse* response) override;
+ // implementation of GetSocket rpc
+ Status GetSocket(ServerContext* unused,
+ const channelz::v1::GetSocketRequest* request,
+ channelz::v1::GetSocketResponse* response) override;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
diff --git a/third_party/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/third_party/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
new file mode 100644
index 0000000..b93e5b5
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/channelz_service_plugin.h>
+#include <grpcpp/impl/server_builder_plugin.h>
+#include <grpcpp/impl/server_initializer.h>
+#include <grpcpp/server.h>
+
+#include "src/cpp/server/channelz/channelz_service.h"
+
+namespace grpc {
+namespace channelz {
+namespace experimental {
+
+class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
+ public:
+ ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
+
+ grpc::string name() override { return "channelz_service"; }
+
+ void InitServer(grpc::ServerInitializer* si) override {
+ si->RegisterService(channelz_service_);
+ }
+
+ void Finish(grpc::ServerInitializer* si) override {}
+
+ void ChangeArguments(const grpc::string& name, void* value) override {}
+
+ bool has_sync_methods() const override {
+ if (channelz_service_) {
+ return channelz_service_->has_synchronous_methods();
+ }
+ return false;
+ }
+
+ bool has_async_methods() const override {
+ if (channelz_service_) {
+ return channelz_service_->has_async_methods();
+ }
+ return false;
+ }
+
+ private:
+ std::shared_ptr<grpc::ChannelzService> channelz_service_;
+};
+
+static std::unique_ptr< ::grpc::ServerBuilderPlugin>
+CreateChannelzServicePlugin() {
+ return std::unique_ptr< ::grpc::ServerBuilderPlugin>(
+ new ChannelzServicePlugin());
+}
+
+void InitChannelzService() {
+ static bool already_here = false;
+ if (already_here) return;
+ already_here = true;
+ ::grpc::ServerBuilder::InternalAddPluginFactory(&CreateChannelzServicePlugin);
+}
+
+} // namespace experimental
+} // namespace channelz
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/create_default_thread_pool.cc b/third_party/grpc/src/cpp/server/create_default_thread_pool.cc
index f3b07ec..8ca3e32 100644
--- a/third_party/grpc/src/cpp/server/create_default_thread_pool.cc
+++ b/third_party/grpc/src/cpp/server/create_default_thread_pool.cc
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -38,13 +23,22 @@
#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL
namespace grpc {
+namespace {
-ThreadPoolInterface* CreateDefaultThreadPool() {
+ThreadPoolInterface* CreateDefaultThreadPoolImpl() {
int cores = gpr_cpu_num_cores();
if (!cores) cores = 4;
return new DynamicThreadPool(cores);
}
+CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl;
+
+} // namespace
+
+ThreadPoolInterface* CreateDefaultThreadPool() { return g_ctp_impl(); }
+
+void SetCreateThreadPool(CreateThreadPoolFunc func) { g_ctp_impl = func; }
+
} // namespace grpc
#endif // !GRPC_CUSTOM_DEFAULT_THREAD_POOL
diff --git a/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc b/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc
index 4b226c2..ef99d64 100644
--- a/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc
+++ b/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc
@@ -1,55 +1,46 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
-
#include "src/cpp/server/dynamic_thread_pool.h"
+#include <mutex>
+
+#include <grpc/support/log.h>
+
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
+
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
: pool_(pool),
- thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
- this)) {}
-DynamicThreadPool::DynamicThread::~DynamicThread() {
- thd_->join();
- thd_.reset();
+ thd_("grpcpp_dynamic_pool",
+ [](void* th) {
+ static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
+ },
+ this) {
+ thd_.Start();
}
+DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->ThreadFunc();
// Now that we have killed ourselves, we should reduce the thread count
- grpc::unique_lock<grpc::mutex> lock(pool_->mu_);
+ std::unique_lock<std::mutex> lock(pool_->mu_);
pool_->nthreads_--;
// Move ourselves to dead list
pool_->dead_threads_.push_back(this);
@@ -62,7 +53,7 @@
void DynamicThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
@@ -91,7 +82,7 @@
nthreads_(0),
threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
- grpc::lock_guard<grpc::mutex> lock(mu_);
+ std::lock_guard<std::mutex> lock(mu_);
nthreads_++;
new DynamicThread(this);
}
@@ -104,7 +95,7 @@
}
DynamicThreadPool::~DynamicThreadPool() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
shutdown_ = true;
cv_.notify_all();
while (nthreads_ != 0) {
@@ -114,7 +105,7 @@
}
void DynamicThreadPool::Add(const std::function<void()>& callback) {
- grpc::lock_guard<grpc::mutex> lock(mu_);
+ std::lock_guard<std::mutex> lock(mu_);
// Add works to the callbacks list
callbacks_.push(callback);
// Increase pool size or notify as needed
diff --git a/third_party/grpc/src/cpp/server/dynamic_thread_pool.h b/third_party/grpc/src/cpp/server/dynamic_thread_pool.h
index 5ba7533..5df8cf2 100644
--- a/third_party/grpc/src/cpp/server/dynamic_thread_pool.h
+++ b/third_party/grpc/src/cpp/server/dynamic_thread_pool.h
@@ -1,57 +1,43 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
#define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
+#include <condition_variable>
#include <list>
#include <memory>
+#include <mutex>
#include <queue>
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
-#include <grpc++/support/config.h>
+#include <grpcpp/support/config.h>
+#include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
-class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface {
+class DynamicThreadPool final : public ThreadPoolInterface {
public:
explicit DynamicThreadPool(int reserve_threads);
~DynamicThreadPool();
- void Add(const std::function<void()>& callback) GRPC_OVERRIDE;
+ void Add(const std::function<void()>& callback) override;
private:
class DynamicThread {
@@ -61,12 +47,12 @@
private:
DynamicThreadPool* pool_;
- std::unique_ptr<grpc::thread> thd_;
+ grpc_core::Thread thd_;
void ThreadFunc();
};
- grpc::mutex mu_;
- grpc::condition_variable cv_;
- grpc::condition_variable shutdown_cv_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ std::condition_variable shutdown_cv_;
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
int reserve_threads_;
diff --git a/third_party/grpc/src/cpp/server/fixed_size_thread_pool.cc b/third_party/grpc/src/cpp/server/fixed_size_thread_pool.cc
deleted file mode 100644
index 2bdc44b..0000000
--- a/third_party/grpc/src/cpp/server/fixed_size_thread_pool.cc
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
-#include "src/cpp/server/fixed_size_thread_pool.h"
-
-namespace grpc {
-
-void FixedSizeThreadPool::ThreadFunc() {
- for (;;) {
- // Wait until work is available or we are shutting down.
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_ && callbacks_.empty()) {
- cv_.wait(lock);
- }
- // Drain callbacks before considering shutdown to ensure all work
- // gets completed.
- if (!callbacks_.empty()) {
- auto cb = callbacks_.front();
- callbacks_.pop();
- lock.unlock();
- cb();
- } else if (shutdown_) {
- return;
- }
- }
-}
-
-FixedSizeThreadPool::FixedSizeThreadPool(int num_threads) : shutdown_(false) {
- for (int i = 0; i < num_threads; i++) {
- threads_.push_back(
- new grpc::thread(&FixedSizeThreadPool::ThreadFunc, this));
- }
-}
-
-FixedSizeThreadPool::~FixedSizeThreadPool() {
- {
- grpc::lock_guard<grpc::mutex> lock(mu_);
- shutdown_ = true;
- cv_.notify_all();
- }
- for (auto t = threads_.begin(); t != threads_.end(); t++) {
- (*t)->join();
- delete *t;
- }
-}
-
-void FixedSizeThreadPool::Add(const std::function<void()>& callback) {
- grpc::lock_guard<grpc::mutex> lock(mu_);
- callbacks_.push(callback);
- cv_.notify_one();
-}
-
-} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/fixed_size_thread_pool.h b/third_party/grpc/src/cpp/server/fixed_size_thread_pool.h
deleted file mode 100644
index 394ae58..0000000
--- a/third_party/grpc/src/cpp/server/fixed_size_thread_pool.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
-#define GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
-
-#include <queue>
-#include <vector>
-
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
-#include <grpc++/support/config.h>
-
-#include "src/cpp/server/thread_pool_interface.h"
-
-namespace grpc {
-
-class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface {
- public:
- explicit FixedSizeThreadPool(int num_threads);
- ~FixedSizeThreadPool();
-
- void Add(const std::function<void()>& callback) GRPC_OVERRIDE;
-
- private:
- grpc::mutex mu_;
- grpc::condition_variable cv_;
- bool shutdown_;
- std::queue<std::function<void()>> callbacks_;
- std::vector<grpc::thread*> threads_;
-
- void ThreadFunc();
-};
-
-} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
diff --git a/third_party/grpc/src/cpp/server/health/default_health_check_service.cc b/third_party/grpc/src/cpp/server/health/default_health_check_service.cc
new file mode 100644
index 0000000..44aebd2
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/health/default_health_check_service.cc
@@ -0,0 +1,499 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <memory>
+#include <mutex>
+
+#include <grpc/slice.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpcpp/impl/codegen/method_handler_impl.h>
+
+#include "pb_decode.h"
+#include "pb_encode.h"
+#include "src/core/ext/filters/client_channel/health/health.pb.h"
+#include "src/cpp/server/health/default_health_check_service.h"
+
+namespace grpc {
+
+//
+// DefaultHealthCheckService
+//
+
+DefaultHealthCheckService::DefaultHealthCheckService() {
+ services_map_[""].SetServingStatus(SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(
+ const grpc::string& service_name, bool serving) {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (shutdown_) {
+ // Set to NOT_SERVING in case service_name is not in the map.
+ serving = false;
+ }
+ services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+ const ServingStatus status = serving ? SERVING : NOT_SERVING;
+ std::unique_lock<std::mutex> lock(mu_);
+ if (shutdown_) {
+ return;
+ }
+ for (auto& p : services_map_) {
+ ServiceData& service_data = p.second;
+ service_data.SetServingStatus(status);
+ }
+}
+
+void DefaultHealthCheckService::Shutdown() {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (shutdown_) {
+ return;
+ }
+ shutdown_ = true;
+ for (auto& p : services_map_) {
+ ServiceData& service_data = p.second;
+ service_data.SetServingStatus(NOT_SERVING);
+ }
+}
+
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
+ const grpc::string& service_name) const {
+ std::lock_guard<std::mutex> lock(mu_);
+ auto it = services_map_.find(service_name);
+ if (it == services_map_.end()) {
+ return NOT_FOUND;
+ }
+ const ServiceData& service_data = it->second;
+ return service_data.GetServingStatus();
+}
+
+void DefaultHealthCheckService::RegisterCallHandler(
+ const grpc::string& service_name,
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ std::unique_lock<std::mutex> lock(mu_);
+ ServiceData& service_data = services_map_[service_name];
+ service_data.AddCallHandler(handler /* copies ref */);
+ HealthCheckServiceImpl::CallHandler* h = handler.get();
+ h->SendHealth(std::move(handler), service_data.GetServingStatus());
+}
+
+void DefaultHealthCheckService::UnregisterCallHandler(
+ const grpc::string& service_name,
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
+ std::unique_lock<std::mutex> lock(mu_);
+ auto it = services_map_.find(service_name);
+ if (it == services_map_.end()) return;
+ ServiceData& service_data = it->second;
+ service_data.RemoveCallHandler(handler);
+ if (service_data.Unused()) {
+ services_map_.erase(it);
+ }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq) {
+ GPR_ASSERT(impl_ == nullptr);
+ impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
+ return impl_.get();
+}
+
+//
+// DefaultHealthCheckService::ServiceData
+//
+
+void DefaultHealthCheckService::ServiceData::SetServingStatus(
+ ServingStatus status) {
+ status_ = status;
+ for (auto& call_handler : call_handlers_) {
+ call_handler->SendHealth(call_handler /* copies ref */, status);
+ }
+}
+
+void DefaultHealthCheckService::ServiceData::AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ call_handlers_.insert(std::move(handler));
+}
+
+void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
+ call_handlers_.erase(handler);
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl
+//
+
+namespace {
+const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
+const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
+} // namespace
+
+DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
+ DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : database_(database), cq_(std::move(cq)) {
+ // Add Check() method.
+ AddMethod(new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
+ // Add Watch() method.
+ AddMethod(new internal::RpcServiceMethod(
+ kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
+ // Create serving thread.
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ cq_->Shutdown();
+ }
+ thread_->Join();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
+ // Request the calls we're interested in.
+ // We do this before starting the serving thread, so that we know it's
+ // done before server startup is complete.
+ CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
+ WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
+ // Start serving thread.
+ thread_->Start();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
+ HealthCheckServiceImpl* service =
+ reinterpret_cast<HealthCheckServiceImpl*>(arg);
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+}
+
+bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
+ const ByteBuffer& request, grpc::string* service_name) {
+ std::vector<Slice> slices;
+ if (!request.Dump(&slices).ok()) return false;
+ uint8_t* request_bytes = nullptr;
+ size_t request_size = 0;
+ grpc_health_v1_HealthCheckRequest request_struct;
+ request_struct.has_service = false;
+ if (slices.size() == 1) {
+ request_bytes = const_cast<uint8_t*>(slices[0].begin());
+ request_size = slices[0].size();
+ } else if (slices.size() > 1) {
+ request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
+ uint8_t* copy_to = request_bytes;
+ for (size_t i = 0; i < slices.size(); i++) {
+ memcpy(copy_to, slices[i].begin(), slices[i].size());
+ copy_to += slices[i].size();
+ }
+ }
+ pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
+ bool decode_status = pb_decode(
+ &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
+ if (slices.size() > 1) {
+ gpr_free(request_bytes);
+ }
+ if (!decode_status) return false;
+ *service_name = request_struct.has_service ? request_struct.service : "";
+ return true;
+}
+
+bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
+ ServingStatus status, ByteBuffer* response) {
+ grpc_health_v1_HealthCheckResponse response_struct;
+ response_struct.has_status = true;
+ response_struct.status =
+ status == NOT_FOUND
+ ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
+ : status == SERVING
+ ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
+ : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
+ pb_ostream_t ostream;
+ memset(&ostream, 0, sizeof(ostream));
+ pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
+ &response_struct);
+ grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
+ ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
+ GRPC_SLICE_LENGTH(response_slice));
+ bool encode_status = pb_encode(
+ &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
+ if (!encode_status) return false;
+ Slice encoded_response(response_slice, Slice::STEAL_REF);
+ ByteBuffer response_buffer(&encoded_response, 1);
+ response->Swap(&response_buffer);
+ return true;
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service) {
+ std::shared_ptr<CallHandler> self =
+ std::make_shared<CheckCallHandler>(cq, database, service);
+ CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
+ {
+ std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+ if (service->shutdown_) return;
+ // Request a Check() call.
+ handler->next_ =
+ CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
+ &handler->writer_, cq, cq, &handler->next_);
+ }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service)
+ : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ // The value of ok being false means that the server is shutting down.
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, database_, service_);
+ // Process request.
+ gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
+ this);
+ grpc::string service_name;
+ grpc::Status status = Status::OK;
+ ByteBuffer response;
+ if (!service_->DecodeRequest(request_, &service_name)) {
+ status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
+ } else {
+ ServingStatus serving_status = database_->GetServingStatus(service_name);
+ if (serving_status == NOT_FOUND) {
+ status = Status(StatusCode::NOT_FOUND, "service name unknown");
+ } else if (!service_->EncodeResponse(serving_status, &response)) {
+ status = Status(StatusCode::INTERNAL, "could not encode response");
+ }
+ }
+ // Send response.
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (!service_->shutdown_) {
+ next_ =
+ CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ if (status.ok()) {
+ writer_.Finish(response, status, &next_);
+ } else {
+ writer_.FinishWithError(status, &next_);
+ }
+ }
+ }
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
+ service_, this);
+ }
+ self.reset(); // To appease clang-tidy.
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service) {
+ std::shared_ptr<CallHandler> self =
+ std::make_shared<WatchCallHandler>(cq, database, service);
+ WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
+ {
+ std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+ if (service->shutdown_) return;
+ // Request AsyncNotifyWhenDone().
+ handler->on_done_notified_ =
+ CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ self /* copies ref */);
+ handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
+ // Request a Watch() call.
+ handler->next_ =
+ CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
+ &handler->stream_, cq, cq,
+ &handler->next_);
+ }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service)
+ : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ // Server shutting down.
+ //
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, database_, service_);
+ // Parse request.
+ if (!service_->DecodeRequest(request_, &service_name_)) {
+ SendFinish(std::move(self),
+ Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
+ return;
+ }
+ // Register the call for updates to the service.
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
+ service_, service_name_.c_str(), this);
+ database_->RegisterCallHandler(service_name_, std::move(self));
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
+ std::unique_lock<std::mutex> lock(send_mu_);
+ // If there's already a send in flight, cache the new status, and
+ // we'll start a new send for it when the one in flight completes.
+ if (send_in_flight_) {
+ pending_status_ = status;
+ return;
+ }
+ // Start a send.
+ SendHealthLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
+ send_in_flight_ = true;
+ // Construct response.
+ ByteBuffer response;
+ bool success = service_->EncodeResponse(status, &response);
+ // Grab shutdown lock and send response.
+ std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ SendFinishLocked(std::move(self), Status::CANCELLED);
+ return;
+ }
+ if (!success) {
+ SendFinishLocked(std::move(self),
+ Status(StatusCode::INTERNAL, "could not encode response"));
+ return;
+ }
+ next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ SendFinish(std::move(self), Status::CANCELLED);
+ return;
+ }
+ std::unique_lock<std::mutex> lock(send_mu_);
+ send_in_flight_ = false;
+ // If we got a new status since we started the last send, start a
+ // new send for it.
+ if (pending_status_ != NOT_FOUND) {
+ auto status = pending_status_;
+ pending_status_ = NOT_FOUND;
+ SendHealthLocked(std::move(self), status);
+ }
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
+ if (finish_called_) return;
+ std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) return;
+ SendFinishLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
+ on_finish_done_ =
+ CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Finish(status, &on_finish_done_);
+ finish_called_ = true;
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch call finished (service_name: \"%s\", "
+ "handler: %p).",
+ service_, service_name_.c_str(), this);
+ }
+ self.reset(); // To appease clang-tidy.
+}
+
+// TODO(roth): This method currently assumes that there will be only one
+// thread polling the cq and invoking the corresponding callbacks. If
+// that changes, we will need to add synchronization here.
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(ctx_.IsCancelled()));
+ database_->UnregisterCallHandler(service_name_, self);
+ SendFinish(std::move(self), Status::CANCELLED);
+}
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/health/default_health_check_service.h b/third_party/grpc/src/cpp/server/health/default_health_check_service.h
new file mode 100644
index 0000000..9551cd2
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/health/default_health_check_service.h
@@ -0,0 +1,284 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
+#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
+
+#include <atomic>
+#include <mutex>
+#include <set>
+
+#include <grpc/support/log.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/health_check_service_interface.h>
+#include <grpcpp/impl/codegen/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/service_type.h>
+#include <grpcpp/support/byte_buffer.h>
+
+#include "src/core/lib/gprpp/thd.h"
+
+namespace grpc {
+
+// Default implementation of HealthCheckServiceInterface. Server will create and
+// own it.
+class DefaultHealthCheckService final : public HealthCheckServiceInterface {
+ public:
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
+ // The service impl to register with the server.
+ class HealthCheckServiceImpl : public Service {
+ public:
+ // Base class for call handlers.
+ class CallHandler {
+ public:
+ virtual ~CallHandler() = default;
+ virtual void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) = 0;
+ };
+
+ HealthCheckServiceImpl(DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq);
+
+ ~HealthCheckServiceImpl();
+
+ void StartServingThread();
+
+ private:
+ // A tag that can be called with a bool argument. It's tailored for
+ // CallHandler's use. Before being used, it should be constructed with a
+ // method of CallHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function
+ // can only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<CallHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no
+ // longer owned by this tag after this method is invoked.
+ void Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+ }
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<CallHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<CallHandler> handler_;
+ };
+
+ // Call handler for Check method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class CheckCallHandler : public CallHandler {
+ public:
+ // Instantiates a CheckCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // Not used for Check.
+ void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) override {}
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
+ GenericServerAsyncResponseWriter writer_;
+ ServerContext ctx_;
+
+ CallableTag next_;
+ };
+
+ // Call handler for Watch method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class WatchCallHandler : public CallHandler {
+ public:
+ // Instantiates a WatchCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) override;
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Requires holding send_mu_.
+ void SendHealthLocked(std::shared_ptr<CallHandler> self,
+ ServingStatus status);
+
+ // When sending a health result finishes.
+ void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
+
+ // Requires holding service_->cq_shutdown_mu_.
+ void SendFinishLocked(std::shared_ptr<CallHandler> self,
+ const Status& status);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
+ grpc::string service_name_;
+ GenericServerAsyncWriter stream_;
+ ServerContext ctx_;
+
+ std::mutex send_mu_;
+ bool send_in_flight_ = false; // Guarded by mu_.
+ ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
+
+ bool finish_called_ = false;
+ CallableTag next_;
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Serve(void* arg);
+
+ // Returns true on success.
+ static bool DecodeRequest(const ByteBuffer& request,
+ grpc::string* service_name);
+ static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
+
+ // Needed to appease Windows compilers, which don't seem to allow
+ // nested classes to access protected members in the parent's
+ // superclass.
+ using Service::RequestAsyncServerStreaming;
+ using Service::RequestAsyncUnary;
+
+ DefaultHealthCheckService* database_;
+ std::unique_ptr<ServerCompletionQueue> cq_;
+
+ // To synchronize the operations related to shutdown state of cq_, so that
+ // we don't enqueue new tags into cq_ after it is already shut down.
+ std::mutex cq_shutdown_mu_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
+ };
+
+ DefaultHealthCheckService();
+
+ void SetServingStatus(const grpc::string& service_name,
+ bool serving) override;
+ void SetServingStatus(bool serving) override;
+
+ void Shutdown() override;
+
+ ServingStatus GetServingStatus(const grpc::string& service_name) const;
+
+ HealthCheckServiceImpl* GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq);
+
+ private:
+ // Stores the current serving status of a service and any call
+ // handlers registered for updates when the service's status changes.
+ class ServiceData {
+ public:
+ void SetServingStatus(ServingStatus status);
+ ServingStatus GetServingStatus() const { return status_; }
+ void AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ void RemoveCallHandler(
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
+ bool Unused() const {
+ return call_handlers_.empty() && status_ == NOT_FOUND;
+ }
+
+ private:
+ ServingStatus status_ = NOT_FOUND;
+ std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
+ call_handlers_;
+ };
+
+ void RegisterCallHandler(
+ const grpc::string& service_name,
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
+ void UnregisterCallHandler(
+ const grpc::string& service_name,
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
+
+ mutable std::mutex mu_;
+ bool shutdown_ = false; // Guarded by mu_.
+ std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
+ std::unique_ptr<HealthCheckServiceImpl> impl_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
diff --git a/third_party/grpc/src/cpp/server/health/health_check_service.cc b/third_party/grpc/src/cpp/server/health/health_check_service.cc
new file mode 100644
index 0000000..a0fa2d6
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/health/health_check_service.cc
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/health_check_service_interface.h>
+
+namespace grpc {
+namespace {
+bool g_grpc_default_health_check_service_enabled = false;
+} // namespace
+
+bool DefaultHealthCheckServiceEnabled() {
+ return g_grpc_default_health_check_service_enabled;
+}
+
+void EnableDefaultHealthCheckService(bool enable) {
+ g_grpc_default_health_check_service_enabled = enable;
+}
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc b/third_party/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc
new file mode 100644
index 0000000..7148265
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/ext/health_check_service_server_builder_option.h>
+
+namespace grpc {
+
+HealthCheckServiceServerBuilderOption::HealthCheckServiceServerBuilderOption(
+ std::unique_ptr<HealthCheckServiceInterface> hc)
+ : hc_(std::move(hc)) {}
+// Hand over hc_ to the server.
+void HealthCheckServiceServerBuilderOption::UpdateArguments(
+ ChannelArguments* args) {
+ args->SetPointer(kHealthCheckServiceInterfaceArg, hc_.release());
+}
+
+void HealthCheckServiceServerBuilderOption::UpdatePlugins(
+ std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) {}
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/insecure_server_credentials.cc b/third_party/grpc/src/cpp/server/insecure_server_credentials.cc
index ef3cae5..7d749dd 100644
--- a/third_party/grpc/src/cpp/server/insecure_server_credentials.cc
+++ b/third_party/grpc/src/cpp/server/insecure_server_credentials.cc
@@ -1,51 +1,35 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-#include <grpc++/security/server_credentials.h>
+#include <grpcpp/security/server_credentials.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
namespace grpc {
namespace {
-class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
+class InsecureServerCredentialsImpl final : public ServerCredentials {
public:
- int AddPortToServer(const grpc::string& addr,
- grpc_server* server) GRPC_OVERRIDE {
+ int AddPortToServer(const grpc::string& addr, grpc_server* server) override {
return grpc_server_add_insecure_http2_port(server, addr.c_str());
}
void SetAuthMetadataProcessor(
- const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE {
+ const std::shared_ptr<AuthMetadataProcessor>& processor) override {
(void)processor;
GPR_ASSERT(0); // Should not be called on InsecureServerCredentials.
}
diff --git a/third_party/grpc/src/cpp/server/load_reporter/constants.h b/third_party/grpc/src/cpp/server/load_reporter/constants.h
new file mode 100644
index 0000000..00ad794
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/constants.h
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+namespace grpc {
+namespace load_reporter {
+
+// TODO(juanlishen): Update the version number with the PR number every time
+// there is any change to the server load reporter.
+constexpr uint32_t kVersion = 15853;
+
+// TODO(juanlishen): This window size is from the internal spec for the load
+// reporter. Need to ask the gRPC LB team whether we should make this and the
+// fetching interval configurable.
+constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
+constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
+
+constexpr size_t kLbIdLength = 8;
+constexpr size_t kIpv4AddressLength = 8;
+constexpr size_t kIpv6AddressLength = 32;
+
+constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
+
+// Call statuses.
+
+constexpr char kCallStatusOk[] = "OK";
+constexpr char kCallStatusServerError[] = "5XX";
+constexpr char kCallStatusClientError[] = "4XX";
+
+// Tag keys.
+
+constexpr char kTagKeyToken[] = "token";
+constexpr char kTagKeyHost[] = "host";
+constexpr char kTagKeyUserId[] = "user_id";
+constexpr char kTagKeyStatus[] = "status";
+constexpr char kTagKeyMetricName[] = "metric_name";
+
+// Measure names.
+
+constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
+constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
+constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
+constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
+constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
+constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
+
+// View names.
+
+constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
+constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
+constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
+constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
+constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
+constexpr char kViewOtherCallMetricCount[] =
+ "grpc.io/lb_view/other_call_metric_count";
+constexpr char kViewOtherCallMetricValue[] =
+ "grpc.io/lb_view/other_call_metric_value";
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
diff --git a/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats.h b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
new file mode 100644
index 0000000..f514b07
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <utility>
+
+namespace grpc {
+namespace load_reporter {
+
+// Reads the CPU stats (in a pair of busy and total numbers) from the system.
+// The units of the stats should be the same.
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
diff --git a/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
new file mode 100644
index 0000000..9c1fd0c
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_LINUX
+
+#include <cstdio>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILE* fp;
+ fp = fopen("/proc/stat", "r");
+ uint64_t user, nice, system, idle;
+ fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle);
+ fclose(fp);
+ busy = user + nice + system;
+ total = busy + idle;
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_LINUX
diff --git a/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
new file mode 100644
index 0000000..dbdde30
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_APPLE
+
+#include <mach/mach.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ host_cpu_load_info_data_t cpuinfo;
+ mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
+ if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
+ (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
+ for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
+ busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_APPLE
diff --git a/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
new file mode 100644
index 0000000..80fb8b6
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
+
+#include <grpc/support/log.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ gpr_log(GPR_ERROR,
+ "Platforms other than Linux, Windows, and MacOS are not supported.");
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
diff --git a/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
new file mode 100644
index 0000000..0a98e84
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS
+
+#include <windows.h>
+#include <cstdint>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+namespace {
+
+uint64_t FiletimeToInt(const FILETIME& ft) {
+ ULARGE_INTEGER i;
+ i.LowPart = ft.dwLowDateTime;
+ i.HighPart = ft.dwHighDateTime;
+ return i.QuadPart;
+}
+
+} // namespace
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILETIME idle, kernel, user;
+ if (GetSystemTimes(&idle, &kernel, &user) != 0) {
+ total = FiletimeToInt(kernel) + FiletimeToInt(user);
+ busy = total - FiletimeToInt(idle);
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_WINDOWS
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_data_store.cc b/third_party/grpc/src/cpp/server/load_reporter/load_data_store.cc
new file mode 100644
index 0000000..594473f
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_data_store.cc
@@ -0,0 +1,338 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
+#include <cstdlib>
+#include <set>
+#include <unordered_map>
+#include <vector>
+
+#include "src/core/lib/iomgr/socket_utils.h"
+#include "src/cpp/server/load_reporter/load_data_store.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Some helper functions.
+namespace {
+
+// Given a map from type K to a set of value type V, finds the set associated
+// with the given key and erases the value from the set. If the set becomes
+// empty, also erases the key-set pair. Returns true if the value is erased
+// successfully.
+template <typename K, typename V>
+bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
+ const K& key, const V& value) {
+ auto it = map.find(key);
+ if (it != map.end()) {
+ size_t erased = it->second.erase(value);
+ if (it->second.size() == 0) {
+ map.erase(it);
+ }
+ return erased;
+ }
+ return false;
+};
+
+// Given a map from type K to a set of value type V, removes the given key and
+// the associated set, and returns the set. Returns an empty set if the key is
+// not found.
+template <typename K, typename V>
+std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
+ const K& key) {
+ auto it = map.find(key);
+ if (it != map.end()) {
+ auto set = std::move(it->second);
+ map.erase(it);
+ return set;
+ }
+ return {};
+};
+
+// From a non-empty container, returns a pointer to a random element.
+template <typename C>
+const typename C::value_type* RandomElement(const C& container) {
+ GPR_ASSERT(!container.empty());
+ auto it = container.begin();
+ std::advance(it, std::rand() % container.size());
+ return &(*it);
+}
+
+} // namespace
+
+LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token,
+ grpc::string user_id)
+ : user_id_(std::move(user_id)) {
+ GPR_ASSERT(client_ip_and_token.size() >= 2);
+ int ip_hex_size;
+ GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
+ &ip_hex_size) == 1);
+ GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
+ ip_hex_size == kIpv6AddressLength);
+ size_t cur_pos = 2;
+ client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
+ cur_pos += ip_hex_size;
+ if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
+ lb_id_ = kInvalidLbId;
+ lb_tag_ = "";
+ } else {
+ lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
+ lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
+ }
+}
+
+grpc::string LoadRecordKey::GetClientIpBytes() const {
+ if (client_ip_hex_.empty()) {
+ return "";
+ } else if (client_ip_hex_.size() == kIpv4AddressLength) {
+ uint32_t ip_bytes;
+ if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
+ gpr_log(GPR_ERROR,
+ "Can't parse client IP (%s) from a hex string to an integer.",
+ client_ip_hex_.c_str());
+ return "";
+ }
+ ip_bytes = grpc_htonl(ip_bytes);
+ return grpc::string(reinterpret_cast<const char*>(&ip_bytes),
+ sizeof(ip_bytes));
+ } else if (client_ip_hex_.size() == kIpv6AddressLength) {
+ uint32_t ip_bytes[4];
+ for (size_t i = 0; i < 4; ++i) {
+ if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
+ ip_bytes + i) != 1) {
+ gpr_log(
+ GPR_ERROR,
+ "Can't parse client IP part (%s) from a hex string to an integer.",
+ client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
+ return "";
+ }
+ ip_bytes[i] = grpc_htonl(ip_bytes[i]);
+ }
+ return grpc::string(reinterpret_cast<const char*>(ip_bytes),
+ sizeof(ip_bytes));
+ } else {
+ GPR_UNREACHABLE_CODE(return "");
+ }
+}
+
+LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
+ double total_metric_value) {
+ call_metrics_.emplace(std::move(metric_name),
+ CallMetricValue(num_calls, total_metric_value));
+}
+
+void PerBalancerStore::MergeRow(const LoadRecordKey& key,
+ const LoadRecordValue& value) {
+ // During suspension, the load data received will be dropped.
+ if (!suspended_) {
+ load_record_map_[key].MergeFrom(value);
+ gpr_log(GPR_DEBUG,
+ "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
+ this, key.ToString().c_str(), value.ToString().c_str());
+ } else {
+ gpr_log(GPR_DEBUG,
+ "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
+ this, key.ToString().c_str(), value.ToString().c_str());
+ }
+ // We always keep track of num_calls_in_progress_, so that when this
+ // store is resumed, we still have a correct value of
+ // num_calls_in_progress_.
+ GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) +
+ value.GetNumCallsInProgressDelta() >=
+ 0);
+ num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
+}
+
+void PerBalancerStore::Suspend() {
+ suspended_ = true;
+ load_record_map_.clear();
+ gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
+}
+
+void PerBalancerStore::Resume() {
+ suspended_ = false;
+ gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
+}
+
+uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
+ GPR_ASSERT(!suspended_);
+ last_reported_num_calls_in_progress_ = num_calls_in_progress_;
+ return num_calls_in_progress_;
+}
+
+void PerHostStore::ReportStreamCreated(const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ GPR_ASSERT(lb_id != kInvalidLbId);
+ SetUpForNewLbId(lb_id, load_key);
+ // Prior to this one, there was no load balancer receiving report, so we may
+ // have unassigned orphaned stores to assign to this new balancer.
+ // TODO(juanlishen): If the load key of this new stream is the same with
+ // some previously adopted orphan store, we may want to take the orphan to
+ // this stream. Need to discuss with LB team.
+ if (assigned_stores_.size() == 1) {
+ for (const auto& p : per_balancer_stores_) {
+ const grpc::string& other_lb_id = p.first;
+ const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
+ if (other_lb_id != lb_id) {
+ orphaned_store->Resume();
+ AssignOrphanedStore(orphaned_store.get(), lb_id);
+ }
+ }
+ }
+ // The first connected balancer will adopt the kInvalidLbId.
+ if (per_balancer_stores_.size() == 1) {
+ SetUpForNewLbId(kInvalidLbId, "");
+ ReportStreamClosed(kInvalidLbId);
+ }
+}
+
+void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) {
+ auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
+ GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
+ // Remove this closed stream from our records.
+ GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
+ load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
+ lb_id));
+ std::set<PerBalancerStore*> orphaned_stores =
+ UnorderedMapOfSetExtract(assigned_stores_, lb_id);
+ // The stores that were assigned to this balancer are orphaned now. They
+ // should be re-assigned to other balancers which are still receiving reports.
+ for (PerBalancerStore* orphaned_store : orphaned_stores) {
+ const grpc::string* new_receiver = nullptr;
+ auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
+ if (it != load_key_to_receiving_lb_ids_.end()) {
+ // First, try to pick from the active balancers with the same load key.
+ new_receiver = RandomElement(it->second);
+ } else if (!assigned_stores_.empty()) {
+ // If failed, pick from all the remaining active balancers.
+ new_receiver = &(RandomElement(assigned_stores_)->first);
+ }
+ if (new_receiver != nullptr) {
+ AssignOrphanedStore(orphaned_store, *new_receiver);
+ } else {
+ // Load data for an LB ID that can't be assigned to any stream should
+ // be dropped.
+ orphaned_store->Suspend();
+ }
+ }
+}
+
+PerBalancerStore* PerHostStore::FindPerBalancerStore(
+ const grpc::string& lb_id) const {
+ return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
+ ? per_balancer_stores_.find(lb_id)->second.get()
+ : nullptr;
+}
+
+const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
+ const grpc::string& lb_id) const {
+ auto it = assigned_stores_.find(lb_id);
+ if (it == assigned_stores_.end()) return nullptr;
+ return &(it->second);
+}
+
+void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
+ const grpc::string& new_receiver) {
+ auto it = assigned_stores_.find(new_receiver);
+ GPR_ASSERT(it != assigned_stores_.end());
+ it->second.insert(orphaned_store);
+ gpr_log(GPR_INFO,
+ "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
+ " ID of %s to new receiver %s",
+ this, orphaned_store, orphaned_store->lb_id().c_str(),
+ new_receiver.c_str());
+}
+
+void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ // The top-level caller (i.e., LoadReportService) should guarantee the
+ // lb_id is unique for each reporting stream.
+ GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
+ GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
+ load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
+ std::unique_ptr<PerBalancerStore> per_balancer_store(
+ new PerBalancerStore(lb_id, load_key));
+ assigned_stores_[lb_id] = {per_balancer_store.get()};
+ per_balancer_stores_[lb_id] = std::move(per_balancer_store);
+}
+
+PerBalancerStore* LoadDataStore::FindPerBalancerStore(
+ const string& hostname, const string& lb_id) const {
+ auto it = per_host_stores_.find(hostname);
+ if (it != per_host_stores_.end()) {
+ const PerHostStore& per_host_store = it->second;
+ return per_host_store.FindPerBalancerStore(lb_id);
+ } else {
+ return nullptr;
+ }
+}
+
+void LoadDataStore::MergeRow(const grpc::string& hostname,
+ const LoadRecordKey& key,
+ const LoadRecordValue& value) {
+ PerBalancerStore* per_balancer_store =
+ FindPerBalancerStore(hostname, key.lb_id());
+ if (per_balancer_store != nullptr) {
+ per_balancer_store->MergeRow(key, value);
+ return;
+ }
+ // Unknown LB ID. Track it until its number of in-progress calls drops to
+ // zero.
+ int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
+ if (in_progress_delta != 0) {
+ auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
+ if (it_tracker == unknown_balancer_id_trackers_.end()) {
+ gpr_log(
+ GPR_DEBUG,
+ "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
+ this, key.lb_id().c_str());
+ unknown_balancer_id_trackers_.insert(
+ {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
+ } else if ((it_tracker->second += in_progress_delta) == 0) {
+ unknown_balancer_id_trackers_.erase(it_tracker);
+ gpr_log(GPR_DEBUG,
+ "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
+ this, key.lb_id().c_str());
+ }
+ }
+}
+
+const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
+ const grpc::string& hostname, const grpc::string& lb_id) {
+ auto it = per_host_stores_.find(hostname);
+ if (it == per_host_stores_.end()) return nullptr;
+ return it->second.GetAssignedStores(lb_id);
+}
+
+void LoadDataStore::ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
+}
+
+void LoadDataStore::ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id) {
+ auto it_per_host_store = per_host_stores_.find(hostname);
+ GPR_ASSERT(it_per_host_store != per_host_stores_.end());
+ it_per_host_store->second.ReportStreamClosed(lb_id);
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_data_store.h b/third_party/grpc/src/cpp/server/load_reporter/load_data_store.h
new file mode 100644
index 0000000..dc08ecf
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_data_store.h
@@ -0,0 +1,348 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
+
+#include <grpc/support/port_platform.h>
+
+#include <memory>
+#include <set>
+#include <unordered_map>
+
+#include <grpc/support/log.h>
+#include <grpcpp/impl/codegen/config.h>
+
+#include "src/cpp/server/load_reporter/constants.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The load data storage is organized in hierarchy. The LoadDataStore is the
+// top-level data store. In LoadDataStore, for each host we keep a
+// PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
+// PerBalancerStore maintains a map of load records, mapping from LoadRecordKey
+// to LoadRecordValue. The LoadRecordValue contains a map of customized call
+// metrics, mapping from a call metric name to the CallMetricValue.
+
+// The value of a customized call metric.
+class CallMetricValue {
+ public:
+ explicit CallMetricValue(uint64_t num_calls = 0,
+ double total_metric_value = 0)
+ : num_calls_(num_calls), total_metric_value_(total_metric_value) {}
+
+ void MergeFrom(CallMetricValue other) {
+ num_calls_ += other.num_calls_;
+ total_metric_value_ += other.total_metric_value_;
+ }
+
+ // Getters.
+ uint64_t num_calls() const { return num_calls_; }
+ double total_metric_value() const { return total_metric_value_; }
+
+ private:
+ // The number of calls that finished with this metric.
+ uint64_t num_calls_ = 0;
+ // The sum of metric values across all the calls that finished with this
+ // metric.
+ double total_metric_value_ = 0;
+};
+
+// The key of a load record.
+class LoadRecordKey {
+ public:
+ LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id,
+ grpc::string client_ip_hex)
+ : lb_id_(std::move(lb_id)),
+ lb_tag_(std::move(lb_tag)),
+ user_id_(std::move(user_id)),
+ client_ip_hex_(std::move(client_ip_hex)) {}
+
+ // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
+ LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id);
+
+ grpc::string ToString() const {
+ return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
+ ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
+ "]";
+ }
+
+ bool operator==(const LoadRecordKey& other) const {
+ return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ &&
+ user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
+ }
+
+ // Gets the client IP bytes in network order (i.e., big-endian).
+ grpc::string GetClientIpBytes() const;
+
+ // Getters.
+ const grpc::string& lb_id() const { return lb_id_; }
+ const grpc::string& lb_tag() const { return lb_tag_; }
+ const grpc::string& user_id() const { return user_id_; }
+ const grpc::string& client_ip_hex() const { return client_ip_hex_; }
+
+ struct Hasher {
+ void hash_combine(size_t* seed, const grpc::string& k) const {
+ *seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) +
+ (*seed >> 2);
+ }
+
+ size_t operator()(const LoadRecordKey& k) const {
+ size_t h = 0;
+ hash_combine(&h, k.lb_id_);
+ hash_combine(&h, k.lb_tag_);
+ hash_combine(&h, k.user_id_);
+ hash_combine(&h, k.client_ip_hex_);
+ return h;
+ }
+ };
+
+ private:
+ grpc::string lb_id_;
+ grpc::string lb_tag_;
+ grpc::string user_id_;
+ grpc::string client_ip_hex_;
+};
+
+// The value of a load record.
+class LoadRecordValue {
+ public:
+ explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
+ uint64_t error_count = 0, uint64_t bytes_sent = 0,
+ uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
+ : start_count_(start_count),
+ ok_count_(ok_count),
+ error_count_(error_count),
+ bytes_sent_(bytes_sent),
+ bytes_recv_(bytes_recv),
+ latency_ms_(latency_ms) {}
+
+ LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
+ double total_metric_value);
+
+ void MergeFrom(const LoadRecordValue& other) {
+ start_count_ += other.start_count_;
+ ok_count_ += other.ok_count_;
+ error_count_ += other.error_count_;
+ bytes_sent_ += other.bytes_sent_;
+ bytes_recv_ += other.bytes_recv_;
+ latency_ms_ += other.latency_ms_;
+ for (const auto& p : other.call_metrics_) {
+ const grpc::string& key = p.first;
+ const CallMetricValue& value = p.second;
+ call_metrics_[key].MergeFrom(value);
+ }
+ }
+
+ int64_t GetNumCallsInProgressDelta() const {
+ return static_cast<int64_t>(start_count_ - ok_count_ - error_count_);
+ }
+
+ grpc::string ToString() const {
+ return "[start_count_=" + grpc::to_string(start_count_) +
+ ", ok_count_=" + grpc::to_string(ok_count_) +
+ ", error_count_=" + grpc::to_string(error_count_) +
+ ", bytes_sent_=" + grpc::to_string(bytes_sent_) +
+ ", bytes_recv_=" + grpc::to_string(bytes_recv_) +
+ ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
+ grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
+ }
+
+ bool InsertCallMetric(const grpc::string& metric_name,
+ const CallMetricValue& metric_value) {
+ return call_metrics_.insert({metric_name, metric_value}).second;
+ }
+
+ // Getters.
+ uint64_t start_count() const { return start_count_; }
+ uint64_t ok_count() const { return ok_count_; }
+ uint64_t error_count() const { return error_count_; }
+ uint64_t bytes_sent() const { return bytes_sent_; }
+ uint64_t bytes_recv() const { return bytes_recv_; }
+ uint64_t latency_ms() const { return latency_ms_; }
+ const std::unordered_map<grpc::string, CallMetricValue>& call_metrics()
+ const {
+ return call_metrics_;
+ }
+
+ private:
+ uint64_t start_count_ = 0;
+ uint64_t ok_count_ = 0;
+ uint64_t error_count_ = 0;
+ uint64_t bytes_sent_ = 0;
+ uint64_t bytes_recv_ = 0;
+ uint64_t latency_ms_ = 0;
+ std::unordered_map<grpc::string, CallMetricValue> call_metrics_;
+};
+
+// Stores the data associated with a particular LB ID.
+class PerBalancerStore {
+ public:
+ using LoadRecordMap =
+ std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>;
+
+ PerBalancerStore(grpc::string lb_id, grpc::string load_key)
+ : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {}
+
+ // Merge a load record with the given key and value if the store is not
+ // suspended.
+ void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value);
+
+ // Suspend this store, so that no detailed load data will be recorded.
+ void Suspend();
+ // Resume this store from suspension.
+ void Resume();
+ // Is this store suspended or not?
+ bool IsSuspended() const { return suspended_; }
+
+ bool IsNumCallsInProgressChangedSinceLastReport() const {
+ return num_calls_in_progress_ != last_reported_num_calls_in_progress_;
+ }
+
+ uint64_t GetNumCallsInProgressForReport();
+
+ grpc::string ToString() {
+ return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ +
+ "]";
+ }
+
+ void ClearLoadRecordMap() { load_record_map_.clear(); }
+
+ // Getters.
+ const grpc::string& lb_id() const { return lb_id_; }
+ const grpc::string& load_key() const { return load_key_; }
+ const LoadRecordMap& load_record_map() const { return load_record_map_; }
+
+ private:
+ grpc::string lb_id_;
+ // TODO(juanlishen): Use bytestring protobuf type?
+ grpc::string load_key_;
+ LoadRecordMap load_record_map_;
+ uint64_t num_calls_in_progress_ = 0;
+ uint64_t last_reported_num_calls_in_progress_ = 0;
+ bool suspended_ = false;
+};
+
+// Stores the data associated with a particular host.
+class PerHostStore {
+ public:
+ // When a report stream is created, a PerBalancerStore is created for the
+ // LB ID (guaranteed unique) associated with that stream. If it is the only
+ // active store, adopt all the orphaned stores. If it is the first created
+ // store, adopt the store of kInvalidLbId.
+ void ReportStreamCreated(const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // When a report stream is closed, the PerBalancerStores assigned to the
+ // associate LB ID need to be re-assigned to other active balancers,
+ // ideally with the same load key. If there is no active balancer, we have
+ // to suspend those stores and drop the incoming load data until they are
+ // resumed.
+ void ReportStreamClosed(const grpc::string& lb_id);
+
+ // Returns null if not found. Caller doesn't own the returned store.
+ PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const;
+
+ // Returns null if lb_id is not found. The returned pointer points to the
+ // underlying data structure, which is not owned by the caller.
+ const std::set<PerBalancerStore*>* GetAssignedStores(
+ const grpc::string& lb_id) const;
+
+ private:
+ // Creates a PerBalancerStore for the given LB ID, assigns the store to
+ // itself, and records the LB ID to the load key.
+ void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key);
+
+ void AssignOrphanedStore(PerBalancerStore* orphaned_store,
+ const grpc::string& new_receiver);
+
+ std::unordered_map<grpc::string, std::set<grpc::string>>
+ load_key_to_receiving_lb_ids_;
+
+ // Key: LB ID. The key set includes all the LB IDs that have been
+ // allocated for reporting streams so far.
+ // Value: the unique pointer to the PerBalancerStore of the LB ID.
+ std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>>
+ per_balancer_stores_;
+
+ // Key: LB ID. The key set includes the LB IDs of the balancers that are
+ // currently receiving report.
+ // Value: the set of raw pointers to the PerBalancerStores assigned to the LB
+ // ID. Note that the sets in assigned_stores_ form a division of the value set
+ // of per_balancer_stores_.
+ std::unordered_map<grpc::string, std::set<PerBalancerStore*>>
+ assigned_stores_;
+};
+
+// Thread-unsafe two-level bookkeeper of all the load data.
+// Note: We never remove any store objects from this class, as per the
+// current spec. That's because premature removal of the store objects
+// may lead to loss of critical information, e.g., mapping from lb_id to
+// load_key, and the number of in-progress calls. Such loss will cause
+// information inconsistency when the balancer is re-connected. Keeping
+// all the stores should be fine for PerHostStore, since we assume there
+// should only be a few hostnames. But it's a potential problem for
+// PerBalancerStore.
+class LoadDataStore {
+ public:
+ // Returns null if not found. Caller doesn't own the returned store.
+ PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname,
+ const grpc::string& lb_id) const;
+
+ // Returns null if hostname or lb_id is not found. The returned pointer points
+ // to the underlying data structure, which is not owned by the caller.
+ const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname,
+ const string& lb_id);
+
+ // If a PerBalancerStore can be found by the hostname and LB ID in
+ // LoadRecordKey, the load data will be merged to that store. Otherwise,
+ // only track the number of the in-progress calls for this unknown LB ID.
+ void MergeRow(const grpc::string& hostname, const LoadRecordKey& key,
+ const LoadRecordValue& value);
+
+ // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
+ // with some received load data but unknown to this load data store)?
+ bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const {
+ return unknown_balancer_id_trackers_.find(lb_id) !=
+ unknown_balancer_id_trackers_.end();
+ }
+
+ // Wrapper around PerHostStore::ReportStreamCreated.
+ void ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // Wrapper around PerHostStore::ReportStreamClosed.
+ void ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id);
+
+ private:
+ // Buffered data that was fetched from Census but hasn't been sent to
+ // balancer. We need to keep this data ourselves because Census will
+ // delete the data once it's returned.
+ std::unordered_map<grpc::string, PerHostStore> per_host_stores_;
+
+ // Tracks the number of in-progress calls for each unknown LB ID.
+ std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporter.cc b/third_party/grpc/src/cpp/server/load_reporter/load_reporter.cc
new file mode 100644
index 0000000..464063a
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporter.cc
@@ -0,0 +1,509 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdint.h>
+#include <stdio.h>
+#include <chrono>
+#include <ctime>
+#include <iterator>
+
+#include "src/cpp/server/load_reporter/constants.h"
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+#include "src/cpp/server/load_reporter/load_reporter.h"
+
+#include "opencensus/stats/internal/set_aggregation_window.h"
+
+namespace grpc {
+namespace load_reporter {
+
+CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
+ return GetCpuStatsImpl();
+}
+
+CensusViewProvider::CensusViewProvider()
+ : tag_key_token_(::opencensus::stats::TagKey::Register(kTagKeyToken)),
+ tag_key_host_(::opencensus::stats::TagKey::Register(kTagKeyHost)),
+ tag_key_user_id_(::opencensus::stats::TagKey::Register(kTagKeyUserId)),
+ tag_key_status_(::opencensus::stats::TagKey::Register(kTagKeyStatus)),
+ tag_key_metric_name_(
+ ::opencensus::stats::TagKey::Register(kTagKeyMetricName)) {
+ // One view related to starting a call.
+ auto vd_start_count =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewStartCount)
+ .set_measure(kMeasureStartCount)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .set_description(
+ "Delta count of calls started broken down by <token, host, "
+ "user_id>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
+ view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
+ // Four views related to ending a call.
+ // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
+ // measure), it's infeasible to prepare fake data for testing. That's because
+ // the OpenCensus API to make up view data will add the input data as separate
+ // measurements instead of setting the data values directly.
+ auto vd_end_count =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewEndCount)
+ .set_measure(kMeasureEndCount)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta count of calls ended broken down by <token, host, "
+ "user_id, status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
+ view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
+ auto vd_end_bytes_sent =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewEndBytesSent)
+ .set_measure(kMeasureEndBytesSent)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta sum of bytes sent broken down by <token, host, user_id, "
+ "status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
+ view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
+ auto vd_end_bytes_received =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewEndBytesReceived)
+ .set_measure(kMeasureEndBytesReceived)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta sum of bytes received broken down by <token, host, "
+ "user_id, status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
+ view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
+ auto vd_end_latency_ms =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewEndLatencyMs)
+ .set_measure(kMeasureEndLatencyMs)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_status_)
+ .set_description(
+ "Delta sum of latency in ms broken down by <token, host, "
+ "user_id, status>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
+ view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
+ // Two views related to other call metrics.
+ auto vd_metric_call_count =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewOtherCallMetricCount)
+ .set_measure(kMeasureOtherCallMetric)
+ .set_aggregation(::opencensus::stats::Aggregation::Count())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_metric_name_)
+ .set_description(
+ "Delta count of calls broken down by <token, host, user_id, "
+ "metric_name>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
+ view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
+ auto vd_metric_value =
+ ::opencensus::stats::ViewDescriptor()
+ .set_name(kViewOtherCallMetricValue)
+ .set_measure(kMeasureOtherCallMetric)
+ .set_aggregation(::opencensus::stats::Aggregation::Sum())
+ .add_column(tag_key_token_)
+ .add_column(tag_key_host_)
+ .add_column(tag_key_user_id_)
+ .add_column(tag_key_metric_name_)
+ .set_description(
+ "Delta sum of call metric value broken down "
+ "by <token, host, user_id, metric_name>.");
+ ::opencensus::stats::SetAggregationWindow(
+ ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
+ view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
+}
+
+double CensusViewProvider::GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values) {
+ auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
+ GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kDouble);
+ auto it_row = it_vd->second.double_data().find(tag_values);
+ GPR_ASSERT(it_row != it_vd->second.double_data().end());
+ return it_row->second;
+}
+
+uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values) {
+ auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
+ GPR_ASSERT(it_vd != view_data_map.end());
+ GPR_ASSERT(it_vd->second.type() ==
+ ::opencensus::stats::ViewData::Type::kInt64);
+ auto it_row = it_vd->second.int_data().find(tag_values);
+ GPR_ASSERT(it_row != it_vd->second.int_data().end());
+ GPR_ASSERT(it_row->second >= 0);
+ return it_row->second;
+}
+
+CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
+ for (const auto& p : view_descriptor_map()) {
+ const grpc::string& view_name = p.first;
+ const ::opencensus::stats::ViewDescriptor& vd = p.second;
+ // We need to use pair's piecewise ctor here, otherwise the deleted copy
+ // ctor of View will be called.
+ view_map_.emplace(std::piecewise_construct,
+ std::forward_as_tuple(view_name),
+ std::forward_as_tuple(vd));
+ }
+}
+
+CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
+ gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
+ ViewDataMap view_data_map;
+ for (auto& p : view_map_) {
+ const grpc::string& view_name = p.first;
+ ::opencensus::stats::View& view = p.second;
+ if (view.IsValid()) {
+ view_data_map.emplace(view_name, view.GetData());
+ gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
+ view_name.c_str());
+ } else {
+ gpr_log(
+ GPR_DEBUG,
+ "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
+ this, view_name.c_str());
+ }
+ }
+ return view_data_map;
+}
+
+grpc::string LoadReporter::GenerateLbId() {
+ while (true) {
+ if (next_lb_id_ > UINT32_MAX) {
+ gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
+ this);
+ return "";
+ }
+ int64_t lb_id = next_lb_id_++;
+ // Overflow should never happen.
+ GPR_ASSERT(lb_id >= 0);
+ // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
+ char buf[kLbIdLength + 1];
+ snprintf(buf, sizeof(buf), "%08lx", lb_id);
+ grpc::string lb_id_str(buf, kLbIdLength);
+ // The client may send requests with LB ID that has never been allocated
+ // by this load reporter. Those IDs are tracked and will be skipped when
+ // we generate a new ID.
+ if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
+ return lb_id_str;
+ }
+ }
+}
+
+::grpc::lb::v1::LoadBalancingFeedback
+LoadReporter::GenerateLoadBalancingFeedback() {
+ std::unique_lock<std::mutex> lock(feedback_mu_);
+ auto now = std::chrono::system_clock::now();
+ // Discard records outside the window until there is only one record
+ // outside the window, which is used as the base for difference.
+ while (feedback_records_.size() > 1 &&
+ !IsRecordInWindow(feedback_records_[1], now)) {
+ feedback_records_.pop_front();
+ }
+ if (feedback_records_.size() < 2) {
+ return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
+ }
+ // Find the longest range with valid ends.
+ auto oldest = feedback_records_.begin();
+ auto newest = feedback_records_.end() - 1;
+ while (std::distance(oldest, newest) > 0 &&
+ (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
+ // A zero limit means that the system info reading was failed, so these
+ // records can't be used to calculate CPU utilization.
+ if (newest->cpu_limit == 0) --newest;
+ if (oldest->cpu_limit == 0) ++oldest;
+ }
+ if (std::distance(oldest, newest) < 1 ||
+ oldest->end_time == newest->end_time ||
+ newest->cpu_limit == oldest->cpu_limit) {
+ return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
+ }
+ uint64_t rpcs = 0;
+ uint64_t errors = 0;
+ for (auto p = newest; p != oldest; --p) {
+ // Because these two numbers are counters, the oldest record shouldn't be
+ // included.
+ rpcs += p->rpcs;
+ errors += p->errors;
+ }
+ double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
+ double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
+ std::chrono::duration<double> duration_seconds =
+ newest->end_time - oldest->end_time;
+ lock.unlock();
+ ::grpc::lb::v1::LoadBalancingFeedback feedback;
+ feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
+ feedback.set_calls_per_second(
+ static_cast<float>(rpcs / duration_seconds.count()));
+ feedback.set_errors_per_second(
+ static_cast<float>(errors / duration_seconds.count()));
+ return feedback;
+}
+
+::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
+LoadReporter::GenerateLoads(const grpc::string& hostname,
+ const grpc::string& lb_id) {
+ std::lock_guard<std::mutex> lock(store_mu_);
+ auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
+ GPR_ASSERT(assigned_stores != nullptr);
+ GPR_ASSERT(!assigned_stores->empty());
+ ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
+ for (PerBalancerStore* per_balancer_store : *assigned_stores) {
+ GPR_ASSERT(!per_balancer_store->IsSuspended());
+ if (!per_balancer_store->load_record_map().empty()) {
+ for (const auto& p : per_balancer_store->load_record_map()) {
+ const auto& key = p.first;
+ const auto& value = p.second;
+ auto load = loads.Add();
+ load->set_load_balance_tag(key.lb_tag());
+ load->set_user_id(key.user_id());
+ load->set_client_ip_address(key.GetClientIpBytes());
+ load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
+ load->set_num_calls_finished_without_error(
+ static_cast<int64_t>(value.ok_count()));
+ load->set_num_calls_finished_with_error(
+ static_cast<int64_t>(value.error_count()));
+ load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
+ load->set_total_bytes_received(
+ static_cast<int64_t>(value.bytes_recv()));
+ load->mutable_total_latency()->set_seconds(
+ static_cast<int64_t>(value.latency_ms() / 1000));
+ load->mutable_total_latency()->set_nanos(
+ (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
+ for (const auto& p : value.call_metrics()) {
+ const grpc::string& metric_name = p.first;
+ const CallMetricValue& metric_value = p.second;
+ auto call_metric_data = load->add_metric_data();
+ call_metric_data->set_metric_name(metric_name);
+ call_metric_data->set_num_calls_finished_with_metric(
+ metric_value.num_calls());
+ call_metric_data->set_total_metric_value(
+ metric_value.total_metric_value());
+ }
+ if (per_balancer_store->lb_id() != lb_id) {
+ // This per-balancer store is an orphan assigned to this receiving
+ // balancer.
+ AttachOrphanLoadId(load, *per_balancer_store);
+ }
+ }
+ per_balancer_store->ClearLoadRecordMap();
+ }
+ if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
+ auto load = loads.Add();
+ load->set_num_calls_in_progress(
+ per_balancer_store->GetNumCallsInProgressForReport());
+ if (per_balancer_store->lb_id() != lb_id) {
+ // This per-balancer store is an orphan assigned to this receiving
+ // balancer.
+ AttachOrphanLoadId(load, *per_balancer_store);
+ }
+ }
+ }
+ return loads;
+}
+
+void LoadReporter::AttachOrphanLoadId(
+ ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
+ if (per_balancer_store.lb_id() == kInvalidLbId) {
+ load->set_load_key_unknown(true);
+ } else {
+ // We shouldn't set load_key_unknown to any value in this case because
+ // load_key_unknown and orphaned_load_identifier are under an oneof struct.
+ load->mutable_orphaned_load_identifier()->set_load_key(
+ per_balancer_store.load_key());
+ load->mutable_orphaned_load_identifier()->set_load_balancer_id(
+ per_balancer_store.lb_id());
+ }
+}
+
+void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
+ CpuStatsProvider::CpuStatsSample cpu_stats;
+ if (cpu_stats_provider_ != nullptr) {
+ cpu_stats = cpu_stats_provider_->GetCpuStats();
+ } else {
+ // This will make the load balancing feedback generation a no-op.
+ cpu_stats = {0, 0};
+ }
+ std::unique_lock<std::mutex> lock(feedback_mu_);
+ feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
+ cpu_stats.first, cpu_stats.second);
+}
+
+void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key) {
+ std::lock_guard<std::mutex> lock(store_mu_);
+ load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
+ gpr_log(GPR_INFO,
+ "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
+ this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
+}
+
+void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id) {
+ std::lock_guard<std::mutex> lock(store_mu_);
+ load_data_store_.ReportStreamClosed(hostname, lb_id);
+ gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
+ hostname.c_str(), lb_id.c_str());
+}
+
+void LoadReporter::ProcessViewDataCallStart(
+ const CensusViewProvider::ViewDataMap& view_data_map) {
+ auto it = view_data_map.find(kViewStartCount);
+ if (it != view_data_map.end()) {
+ for (const auto& p : it->second.int_data()) {
+ const std::vector<grpc::string>& tag_values = p.first;
+ const uint64_t start_count = static_cast<uint64_t>(p.second);
+ const grpc::string& client_ip_and_token = tag_values[0];
+ const grpc::string& host = tag_values[1];
+ const grpc::string& user_id = tag_values[2];
+ LoadRecordKey key(client_ip_and_token, user_id);
+ LoadRecordValue value = LoadRecordValue(start_count);
+ {
+ std::unique_lock<std::mutex> lock(store_mu_);
+ load_data_store_.MergeRow(host, key, value);
+ }
+ }
+ }
+}
+
+void LoadReporter::ProcessViewDataCallEnd(
+ const CensusViewProvider::ViewDataMap& view_data_map) {
+ uint64_t total_end_count = 0;
+ uint64_t total_error_count = 0;
+ auto it = view_data_map.find(kViewEndCount);
+ if (it != view_data_map.end()) {
+ for (const auto& p : it->second.int_data()) {
+ const std::vector<grpc::string>& tag_values = p.first;
+ const uint64_t end_count = static_cast<uint64_t>(p.second);
+ const grpc::string& client_ip_and_token = tag_values[0];
+ const grpc::string& host = tag_values[1];
+ const grpc::string& user_id = tag_values[2];
+ const grpc::string& status = tag_values[3];
+ // This is due to a bug reported internally of Java server load reporting
+ // implementation.
+ // TODO(juanlishen): Check whether this situation happens in OSS C++.
+ if (client_ip_and_token.size() == 0) {
+ gpr_log(GPR_DEBUG,
+ "Skipping processing Opencensus record with empty "
+ "client_ip_and_token tag.");
+ continue;
+ }
+ LoadRecordKey key(client_ip_and_token, user_id);
+ const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
+ tag_values);
+ const uint64_t bytes_received =
+ CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndBytesReceived,
+ sizeof(kViewEndBytesReceived) - 1, tag_values);
+ const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
+ view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
+ tag_values);
+ uint64_t ok_count = 0;
+ uint64_t error_count = 0;
+ total_end_count += end_count;
+ if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
+ ok_count = end_count;
+ } else {
+ error_count = end_count;
+ total_error_count += end_count;
+ }
+ LoadRecordValue value = LoadRecordValue(
+ 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
+ {
+ std::unique_lock<std::mutex> lock(store_mu_);
+ load_data_store_.MergeRow(host, key, value);
+ }
+ }
+ }
+ AppendNewFeedbackRecord(total_end_count, total_error_count);
+}
+
+void LoadReporter::ProcessViewDataOtherCallMetrics(
+ const CensusViewProvider::ViewDataMap& view_data_map) {
+ auto it = view_data_map.find(kViewOtherCallMetricCount);
+ if (it != view_data_map.end()) {
+ for (const auto& p : it->second.int_data()) {
+ const std::vector<grpc::string>& tag_values = p.first;
+ const int64_t num_calls = p.second;
+ const grpc::string& client_ip_and_token = tag_values[0];
+ const grpc::string& host = tag_values[1];
+ const grpc::string& user_id = tag_values[2];
+ const grpc::string& metric_name = tag_values[3];
+ LoadRecordKey key(client_ip_and_token, user_id);
+ const double total_metric_value =
+ CensusViewProvider::GetRelatedViewDataRowDouble(
+ view_data_map, kViewOtherCallMetricValue,
+ sizeof(kViewOtherCallMetricValue) - 1, tag_values);
+ LoadRecordValue value = LoadRecordValue(
+ metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
+ {
+ std::unique_lock<std::mutex> lock(store_mu_);
+ load_data_store_.MergeRow(host, key, value);
+ }
+ }
+ }
+}
+
+void LoadReporter::FetchAndSample() {
+ gpr_log(GPR_DEBUG,
+ "[LR %p] Starts fetching Census view data and sampling LB feedback "
+ "record.",
+ this);
+ CensusViewProvider::ViewDataMap view_data_map =
+ census_view_provider_->FetchViewData();
+ ProcessViewDataCallStart(view_data_map);
+ ProcessViewDataCallEnd(view_data_map);
+ ProcessViewDataOtherCallMetrics(view_data_map);
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporter.h b/third_party/grpc/src/cpp/server/load_reporter/load_reporter.h
new file mode 100644
index 0000000..b2254d5
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporter.h
@@ -0,0 +1,228 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include <atomic>
+#include <chrono>
+#include <deque>
+#include <vector>
+
+#include <grpc/support/log.h>
+#include <grpcpp/impl/codegen/config.h>
+
+#include "src/cpp/server/load_reporter/load_data_store.h"
+#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
+
+#include "opencensus/stats/stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The interface to get the Census stats. Abstracted for mocking.
+class CensusViewProvider {
+ public:
+ // Maps from the view name to the view data.
+ using ViewDataMap =
+ std::unordered_map<grpc::string, ::opencensus::stats::ViewData>;
+ // Maps from the view name to the view descriptor.
+ using ViewDescriptorMap =
+ std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>;
+
+ CensusViewProvider();
+ virtual ~CensusViewProvider() = default;
+
+ // Fetches the view data accumulated since last fetching, and returns it as a
+ // map from the view name to the view data.
+ virtual ViewDataMap FetchViewData() = 0;
+
+ // A helper function that gets a row with the input tag values from the view
+ // data. Only used when we know that row must exist because we have seen a row
+ // with the same tag values in a related view data. Several ViewData's are
+ // considered related if their views are based on the measures that are always
+ // recorded at the same time.
+ static double GetRelatedViewDataRowDouble(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+ static uint64_t GetRelatedViewDataRowInt(
+ const ViewDataMap& view_data_map, const char* view_name,
+ size_t view_name_len, const std::vector<grpc::string>& tag_values);
+
+ protected:
+ const ViewDescriptorMap& view_descriptor_map() const {
+ return view_descriptor_map_;
+ }
+
+ private:
+ ViewDescriptorMap view_descriptor_map_;
+ // Tag keys.
+ ::opencensus::stats::TagKey tag_key_token_;
+ ::opencensus::stats::TagKey tag_key_host_;
+ ::opencensus::stats::TagKey tag_key_user_id_;
+ ::opencensus::stats::TagKey tag_key_status_;
+ ::opencensus::stats::TagKey tag_key_metric_name_;
+};
+
+// The default implementation fetches the real stats from Census.
+class CensusViewProviderDefaultImpl : public CensusViewProvider {
+ public:
+ CensusViewProviderDefaultImpl();
+
+ ViewDataMap FetchViewData() override;
+
+ private:
+ std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_;
+};
+
+// The interface to get the CPU stats. Abstracted for mocking.
+class CpuStatsProvider {
+ public:
+ // The used and total amounts of CPU usage.
+ using CpuStatsSample = std::pair<uint64_t, uint64_t>;
+
+ virtual ~CpuStatsProvider() = default;
+
+ // Gets the cumulative used CPU and total CPU resource.
+ virtual CpuStatsSample GetCpuStats() = 0;
+};
+
+// The default implementation reads CPU jiffies from the system to calculate CPU
+// utilization.
+class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
+ public:
+ CpuStatsSample GetCpuStats() override;
+};
+
+// Maintains all the load data and load reporting streams.
+class LoadReporter {
+ public:
+ // TODO(juanlishen): Allow config for providers from users.
+ LoadReporter(uint32_t feedback_sample_window_seconds,
+ std::unique_ptr<CensusViewProvider> census_view_provider,
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
+ : feedback_sample_window_seconds_(feedback_sample_window_seconds),
+ census_view_provider_(std::move(census_view_provider)),
+ cpu_stats_provider_(std::move(cpu_stats_provider)) {
+ // Append the initial record so that the next real record can have a base.
+ AppendNewFeedbackRecord(0, 0);
+ }
+
+ // Fetches the latest data from Census and merge it into the data store.
+ // Also adds a new sample to the LB feedback sliding window.
+ // Thread-unsafe. (1). The access to the load data store and feedback records
+ // has locking. (2). The access to the Census view provider and CPU stats
+ // provider lacks locking, but we only access these two members in this method
+ // (in testing, we also access them when setting up expectation). So the
+ // invocations of this method must be serialized.
+ void FetchAndSample();
+
+ // Generates a report for that host and balancer. The report contains
+ // all the stats data accumulated between the last report (i.e., the last
+ // consumption) and the last fetch from Census (i.e., the last production).
+ // Thread-safe.
+ ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
+ const grpc::string& hostname, const grpc::string& lb_id);
+
+ // The feedback is calculated from the stats data recorded in the sliding
+ // window. Outdated records are discarded.
+ // Thread-safe.
+ ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
+
+ // Wrapper around LoadDataStore::ReportStreamCreated.
+ // Thread-safe.
+ void ReportStreamCreated(const grpc::string& hostname,
+ const grpc::string& lb_id,
+ const grpc::string& load_key);
+
+ // Wrapper around LoadDataStore::ReportStreamClosed.
+ // Thread-safe.
+ void ReportStreamClosed(const grpc::string& hostname,
+ const grpc::string& lb_id);
+
+ // Generates a unique LB ID of length kLbIdLength. Returns an empty string
+ // upon failure. Thread-safe.
+ grpc::string GenerateLbId();
+
+ // Accessors only for testing.
+ CensusViewProvider* census_view_provider() {
+ return census_view_provider_.get();
+ }
+ CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
+
+ private:
+ struct LoadBalancingFeedbackRecord {
+ std::chrono::system_clock::time_point end_time;
+ uint64_t rpcs;
+ uint64_t errors;
+ uint64_t cpu_usage;
+ uint64_t cpu_limit;
+
+ LoadBalancingFeedbackRecord(
+ const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
+ uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
+ : end_time(end_time),
+ rpcs(rpcs),
+ errors(errors),
+ cpu_usage(cpu_usage),
+ cpu_limit(cpu_limit) {}
+ };
+
+ // Finds the view data about starting call from the view_data_map and merges
+ // the data to the load data store.
+ void ProcessViewDataCallStart(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+ // Finds the view data about ending call from the view_data_map and merges the
+ // data to the load data store.
+ void ProcessViewDataCallEnd(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+ // Finds the view data about the customized call metrics from the
+ // view_data_map and merges the data to the load data store.
+ void ProcessViewDataOtherCallMetrics(
+ const CensusViewProvider::ViewDataMap& view_data_map);
+
+ bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
+ std::chrono::system_clock::time_point now) {
+ return record.end_time > now - feedback_sample_window_seconds_;
+ }
+
+ void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
+
+ // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
+ // it to the load.
+ void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
+ const PerBalancerStore& per_balancer_store);
+
+ std::atomic<int64_t> next_lb_id_{0};
+ const std::chrono::seconds feedback_sample_window_seconds_;
+ std::mutex feedback_mu_;
+ std::deque<LoadBalancingFeedbackRecord> feedback_records_;
+ // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
+ // too expensive.
+ std::mutex store_mu_;
+ LoadDataStore load_data_store_;
+ std::unique_ptr<CensusViewProvider> census_view_provider_;
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/third_party/grpc/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
new file mode 100644
index 0000000..d001199
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
@@ -0,0 +1,370 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+}
+
+LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : cq_(std::move(cq)) {
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("server_load_reporting", Work, this));
+ std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
+#if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
+ cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
+#endif
+ load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
+ kFeedbackSampleWindowSeconds,
+ std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
+ std::move(cpu_stats_provider)));
+}
+
+LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ cq_->Shutdown();
+ }
+ if (next_fetch_and_sample_alarm_ != nullptr)
+ next_fetch_and_sample_alarm_->Cancel();
+ thread_->Join();
+}
+
+void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
+ auto next_fetch_and_sample_time =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
+ GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+ if (shutdown_) return;
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_fetch_and_sample_alarm_.reset(new Alarm);
+ next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
+ this);
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
+}
+
+void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
+ if (!ok) {
+ gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
+ return;
+ }
+ gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
+ load_reporter_->FetchAndSample();
+ ScheduleNextFetchAndSample();
+}
+
+void LoadReporterAsyncServiceImpl::Work(void* arg) {
+ LoadReporterAsyncServiceImpl* service =
+ reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
+ service->FetchAndSample(true /* ok */);
+ // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
+ // to figure out why cq is not ready after service starts.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)));
+ ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
+ service->load_reporter_.get());
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ if (tag == service) {
+ service->FetchAndSample(ok);
+ } else {
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter) {
+ std::shared_ptr<ReportLoadHandler> handler =
+ std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
+ ReportLoadHandler* p = handler.get();
+ {
+ std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+ if (service->shutdown_) return;
+ p->on_done_notified_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ p->next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(handler));
+ p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
+ service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
+ &p->next_inbound_);
+ }
+}
+
+LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
+ ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter)
+ : cq_(cq),
+ service_(service),
+ load_reporter_(load_reporter),
+ stream_(&ctx_),
+ call_status_(WAITING_FOR_DELIVERY) {}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ call_status_ = DELIVERED;
+ } else {
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ }
+ if (!ok || shutdown_) {
+ // The value of ok being false means that the server is shutting down.
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, service_, load_reporter_);
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnRequestDelivered");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ // LB ID is unique for each load reporting stream.
+ lb_id_ = load_reporter_->GenerateLbId();
+ gpr_log(GPR_INFO,
+ "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
+ "Start reading the initial request...",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
+ // The client may have half-closed the stream or the stream is broken.
+ gpr_log(GPR_INFO,
+ "[LRS %p] Failed reading the initial request from the stream "
+ "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
+ service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
+ static_cast<int>(is_cancelled_));
+ }
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ // We only receive one request, which is the initial request.
+ if (call_status_ < INITIAL_REQUEST_RECEIVED) {
+ if (!request_.has_initial_request()) {
+ Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
+ } else {
+ call_status_ = INITIAL_REQUEST_RECEIVED;
+ const auto& initial_request = request_.initial_request();
+ load_balanced_hostname_ = initial_request.load_balanced_hostname();
+ load_key_ = initial_request.load_key();
+ load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
+ load_key_);
+ const auto& load_report_interval = initial_request.load_report_interval();
+ load_report_interval_ms_ =
+ static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
+ load_report_interval.nanos() / 1000);
+ gpr_log(
+ GPR_INFO,
+ "[LRS %p] Initial request received. Start load reporting (load "
+ "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
+ service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
+ lb_id_.c_str(), this);
+ SendReport(self, true /* ok */);
+ // Expect this read to fail.
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "OnReadDone");
+ return;
+ }
+ next_inbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Read(&request_, &next_inbound_);
+ }
+ }
+ } else {
+ // Another request received! This violates the spec.
+ gpr_log(GPR_ERROR,
+ "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ Shutdown(std::move(self), "OnReadDone+second_request");
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ auto next_report_time = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "ScheduleNextReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Improve the Alarm implementation to reuse a single
+ // instance for multiple events.
+ next_report_alarm_.reset(new Alarm);
+ next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
+ }
+ gpr_log(GPR_DEBUG,
+ "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (!ok || shutdown_) {
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ ::grpc::lb::v1::LoadReportResponse response;
+ auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
+ response.mutable_load()->Swap(&loads);
+ auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
+ response.mutable_load_balancing_feedback()->Swap(&feedback);
+ if (call_status_ < INITIAL_RESPONSE_SENT) {
+ auto initial_response = response.mutable_initial_response();
+ initial_response->set_load_balancer_id(lb_id_);
+ initial_response->set_implementation_id(
+ ::grpc::lb::v1::InitialLoadReportResponse::CPP);
+ initial_response->set_server_version(kVersion);
+ call_status_ = INITIAL_RESPONSE_SENT;
+ }
+ {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (service_->shutdown_) {
+ lock.release()->unlock();
+ Shutdown(std::move(self), "SendReport");
+ return;
+ }
+ next_outbound_ =
+ CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_outbound_);
+ gpr_log(GPR_INFO,
+ "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
+ "count: %d)...",
+ service_, lb_id_.c_str(), this, response.load().size());
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ done_notified_ = true;
+ if (ctx_.IsCancelled()) {
+ is_cancelled_ = true;
+ }
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(is_cancelled_));
+ Shutdown(std::move(self), "OnDoneNotified");
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
+ std::shared_ptr<ReportLoadHandler> self, const char* reason) {
+ if (!shutdown_) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
+ "reason: %s).",
+ service_, lb_id_.c_str(), this, reason);
+ shutdown_ = true;
+ if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
+ load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
+ next_report_alarm_->Cancel();
+ }
+ }
+ // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
+ // try to Finish() every time we are in Shutdown().
+ if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
+ std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+ if (!service_->shutdown_) {
+ on_finish_done_ =
+ CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ // TODO(juanlishen): Maybe add a message proto for the client to
+ // explicitly cancel the stream so that we can return OK status in such
+ // cases.
+ stream_.Finish(Status::CANCELLED, &on_finish_done_);
+ call_status_ = FINISH_CALLED;
+ }
+ }
+}
+
+void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
+ std::shared_ptr<ReportLoadHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_INFO,
+ "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
+ service_, lb_id_.c_str(), this);
+ }
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporter_async_service_impl.h b/third_party/grpc/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
new file mode 100644
index 0000000..6fc577f
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporter_async_service_impl.h
@@ -0,0 +1,194 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/log.h>
+#include <grpcpp/alarm.h>
+#include <grpcpp/grpcpp.h>
+
+#include "src/core/lib/gprpp/thd.h"
+#include "src/cpp/server/load_reporter/load_reporter.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// Async load reporting service. It's mainly responsible for controlling the
+// procedure of incoming requests. The real business logic is handed off to the
+// LoadReporter. There should be at most one instance of this service on a
+// server to avoid spreading the load data into multiple places.
+class LoadReporterAsyncServiceImpl
+ : public grpc::lb::v1::LoadReporter::AsyncService {
+ public:
+ explicit LoadReporterAsyncServiceImpl(
+ std::unique_ptr<ServerCompletionQueue> cq);
+ ~LoadReporterAsyncServiceImpl();
+
+ // Starts the working thread.
+ void StartThread();
+
+ // Not copyable nor movable.
+ LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
+ LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
+ delete;
+
+ private:
+ class ReportLoadHandler;
+
+ // A tag that can be called with a bool argument. It's tailored for
+ // ReportLoadHandler's use. Before being used, it should be constructed with a
+ // method of ReportLoadHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function can
+ // only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func,
+ std::shared_ptr<ReportLoadHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no longer
+ // owned by this tag after this method is invoked.
+ void Run(bool ok);
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<ReportLoadHandler> handler_;
+ };
+
+ // Each handler takes care of one load reporting stream. It contains
+ // per-stream data and it will access the members of the parent class (i.e.,
+ // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
+ class ReportLoadHandler {
+ public:
+ // Instantiates a ReportLoadHandler and requests the next load reporting
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ ReportLoadHandler(ServerCompletionQueue* cq,
+ LoadReporterAsyncServiceImpl* service,
+ LoadReporter* load_reporter);
+
+ private:
+ // After the handler has a call request delivered, it starts reading the
+ // initial request. Also, a new handler is spawned so that we can keep
+ // servicing future calls.
+ void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The first Read() is expected to succeed, after which the handler starts
+ // sending load reports back to the balancer. The second Read() is
+ // expected to fail, which happens when the balancer half-closes the
+ // stream to signal that it's no longer interested in the load reports. For
+ // the latter case, the handler will then close the stream.
+ void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // The report sending operations are sequential as: send report -> send
+ // done, schedule the next send -> waiting for the alarm to fire -> alarm
+ // fires, send report -> ...
+ void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+ void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
+
+ void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
+
+ // The key fields of the stream.
+ grpc::string lb_id_;
+ grpc::string load_balanced_hostname_;
+ grpc::string load_key_;
+ uint64_t load_report_interval_ms_;
+
+ // The data for RPC communication with the load reportee.
+ ServerContext ctx_;
+ ::grpc::lb::v1::LoadReportRequest request_;
+
+ // The members passed down from LoadReporterAsyncServiceImpl.
+ ServerCompletionQueue* cq_;
+ LoadReporterAsyncServiceImpl* service_;
+ LoadReporter* load_reporter_;
+ ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
+ ::grpc::lb::v1::LoadReportRequest>
+ stream_;
+
+ // The status of the RPC progress.
+ enum CallStatus {
+ WAITING_FOR_DELIVERY,
+ DELIVERED,
+ INITIAL_REQUEST_RECEIVED,
+ INITIAL_RESPONSE_SENT,
+ FINISH_CALLED
+ } call_status_;
+ bool shutdown_{false};
+ bool done_notified_{false};
+ bool is_cancelled_{false};
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ CallableTag next_inbound_;
+ CallableTag next_outbound_;
+ std::unique_ptr<Alarm> next_report_alarm_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Work(void* arg);
+
+ // Schedules the next data fetching from Census and LB feedback sampling.
+ void ScheduleNextFetchAndSample();
+
+ // Fetches data from Census and samples LB feedback.
+ void FetchAndSample(bool ok);
+
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ // To synchronize the operations related to shutdown state of cq_, so that we
+ // don't enqueue new tags into cq_ after it is already shut down.
+ std::mutex cq_shutdown_mu_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
+ std::unique_ptr<LoadReporter> load_reporter_;
+ std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
+};
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc b/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
new file mode 100644
index 0000000..81cf6ac
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_option.cc
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void LoadReportingServiceServerBuilderOption::UpdateArguments(
+ ::grpc::ChannelArguments* args) {
+ args->SetInt(GRPC_ARG_ENABLE_LOAD_REPORTING, true);
+}
+
+void LoadReportingServiceServerBuilderOption::UpdatePlugins(
+ std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* plugins) {
+ plugins->emplace_back(new LoadReportingServiceServerBuilderPlugin());
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc b/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
new file mode 100644
index 0000000..c2c5dd5
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.cc
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h"
+
+#include <grpcpp/impl/server_initializer.h>
+
+namespace grpc {
+namespace load_reporter {
+
+bool LoadReportingServiceServerBuilderPlugin::has_sync_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_synchronous_methods();
+ }
+ return false;
+}
+
+bool LoadReportingServiceServerBuilderPlugin::has_async_methods() const {
+ if (service_ != nullptr) {
+ return service_->has_async_methods();
+ }
+ return false;
+}
+
+void LoadReportingServiceServerBuilderPlugin::UpdateServerBuilder(
+ grpc::ServerBuilder* builder) {
+ auto cq = builder->AddCompletionQueue();
+ service_ = std::make_shared<LoadReporterAsyncServiceImpl>(std::move(cq));
+}
+
+void LoadReportingServiceServerBuilderPlugin::InitServer(
+ grpc::ServerInitializer* si) {
+ si->RegisterService(service_);
+}
+
+void LoadReportingServiceServerBuilderPlugin::Finish(
+ grpc::ServerInitializer* si) {
+ service_->StartThread();
+ service_.reset();
+}
+
+} // namespace load_reporter
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h b/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
new file mode 100644
index 0000000..1f09859
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+#define GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/impl/server_builder_plugin.h>
+
+#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
+
+namespace grpc {
+namespace load_reporter {
+
+// The plugin that registers and starts load reporting service when starting a
+// server.
+class LoadReportingServiceServerBuilderPlugin : public ServerBuilderPlugin {
+ public:
+ ~LoadReportingServiceServerBuilderPlugin() override = default;
+ grpc::string name() override { return "load_reporting_service"; }
+
+ // Creates a load reporting service.
+ void UpdateServerBuilder(grpc::ServerBuilder* builder) override;
+
+ // Registers the load reporter service.
+ void InitServer(grpc::ServerInitializer* si) override;
+
+ // Starts the load reporter service.
+ void Finish(grpc::ServerInitializer* si) override;
+
+ void ChangeArguments(const grpc::string& name, void* value) override {}
+ void UpdateChannelArguments(grpc::ChannelArguments* args) override {}
+ bool has_sync_methods() const override;
+ bool has_async_methods() const override;
+
+ private:
+ std::shared_ptr<LoadReporterAsyncServiceImpl> service_;
+};
+
+std::unique_ptr<grpc::ServerBuilderPlugin>
+CreateLoadReportingServiceServerBuilderPlugin();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
diff --git a/third_party/grpc/src/cpp/server/load_reporter/util.cc b/third_party/grpc/src/cpp/server/load_reporter/util.cc
new file mode 100644
index 0000000..89bdf57
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/load_reporter/util.cc
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include <cmath>
+
+#include <grpc/support/log.h>
+
+namespace grpc {
+namespace load_reporter {
+namespace experimental {
+
+void AddLoadReportingCost(grpc::ServerContext* ctx,
+ const grpc::string& cost_name, double cost_value) {
+ if (std::isnormal(cost_value)) {
+ grpc::string buf;
+ buf.resize(sizeof(cost_value) + cost_name.size());
+ memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
+ memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
+ cost_name.size());
+ ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
+ } else {
+ gpr_log(GPR_ERROR, "Call metric value is not normal.");
+ }
+}
+
+} // namespace experimental
+} // namespace load_reporter
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/secure_server_credentials.cc b/third_party/grpc/src/cpp/server/secure_server_credentials.cc
index d472667..453e76e 100644
--- a/third_party/grpc/src/cpp/server/secure_server_credentials.cc
+++ b/third_party/grpc/src/cpp/server/secure_server_credentials.cc
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -35,31 +20,33 @@
#include <map>
#include <memory>
+#include <grpcpp/impl/codegen/slice.h>
+#include <grpcpp/security/auth_metadata_processor.h>
+
#include "src/cpp/common/secure_auth_context.h"
#include "src/cpp/server/secure_server_credentials.h"
-#include <grpc++/security/auth_metadata_processor.h>
-
namespace grpc {
void AuthMetadataProcessorAyncWrapper::Destroy(void* wrapper) {
- auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper);
+ auto* w = static_cast<AuthMetadataProcessorAyncWrapper*>(wrapper);
delete w;
}
void AuthMetadataProcessorAyncWrapper::Process(
void* wrapper, grpc_auth_context* context, const grpc_metadata* md,
size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) {
- auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper);
+ auto* w = static_cast<AuthMetadataProcessorAyncWrapper*>(wrapper);
if (!w->processor_) {
// Early exit.
cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr);
return;
}
if (w->processor_->IsBlocking()) {
- w->thread_pool_->Add(
- std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w,
- context, md, num_md, cb, user_data));
+ w->thread_pool_->Add([w, context, md, num_md, cb, user_data] {
+ w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md,
+ cb, user_data);
+ });
} else {
// invoke directly.
w->InvokeProcessor(context, md, num_md, cb, user_data);
@@ -71,10 +58,10 @@
grpc_process_auth_metadata_done_cb cb, void* user_data) {
AuthMetadataProcessor::InputMetadata metadata;
for (size_t i = 0; i < num_md; i++) {
- metadata.insert(std::make_pair(
- md[i].key, grpc::string_ref(md[i].value, md[i].value_length)));
+ metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key),
+ StringRefFromSlice(&md[i].value)));
}
- SecureAuthContext context(ctx, false);
+ SecureAuthContext context(ctx);
AuthMetadataProcessor::OutputMetadata consumed_metadata;
AuthMetadataProcessor::OutputMetadata response_metadata;
@@ -85,9 +72,8 @@
for (auto it = consumed_metadata.begin(); it != consumed_metadata.end();
++it) {
grpc_metadata md_entry;
- md_entry.key = it->first.c_str();
- md_entry.value = it->second.data();
- md_entry.value_length = it->second.size();
+ md_entry.key = SliceReferencingString(it->first);
+ md_entry.value = SliceReferencingString(it->second);
md_entry.flags = 0;
consumed_md.push_back(md_entry);
}
@@ -95,9 +81,8 @@
for (auto it = response_metadata.begin(); it != response_metadata.end();
++it) {
grpc_metadata md_entry;
- md_entry.key = it->first.c_str();
- md_entry.value = it->second.data();
- md_entry.value_length = it->second.size();
+ md_entry.key = SliceReferencingString(it->first);
+ md_entry.value = SliceReferencingString(it->second);
md_entry.flags = 0;
response_md.push_back(md_entry);
}
@@ -130,12 +115,36 @@
key_cert_pair->cert_chain.c_str()};
pem_key_cert_pairs.push_back(p);
}
- grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
+ grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create_ex(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
pem_key_cert_pairs.empty() ? nullptr : &pem_key_cert_pairs[0],
- pem_key_cert_pairs.size(), options.force_client_auth, nullptr);
+ pem_key_cert_pairs.size(),
+ options.force_client_auth
+ ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY
+ : options.client_certificate_request,
+ nullptr);
return std::shared_ptr<ServerCredentials>(
new SecureServerCredentials(c_creds));
}
+namespace experimental {
+
+std::shared_ptr<ServerCredentials> AltsServerCredentials(
+ const AltsServerCredentialsOptions& options) {
+ grpc_alts_credentials_options* c_options =
+ grpc_alts_credentials_server_options_create();
+ grpc_server_credentials* c_creds =
+ grpc_alts_server_credentials_create(c_options);
+ grpc_alts_credentials_options_destroy(c_options);
+ return std::shared_ptr<ServerCredentials>(
+ new SecureServerCredentials(c_creds));
+}
+
+std::shared_ptr<ServerCredentials> LocalServerCredentials(
+ grpc_local_connect_type type) {
+ return std::shared_ptr<ServerCredentials>(
+ new SecureServerCredentials(grpc_local_server_credentials_create(type)));
+}
+
+} // namespace experimental
} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/secure_server_credentials.h b/third_party/grpc/src/cpp/server/secure_server_credentials.h
index 5460f4a..8a81af2 100644
--- a/third_party/grpc/src/cpp/server/secure_server_credentials.h
+++ b/third_party/grpc/src/cpp/server/secure_server_credentials.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -36,7 +21,7 @@
#include <memory>
-#include <grpc++/security/server_credentials.h>
+#include <grpcpp/security/server_credentials.h>
#include <grpc/grpc_security.h>
@@ -44,7 +29,7 @@
namespace grpc {
-class AuthMetadataProcessorAyncWrapper GRPC_FINAL {
+class AuthMetadataProcessorAyncWrapper final {
public:
static void Destroy(void* wrapper);
@@ -64,19 +49,18 @@
std::shared_ptr<AuthMetadataProcessor> processor_;
};
-class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
+class SecureServerCredentials final : public ServerCredentials {
public:
explicit SecureServerCredentials(grpc_server_credentials* creds)
: creds_(creds) {}
- ~SecureServerCredentials() GRPC_OVERRIDE {
+ ~SecureServerCredentials() override {
grpc_server_credentials_release(creds_);
}
- int AddPortToServer(const grpc::string& addr,
- grpc_server* server) GRPC_OVERRIDE;
+ int AddPortToServer(const grpc::string& addr, grpc_server* server) override;
void SetAuthMetadataProcessor(
- const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE;
+ const std::shared_ptr<AuthMetadataProcessor>& processor) override;
private:
grpc_server_credentials* creds_;
diff --git a/third_party/grpc/src/cpp/server/server.cc b/third_party/grpc/src/cpp/server/server.cc
deleted file mode 100644
index 0d31140..0000000
--- a/third_party/grpc/src/cpp/server/server.cc
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- *
- * Copyright 2015-2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/server.h>
-
-#include <utility>
-
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/async_generic_service.h>
-#include <grpc++/impl/codegen/completion_queue_tag.h>
-#include <grpc++/impl/grpc_library.h>
-#include <grpc++/impl/method_handler_impl.h>
-#include <grpc++/impl/rpc_service_method.h>
-#include <grpc++/impl/service_type.h>
-#include <grpc++/security/server_credentials.h>
-#include <grpc++/server_context.h>
-#include <grpc++/support/time.h>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/profiling/timers.h"
-#include "src/cpp/server/thread_pool_interface.h"
-
-namespace grpc {
-
-class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks {
- public:
- ~DefaultGlobalCallbacks() GRPC_OVERRIDE {}
- void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
- void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
-};
-
-static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
-static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
-
-static void InitGlobalCallbacks() {
- if (g_callbacks == nullptr) {
- g_callbacks.reset(new DefaultGlobalCallbacks());
- }
-}
-
-class Server::UnimplementedAsyncRequestContext {
- protected:
- UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
-
- GenericServerContext server_context_;
- GenericServerAsyncReaderWriter generic_stream_;
-};
-
-class Server::UnimplementedAsyncRequest GRPC_FINAL
- : public UnimplementedAsyncRequestContext,
- public GenericAsyncRequest {
- public:
- UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
- : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
- NULL, false),
- server_(server),
- cq_(cq) {}
-
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
-
- ServerContext* context() { return &server_context_; }
- GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
-
- private:
- Server* const server_;
- ServerCompletionQueue* const cq_;
-};
-
-typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
- UnimplementedAsyncResponseOp;
-class Server::UnimplementedAsyncResponse GRPC_FINAL
- : public UnimplementedAsyncResponseOp {
- public:
- UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
- ~UnimplementedAsyncResponse() { delete request_; }
-
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
- delete this;
- return r;
- }
-
- private:
- UnimplementedAsyncRequest* const request_;
-};
-
-class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
- public:
- bool FinalizeResult(void** tag, bool* status) {
- delete this;
- return false;
- }
-};
-
-class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
- public:
- SyncRequest(RpcServiceMethod* method, void* tag)
- : method_(method),
- tag_(tag),
- in_flight_(false),
- has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::SERVER_STREAMING),
- call_details_(nullptr),
- cq_(nullptr) {
- grpc_metadata_array_init(&request_metadata_);
- }
-
- ~SyncRequest() {
- if (call_details_) {
- delete call_details_;
- }
- grpc_metadata_array_destroy(&request_metadata_);
- }
-
- static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
- void* tag = nullptr;
- *ok = false;
- if (!cq->Next(&tag, ok)) {
- return nullptr;
- }
- auto* mrd = static_cast<SyncRequest*>(tag);
- GPR_ASSERT(mrd->in_flight_);
- return mrd;
- }
-
- static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
- gpr_timespec deadline) {
- void* tag = nullptr;
- *ok = false;
- switch (cq->AsyncNext(&tag, ok, deadline)) {
- case CompletionQueue::TIMEOUT:
- *req = nullptr;
- return true;
- case CompletionQueue::SHUTDOWN:
- *req = nullptr;
- return false;
- case CompletionQueue::GOT_EVENT:
- *req = static_cast<SyncRequest*>(tag);
- GPR_ASSERT((*req)->in_flight_);
- return true;
- }
- GPR_UNREACHABLE_CODE(return false);
- }
-
- void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
-
- void TeardownRequest() {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
- }
-
- void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
- GPR_ASSERT(cq_ && !in_flight_);
- in_flight_ = true;
- if (tag_) {
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_registered_call(
- server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this));
- } else {
- if (!call_details_) {
- call_details_ = new grpc_call_details;
- grpc_call_details_init(call_details_);
- }
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
- server, &call_, call_details_,
- &request_metadata_, cq_, notify_cq, this));
- }
- }
-
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- if (!*status) {
- grpc_completion_queue_destroy(cq_);
- }
- if (call_details_) {
- deadline_ = call_details_->deadline;
- grpc_call_details_destroy(call_details_);
- grpc_call_details_init(call_details_);
- }
- return true;
- }
-
- class CallData GRPC_FINAL {
- public:
- explicit CallData(Server* server, SyncRequest* mrd)
- : cq_(mrd->cq_),
- call_(mrd->call_, server, &cq_, server->max_message_size_),
- ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
- mrd->request_metadata_.count),
- has_request_payload_(mrd->has_request_payload_),
- request_payload_(mrd->request_payload_),
- method_(mrd->method_) {
- ctx_.set_call(mrd->call_);
- ctx_.cq_ = &cq_;
- GPR_ASSERT(mrd->in_flight_);
- mrd->in_flight_ = false;
- mrd->request_metadata_.count = 0;
- }
-
- ~CallData() {
- if (has_request_payload_ && request_payload_) {
- grpc_byte_buffer_destroy(request_payload_);
- }
- }
-
- void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
- ctx_.BeginCompletionOp(&call_);
- global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(MethodHandler::HandlerParameter(
- &call_, &ctx_, request_payload_, call_.max_message_size()));
- global_callbacks->PostSynchronousRequest(&ctx_);
- request_payload_ = nullptr;
- void* ignored_tag;
- bool ignored_ok;
- cq_.Shutdown();
- GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
- }
-
- private:
- CompletionQueue cq_;
- Call call_;
- ServerContext ctx_;
- const bool has_request_payload_;
- grpc_byte_buffer* request_payload_;
- RpcServiceMethod* const method_;
- };
-
- private:
- RpcServiceMethod* const method_;
- void* const tag_;
- bool in_flight_;
- const bool has_request_payload_;
- grpc_call* call_;
- grpc_call_details* call_details_;
- gpr_timespec deadline_;
- grpc_metadata_array request_metadata_;
- grpc_byte_buffer* request_payload_;
- grpc_completion_queue* cq_;
-};
-
-static grpc_server* CreateServer(const ChannelArguments& args) {
- grpc_channel_args channel_args;
- args.SetChannelArgs(&channel_args);
- return grpc_server_create(&channel_args, nullptr);
-}
-
-static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_message_size, const ChannelArguments& args)
- : max_message_size_(max_message_size),
- started_(false),
- shutdown_(false),
- num_running_cb_(0),
- sync_methods_(new std::list<SyncRequest>),
- has_generic_service_(false),
- server_(CreateServer(args)),
- thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned) {
- g_gli_initializer.summon();
- gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
- global_callbacks_ = g_callbacks;
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
-}
-
-Server::~Server() {
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (started_ && !shutdown_) {
- lock.unlock();
- Shutdown();
- } else if (!started_) {
- cq_.Shutdown();
- }
- }
- void* got_tag;
- bool ok;
- GPR_ASSERT(!cq_.Next(&got_tag, &ok));
- grpc_server_destroy(server_);
- if (thread_pool_owned_) {
- delete thread_pool_;
- }
- delete sync_methods_;
-}
-
-void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
- GPR_ASSERT(g_callbacks == nullptr);
- GPR_ASSERT(callbacks != nullptr);
- g_callbacks.reset(callbacks);
-}
-
-bool Server::RegisterService(const grpc::string* host, Service* service) {
- bool has_async_methods = service->has_async_methods();
- if (has_async_methods) {
- GPR_ASSERT(service->server_ == nullptr &&
- "Can only register an asynchronous service against one server.");
- service->server_ = this;
- }
- for (auto it = service->methods_.begin(); it != service->methods_.end();
- ++it) {
- if (it->get() == nullptr) { // Handled by generic service if any.
- continue;
- }
- RpcServiceMethod* method = it->get();
- void* tag = grpc_server_register_method(server_, method->name(),
- host ? host->c_str() : nullptr);
- if (tag == nullptr) {
- gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
- method->name());
- return false;
- }
- if (method->handler() == nullptr) {
- method->set_server_tag(tag);
- } else {
- sync_methods_->emplace_back(method, tag);
- }
- }
- return true;
-}
-
-void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
- GPR_ASSERT(service->server_ == nullptr &&
- "Can only register an async generic service against one server.");
- service->server_ = this;
- has_generic_service_ = true;
-}
-
-int Server::AddListeningPort(const grpc::string& addr,
- ServerCredentials* creds) {
- GPR_ASSERT(!started_);
- return creds->AddPortToServer(addr, server_);
-}
-
-bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
- GPR_ASSERT(!started_);
- started_ = true;
- grpc_server_start(server_);
-
- if (!has_generic_service_) {
- if (!sync_methods_->empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted
- // here by gcc-4.4 because it can't match the anonymous nullptr with a
- // proper constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
- }
- for (size_t i = 0; i < num_cqs; i++) {
- new UnimplementedAsyncRequest(this, cqs[i]);
- }
- }
- // Start processing rpcs.
- if (!sync_methods_->empty()) {
- for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
- m->SetupRequest();
- m->Request(server_, cq_.cq());
- }
-
- ScheduleCallback();
- }
-
- return true;
-}
-
-void Server::ShutdownInternal(gpr_timespec deadline) {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (started_ && !shutdown_) {
- shutdown_ = true;
- grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
- cq_.Shutdown();
- lock.unlock();
- // Spin, eating requests until the completion queue is completely shutdown.
- // If the deadline expires then cancel anything that's pending and keep
- // spinning forever until the work is actually drained.
- // Since nothing else needs to touch state guarded by mu_, holding it
- // through this loop is fine.
- SyncRequest* request;
- bool ok;
- while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
- if (request == NULL) { // deadline expired
- grpc_server_cancel_all_calls(server_);
- deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- } else if (ok) {
- SyncRequest::CallData call_data(this, request);
- }
- }
- lock.lock();
-
- // Wait for running callbacks to finish.
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
- }
- }
-}
-
-void Server::Wait() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
- }
-}
-
-void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
- static const size_t MAX_OPS = 8;
- size_t nops = 0;
- grpc_op cops[MAX_OPS];
- ops->FillOps(cops, &nops);
- auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
- GPR_ASSERT(GRPC_CALL_OK == result);
-}
-
-ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
- ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize)
- : server_(server),
- context_(context),
- stream_(stream),
- call_cq_(call_cq),
- tag_(tag),
- delete_on_finalize_(delete_on_finalize),
- call_(nullptr) {
- memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
-}
-
-bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
- bool* status) {
- if (*status) {
- for (size_t i = 0; i < initial_metadata_array_.count; i++) {
- context_->client_metadata_.insert(
- std::pair<grpc::string_ref, grpc::string_ref>(
- initial_metadata_array_.metadata[i].key,
- grpc::string_ref(
- initial_metadata_array_.metadata[i].value,
- initial_metadata_array_.metadata[i].value_length)));
- }
- }
- grpc_metadata_array_destroy(&initial_metadata_array_);
- context_->set_call(call_);
- context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_message_size());
- if (*status && call_) {
- context_->BeginCompletionOp(&call);
- }
- // just the pointers inside call are copied here
- stream_->BindCall(&call);
- *tag = tag_;
- if (delete_on_finalize_) {
- delete this;
- }
- return true;
-}
-
-ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
- ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
-
-void ServerInterface::RegisteredAsyncRequest::IssueRequest(
- void* registered_method, grpc_byte_buffer** payload,
- ServerCompletionQueue* notification_cq) {
- grpc_server_request_registered_call(
- server_->server(), registered_method, &call_, &context_->deadline_,
- &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
- this);
-}
-
-ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
- ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
- : BaseAsyncRequest(server, context, stream, call_cq, tag,
- delete_on_finalize) {
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server(), &call_, &call_details_,
- &initial_metadata_array_, call_cq->cq(),
- notification_cq->cq(), this);
-}
-
-bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
- bool* status) {
- // TODO(yangg) remove the copy here.
- if (*status) {
- static_cast<GenericServerContext*>(context_)->method_ =
- call_details_.method;
- static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
- }
- gpr_free(call_details_.method);
- gpr_free(call_details_.host);
- return BaseAsyncRequest::FinalizeResult(tag, status);
-}
-
-bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
- bool* status) {
- if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
- new UnimplementedAsyncRequest(server_, cq_);
- new UnimplementedAsyncResponse(this);
- } else {
- delete this;
- }
- return false;
-}
-
-Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
- UnimplementedAsyncRequest* request)
- : request_(request) {
- Status status(StatusCode::UNIMPLEMENTED, "");
- UnknownMethodHandler::FillOps(request_->context(), this);
- request_->stream()->call_.PerformOps(this);
-}
-
-void Server::ScheduleCallback() {
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_++;
- }
- thread_pool_->Add(std::bind(&Server::RunRpc, this));
-}
-
-void Server::RunRpc() {
- // Wait for one more incoming rpc.
- bool ok;
- GPR_TIMER_SCOPE("Server::RunRpc", 0);
- auto* mrd = SyncRequest::Wait(&cq_, &ok);
- if (mrd) {
- ScheduleCallback();
- if (ok) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
- }
- }
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
- }
- }
-
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_--;
- if (shutdown_) {
- callback_cv_.notify_all();
- }
- }
-}
-
-} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/server_builder.cc b/third_party/grpc/src/cpp/server/server_builder.cc
index a8c188e..0dc03b6 100644
--- a/third_party/grpc/src/cpp/server/server_builder.cc
+++ b/third_party/grpc/src/cpp/server/server_builder.cc
@@ -1,120 +1,346 @@
/*
*
- * Copyright 2015-2016, Google Inc.
- * All rights reserved.
+ * Copyright 2015-2016 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-#include <grpc++/server_builder.h>
+#include <grpcpp/server_builder.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
-#include <grpc++/impl/service_type.h>
-#include <grpc++/server.h>
+#include <grpcpp/impl/service_type.h>
+#include <grpcpp/resource_quota.h>
+#include <grpcpp/server.h>
+
+#include <utility>
+
+#include "src/core/lib/gpr/useful.h"
#include "src/cpp/server/thread_pool_interface.h"
-#include "src/cpp/server/fixed_size_thread_pool.h"
namespace grpc {
-ServerBuilder::ServerBuilder()
- : max_message_size_(-1), generic_service_(nullptr) {
- grpc_compression_options_init(&compression_options_);
+static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
+ g_plugin_factory_list;
+static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
+
+static void do_plugin_list_init(void) {
+ g_plugin_factory_list =
+ new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
}
-std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
- ServerCompletionQueue* cq = new ServerCompletionQueue();
+ServerBuilder::ServerBuilder()
+ : max_receive_message_size_(INT_MIN),
+ max_send_message_size_(INT_MIN),
+ sync_server_settings_(SyncServerSettings()),
+ resource_quota_(nullptr),
+ generic_service_(nullptr) {
+ gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
+ for (auto it = g_plugin_factory_list->begin();
+ it != g_plugin_factory_list->end(); it++) {
+ auto& factory = *it;
+ plugins_.emplace_back(factory());
+ }
+
+ // all compression algorithms enabled by default.
+ enabled_compression_algorithms_bitset_ =
+ (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+ memset(&maybe_default_compression_level_, 0,
+ sizeof(maybe_default_compression_level_));
+ memset(&maybe_default_compression_algorithm_, 0,
+ sizeof(maybe_default_compression_algorithm_));
+}
+
+ServerBuilder::~ServerBuilder() {
+ if (resource_quota_ != nullptr) {
+ grpc_resource_quota_unref(resource_quota_);
+ }
+}
+
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
+ bool is_frequently_polled) {
+ ServerCompletionQueue* cq = new ServerCompletionQueue(
+ GRPC_CQ_NEXT,
+ is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
+ nullptr);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
-void ServerBuilder::RegisterService(Service* service) {
+ServerBuilder& ServerBuilder::RegisterService(Service* service) {
services_.emplace_back(new NamedService(service));
+ return *this;
}
-void ServerBuilder::RegisterService(const grpc::string& addr,
- Service* service) {
+ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr,
+ Service* service) {
services_.emplace_back(new NamedService(addr, service));
+ return *this;
}
-void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
+ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
+ AsyncGenericService* service) {
if (generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- service);
- return;
+ (void*)service);
+ } else {
+ generic_service_ = service;
}
- generic_service_ = service;
+ return *this;
}
-void ServerBuilder::SetOption(std::unique_ptr<ServerBuilderOption> option) {
+ServerBuilder& ServerBuilder::SetOption(
+ std::unique_ptr<ServerBuilderOption> option) {
options_.push_back(std::move(option));
+ return *this;
}
-void ServerBuilder::AddListeningPort(const grpc::string& addr,
- std::shared_ptr<ServerCredentials> creds,
- int* selected_port) {
- Port port = {addr, creds, selected_port};
+ServerBuilder& ServerBuilder::SetSyncServerOption(
+ ServerBuilder::SyncServerOption option, int val) {
+ switch (option) {
+ case NUM_CQS:
+ sync_server_settings_.num_cqs = val;
+ break;
+ case MIN_POLLERS:
+ sync_server_settings_.min_pollers = val;
+ break;
+ case MAX_POLLERS:
+ sync_server_settings_.max_pollers = val;
+ break;
+ case CQ_TIMEOUT_MSEC:
+ sync_server_settings_.cq_timeout_msec = val;
+ break;
+ }
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
+ grpc_compression_algorithm algorithm, bool enabled) {
+ if (enabled) {
+ GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm);
+ } else {
+ GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm);
+ }
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
+ grpc_compression_level level) {
+ maybe_default_compression_level_.level = level;
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
+ grpc_compression_algorithm algorithm) {
+ maybe_default_compression_algorithm_.is_set = true;
+ maybe_default_compression_algorithm_.algorithm = algorithm;
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::SetResourceQuota(
+ const grpc::ResourceQuota& resource_quota) {
+ if (resource_quota_ != nullptr) {
+ grpc_resource_quota_unref(resource_quota_);
+ }
+ resource_quota_ = resource_quota.c_resource_quota();
+ grpc_resource_quota_ref(resource_quota_);
+ return *this;
+}
+
+ServerBuilder& ServerBuilder::AddListeningPort(
+ const grpc::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
+ int* selected_port) {
+ const grpc::string uri_scheme = "dns:";
+ grpc::string addr = addr_uri;
+ if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
+ size_t pos = uri_scheme.size();
+ while (addr_uri[pos] == '/') ++pos; // Skip slashes.
+ addr = addr_uri.substr(pos);
+ }
+ Port port = {addr, std::move(creds), selected_port};
ports_.push_back(port);
+ return *this;
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- std::unique_ptr<ThreadPoolInterface> thread_pool;
+ ChannelArguments args;
+ for (auto option = options_.begin(); option != options_.end(); ++option) {
+ (*option)->UpdateArguments(&args);
+ (*option)->UpdatePlugins(&plugins_);
+ }
+
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin)->UpdateServerBuilder(this);
+ (*plugin)->UpdateChannelArguments(&args);
+ }
+
+ if (max_receive_message_size_ >= -1) {
+ args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
+ }
+
+ // The default message size is -1 (max), so no need to explicitly set it for
+ // -1.
+ if (max_send_message_size_ >= 0) {
+ args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
+ }
+
+ args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ enabled_compression_algorithms_bitset_);
+ if (maybe_default_compression_level_.is_set) {
+ args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL,
+ maybe_default_compression_level_.level);
+ }
+ if (maybe_default_compression_algorithm_.is_set) {
+ args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
+ maybe_default_compression_algorithm_.algorithm);
+ }
+
+ if (resource_quota_ != nullptr) {
+ args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
+ grpc_resource_quota_arg_vtable());
+ }
+
+ // == Determine if the server has any syncrhonous methods ==
+ bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
- if (thread_pool == nullptr) {
- thread_pool.reset(CreateDefaultThreadPool());
+ has_sync_methods = true;
+ break;
+ }
+ }
+
+ if (!has_sync_methods) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin)->has_sync_methods()) {
+ has_sync_methods = true;
break;
}
}
}
- ChannelArguments args;
- for (auto option = options_.begin(); option != options_.end(); ++option) {
- (*option)->UpdateArguments(&args);
+
+ // If this is a Sync server, i.e a server expositing sync API, then the server
+ // needs to create some completion queues to listen for incoming requests.
+ // 'sync_server_cqs' are those internal completion queues.
+ //
+ // This is different from the completion queues added to the server via
+ // ServerBuilder's AddCompletionQueue() method (those completion queues
+ // are in 'cqs_' member variable of ServerBuilder object)
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs(std::make_shared<
+ std::vector<std::unique_ptr<ServerCompletionQueue>>>());
+
+ int num_frequently_polled_cqs = 0;
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ num_frequently_polled_cqs++;
+ }
}
- if (max_message_size_ > 0) {
- args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_);
+
+ const bool is_hybrid_server =
+ has_sync_methods && num_frequently_polled_cqs > 0;
+
+ if (has_sync_methods) {
+ grpc_cq_polling_type polling_type =
+ is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
+
+ // Create completion queues to listen to incoming rpc requests
+ for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
+ sync_server_cqs->emplace_back(
+ new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
+ }
}
- args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
- compression_options_.enabled_algorithms_bitset);
- std::unique_ptr<Server> server(
- new Server(thread_pool.release(), true, max_message_size_, args));
- for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+
+ // == Determine if the server has any callback methods ==
+ bool has_callback_methods = false;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_callback_methods()) {
+ has_callback_methods = true;
+ break;
+ }
+ }
+
+ // TODO(vjpai): Add a section here for plugins once they can support callback
+ // methods
+
+ if (has_sync_methods) {
+ // This is a Sync server
+ gpr_log(GPR_INFO,
+ "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+ "%d, CQ timeout (msec): %d",
+ sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+ sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec);
+ }
+
+ if (has_callback_methods) {
+ gpr_log(GPR_INFO, "Callback server.");
+ }
+
+ std::unique_ptr<Server> server(new Server(
+ max_receive_message_size_, &args, sync_server_cqs,
+ sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec, resource_quota_,
+ std::move(interceptor_creators_)));
+
+ ServerInitializer* initializer = server->initializer();
+
+ // Register all the completion queues with the server. i.e
+ // 1. sync_server_cqs: internal completion queues created IF this is a sync
+ // server
+ // 2. cqs_: Completion queues added via AddCompletionQueue() call
+
+ for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ num_frequently_polled_cqs++;
+ }
+
+ if (has_callback_methods) {
+ auto* cq = server->CallbackCQ();
+ grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
+ num_frequently_polled_cqs++;
+ }
+
+ // cqs_ contains the completion queue added by calling the ServerBuilder's
+ // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+ // calling Next() or AsyncNext()) and hence are not safe to be used for
+ // listening to incoming channels. Such completion queues must be registered
+ // as non-listening queues
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
}
+
+ if (num_frequently_polled_cqs == 0) {
+ gpr_log(GPR_ERROR,
+ "At least one of the completion queues must be frequently polled");
+ return nullptr;
+ }
+
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService((*service)->host.get(), (*service)->service)) {
return nullptr;
}
}
+
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin)->InitServer(initializer);
+ }
+
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -127,18 +353,44 @@
}
}
}
+
+ bool added_port = false;
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
- if (!r) return nullptr;
+ if (!r) {
+ if (added_port) server->Shutdown();
+ return nullptr;
+ }
+ added_port = true;
if (port->selected_port != nullptr) {
*port->selected_port = r;
}
}
+
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
- if (!server->Start(cqs_data, cqs_.size())) {
- return nullptr;
+ server->Start(cqs_data, cqs_.size());
+
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin)->Finish(initializer);
}
+
return server;
}
+void ServerBuilder::InternalAddPluginFactory(
+ std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
+ gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
+ (*g_plugin_factory_list).push_back(CreatePlugin);
+}
+
+ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
+ switch (id) {
+ case GRPC_WORKAROUND_ID_CRONET_COMPRESSION:
+ return AddChannelArgument(GRPC_ARG_WORKAROUND_CRONET_COMPRESSION, 1);
+ default:
+ gpr_log(GPR_ERROR, "Workaround %u does not exist or is obsolete.", id);
+ return *this;
+ }
+}
+
} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/server_cc.cc b/third_party/grpc/src/cpp/server/server_cc.cc
new file mode 100644
index 0000000..1e3c574
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/server_cc.cc
@@ -0,0 +1,1166 @@
+/*
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/server.h>
+
+#include <cstdlib>
+#include <sstream>
+#include <utility>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpcpp/completion_queue.h>
+#include <grpcpp/generic/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/completion_queue_tag.h>
+#include <grpcpp/impl/codegen/server_interceptor.h>
+#include <grpcpp/impl/grpc_library.h>
+#include <grpcpp/impl/method_handler_impl.h>
+#include <grpcpp/impl/rpc_service_method.h>
+#include <grpcpp/impl/server_initializer.h>
+#include <grpcpp/impl/service_type.h>
+#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/server_context.h>
+#include <grpcpp/support/time.h>
+
+#include "src/core/ext/transport/inproc/inproc_transport.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/completion_queue.h"
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/cpp/server/health/default_health_check_service.h"
+#include "src/cpp/thread_manager/thread_manager.h"
+
+namespace grpc {
+namespace {
+
+// The default value for maximum number of threads that can be created in the
+// sync server. This value of INT_MAX is chosen to match the default behavior if
+// no ResourceQuota is set. To modify the max number of threads in a sync
+// server, pass a custom ResourceQuota object (with the desired number of
+// max-threads set) to the server builder.
+#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+
+// How many callback requests of each method should we pre-register at start
+#define DEFAULT_CALLBACK_REQS_PER_METHOD 32
+
+class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
+ public:
+ ~DefaultGlobalCallbacks() override {}
+ void PreSynchronousRequest(ServerContext* context) override {}
+ void PostSynchronousRequest(ServerContext* context) override {}
+};
+
+std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
+gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
+
+void InitGlobalCallbacks() {
+ if (!g_callbacks) {
+ g_callbacks.reset(new DefaultGlobalCallbacks());
+ }
+}
+
+class ShutdownTag : public internal::CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) { return false; }
+};
+
+class DummyTag : public internal::CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) { return true; }
+};
+
+class UnimplementedAsyncRequestContext {
+ protected:
+ UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
+
+ GenericServerContext server_context_;
+ GenericServerAsyncReaderWriter generic_stream_;
+};
+
+} // namespace
+
+/// Use private inheritance rather than composition only to establish order
+/// of construction, since the public base class should be constructed after the
+/// elements belonging to the private base class are constructed. This is not
+/// possible using true composition.
+class Server::UnimplementedAsyncRequest final
+ : private UnimplementedAsyncRequestContext,
+ public GenericAsyncRequest {
+ public:
+ UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
+ : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
+ nullptr, false),
+ server_(server),
+ cq_(cq) {}
+
+ bool FinalizeResult(void** tag, bool* status) override;
+
+ ServerContext* context() { return &server_context_; }
+ GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
+
+ private:
+ Server* const server_;
+ ServerCompletionQueue* const cq_;
+};
+
+/// UnimplementedAsyncResponse should not post user-visible completions to the
+/// C++ completion queue, but is generated as a CQ event by the core
+class Server::UnimplementedAsyncResponse final
+ : public internal::CallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus> {
+ public:
+ UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
+ ~UnimplementedAsyncResponse() { delete request_; }
+
+ bool FinalizeResult(void** tag, bool* status) override {
+ if (internal::CallOpSet<
+ internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) {
+ delete this;
+ } else {
+ // The tag was swallowed due to interception. We will see it again.
+ }
+ return false;
+ }
+
+ private:
+ UnimplementedAsyncRequest* const request_;
+};
+
+class Server::SyncRequest final : public internal::CompletionQueueTag {
+ public:
+ SyncRequest(internal::RpcServiceMethod* method, void* method_tag)
+ : method_(method),
+ method_tag_(method_tag),
+ in_flight_(false),
+ has_request_payload_(
+ method->method_type() == internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() == internal::RpcMethod::SERVER_STREAMING),
+ call_details_(nullptr),
+ cq_(nullptr) {
+ grpc_metadata_array_init(&request_metadata_);
+ }
+
+ ~SyncRequest() {
+ if (call_details_) {
+ delete call_details_;
+ }
+ grpc_metadata_array_destroy(&request_metadata_);
+ }
+
+ void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
+
+ void TeardownRequest() {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+
+ void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
+ GPR_ASSERT(cq_ && !in_flight_);
+ in_flight_ = true;
+ if (method_tag_) {
+ if (GRPC_CALL_OK !=
+ grpc_server_request_registered_call(
+ server, method_tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_,
+ notify_cq, this)) {
+ TeardownRequest();
+ return;
+ }
+ } else {
+ if (!call_details_) {
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ }
+ if (grpc_server_request_call(server, &call_, call_details_,
+ &request_metadata_, cq_, notify_cq,
+ this) != GRPC_CALL_OK) {
+ TeardownRequest();
+ return;
+ }
+ }
+ }
+
+ void PostShutdownCleanup() {
+ if (call_) {
+ grpc_call_unref(call_);
+ call_ = nullptr;
+ }
+ if (cq_) {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+ }
+
+ bool FinalizeResult(void** tag, bool* status) override {
+ if (!*status) {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+ if (call_details_) {
+ deadline_ = call_details_->deadline;
+ grpc_call_details_destroy(call_details_);
+ grpc_call_details_init(call_details_);
+ }
+ return true;
+ }
+
+ // The CallData class represents a call that is "active" as opposed
+ // to just being requested. It wraps and takes ownership of the cq from
+ // the call request
+ class CallData final {
+ public:
+ explicit CallData(Server* server, SyncRequest* mrd)
+ : cq_(mrd->cq_),
+ ctx_(mrd->deadline_, &mrd->request_metadata_),
+ has_request_payload_(mrd->has_request_payload_),
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ request_(nullptr),
+ method_(mrd->method_),
+ call_(
+ mrd->call_, server, &cq_, server->max_receive_message_size(),
+ ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
+ server->interceptor_creators_)),
+ server_(server),
+ global_callbacks_(nullptr),
+ resources_(false) {
+ ctx_.set_call(mrd->call_);
+ ctx_.cq_ = &cq_;
+ GPR_ASSERT(mrd->in_flight_);
+ mrd->in_flight_ = false;
+ mrd->request_metadata_.count = 0;
+ }
+
+ ~CallData() {
+ if (has_request_payload_ && request_payload_) {
+ grpc_byte_buffer_destroy(request_payload_);
+ }
+ }
+
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
+
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+
+ if (has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ request_ = handler->Deserialize(call_.call(), request_payload_,
+ &request_status_);
+
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ interceptor_methods_.SetRecvMessage(request_);
+ }
+
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
+ }
+
+ void ContinueRunAfterInterception() {
+ {
+ ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
+ global_callbacks_->PreSynchronousRequest(&ctx_);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(internal::MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_, request_status_, nullptr));
+ request_ = nullptr;
+ global_callbacks_->PostSynchronousRequest(&ctx_);
+
+ cq_.Shutdown();
+
+ internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
+ DummyTag ignored_tag;
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
+ }
+ delete this;
+ }
+
+ private:
+ CompletionQueue cq_;
+ ServerContext ctx_;
+ const bool has_request_payload_;
+ grpc_byte_buffer* request_payload_;
+ void* request_;
+ Status request_status_;
+ internal::RpcServiceMethod* const method_;
+ internal::Call call_;
+ Server* server_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
+ internal::InterceptorBatchMethodsImpl interceptor_methods_;
+ };
+
+ private:
+ internal::RpcServiceMethod* const method_;
+ void* const method_tag_;
+ bool in_flight_;
+ const bool has_request_payload_;
+ grpc_call* call_;
+ grpc_call_details* call_details_;
+ gpr_timespec deadline_;
+ grpc_metadata_array request_metadata_;
+ grpc_byte_buffer* request_payload_;
+ grpc_completion_queue* cq_;
+};
+
+class Server::CallbackRequest final : public internal::CompletionQueueTag {
+ public:
+ CallbackRequest(Server* server, internal::RpcServiceMethod* method,
+ void* method_tag)
+ : server_(server),
+ method_(method),
+ method_tag_(method_tag),
+ has_request_payload_(
+ method->method_type() == internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() == internal::RpcMethod::SERVER_STREAMING),
+ cq_(server->CallbackCQ()),
+ tag_(this) {
+ Setup();
+ }
+
+ ~CallbackRequest() { Clear(); }
+
+ void Request() {
+ if (method_tag_) {
+ if (GRPC_CALL_OK !=
+ grpc_server_request_registered_call(
+ server_->c_server(), method_tag_, &call_, &deadline_,
+ &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(),
+ cq_->cq(), static_cast<void*>(&tag_))) {
+ return;
+ }
+ } else {
+ if (!call_details_) {
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ }
+ if (grpc_server_request_call(server_->c_server(), &call_, call_details_,
+ &request_metadata_, cq_->cq(), cq_->cq(),
+ static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
+ return;
+ }
+ }
+ }
+
+ bool FinalizeResult(void** tag, bool* status) override { return false; }
+
+ private:
+ class CallbackCallTag : public grpc_experimental_completion_queue_functor {
+ public:
+ CallbackCallTag(Server::CallbackRequest* req) : req_(req) {
+ functor_run = &CallbackCallTag::StaticRun;
+ }
+
+ // force_run can not be performed on a tag if operations using this tag
+ // have been sent to PerformOpsOnCall. It is intended for error conditions
+ // that are detected before the operations are internally processed.
+ void force_run(bool ok) { Run(ok); }
+
+ private:
+ Server::CallbackRequest* req_;
+ internal::Call* call_;
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = req_;
+ bool new_ok = ok;
+ GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == req_);
+
+ if (!ok) {
+ // The call has been shutdown
+ req_->Clear();
+ return;
+ }
+
+ // Bind the call, deadline, and metadata from what we got
+ req_->ctx_.set_call(req_->call_);
+ req_->ctx_.cq_ = req_->cq_;
+ req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
+ &req_->request_metadata_);
+ req_->request_metadata_.count = 0;
+
+ // Create a C++ Call to control the underlying core call
+ call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call)))
+ internal::Call(
+ req_->call_, req_->server_, req_->cq_,
+ req_->server_->max_receive_message_size(),
+ req_->ctx_.set_server_rpc_info(
+ req_->method_->name(), req_->method_->method_type(),
+ req_->server_->interceptor_creators_));
+
+ req_->interceptor_methods_.SetCall(call_);
+ req_->interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ req_->interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ req_->interceptor_methods_.SetRecvInitialMetadata(
+ &req_->ctx_.client_metadata_);
+
+ if (req_->has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ req_->request_ = req_->method_->handler()->Deserialize(
+ req_->call_, req_->request_payload_, &req_->request_status_);
+ req_->request_payload_ = nullptr;
+ req_->interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ req_->interceptor_methods_.SetRecvMessage(req_->request_);
+ }
+
+ if (req_->interceptor_methods_.RunInterceptors(
+ [this] { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
+ }
+ void ContinueRunAfterInterception() {
+ req_->method_->handler()->RunHandler(
+ internal::MethodHandler::HandlerParameter(
+ call_, &req_->ctx_, req_->request_, req_->request_status_,
+ [this] {
+ req_->Reset();
+ req_->Request();
+ }));
+ }
+ };
+
+ void Reset() {
+ Clear();
+ Setup();
+ }
+
+ void Clear() {
+ if (call_details_) {
+ delete call_details_;
+ call_details_ = nullptr;
+ }
+ grpc_metadata_array_destroy(&request_metadata_);
+ if (has_request_payload_ && request_payload_) {
+ grpc_byte_buffer_destroy(request_payload_);
+ }
+ ctx_.Clear();
+ interceptor_methods_.ClearState();
+ }
+
+ void Setup() {
+ grpc_metadata_array_init(&request_metadata_);
+ ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME));
+ request_payload_ = nullptr;
+ request_ = nullptr;
+ request_status_ = Status();
+ }
+
+ Server* const server_;
+ internal::RpcServiceMethod* const method_;
+ void* const method_tag_;
+ const bool has_request_payload_;
+ grpc_byte_buffer* request_payload_;
+ void* request_;
+ Status request_status_;
+ grpc_call_details* call_details_ = nullptr;
+ grpc_call* call_;
+ gpr_timespec deadline_;
+ grpc_metadata_array request_metadata_;
+ CompletionQueue* cq_;
+ CallbackCallTag tag_;
+ ServerContext ctx_;
+ internal::InterceptorBatchMethodsImpl interceptor_methods_;
+};
+
+// Implementation of ThreadManager. Each instance of SyncRequestThreadManager
+// manages a pool of threads that poll for incoming Sync RPCs and call the
+// appropriate RPC handlers
+class Server::SyncRequestThreadManager : public ThreadManager {
+ public:
+ SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
+ server_(server),
+ server_cq_(server_cq),
+ cq_timeout_msec_(cq_timeout_msec),
+ global_callbacks_(std::move(global_callbacks)) {}
+
+ WorkStatus PollForWork(void** tag, bool* ok) override {
+ *tag = nullptr;
+ // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
+ // right now
+ gpr_timespec deadline =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
+
+ switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ return TIMEOUT;
+ case CompletionQueue::SHUTDOWN:
+ return SHUTDOWN;
+ case CompletionQueue::GOT_EVENT:
+ return WORK_FOUND;
+ }
+
+ GPR_UNREACHABLE_CODE(return TIMEOUT);
+ }
+
+ void DoWork(void* tag, bool ok, bool resources) override {
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+
+ if (!sync_req) {
+ // No tag. Nothing to work on. This is an unlikley scenario and possibly a
+ // bug in RPC Manager implementation.
+ gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
+ return;
+ }
+
+ if (ok) {
+ // Calldata takes ownership of the completion queue and interceptors
+ // inside sync_req
+ auto* cd = new SyncRequest::CallData(server_, sync_req);
+ // Prepare for the next request
+ if (!IsShutdown()) {
+ sync_req->SetupRequest(); // Create new completion queue for sync_req
+ sync_req->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd->Run(global_callbacks_, resources);
+ }
+ // TODO (sreek) If ok is false here (which it isn't in case of
+ // grpc_request_registered_call), we should still re-queue the request
+ // object
+ }
+
+ void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
+ sync_requests_.emplace_back(new SyncRequest(method, tag));
+ }
+
+ void AddUnknownSyncMethod() {
+ if (!sync_requests_.empty()) {
+ unknown_method_.reset(new internal::RpcServiceMethod(
+ "unknown", internal::RpcMethod::BIDI_STREAMING,
+ new internal::UnknownMethodHandler));
+ sync_requests_.emplace_back(
+ new SyncRequest(unknown_method_.get(), nullptr));
+ }
+ }
+
+ void Shutdown() override {
+ ThreadManager::Shutdown();
+ server_cq_->Shutdown();
+ }
+
+ void Wait() override {
+ ThreadManager::Wait();
+ // Drain any pending items from the queue
+ void* tag;
+ bool ok;
+ while (server_cq_->Next(&tag, &ok)) {
+ if (ok) {
+ // If a request was pulled off the queue, it means that the thread
+ // handling the request added it to the completion queue after shutdown
+ // was called - because the thread had already started and checked the
+ // shutdown flag before shutdown was called. In this case, we simply
+ // clean it up here, *after* calling wait on all the worker threads, at
+ // which point we are certain no in-flight requests will add more to the
+ // queue. This fixes an intermittent memory leak on shutdown.
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+ sync_req->PostShutdownCleanup();
+ }
+ }
+ }
+
+ void Start() {
+ if (!sync_requests_.empty()) {
+ for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) {
+ (*m)->SetupRequest();
+ (*m)->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ Initialize(); // ThreadManager's Initialize()
+ }
+ }
+
+ private:
+ Server* server_;
+ CompletionQueue* server_cq_;
+ int cq_timeout_msec_;
+ std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
+ std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
+ std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
+static internal::GrpcLibraryInitializer g_gli_initializer;
+Server::Server(
+ int max_receive_message_size, ChannelArguments* args,
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec,
+ grpc_resource_quota* server_rq,
+ std::vector<
+ std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
+ interceptor_creators)
+ : interceptor_creators_(std::move(interceptor_creators)),
+ max_receive_message_size_(max_receive_message_size),
+ sync_server_cqs_(std::move(sync_server_cqs)),
+ started_(false),
+ shutdown_(false),
+ shutdown_notified_(false),
+ has_generic_service_(false),
+ server_(nullptr),
+ server_initializer_(new ServerInitializer(this)),
+ health_check_service_disabled_(false) {
+ g_gli_initializer.summon();
+ gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
+ global_callbacks_ = g_callbacks;
+ global_callbacks_->UpdateArguments(args);
+
+ if (sync_server_cqs_ != nullptr) {
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-default-rq");
+ grpc_resource_quota_set_max_threads(server_rq,
+ DEFAULT_MAX_SYNC_SERVER_THREADS);
+ default_rq_created = true;
+ }
+
+ for (const auto& it : *sync_server_cqs_) {
+ sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
+ }
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
+ }
+ }
+
+ grpc_channel_args channel_args;
+ args->SetChannelArgs(&channel_args);
+
+ for (size_t i = 0; i < channel_args.num_args; i++) {
+ if (0 ==
+ strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) {
+ if (channel_args.args[i].value.pointer.p == nullptr) {
+ health_check_service_disabled_ = true;
+ } else {
+ health_check_service_.reset(static_cast<HealthCheckServiceInterface*>(
+ channel_args.args[i].value.pointer.p));
+ }
+ break;
+ }
+ }
+
+ server_ = grpc_server_create(&channel_args, nullptr);
+}
+
+Server::~Server() {
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (callback_cq_ != nullptr) {
+ callback_cq_->Shutdown();
+ }
+ if (started_ && !shutdown_) {
+ lock.unlock();
+ Shutdown();
+ } else if (!started_) {
+ // Shutdown the completion queues
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Shutdown();
+ }
+ }
+ }
+
+ grpc_server_destroy(server_);
+}
+
+void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
+ GPR_ASSERT(!g_callbacks);
+ GPR_ASSERT(callbacks);
+ g_callbacks.reset(callbacks);
+}
+
+grpc_server* Server::c_server() { return server_; }
+
+std::shared_ptr<Channel> Server::InProcessChannel(
+ const ChannelArguments& args) {
+ grpc_channel_args channel_args = args.c_channel_args();
+ return CreateChannelInternal(
+ "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>());
+}
+
+std::shared_ptr<Channel>
+Server::experimental_type::InProcessChannelWithInterceptors(
+ const ChannelArguments& args,
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
+ interceptor_creators) {
+ grpc_channel_args channel_args = args.c_channel_args();
+ return CreateChannelInternal(
+ "inproc",
+ grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
+ std::move(interceptor_creators));
+}
+
+static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
+ internal::RpcServiceMethod* method) {
+ switch (method->method_type()) {
+ case internal::RpcMethod::NORMAL_RPC:
+ case internal::RpcMethod::SERVER_STREAMING:
+ return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
+ case internal::RpcMethod::CLIENT_STREAMING:
+ case internal::RpcMethod::BIDI_STREAMING:
+ return GRPC_SRM_PAYLOAD_NONE;
+ }
+ GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
+}
+
+bool Server::RegisterService(const grpc::string* host, Service* service) {
+ bool has_async_methods = service->has_async_methods();
+ if (has_async_methods) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an asynchronous service against one server.");
+ service->server_ = this;
+ }
+
+ const char* method_name = nullptr;
+ for (auto it = service->methods_.begin(); it != service->methods_.end();
+ ++it) {
+ if (it->get() == nullptr) { // Handled by generic service if any.
+ continue;
+ }
+
+ internal::RpcServiceMethod* method = it->get();
+ void* method_registration_tag = grpc_server_register_method(
+ server_, method->name(), host ? host->c_str() : nullptr,
+ PayloadHandlingForMethod(method), 0);
+ if (method_registration_tag == nullptr) {
+ gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
+ method->name());
+ return false;
+ }
+
+ if (method->handler() == nullptr) { // Async method without handler
+ method->set_server_tag(method_registration_tag);
+ } else if (method->api_type() ==
+ internal::RpcServiceMethod::ApiType::SYNC) {
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddSyncMethod(method, method_registration_tag);
+ }
+ } else {
+ // a callback method. Register at least some callback requests
+ // TODO(vjpai): Register these dynamically based on need
+ for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
+ auto* req = new CallbackRequest(this, method, method_registration_tag);
+ callback_reqs_.emplace_back(req);
+ }
+ // Enqueue it so that it will be Request'ed later once
+ // all request matchers are created at core server startup
+ }
+
+ method_name = method->name();
+ }
+
+ // Parse service name.
+ if (method_name != nullptr) {
+ std::stringstream ss(method_name);
+ grpc::string service_name;
+ if (std::getline(ss, service_name, '/') &&
+ std::getline(ss, service_name, '/')) {
+ services_.push_back(service_name);
+ }
+ }
+ return true;
+}
+
+void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an async generic service against one server.");
+ service->server_ = this;
+ has_generic_service_ = true;
+}
+
+int Server::AddListeningPort(const grpc::string& addr,
+ ServerCredentials* creds) {
+ GPR_ASSERT(!started_);
+ int port = creds->AddPortToServer(addr, server_);
+ global_callbacks_->AddPort(this, addr, creds, port);
+ return port;
+}
+
+void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
+ GPR_ASSERT(!started_);
+ global_callbacks_->PreServerStart(this);
+ started_ = true;
+
+ // Only create default health check service when user did not provide an
+ // explicit one.
+ ServerCompletionQueue* health_check_cq = nullptr;
+ DefaultHealthCheckService::HealthCheckServiceImpl*
+ default_health_check_service_impl = nullptr;
+ if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
+ DefaultHealthCheckServiceEnabled()) {
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ // We create a non-polling CQ to avoid impacting application
+ // performance. This ensures that we don't introduce thread hops
+ // for application requests that wind up on this CQ, which is polled
+ // in its own thread.
+ health_check_cq =
+ new ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
+ grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+ nullptr);
+ default_health_check_service_impl =
+ default_hc_service->GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue>(health_check_cq));
+ RegisterService(nullptr, default_health_check_service_impl);
+ }
+
+ grpc_server_start(server_);
+
+ if (!has_generic_service_) {
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddUnknownSyncMethod();
+ }
+
+ for (size_t i = 0; i < num_cqs; i++) {
+ if (cqs[i]->IsFrequentlyPolled()) {
+ new UnimplementedAsyncRequest(this, cqs[i]);
+ }
+ }
+ if (health_check_cq != nullptr) {
+ new UnimplementedAsyncRequest(this, health_check_cq);
+ }
+ }
+
+ // If this server has any support for synchronous methods (has any sync
+ // server CQs), make sure that we have a ResourceExhausted handler
+ // to deal with the case of thread exhaustion
+ if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
+ resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
+ }
+
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Start();
+ }
+
+ for (auto& cbreq : callback_reqs_) {
+ cbreq->Request();
+ }
+
+ if (default_health_check_service_impl != nullptr) {
+ default_health_check_service_impl->StartServingThread();
+ }
+}
+
+void Server::ShutdownInternal(gpr_timespec deadline) {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (!shutdown_) {
+ shutdown_ = true;
+
+ /// The completion queue to use for server shutdown completion notification
+ CompletionQueue shutdown_cq;
+ ShutdownTag shutdown_tag; // Dummy shutdown tag
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+
+ shutdown_cq.Shutdown();
+
+ void* tag;
+ bool ok;
+ CompletionQueue::NextStatus status =
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
+
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
+ if (status == CompletionQueue::NextStatus::TIMEOUT) {
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
+
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Shutdown(); // ThreadManager's Shutdown()
+ }
+
+ // Wait for threads in all ThreadManagers to terminate
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Wait();
+ }
+
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while (shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here. Just ignore ok and tag values
+ }
+
+ shutdown_notified_ = true;
+ shutdown_cv_.notify_all();
+ }
+}
+
+void Server::Wait() {
+ std::unique_lock<std::mutex> lock(mu_);
+ while (started_ && !shutdown_notified_) {
+ shutdown_cv_.wait(lock);
+ }
+}
+
+void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
+ ops->FillOps(call);
+}
+
+ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
+ ServerInterface* server, ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+ : server_(server),
+ context_(context),
+ stream_(stream),
+ call_cq_(call_cq),
+ notification_cq_(notification_cq),
+ tag_(tag),
+ delete_on_finalize_(delete_on_finalize),
+ call_(nullptr),
+ done_intercepting_(false) {
+ /* Set up interception state partially for the receive ops. call_wrapper_ is
+ * not filled at this point, but it will be filled before the interceptors are
+ * run. */
+ interceptor_methods_.SetCall(&call_wrapper_);
+ interceptor_methods_.SetReverse();
+ call_cq_->RegisterAvalanching(); // This op will trigger more ops
+}
+
+ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
+ call_cq_->CompleteAvalanching();
+}
+
+bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
+ if (done_intercepting_) {
+ *tag = tag_;
+ if (delete_on_finalize_) {
+ delete this;
+ }
+ return true;
+ }
+ context_->set_call(call_);
+ context_->cq_ = call_cq_;
+ if (call_wrapper_.call() == nullptr) {
+ // Fill it since it is empty.
+ call_wrapper_ = internal::Call(
+ call_, server_, call_cq_, server_->max_receive_message_size(), nullptr);
+ }
+
+ // just the pointers inside call are copied here
+ stream_->BindCall(&call_wrapper_);
+
+ if (*status && call_ && call_wrapper_.server_rpc_info()) {
+ done_intercepting_ = true;
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_);
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueFinalizeResultAfterInterception(); })) {
+ // There are no interceptors to run. Continue
+ } else {
+ // There were interceptors to be run, so
+ // ContinueFinalizeResultAfterInterception will be run when interceptors
+ // are done.
+ return false;
+ }
+ }
+ if (*status && call_) {
+ context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
+ }
+ *tag = tag_;
+ if (delete_on_finalize_) {
+ delete this;
+ }
+ return true;
+}
+
+void ServerInterface::BaseAsyncRequest::
+ ContinueFinalizeResultAfterInterception() {
+ context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
+ // Queue a tag which will be returned immediately
+ grpc_core::ExecCtx exec_ctx;
+ grpc_cq_begin_op(notification_cq_->cq(), this);
+ grpc_cq_end_op(
+ notification_cq_->cq(), this, GRPC_ERROR_NONE,
+ [](void* arg, grpc_cq_completion* completion) { delete completion; },
+ nullptr, new grpc_cq_completion());
+}
+
+ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
+ ServerInterface* server, ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag, const char* name,
+ internal::RpcMethod::RpcType type)
+ : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
+ true),
+ name_(name),
+ type_(type) {}
+
+void ServerInterface::RegisteredAsyncRequest::IssueRequest(
+ void* registered_method, grpc_byte_buffer** payload,
+ ServerCompletionQueue* notification_cq) {
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
+ server_->server(), registered_method, &call_,
+ &context_->deadline_,
+ context_->client_metadata_.arr(), payload,
+ call_cq_->cq(), notification_cq->cq(), this));
+}
+
+ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
+ ServerInterface* server, GenericServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+ : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
+ delete_on_finalize) {
+ grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ server->server(), &call_, &call_details_,
+ context->client_metadata_.arr(), call_cq->cq(),
+ notification_cq->cq(), this));
+}
+
+bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
+ // If we are done intercepting, there is nothing more for us to do
+ if (done_intercepting_) {
+ return BaseAsyncRequest::FinalizeResult(tag, status);
+ }
+ // TODO(yangg) remove the copy here.
+ if (*status) {
+ static_cast<GenericServerContext*>(context_)->method_ =
+ StringFromCopiedSlice(call_details_.method);
+ static_cast<GenericServerContext*>(context_)->host_ =
+ StringFromCopiedSlice(call_details_.host);
+ context_->deadline_ = call_details_.deadline;
+ }
+ grpc_slice_unref(call_details_.method);
+ grpc_slice_unref(call_details_.host);
+ call_wrapper_ = internal::Call(
+ call_, server_, call_cq_, server_->max_receive_message_size(),
+ context_->set_server_rpc_info(
+ static_cast<GenericServerContext*>(context_)->method_.c_str(),
+ internal::RpcMethod::BIDI_STREAMING,
+ *server_->interceptor_creators()));
+ return BaseAsyncRequest::FinalizeResult(tag, status);
+}
+
+bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
+ if (GenericAsyncRequest::FinalizeResult(tag, status)) {
+ // We either had no interceptors run or we are done intercepting
+ if (*status) {
+ new UnimplementedAsyncRequest(server_, cq_);
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
+ } else {
+ // The tag was swallowed due to interception. We will see it again.
+ }
+ return false;
+}
+
+Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
+ UnimplementedAsyncRequest* request)
+ : request_(request) {
+ Status status(StatusCode::UNIMPLEMENTED, "");
+ internal::UnknownMethodHandler::FillOps(request_->context(), this);
+ request_->stream()->call_.PerformOps(this);
+}
+
+ServerInitializer* Server::initializer() { return server_initializer_.get(); }
+
+namespace {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
+ public:
+ ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
+ // TakeCQ takes ownership of the cq into the shutdown callback
+ // so that the shutdown callback will be responsible for destroying it
+ void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
+
+ // The Run function will get invoked by the completion queue library
+ // when the shutdown is actually complete
+ static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ delete callback->cq_;
+ delete callback;
+ }
+
+ private:
+ CompletionQueue* cq_ = nullptr;
+};
+} // namespace
+
+CompletionQueue* Server::CallbackCQ() {
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-server CQ registered
+ std::lock_guard<std::mutex> l(mu_);
+ if (callback_cq_ == nullptr) {
+ auto* shutdown_callback = new ShutdownCallback;
+ callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ }
+ return callback_cq_;
+};
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/server_context.cc b/third_party/grpc/src/cpp/server/server_context.cc
index e205a19..1b524bc 100644
--- a/third_party/grpc/src/cpp/server/server_context.cc
+++ b/third_party/grpc/src/cpp/server/server_context.cc
@@ -1,168 +1,309 @@
/*
*
- * Copyright 2015-2016, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-#include <grpc++/server_context.h>
+#include <grpcpp/server_context.h>
+#include <grpcpp/support/server_callback.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/impl/sync.h>
-#include <grpc++/support/time.h>
+#include <algorithm>
+#include <mutex>
+#include <utility>
+
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/load_reporting.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpcpp/completion_queue.h>
+#include <grpcpp/impl/call.h>
+#include <grpcpp/support/time.h>
-#include "src/core/channel/compress_filter.h"
-#include "src/cpp/common/create_auth_context.h"
+#include "src/core/lib/surface/call.h"
namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
+class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp()
- : has_tag_(false),
+ // must ref the call before calling constructor and after deleting this
+ CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
+ : call_(*call),
+ reactor_(reactor),
+ has_tag_(false),
tag_(nullptr),
+ core_cq_tag_(this),
refs_(2),
finalized_(false),
- cancelled_(0) {}
+ cancelled_(0),
+ done_intercepting_(false) {}
- void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+ // CompletionOp isn't copyable or movable
+ CompletionOp(const CompletionOp&) = delete;
+ CompletionOp& operator=(const CompletionOp&) = delete;
+ CompletionOp(CompletionOp&&) = delete;
+ CompletionOp& operator=(CompletionOp&&) = delete;
- bool CheckCancelled(CompletionQueue* cq);
+ ~CompletionOp() {
+ if (call_.server_rpc_info()) {
+ call_.server_rpc_info()->Unref();
+ }
+ }
+
+ void FillOps(internal::Call* call) override;
+
+ // This should always be arena allocated in the call, so override delete.
+ // But this class is not trivially destructible, so must actually call delete
+ // before allowing the arena to be freed
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(CompletionOp));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ bool FinalizeResult(void** tag, bool* status) override;
+
+ bool CheckCancelled(CompletionQueue* cq) {
+ cq->TryPluck(this);
+ return CheckCancelledNoPluck();
+ }
+ bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
void set_tag(void* tag) {
has_tag_ = true;
tag_ = tag;
}
+ void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
+
+ void* core_cq_tag() override { return core_cq_tag_; }
+
void Unref();
+ // This will be called while interceptors are run if the RPC is a hijacked
+ // RPC. This should set hijacking state for each of the ops.
+ void SetHijackingState() override {
+ /* Servers don't allow hijacking */
+ GPR_CODEGEN_ASSERT(false);
+ }
+
+ /* Should be called after interceptors are done running */
+ void ContinueFillOpsAfterInterception() override {}
+
+ /* Should be called after interceptors are done running on the finalize result
+ * path */
+ void ContinueFinalizeResultAfterInterception() override {
+ done_intercepting_ = true;
+ if (!has_tag_) {
+ /* We don't have a tag to return. */
+ std::unique_lock<std::mutex> lock(mu_);
+ if (--refs_ == 0) {
+ lock.unlock();
+ grpc_call* call = call_.call();
+ delete this;
+ grpc_call_unref(call);
+ }
+ return;
+ }
+ /* Start a dummy op so that we can return the tag */
+ GPR_CODEGEN_ASSERT(
+ GRPC_CALL_OK ==
+ grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
+ }
+
private:
+ bool CheckCancelledNoPluck() {
+ std::lock_guard<std::mutex> g(mu_);
+ return finalized_ ? (cancelled_ != 0) : false;
+ }
+
+ internal::Call call_;
+ internal::ServerReactor* reactor_;
bool has_tag_;
void* tag_;
- grpc::mutex mu_;
+ void* core_cq_tag_;
+ std::mutex mu_;
int refs_;
bool finalized_;
- int cancelled_;
+ int cancelled_; // This is an int (not bool) because it is passed to core
+ bool done_intercepting_;
+ internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
void ServerContext::CompletionOp::Unref() {
- grpc::unique_lock<grpc::mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
+ grpc_call* call = call_.call();
delete this;
+ grpc_call_unref(call);
}
}
-bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
- cq->TryPluck(this);
- grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ != 0 : false;
-}
-
-void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
- ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops->data.recv_close_on_server.cancelled = &cancelled_;
- ops->flags = 0;
- ops->reserved = NULL;
- *nops = 1;
+void ServerContext::CompletionOp::FillOps(internal::Call* call) {
+ grpc_op ops;
+ ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops.data.recv_close_on_server.cancelled = &cancelled_;
+ ops.flags = 0;
+ ops.reserved = nullptr;
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ interceptor_methods_.SetCallOpSetInterface(this);
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
+ core_cq_tag_, nullptr));
+ /* No interceptors to run here */
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- finalized_ = true;
bool ret = false;
- if (has_tag_) {
- *tag = tag_;
- ret = true;
+ std::unique_lock<std::mutex> lock(mu_);
+ if (done_intercepting_) {
+ /* We are done intercepting. */
+ if (has_tag_) {
+ *tag = tag_;
+ ret = true;
+ }
+ if (--refs_ == 0) {
+ lock.unlock();
+ grpc_call* call = call_.call();
+ delete this;
+ grpc_call_unref(call);
+ }
+ return ret;
}
- if (!*status) cancelled_ = 1;
- if (--refs_ == 0) {
- lock.unlock();
- delete this;
+ finalized_ = true;
+
+ // If for some reason the incoming status is false, mark that as a
+ // cancellation.
+ // TODO(vjpai): does this ever happen?
+ if (!*status) {
+ cancelled_ = 1;
}
- return ret;
+
+ if (cancelled_ && (reactor_ != nullptr)) {
+ reactor_->OnCancel();
+ }
+ /* Release the lock since we are going to be running through interceptors now
+ */
+ lock.unlock();
+ /* Add interception point and run through interceptors */
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_CLOSE);
+ if (interceptor_methods_.RunInterceptors()) {
+ /* No interceptors were run */
+ if (has_tag_) {
+ *tag = tag_;
+ ret = true;
+ }
+ lock.lock();
+ if (--refs_ == 0) {
+ lock.unlock();
+ grpc_call* call = call_.call();
+ delete this;
+ grpc_call_unref(call);
+ }
+ return ret;
+ }
+ /* There are interceptors to be run. Return false for now */
+ return false;
}
// ServerContext body
-ServerContext::ServerContext()
- : completion_op_(nullptr),
- has_notify_when_done_tag_(false),
- async_notify_when_done_tag_(nullptr),
- deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
- call_(nullptr),
- cq_(nullptr),
- sent_initial_metadata_(false) {}
+ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); }
-ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
- size_t metadata_count)
- : completion_op_(nullptr),
- has_notify_when_done_tag_(false),
- async_notify_when_done_tag_(nullptr),
- deadline_(deadline),
- call_(nullptr),
- cq_(nullptr),
- sent_initial_metadata_(false) {
- for (size_t i = 0; i < metadata_count; i++) {
- client_metadata_.insert(std::pair<grpc::string_ref, grpc::string_ref>(
- metadata[i].key,
- grpc::string_ref(metadata[i].value, metadata[i].value_length)));
- }
+ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) {
+ Setup(deadline);
+ std::swap(*client_metadata_.arr(), *arr);
}
-ServerContext::~ServerContext() {
- if (call_) {
- grpc_call_destroy(call_);
- }
+void ServerContext::Setup(gpr_timespec deadline) {
+ completion_op_ = nullptr;
+ has_notify_when_done_tag_ = false;
+ async_notify_when_done_tag_ = nullptr;
+ deadline_ = deadline;
+ call_ = nullptr;
+ cq_ = nullptr;
+ sent_initial_metadata_ = false;
+ compression_level_set_ = false;
+ has_pending_ops_ = false;
+ rpc_info_ = nullptr;
+}
+
+void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline,
+ grpc_metadata_array* arr) {
+ deadline_ = deadline;
+ std::swap(*client_metadata_.arr(), *arr);
+}
+
+ServerContext::~ServerContext() { Clear(); }
+
+void ServerContext::Clear() {
+ auth_context_.reset();
+ initial_metadata_.clear();
+ trailing_metadata_.clear();
+ client_metadata_.Reset();
if (completion_op_) {
completion_op_->Unref();
+ completion_op_ = nullptr;
+ completion_tag_.Clear();
+ }
+ if (rpc_info_) {
+ rpc_info_->Unref();
+ rpc_info_ = nullptr;
+ }
+ if (call_) {
+ auto* call = call_;
+ call_ = nullptr;
+ grpc_call_unref(call);
}
}
-void ServerContext::BeginCompletionOp(Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call,
+ std::function<void(bool)> callback,
+ internal::ServerReactor* reactor) {
GPR_ASSERT(!completion_op_);
- completion_op_ = new CompletionOp();
- if (has_notify_when_done_tag_) {
+ if (rpc_info_) {
+ rpc_info_->Ref();
+ }
+ grpc_call_ref(call->call());
+ completion_op_ =
+ new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
+ CompletionOp(call, reactor);
+ if (callback != nullptr) {
+ completion_tag_.Set(call->call(), std::move(callback), completion_op_);
+ completion_op_->set_core_cq_tag(&completion_tag_);
+ completion_op_->set_tag(completion_op_);
+ } else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
call->PerformOps(completion_op_);
}
+internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<internal::CompletionQueueTag*>(completion_op_);
+}
+
void ServerContext::AddInitialMetadata(const grpc::string& key,
const grpc::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
@@ -174,45 +315,44 @@
}
void ServerContext::TryCancel() const {
+ internal::CancelInterceptorBatchMethods cancel_methods;
+ if (rpc_info_) {
+ for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
+ rpc_info_->RunInterceptor(&cancel_methods, i);
+ }
+ }
grpc_call_error err = grpc_call_cancel_with_status(
- call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", NULL);
+ call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
if (err != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
}
}
bool ServerContext::IsCancelled() const {
- return completion_op_ && completion_op_->CheckCancelled(cq_);
-}
-
-void ServerContext::set_compression_level(grpc_compression_level level) {
- const grpc_compression_algorithm algorithm_for_level =
- grpc_compression_algorithm_for_level(level);
- set_compression_algorithm(algorithm_for_level);
+ if (completion_tag_) {
+ // When using callback API, this result is always valid.
+ return completion_op_->CheckCancelledAsync();
+ } else if (has_notify_when_done_tag_) {
+ // When using async API, the result is only valid
+ // if the tag has already been delivered at the completion queue
+ return completion_op_ && completion_op_->CheckCancelledAsync();
+ } else {
+ // when using sync API, the result is always valid
+ return completion_op_ && completion_op_->CheckCancelled(cq_);
+ }
}
void ServerContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
- char* algorithm_name = NULL;
+ compression_algorithm_ = algorithm;
+ const char* algorithm_name = nullptr;
if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
algorithm);
abort();
}
- GPR_ASSERT(algorithm_name != NULL);
- AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
-}
-
-void ServerContext::set_call(grpc_call* call) {
- call_ = call;
- auth_context_ = CreateAuthContext(call);
-}
-
-std::shared_ptr<const AuthContext> ServerContext::auth_context() const {
- if (auth_context_.get() == nullptr) {
- auth_context_ = CreateAuthContext(call_);
- }
- return auth_context_;
+ GPR_ASSERT(algorithm_name != nullptr);
+ AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
}
grpc::string ServerContext::peer() const {
@@ -229,4 +369,12 @@
return grpc_census_call_get_context(call_);
}
+void ServerContext::SetLoadReportingCosts(
+ const std::vector<grpc::string>& cost_data) {
+ if (call_ == nullptr) return;
+ for (const auto& cost_datum : cost_data) {
+ AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
+ }
+}
+
} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/server_credentials.cc b/third_party/grpc/src/cpp/server/server_credentials.cc
index 8495916..c3b3a8b 100644
--- a/third_party/grpc/src/cpp/server/server_credentials.cc
+++ b/third_party/grpc/src/cpp/server/server_credentials.cc
@@ -1,37 +1,22 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-#include <grpc++/security/server_credentials.h>
+#include <grpcpp/security/server_credentials.h>
namespace grpc {
diff --git a/third_party/grpc/src/cpp/server/server_posix.cc b/third_party/grpc/src/cpp/server/server_posix.cc
new file mode 100644
index 0000000..7c221ed
--- /dev/null
+++ b/third_party/grpc/src/cpp/server/server_posix.cc
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/server_posix.h>
+
+#include <grpc/grpc_posix.h>
+
+namespace grpc {
+
+#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
+
+void AddInsecureChannelFromFd(Server* server, int fd) {
+ grpc_server_add_insecure_channel_from_fd(server->c_server(), nullptr, fd);
+}
+
+#endif // GPR_SUPPORT_CHANNELS_FROM_FD
+
+} // namespace grpc
diff --git a/third_party/grpc/src/cpp/server/thread_pool_interface.h b/third_party/grpc/src/cpp/server/thread_pool_interface.h
index 1ebe30f..028842a 100644
--- a/third_party/grpc/src/cpp/server/thread_pool_interface.h
+++ b/third_party/grpc/src/cpp/server/thread_pool_interface.h
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -47,6 +32,10 @@
virtual void Add(const std::function<void()>& callback) = 0;
};
+// Allows different codebases to use their own thread pool impls
+typedef ThreadPoolInterface* (*CreateThreadPoolFunc)(void);
+void SetCreateThreadPool(CreateThreadPoolFunc func);
+
ThreadPoolInterface* CreateDefaultThreadPool();
} // namespace grpc