1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of grpc server
19 */
20
21#include "grpc_server.h"
22#include <chrono>
23#include "common/config.h"
24#include "common/error_code.h"
25#include "common/logger.h"
26
27extern std::string GetVersion();
28
29namespace proxima {
30namespace be {
31namespace server {
32
33
34GrpcServerUPtr GrpcServer::Create() {
35 return std::unique_ptr<GrpcServer>(new GrpcServer());
36}
37
38GrpcServer::~GrpcServer() {
39 if (thread_ != nullptr) {
40 this->stop();
41 }
42}
43
44int GrpcServer::bind_and_start(const agent::IndexAgentPtr &index_agent,
45 const query::QueryAgentPtr &query_agent,
46 const admin::AdminAgentPtr &admin_agent,
47 const std::string &version) {
48 auto *request_handler =
49 new ProximaRequestHandler(index_agent, query_agent, admin_agent);
50 if (!request_handler) {
51 LOG_ERROR("Create proxima request handler failed.");
52 return ErrorCode_RuntimeError;
53 }
54
55 // Set server version
56 request_handler->set_version(version);
57 server_.set_version(version);
58
59 // Register grpc service
60 int ret = server_.AddService((proto::ProximaService *)request_handler,
61 brpc::SERVER_OWNS_SERVICE);
62 if (ret != 0) {
63 LOG_ERROR("Grpc server add service failed.");
64 return ret;
65 }
66
67 // async start grpc server in single thread
68 thread_ = std::unique_ptr<std::thread>(
69 new std::thread(&GrpcServer::start_server, this));
70
71 // sleep 1s
72 std::this_thread::sleep_for(std::chrono::seconds(1));
73
74 return 0;
75}
76
77int GrpcServer::stop() {
78 this->stop_server();
79 if (thread_ != nullptr) {
80 thread_->join();
81 thread_ = nullptr;
82 }
83 return 0;
84}
85
86bool GrpcServer::is_running() {
87 return server_.IsRunning();
88}
89
90int GrpcServer::start_server() {
91 brpc::ServerOptions options;
92
93 // Do not set auto concurrency limiter, it's unstable
94 // max_concurrency | idle_timeout_sec options not open now
95
96 // Configured by query thread count,
97 // the pthread pool is shared in global.
98 // We config query thread count + 1, just for preserving
99 // one thread for scheduler.
100 options.num_threads = Config::Instance().get_query_thread_count() + 1;
101 uint32_t listen_port = Config::Instance().get_grpc_listen_port();
102 int ret = server_.Start(listen_port, &options);
103 if (ret != 0) {
104 LOG_ERROR("Grpc server start failed.");
105 return ret;
106 }
107
108 LOG_INFO("Grpc server start success. port[%u]", listen_port);
109
110 while (server_.IsRunning()) {
111 std::this_thread::sleep_for(std::chrono::seconds(1));
112 }
113
114 LOG_INFO("Grpc server thread exit.");
115 return 0;
116}
117
118int GrpcServer::stop_server() {
119 server_.Stop(0);
120 server_.Join();
121 return 0;
122}
123
124
125} // end namespace server
126} // namespace be
127} // end namespace proxima
128