1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of vector column reader. |
19 | */ |
20 | |
21 | #include "vector_column_reader.h" |
22 | #include "common/defer.h" |
23 | #include "common/error_code.h" |
24 | #include "constants.h" |
25 | #include "typedef.h" |
26 | |
27 | namespace proxima { |
28 | namespace be { |
29 | namespace index { |
30 | |
31 | VectorColumnReader::~VectorColumnReader() { |
32 | // TODO can't call virtual function |
33 | if (opened_) { |
34 | close(); |
35 | } |
36 | } |
37 | |
38 | int VectorColumnReader::open(const meta::ColumnMeta &column_meta, |
39 | const ReadOptions &read_options) { |
40 | CHECK_STATUS(opened_, false); |
41 | |
42 | if (!check_column_meta(column_meta)) { |
43 | LLOG_ERROR("Check column meta failed." ); |
44 | return ErrorCode_ConfigError; |
45 | } |
46 | |
47 | int ret = open_proxima_container(read_options); |
48 | CHECK_RETURN_WITH_LOG(ret, 0, "Open proxima container failed." ); |
49 | |
50 | ret = open_proxima_searcher(); |
51 | CHECK_RETURN_WITH_LOG(ret, 0, "Open proxima searcher failed." ); |
52 | |
53 | opened_ = true; |
54 | LLOG_DEBUG("Opened column searcher." ); |
55 | return 0; |
56 | } |
57 | |
58 | int VectorColumnReader::close() { |
59 | context_pool_.clear(); |
60 | proxima_searcher_->unload(); |
61 | proxima_searcher_->cleanup(); |
62 | |
63 | opened_ = false; |
64 | LLOG_DEBUG("Unloaded column searcher" ); |
65 | |
66 | return 0; |
67 | } |
68 | |
69 | int VectorColumnReader::search(const std::string &query, |
70 | const QueryParams &query_params, |
71 | FilterFunction filter, |
72 | IndexDocumentList *results) { |
73 | CHECK_STATUS(opened_, true); |
74 | |
75 | std::vector<IndexDocumentList> batch_results; |
76 | int ret = this->search(query, query_params, 1, filter, &batch_results); |
77 | (*results) = batch_results[0]; |
78 | return ret; |
79 | } |
80 | |
81 | |
82 | int VectorColumnReader::search( |
83 | const std::string &query, const QueryParams &query_params, |
84 | uint32_t batch_count, FilterFunction filter, |
85 | std::vector<IndexDocumentList> *batch_result_list) { |
86 | CHECK_STATUS(opened_, true); |
87 | |
88 | // Check if query legal |
89 | IndexQueryMeta query_meta; |
90 | auto feature_type = |
91 | IndexHelper::GetProximaFeatureType(query_params.data_type); |
92 | auto dimension = query_params.dimension; |
93 | if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) { |
94 | query_meta.set_meta(feature_type, dimension); |
95 | } else { |
96 | query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension()); |
97 | } |
98 | |
99 | if (query_meta.type() != proxima_meta_.type() || |
100 | query_meta.dimension() != proxima_meta_.dimension()) { |
101 | LLOG_ERROR( |
102 | "Invalid query, input query feature type or dimension not matched. " |
103 | "query_feature_type[%d] query_dimension[%u] feature_type[%d] " |
104 | "dimension[%u]" , |
105 | query_meta.type(), query_meta.type(), proxima_meta_.type(), |
106 | proxima_meta_.dimension()); |
107 | return ErrorCode_InvalidQuery; |
108 | } |
109 | |
110 | uint32_t expect_size = query_meta.element_size() * batch_count; |
111 | if (query.size() != expect_size) { |
112 | LLOG_ERROR( |
113 | "Invalid query, query size mismatch. expect_size[%u] " |
114 | "actual_size[%zu]" , |
115 | expect_size, query.size()); |
116 | return ErrorCode_InvalidQuery; |
117 | } |
118 | |
119 | // Get context and set properties. |
120 | // Notice that, we must reset the context |
121 | // when return back into pool. |
122 | auto ctx = context_pool_.acquire(); |
123 | ctx->set_topk(query_params.topk); |
124 | if (filter != nullptr) { |
125 | ctx->set_filter(filter); |
126 | } |
127 | if (query_params.radius > 0.0f) { |
128 | ctx->set_threshold(query_params.radius); |
129 | } |
130 | Defer defer([&ctx, this] { |
131 | ctx->set_filter(nullptr); |
132 | ctx->set_threshold(std::numeric_limits<float>::max()); |
133 | context_pool_.release(std::move(ctx)); |
134 | }); |
135 | |
136 | int ret = 0; |
137 | // Check if need to use quantizer |
138 | if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) { |
139 | std::string new_query; |
140 | IndexQueryMeta new_meta; |
141 | ret = reformer_->transform(query.data(), query_meta, &new_query, &new_meta); |
142 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]" , |
143 | ret); |
144 | |
145 | if (query_params.is_linear) { |
146 | ret = proxima_searcher_->search_bf_impl(new_query.data(), new_meta, |
147 | batch_count, ctx); |
148 | } else { |
149 | ret = proxima_searcher_->search_impl(new_query.data(), new_meta, |
150 | batch_count, ctx); |
151 | } |
152 | } else { |
153 | if (query_params.is_linear) { |
154 | ret = proxima_searcher_->search_bf_impl(query.data(), query_meta, |
155 | batch_count, ctx); |
156 | } else { |
157 | ret = proxima_searcher_->search_impl(query.data(), query_meta, |
158 | batch_count, ctx); |
159 | } |
160 | } |
161 | |
162 | CHECK_RETURN_WITH_LLOG(ret, 0, |
163 | "Search proxima searcher failed. ret[%d] reason[%s]" , |
164 | ret, aitheta2::IndexError::What(ret)); |
165 | |
166 | for (uint32_t i = 0; i < batch_count; i++) { |
167 | auto &result_list = ctx->result(i); |
168 | if (measure_->support_normalize()) { |
169 | for (auto &it : const_cast<IndexDocumentList &>(result_list)) { |
170 | measure_->normalize(it.mutable_score()); |
171 | } |
172 | } |
173 | if (reformer_) { |
174 | reformer_->normalize(query.data(), query_meta, |
175 | const_cast<IndexDocumentList &>(result_list)); |
176 | } |
177 | batch_result_list->emplace_back(result_list); |
178 | } |
179 | |
180 | return 0; |
181 | } |
182 | |
183 | bool VectorColumnReader::check_column_meta( |
184 | const meta::ColumnMeta &column_meta) { |
185 | auto index_type = column_meta.index_type(); |
186 | if (index_type != IndexTypes::PROXIMA_GRAPH_INDEX) { |
187 | LOG_ERROR("Column meta config error, only support PROXIMA_GRAPH_INDEX now" ); |
188 | return false; |
189 | } |
190 | |
191 | auto data_type = column_meta.data_type(); |
192 | auto feature_type = IndexHelper::GetProximaFeatureType(data_type); |
193 | if (feature_type == FeatureTypes::FT_UNDEFINED) { |
194 | LLOG_ERROR("Column meta config error, unknown data type." ); |
195 | return false; |
196 | } |
197 | |
198 | auto dimension = column_meta.dimension(); |
199 | if (dimension == 0U) { |
200 | LLOG_ERROR("Column meta config error, dimension can't be 0." ); |
201 | return false; |
202 | } |
203 | |
204 | auto metric_type = column_meta.parameters().get_as_string("metric_type" ); |
205 | if (metric_type.empty()) { |
206 | metric_type = "SquaredEuclidean" ; |
207 | } |
208 | |
209 | auto ef_search = column_meta.parameters().get_as_uint32("ef_search" ); |
210 | if (ef_search > 0U) { |
211 | proxima_params_.set("proxima.hnsw.searcher.ef" , ef_search); |
212 | } else { |
213 | proxima_params_.set("proxima.hnsw.searcher.ef" , 200U); |
214 | } |
215 | |
216 | auto max_scan_ratio = column_meta.parameters().get_as_float("max_scan_ratio" ); |
217 | if (max_scan_ratio > 0.0f) { |
218 | proxima_params_.set("proxima.hnsw.searcher.max_scan_ratio" , max_scan_ratio); |
219 | } |
220 | |
221 | auto visit_bf = |
222 | column_meta.parameters().get_as_bool("visit_bloomfilter_enable" ); |
223 | if (visit_bf) { |
224 | proxima_params_.set("proxima.hnsw.searcher.visit_bloomfilter_enable" , |
225 | visit_bf); |
226 | } |
227 | |
228 | // Check quantize type |
229 | auto quantize_type = column_meta.parameters().get_as_string("quantize_type" ); |
230 | if (!quantize_type.empty()) { |
231 | if (IndexHelper::GetQuantizeType(quantize_type) == |
232 | QuantizeTypes::UNDEFINED) { |
233 | LLOG_ERROR( |
234 | "Column meta config error, unknown quantize type. quantize_type[%s]" , |
235 | quantize_type.c_str()); |
236 | return false; |
237 | } |
238 | |
239 | if (data_type != DataTypes::VECTOR_FP32) { |
240 | LLOG_ERROR( |
241 | "Column meta config error, only FP32 data type can open quantizer" ); |
242 | return false; |
243 | } |
244 | |
245 | quantize_type_ = IndexHelper::GetQuantizeType(quantize_type); |
246 | } |
247 | |
248 | // Set proxima index meta |
249 | proxima_meta_.set_meta(feature_type, dimension); |
250 | proxima_meta_.set_measure(metric_type, 0, IndexParams()); |
251 | |
252 | LLOG_INFO( |
253 | "Show vector column searcher options. index_type[%u] data_type[%u] " |
254 | "dimension[%u] measure[%s] context_count[%u] ef_search[%u] " |
255 | "max_scan_ratio[%f] visit_bf[%d] quantize_type[%s] " , |
256 | index_type, data_type, dimension, metric_type.c_str(), |
257 | this->concurrency(), ef_search, max_scan_ratio, visit_bf, |
258 | quantize_type.c_str()); |
259 | |
260 | return true; |
261 | } |
262 | |
263 | int VectorColumnReader::open_proxima_container( |
264 | const ReadOptions &read_options) { |
265 | index_file_path_ = FileHelper::MakeFilePath( |
266 | this->collection_path(), FileID::SEGMENT_FILE, this->segment_id()); |
267 | |
268 | if (read_options.use_mmap) { |
269 | container_ = aitheta2::IndexFactory::CreateContainer("MMapFileContainer" ); |
270 | } else { |
271 | container_ = aitheta2::IndexFactory::CreateContainer("MemoryContainer" ); |
272 | } |
273 | |
274 | // Default set warmup flag |
275 | IndexParams container_params; |
276 | container_params.set("proxima.mmap_file.container.memory_warmup" , true); |
277 | |
278 | int ret = container_->init(container_params); |
279 | CHECK_RETURN_WITH_LLOG(ret, 0, "Container init failed. ret[%d]" , ret); |
280 | |
281 | ret = container_->load(index_file_path_); |
282 | CHECK_RETURN_WITH_LLOG(ret, 0, "Container load failed. ret[%d] file[%s]" , ret, |
283 | index_file_path_.c_str()); |
284 | |
285 | return 0; |
286 | } |
287 | |
288 | int VectorColumnReader::open_proxima_searcher() { |
289 | int ret = 0; |
290 | auto index_meta = proxima_meta_; |
291 | // Check if need to open quantizer |
292 | if (quantize_type_ != QuantizeTypes::UNDEFINED) { |
293 | IndexConverterPtr converter; |
294 | switch (quantize_type_) { |
295 | case QuantizeTypes::VECTOR_INT4: |
296 | converter = |
297 | aitheta2::IndexFactory::CreateConverter("Int4StreamingConverter" ); |
298 | break; |
299 | case QuantizeTypes::VECTOR_INT8: |
300 | converter = |
301 | aitheta2::IndexFactory::CreateConverter("Int8StreamingConverter" ); |
302 | break; |
303 | case QuantizeTypes::VECTOR_FP16: |
304 | converter = |
305 | aitheta2::IndexFactory::CreateConverter("HalfFloatConverter" ); |
306 | break; |
307 | default: |
308 | return ErrorCode_RuntimeError; |
309 | } |
310 | |
311 | if (!converter) { |
312 | LLOG_ERROR("Create converter failed." ); |
313 | return ErrorCode_RuntimeError; |
314 | } |
315 | |
316 | ret = converter->init(proxima_meta_, IndexParams()); |
317 | CHECK_RETURN_WITH_LLOG(ret, 0, "Converter init failed. ret[%d]" , ret); |
318 | index_meta = converter->meta(); |
319 | |
320 | reformer_ = |
321 | aitheta2::IndexFactory::CreateReformer(index_meta.reformer_name()); |
322 | ret = reformer_->init(IndexParams()); |
323 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]" , ret); |
324 | } |
325 | |
326 | // Init measure |
327 | measure_ = |
328 | aitheta2::IndexFactory::CreateMeasure(proxima_meta_.measure_name()); |
329 | if (!measure_) { |
330 | LLOG_ERROR("Create measure %s failed" , |
331 | proxima_meta_.measure_name().c_str()); |
332 | return aitheta2::IndexError_Runtime; |
333 | } |
334 | ret = measure_->init(proxima_meta_, IndexParams()); |
335 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]" , ret); |
336 | auto query_measure = measure_->query_measure(); |
337 | if (query_measure) { |
338 | measure_ = query_measure; |
339 | } |
340 | |
341 | // Init proxima searcher |
342 | proxima_searcher_ = aitheta2::IndexFactory::CreateSearcher("HnswSearcher" ); |
343 | if (!proxima_searcher_) { |
344 | LLOG_ERROR("Create proxima searcher failed. name[HnswSearcher]" ); |
345 | return ErrorCode_RuntimeError; |
346 | } |
347 | |
348 | ret = proxima_searcher_->init(proxima_params_); |
349 | CHECK_RETURN_WITH_LOG(ret, 0, "Init proxima searcher failed." ); |
350 | |
351 | auto column_block = container_->get(COLUMN_DUMP_BLOCK + this->column_name()); |
352 | if (!column_block) { |
353 | LLOG_INFO("Can't find column block in index file." ); |
354 | return ErrorCode_InvalidSegment; |
355 | } |
356 | auto block_container = |
357 | std::make_shared<aitheta2::IndexSegmentContainer>(column_block); |
358 | ret = block_container->load(); |
359 | CHECK_RETURN_WITH_LLOG(ret, 0, "Column block load failed." ); |
360 | |
361 | ret = proxima_searcher_->load(block_container, nullptr); |
362 | CHECK_RETURN_WITH_LLOG(ret, 0, "Load container failed." ); |
363 | |
364 | // Init context pool |
365 | for (uint32_t i = 0; i < this->concurrency(); i++) { |
366 | auto ctx = proxima_searcher_->create_context(); |
367 | if (!ctx) { |
368 | LLOG_ERROR("Create context for proxima searcher failed." ); |
369 | return ErrorCode_RuntimeError; |
370 | } |
371 | context_pool_.emplace(std::move(ctx)); |
372 | } |
373 | |
374 | return 0; |
375 | } |
376 | |
377 | |
378 | } // end namespace index |
379 | } // namespace be |
380 | } // end namespace proxima |
381 | |