1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author guonix |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include "query_agent.h" |
22 | #include "common/config.h" |
23 | #include "common/profiler.h" |
24 | #include "query_service_builder.h" |
25 | |
26 | namespace proxima { |
27 | namespace be { |
28 | namespace query { |
29 | |
30 | /*! |
31 | * QueryAgent implementation |
32 | */ |
33 | class QueryAgentImpl : public QueryAgent { |
34 | public: |
35 | //! Constructor |
36 | explicit QueryAgentImpl(QueryServicePtr query_service) |
37 | : query_service_(std::move(query_service)), stopped_(false) {} |
38 | |
39 | //! Destructor |
40 | ~QueryAgentImpl() override = default; |
41 | |
42 | public: |
43 | //! Get Query Service Instance |
44 | QueryServicePtr get_service() const override { |
45 | return query_service_; |
46 | } |
47 | |
48 | public: |
49 | //! Query Service |
50 | int search(const proto::QueryRequest *query, |
51 | proto::QueryResponse *response) override { |
52 | int error_code = PROXIMA_BE_ERROR_CODE(StoppedService); |
53 | if (!is_running()) { |
54 | LOG_WARN("QueryAgent stopped, invoke start and try again." ); |
55 | return error_code; |
56 | } |
57 | ProfilerPtr profiler = std::make_shared<Profiler>(query->debug_mode()); |
58 | profiler->start(); |
59 | error_code = query_service_->search(query, response, profiler); |
60 | profiler->stop(); |
61 | if (profiler->enabled()) { |
62 | response->set_debug_info(profiler->as_json_string()); |
63 | } |
64 | return error_code; |
65 | } |
66 | |
67 | //! Query Service |
68 | int search_by_key(const proto::GetDocumentRequest *query, |
69 | proto::GetDocumentResponse *response) override { |
70 | if (!is_running()) { |
71 | LOG_WARN("QueryAgent stopped, invoke start and try again." ); |
72 | return PROXIMA_BE_ERROR_CODE(StoppedService); |
73 | } |
74 | ProfilerPtr profiler = std::make_shared<Profiler>(query->debug_mode()); |
75 | profiler->start(); |
76 | int error_code = query_service_->search_by_key(query, response, profiler); |
77 | profiler->stop(); |
78 | if (profiler->enabled()) { |
79 | response->set_debug_info(profiler->as_json_string()); |
80 | } |
81 | return error_code; |
82 | } |
83 | |
84 | public: |
85 | //! Init Query Agent |
86 | int init() override { |
87 | LOG_INFO("QueryAgent initialize complete." ); |
88 | return 0; |
89 | } |
90 | |
91 | //! Cleanup Query Agent |
92 | int cleanup() override { |
93 | int error_code = query_service_->cleanup(); |
94 | if (error_code == 0) { |
95 | LOG_INFO("QueryAgent cleanup complete." ); |
96 | } else { |
97 | LOG_ERROR("QueryAgent cleanup failed. code[%d], what[%s]" , error_code, |
98 | ErrorCode::What(error_code)); |
99 | } |
100 | return error_code; |
101 | } |
102 | |
103 | //! Start background service |
104 | int start() override { |
105 | stopped_.store(false); |
106 | LOG_INFO("QueryAgent start complete." ); |
107 | return 0; |
108 | } |
109 | |
110 | //! Stop background service |
111 | int stop() override { |
112 | LOG_INFO("QueryAgent stopped." ); |
113 | stopped_.store(true); |
114 | return 0; |
115 | } |
116 | |
117 | //! Check QueryAgent is running |
118 | int is_running() override { |
119 | return !stopped_; |
120 | } |
121 | |
122 | private: |
123 | //! Handler for query service |
124 | QueryServicePtr query_service_{nullptr}; |
125 | |
126 | //! Stopped flag |
127 | std::atomic_bool stopped_{false}; |
128 | }; |
129 | |
130 | //! Create one QueryAgent instance |
131 | //! @param: concurrency: the buckets of execution queue, equal 0, means |
132 | // using hardware concurrency |
133 | QueryAgentPtr QueryAgent::Create(index::IndexServicePtr index_service, |
134 | meta::MetaServicePtr meta_service, |
135 | uint32_t concurrency) { |
136 | return std::make_shared<QueryAgentImpl>(QueryServiceBuilder::Create( |
137 | std::move(index_service), std::move(meta_service), concurrency)); |
138 | } |
139 | |
140 | } // namespace query |
141 | } // namespace be |
142 | } // namespace proxima |
143 | |