1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of grpc server |
19 | */ |
20 | |
21 | #include "grpc_server.h" |
22 | #include <chrono> |
23 | #include "common/config.h" |
24 | #include "common/error_code.h" |
25 | #include "common/logger.h" |
26 | |
27 | extern std::string GetVersion(); |
28 | |
29 | namespace proxima { |
30 | namespace be { |
31 | namespace server { |
32 | |
33 | |
34 | GrpcServerUPtr GrpcServer::Create() { |
35 | return std::unique_ptr<GrpcServer>(new GrpcServer()); |
36 | } |
37 | |
38 | GrpcServer::~GrpcServer() { |
39 | if (thread_ != nullptr) { |
40 | this->stop(); |
41 | } |
42 | } |
43 | |
44 | int GrpcServer::bind_and_start(const agent::IndexAgentPtr &index_agent, |
45 | const query::QueryAgentPtr &query_agent, |
46 | const admin::AdminAgentPtr &admin_agent, |
47 | const std::string &version) { |
48 | auto *request_handler = |
49 | new ProximaRequestHandler(index_agent, query_agent, admin_agent); |
50 | if (!request_handler) { |
51 | LOG_ERROR("Create proxima request handler failed." ); |
52 | return ErrorCode_RuntimeError; |
53 | } |
54 | |
55 | // Set server version |
56 | request_handler->set_version(version); |
57 | server_.set_version(version); |
58 | |
59 | // Register grpc service |
60 | int ret = server_.AddService((proto::ProximaService *)request_handler, |
61 | brpc::SERVER_OWNS_SERVICE); |
62 | if (ret != 0) { |
63 | LOG_ERROR("Grpc server add service failed." ); |
64 | return ret; |
65 | } |
66 | |
67 | // async start grpc server in single thread |
68 | thread_ = std::unique_ptr<std::thread>( |
69 | new std::thread(&GrpcServer::start_server, this)); |
70 | |
71 | // sleep 1s |
72 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
73 | |
74 | return 0; |
75 | } |
76 | |
77 | int GrpcServer::stop() { |
78 | this->stop_server(); |
79 | if (thread_ != nullptr) { |
80 | thread_->join(); |
81 | thread_ = nullptr; |
82 | } |
83 | return 0; |
84 | } |
85 | |
86 | bool GrpcServer::is_running() { |
87 | return server_.IsRunning(); |
88 | } |
89 | |
90 | int GrpcServer::start_server() { |
91 | brpc::ServerOptions options; |
92 | |
93 | // Do not set auto concurrency limiter, it's unstable |
94 | // max_concurrency | idle_timeout_sec options not open now |
95 | |
96 | // Configured by query thread count, |
97 | // the pthread pool is shared in global. |
98 | // We config query thread count + 1, just for preserving |
99 | // one thread for scheduler. |
100 | options.num_threads = Config::Instance().get_query_thread_count() + 1; |
101 | uint32_t listen_port = Config::Instance().get_grpc_listen_port(); |
102 | int ret = server_.Start(listen_port, &options); |
103 | if (ret != 0) { |
104 | LOG_ERROR("Grpc server start failed." ); |
105 | return ret; |
106 | } |
107 | |
108 | LOG_INFO("Grpc server start success. port[%u]" , listen_port); |
109 | |
110 | while (server_.IsRunning()) { |
111 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
112 | } |
113 | |
114 | LOG_INFO("Grpc server thread exit." ); |
115 | return 0; |
116 | } |
117 | |
118 | int GrpcServer::stop_server() { |
119 | server_.Stop(0); |
120 | server_.Join(); |
121 | return 0; |
122 | } |
123 | |
124 | |
125 | } // end namespace server |
126 | } // namespace be |
127 | } // end namespace proxima |
128 | |