1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author guonix
17 * \date Nov 2020
18 * \brief
19 */
20
21#include "query_service.h"
22#include <ailego/utility/time_helper.h>
23#include "common/error_code.h"
24#include "common/logger.h"
25#include "executor/parallel_executor.h"
26#include "meta_wrapper.h"
27#include "query_factory.h"
28#include "query_service_builder.h"
29
30namespace proxima {
31namespace be {
32namespace query {
33
34/*!
35 * QueryService Implementation
36 */
37class QueryServiceImpl : public QueryService {
38 public:
39 //! Destructor
40 QueryServiceImpl(index::IndexServicePtr index_service,
41 MetaWrapperPtr meta_service, ExecutorPtr executor)
42 : index_service_(std::move(index_service)),
43 meta_service_(std::move(meta_service)),
44 executor_(std::move(executor)) {}
45
46 //! Destructor
47 ~QueryServiceImpl() override = default;
48
49 public:
50 //! Initialized flag
51 bool initialized() const override {
52 return index_service_ && meta_service_ && executor_;
53 }
54
55 //! Query Service
56 int search(const proto::QueryRequest *request, proto::QueryResponse *response,
57 ProfilerPtr profiler) override {
58 if (!initialized() || !request || !response || !profiler) {
59 return PROXIMA_BE_ERROR_CODE(RuntimeError);
60 }
61
62 // Do not change the sequence of following statements, we need more
63 // specific profiling data with debug mode enabled.
64 // Step1: Create Query
65 profiler->open_stage("before_process_query");
66 ailego::ElapsedTime timer;
67 auto query = QueryFactory::Create(request, index_service_, meta_service_,
68 executor_, profiler, response);
69 profiler->close_stage();
70
71 // Step2: Process Query
72 int code = process_query(query, profiler);
73 if (code != 0) {
74 LOG_ERROR("Process query failed. code[%d] what[%s]", code,
75 ErrorCode::What(code));
76 return code;
77 }
78
79 // Stage3: After Process Query
80 profiler->open_stage("after_process_query");
81 uint32_t result_counts = 0;
82 for (int i = 0; i < response->results_size(); i++) {
83 result_counts += response->results(i).documents_size();
84 }
85 LOG_INFO(
86 "Knn search success. query_id[%zu] batch_count[%u] topk[%u] "
87 "is_linear[%d] "
88 "resnum[%u] rt[%zuus] collection[%s]",
89 (size_t)query->id(), request->knn_param().batch_count(),
90 request->knn_param().topk(), request->knn_param().is_linear(),
91 result_counts, (size_t)timer.micro_seconds(),
92 request->collection_name().c_str());
93 profiler->close_stage();
94 return code;
95 }
96
97 //! Query Service
98 int search_by_key(const proto::GetDocumentRequest *request,
99 proto::GetDocumentResponse *response,
100 ProfilerPtr profiler) override {
101 if (!initialized() || !request || !response || !profiler) {
102 return PROXIMA_BE_ERROR_CODE(RuntimeError);
103 }
104
105 ailego::ElapsedTime timer;
106 auto query = QueryFactory::Create(request, index_service_, meta_service_,
107 executor_, profiler, response);
108
109 int code = process_query(query, profiler);
110 if (code != 0) {
111 LOG_ERROR("Process query failed. code[%d] what[%s]", code,
112 ErrorCode::What(code));
113 return code;
114 }
115
116 uint32_t result_counts = response->has_document() ? 1 : 0;
117 LOG_INFO(
118 "Kv search success. query_id[%zu] pk[%zu] resnum[%u] rt[%zuus] "
119 "collection[%s]",
120 (size_t)query->id(), (size_t)request->primary_key(), result_counts,
121 (size_t)timer.micro_seconds(), request->collection_name().c_str());
122 return code;
123 }
124
125 //! Cleanup QueryService
126 int cleanup() override {
127 index_service_.reset();
128 meta_service_.reset();
129 executor_.reset();
130 return 0;
131 }
132
133 private:
134 // Process query
135 int process_query(QueryPtr query, ProfilerPtr profiler) {
136 profiler->add("query_id", query->id());
137 profiler->open_stage("query");
138
139 int code = query->validate();
140 if (code == 0) {
141 code = query->prepare();
142 if (code == 0) {
143 code = query->evaluate();
144 } else {
145 LOG_ERROR(
146 "Failed to prepare resource for query. trace_id[%zu], code[%d]",
147 (size_t)query->id(), code);
148 }
149 } else {
150 LOG_ERROR("Can't validate query, skip it and continue");
151 }
152
153 query->finalize();
154 LOG_DEBUG("Query [%zu] have been finished", (size_t)query->id());
155 profiler->close_stage();
156
157 return code;
158 }
159
160 private:
161 //! IndexService Handler
162 index::IndexServicePtr index_service_{nullptr};
163
164 //! MetaService Handler
165 MetaWrapperPtr meta_service_{nullptr};
166
167 //! Executor
168 ExecutorPtr executor_{nullptr};
169};
170
171QueryServicePtr QueryServiceBuilder::Create(
172 index::IndexServicePtr index_service, meta::MetaServicePtr meta_service,
173 uint32_t concurrency) {
174 if (!index_service || !meta_service) {
175 LOG_ERROR(
176 "Create QueryService failed, invalid arguments index_service "
177 "or meta_service");
178 return nullptr;
179 }
180
181 if (concurrency == 0) { // Default host concurrency
182 concurrency = Scheduler::HostConcurrency();
183 }
184
185 // Share one scheduler between multiple instance of QueryService
186 auto scheduler = Scheduler::Default();
187 if (scheduler->concurrency() == 0) {
188 LOG_INFO("Set concurrency of query service [%u]", concurrency);
189 scheduler->concurrency(concurrency);
190 }
191
192 auto meta_wrapper = std::make_shared<MetaWrapper>(meta_service);
193 auto executor = std::make_shared<ParallelExecutor>(scheduler);
194
195 LOG_INFO("QueryService created with parallel executor");
196 return std::make_shared<QueryServiceImpl>(index_service, meta_wrapper,
197 executor);
198}
199
200} // namespace query
201} // namespace be
202} // namespace proxima
203