1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of proxima search engine |
19 | */ |
20 | |
21 | #include "proxima_search_engine.h" |
22 | #include <chrono> |
23 | #include <thread> |
24 | #include <ailego/utility/process_helper.h> |
25 | #include <ailego/utility/string_helper.h> |
26 | #include "common/config.h" |
27 | #include "common/error_code.h" |
28 | #include "common/logger.h" |
29 | #include "metrics/metrics_collector.h" |
30 | |
31 | namespace proxima { |
32 | namespace be { |
33 | namespace server { |
34 | |
35 | int ProximaSearchEngine::init(bool daemonized, const std::string &pid_file) { |
36 | if (!pid_file.empty() && !pid_file_.open(pid_file)) { |
37 | LOG_ERROR("ProximaSE open the pid file failed, pid_file=[%s]." , |
38 | pid_file.c_str()); |
39 | return ErrorCode_OpenFile; |
40 | } |
41 | daemonized_ = daemonized; |
42 | |
43 | // init logger |
44 | int ret = init_logger(); |
45 | if (ret != 0) { |
46 | LOG_ERROR("ProximaSE init logger error." ); |
47 | return ret; |
48 | } |
49 | |
50 | // get config |
51 | Config &config = Config::Instance(); |
52 | |
53 | // init metrics |
54 | ret = |
55 | metrics::MetricsCollector::CreateAndInitMetrics(config.metrics_config()); |
56 | if (ret != 0) { |
57 | LOG_ERROR("ProximaSE init metrics error" ); |
58 | return ret; |
59 | } |
60 | |
61 | // init meta agent |
62 | meta_agent_ = meta::MetaAgent::Create(config.get_meta_uri()); |
63 | if (!meta_agent_) { |
64 | LOG_ERROR("Create meta agent failed." ); |
65 | return ErrorCode_RuntimeError; |
66 | } |
67 | |
68 | ret = meta_agent_->init(); |
69 | if (ret != 0) { |
70 | LOG_ERROR("Init meta agent failed." ); |
71 | return ret; |
72 | } |
73 | |
74 | // init index agent |
75 | index_agent_ = agent::IndexAgent::Create(meta_agent_->get_service()); |
76 | if (!index_agent_) { |
77 | LOG_ERROR("Create index agent failed." ); |
78 | return ErrorCode_RuntimeError; |
79 | } |
80 | |
81 | ret = index_agent_->init(); |
82 | if (ret != 0) { |
83 | LOG_ERROR("Init index agent failed." ); |
84 | return ret; |
85 | } |
86 | |
87 | // init query agent |
88 | uint32_t concurrency = config.get_query_thread_count(); |
89 | query_agent_ = query::QueryAgent::Create( |
90 | index_agent_->get_service(), meta_agent_->get_service(), concurrency); |
91 | if (!query_agent_) { |
92 | LOG_ERROR("Create query agent failed." ); |
93 | return ErrorCode_RuntimeError; |
94 | } |
95 | |
96 | ret = query_agent_->init(); |
97 | if (ret != 0) { |
98 | LOG_ERROR("Init query agent failed." ); |
99 | return ret; |
100 | } |
101 | |
102 | // init admin agent |
103 | admin_agent_ = |
104 | admin::AdminAgent::Create(meta_agent_, index_agent_, query_agent_); |
105 | if (!admin_agent_) { |
106 | LOG_ERROR("Create admin agent failed." ); |
107 | return ErrorCode_RuntimeError; |
108 | } |
109 | |
110 | ret = admin_agent_->init(); |
111 | if (ret != 0) { |
112 | LOG_ERROR("Init admin agent failed." ); |
113 | return ret; |
114 | } |
115 | |
116 | grpc_server_ = GrpcServer::Create(); |
117 | if (!grpc_server_) { |
118 | LOG_ERROR("GrpcServer create failed." ); |
119 | return ErrorCode_RuntimeError; |
120 | } |
121 | |
122 | http_server_ = HttpServer::Create(); |
123 | if (!http_server_) { |
124 | LOG_ERROR("HttpServer create failed." ); |
125 | return ErrorCode_RuntimeError; |
126 | } |
127 | |
128 | return 0; |
129 | } |
130 | |
131 | int ProximaSearchEngine::cleanup() { |
132 | if (admin_agent_ != nullptr) { |
133 | admin_agent_->cleanup(); |
134 | } |
135 | |
136 | if (query_agent_ != nullptr) { |
137 | query_agent_->cleanup(); |
138 | } |
139 | |
140 | if (index_agent_ != nullptr) { |
141 | index_agent_->cleanup(); |
142 | } |
143 | |
144 | if (meta_agent_ != nullptr) { |
145 | meta_agent_->cleanup(); |
146 | } |
147 | |
148 | LOG_INFO("ProximaSE cleanup complete." ); |
149 | LogUtil::Shutdown(); |
150 | Config::Instance().cleanup(); |
151 | |
152 | daemonized_ = false; |
153 | return 0; |
154 | } |
155 | |
156 | int ProximaSearchEngine::start() { |
157 | if (daemonized_) { |
158 | daemonize(); |
159 | } |
160 | |
161 | // start meta agent |
162 | int ret = meta_agent_->start(); |
163 | if (ret != 0) { |
164 | LOG_ERROR("Start meta agent failed." ); |
165 | return ret; |
166 | } |
167 | |
168 | // start index agent |
169 | ret = index_agent_->start(); |
170 | if (ret != 0) { |
171 | LOG_ERROR("Start index agent failed." ); |
172 | return ret; |
173 | } |
174 | |
175 | // start query agent |
176 | ret = query_agent_->start(); |
177 | if (ret != 0) { |
178 | LOG_ERROR("Start query agent failed." ); |
179 | return ret; |
180 | } |
181 | |
182 | // start admin agent |
183 | ret = admin_agent_->start(); |
184 | if (ret != 0) { |
185 | LOG_ERROR("Start admin agent failed." ); |
186 | return ret; |
187 | } |
188 | |
189 | // start grpc server |
190 | if (support_brpc_protocol()) { |
191 | ret = grpc_server_->bind_and_start(index_agent_, query_agent_, admin_agent_, |
192 | version_); |
193 | if (ret != 0) { |
194 | LOG_ERROR("GrpcServer bind and start failed." ); |
195 | return ret; |
196 | } |
197 | |
198 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
199 | if (!grpc_server_->is_running()) { |
200 | return ErrorCode_StartServer; |
201 | } |
202 | } |
203 | |
204 | // start http server |
205 | if (support_http_protocol()) { |
206 | ret = http_server_->bind_and_start(index_agent_, query_agent_, admin_agent_, |
207 | version_); |
208 | if (ret != 0) { |
209 | LOG_ERROR("HttpServer bind and start failed." ); |
210 | return ret; |
211 | } |
212 | |
213 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
214 | |
215 | if (!http_server_->is_running()) { |
216 | return ErrorCode_StartServer; |
217 | } |
218 | } |
219 | |
220 | LOG_INFO("ProximaSE start successfully." ); |
221 | return 0; |
222 | } |
223 | |
224 | int ProximaSearchEngine::stop() { |
225 | if (is_stopping_.exchange(true)) { |
226 | return 0; |
227 | } |
228 | |
229 | if (grpc_server_ != nullptr && grpc_server_->is_running()) { |
230 | grpc_server_->stop(); |
231 | } |
232 | |
233 | if (http_server_ != nullptr && http_server_->is_running()) { |
234 | http_server_->stop(); |
235 | } |
236 | |
237 | if (admin_agent_ != nullptr) { |
238 | admin_agent_->stop(); |
239 | } |
240 | |
241 | if (query_agent_ != nullptr) { |
242 | query_agent_->stop(); |
243 | } |
244 | |
245 | if (index_agent_ != nullptr) { |
246 | index_agent_->stop(); |
247 | } |
248 | |
249 | if (meta_agent_ != nullptr) { |
250 | meta_agent_->stop(); |
251 | } |
252 | |
253 | pid_file_.close(); |
254 | |
255 | LOG_INFO("ProximaSE stopped." ); |
256 | return 0; |
257 | } |
258 | |
259 | int ProximaSearchEngine::init_logger() { |
260 | // get logger config |
261 | std::string log_dir = Config::Instance().get_log_dir(); |
262 | std::string log_file = Config::Instance().get_log_file(); |
263 | uint32_t log_level = Config::Instance().get_log_level(); |
264 | std::string logger_type = Config::Instance().get_logger_type(); |
265 | |
266 | // int logger |
267 | return LogUtil::Init(log_dir, log_file, log_level, logger_type); |
268 | } |
269 | |
270 | void ProximaSearchEngine::daemonize() { |
271 | std::string log_dir = Config::Instance().get_log_dir(); |
272 | std::string stdout_path = log_dir + "./stdout.log" ; |
273 | std::string stderr_path = log_dir + "./stderr.log" ; |
274 | ailego::ProcessHelper::Daemon(stdout_path.c_str(), stderr_path.c_str()); |
275 | } |
276 | |
277 | bool ProximaSearchEngine::support_brpc_protocol() { |
278 | std::string protocol = Config::Instance().get_protocol(); |
279 | std::vector<std::string> prots; |
280 | ailego::StringHelper::Split<std::string>(protocol, '|', &prots); |
281 | auto it = std::find_if(prots.begin(), prots.end(), |
282 | [](const std::string &str) { return str == "grpc" ; }); |
283 | return (it != prots.end()); |
284 | } |
285 | |
286 | bool ProximaSearchEngine::support_http_protocol() { |
287 | std::string protocol = Config::Instance().get_protocol(); |
288 | std::vector<std::string> prots; |
289 | ailego::StringHelper::Split<std::string>(protocol, '|', &prots); |
290 | auto it = std::find_if(prots.begin(), prots.end(), |
291 | [](const std::string &str) { return str == "http" ; }); |
292 | return (it != prots.end()); |
293 | } |
294 | |
295 | } // end namespace server |
296 | } // namespace be |
297 | } // end namespace proxima |
298 | |