1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of proxima search engine
19 */
20
21#include "proxima_search_engine.h"
22#include <chrono>
23#include <thread>
24#include <ailego/utility/process_helper.h>
25#include <ailego/utility/string_helper.h>
26#include "common/config.h"
27#include "common/error_code.h"
28#include "common/logger.h"
29#include "metrics/metrics_collector.h"
30
31namespace proxima {
32namespace be {
33namespace server {
34
35int ProximaSearchEngine::init(bool daemonized, const std::string &pid_file) {
36 if (!pid_file.empty() && !pid_file_.open(pid_file)) {
37 LOG_ERROR("ProximaSE open the pid file failed, pid_file=[%s].",
38 pid_file.c_str());
39 return ErrorCode_OpenFile;
40 }
41 daemonized_ = daemonized;
42
43 // init logger
44 int ret = init_logger();
45 if (ret != 0) {
46 LOG_ERROR("ProximaSE init logger error.");
47 return ret;
48 }
49
50 // get config
51 Config &config = Config::Instance();
52
53 // init metrics
54 ret =
55 metrics::MetricsCollector::CreateAndInitMetrics(config.metrics_config());
56 if (ret != 0) {
57 LOG_ERROR("ProximaSE init metrics error");
58 return ret;
59 }
60
61 // init meta agent
62 meta_agent_ = meta::MetaAgent::Create(config.get_meta_uri());
63 if (!meta_agent_) {
64 LOG_ERROR("Create meta agent failed.");
65 return ErrorCode_RuntimeError;
66 }
67
68 ret = meta_agent_->init();
69 if (ret != 0) {
70 LOG_ERROR("Init meta agent failed.");
71 return ret;
72 }
73
74 // init index agent
75 index_agent_ = agent::IndexAgent::Create(meta_agent_->get_service());
76 if (!index_agent_) {
77 LOG_ERROR("Create index agent failed.");
78 return ErrorCode_RuntimeError;
79 }
80
81 ret = index_agent_->init();
82 if (ret != 0) {
83 LOG_ERROR("Init index agent failed.");
84 return ret;
85 }
86
87 // init query agent
88 uint32_t concurrency = config.get_query_thread_count();
89 query_agent_ = query::QueryAgent::Create(
90 index_agent_->get_service(), meta_agent_->get_service(), concurrency);
91 if (!query_agent_) {
92 LOG_ERROR("Create query agent failed.");
93 return ErrorCode_RuntimeError;
94 }
95
96 ret = query_agent_->init();
97 if (ret != 0) {
98 LOG_ERROR("Init query agent failed.");
99 return ret;
100 }
101
102 // init admin agent
103 admin_agent_ =
104 admin::AdminAgent::Create(meta_agent_, index_agent_, query_agent_);
105 if (!admin_agent_) {
106 LOG_ERROR("Create admin agent failed.");
107 return ErrorCode_RuntimeError;
108 }
109
110 ret = admin_agent_->init();
111 if (ret != 0) {
112 LOG_ERROR("Init admin agent failed.");
113 return ret;
114 }
115
116 grpc_server_ = GrpcServer::Create();
117 if (!grpc_server_) {
118 LOG_ERROR("GrpcServer create failed.");
119 return ErrorCode_RuntimeError;
120 }
121
122 http_server_ = HttpServer::Create();
123 if (!http_server_) {
124 LOG_ERROR("HttpServer create failed.");
125 return ErrorCode_RuntimeError;
126 }
127
128 return 0;
129}
130
131int ProximaSearchEngine::cleanup() {
132 if (admin_agent_ != nullptr) {
133 admin_agent_->cleanup();
134 }
135
136 if (query_agent_ != nullptr) {
137 query_agent_->cleanup();
138 }
139
140 if (index_agent_ != nullptr) {
141 index_agent_->cleanup();
142 }
143
144 if (meta_agent_ != nullptr) {
145 meta_agent_->cleanup();
146 }
147
148 LOG_INFO("ProximaSE cleanup complete.");
149 LogUtil::Shutdown();
150 Config::Instance().cleanup();
151
152 daemonized_ = false;
153 return 0;
154}
155
156int ProximaSearchEngine::start() {
157 if (daemonized_) {
158 daemonize();
159 }
160
161 // start meta agent
162 int ret = meta_agent_->start();
163 if (ret != 0) {
164 LOG_ERROR("Start meta agent failed.");
165 return ret;
166 }
167
168 // start index agent
169 ret = index_agent_->start();
170 if (ret != 0) {
171 LOG_ERROR("Start index agent failed.");
172 return ret;
173 }
174
175 // start query agent
176 ret = query_agent_->start();
177 if (ret != 0) {
178 LOG_ERROR("Start query agent failed.");
179 return ret;
180 }
181
182 // start admin agent
183 ret = admin_agent_->start();
184 if (ret != 0) {
185 LOG_ERROR("Start admin agent failed.");
186 return ret;
187 }
188
189 // start grpc server
190 if (support_brpc_protocol()) {
191 ret = grpc_server_->bind_and_start(index_agent_, query_agent_, admin_agent_,
192 version_);
193 if (ret != 0) {
194 LOG_ERROR("GrpcServer bind and start failed.");
195 return ret;
196 }
197
198 std::this_thread::sleep_for(std::chrono::seconds(1));
199 if (!grpc_server_->is_running()) {
200 return ErrorCode_StartServer;
201 }
202 }
203
204 // start http server
205 if (support_http_protocol()) {
206 ret = http_server_->bind_and_start(index_agent_, query_agent_, admin_agent_,
207 version_);
208 if (ret != 0) {
209 LOG_ERROR("HttpServer bind and start failed.");
210 return ret;
211 }
212
213 std::this_thread::sleep_for(std::chrono::seconds(1));
214
215 if (!http_server_->is_running()) {
216 return ErrorCode_StartServer;
217 }
218 }
219
220 LOG_INFO("ProximaSE start successfully.");
221 return 0;
222}
223
224int ProximaSearchEngine::stop() {
225 if (is_stopping_.exchange(true)) {
226 return 0;
227 }
228
229 if (grpc_server_ != nullptr && grpc_server_->is_running()) {
230 grpc_server_->stop();
231 }
232
233 if (http_server_ != nullptr && http_server_->is_running()) {
234 http_server_->stop();
235 }
236
237 if (admin_agent_ != nullptr) {
238 admin_agent_->stop();
239 }
240
241 if (query_agent_ != nullptr) {
242 query_agent_->stop();
243 }
244
245 if (index_agent_ != nullptr) {
246 index_agent_->stop();
247 }
248
249 if (meta_agent_ != nullptr) {
250 meta_agent_->stop();
251 }
252
253 pid_file_.close();
254
255 LOG_INFO("ProximaSE stopped.");
256 return 0;
257}
258
259int ProximaSearchEngine::init_logger() {
260 // get logger config
261 std::string log_dir = Config::Instance().get_log_dir();
262 std::string log_file = Config::Instance().get_log_file();
263 uint32_t log_level = Config::Instance().get_log_level();
264 std::string logger_type = Config::Instance().get_logger_type();
265
266 // int logger
267 return LogUtil::Init(log_dir, log_file, log_level, logger_type);
268}
269
270void ProximaSearchEngine::daemonize() {
271 std::string log_dir = Config::Instance().get_log_dir();
272 std::string stdout_path = log_dir + "./stdout.log";
273 std::string stderr_path = log_dir + "./stderr.log";
274 ailego::ProcessHelper::Daemon(stdout_path.c_str(), stderr_path.c_str());
275}
276
277bool ProximaSearchEngine::support_brpc_protocol() {
278 std::string protocol = Config::Instance().get_protocol();
279 std::vector<std::string> prots;
280 ailego::StringHelper::Split<std::string>(protocol, '|', &prots);
281 auto it = std::find_if(prots.begin(), prots.end(),
282 [](const std::string &str) { return str == "grpc"; });
283 return (it != prots.end());
284}
285
286bool ProximaSearchEngine::support_http_protocol() {
287 std::string protocol = Config::Instance().get_protocol();
288 std::vector<std::string> prots;
289 ailego::StringHelper::Split<std::string>(protocol, '|', &prots);
290 auto it = std::find_if(prots.begin(), prots.end(),
291 [](const std::string &str) { return str == "http"; });
292 return (it != prots.end());
293}
294
295} // end namespace server
296} // namespace be
297} // end namespace proxima
298