1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of vector column reader.
19 */
20
21#include "vector_column_reader.h"
22#include "common/defer.h"
23#include "common/error_code.h"
24#include "constants.h"
25#include "typedef.h"
26
27namespace proxima {
28namespace be {
29namespace index {
30
31VectorColumnReader::~VectorColumnReader() {
32 // TODO can't call virtual function
33 if (opened_) {
34 close();
35 }
36}
37
38int VectorColumnReader::open(const meta::ColumnMeta &column_meta,
39 const ReadOptions &read_options) {
40 CHECK_STATUS(opened_, false);
41
42 if (!check_column_meta(column_meta)) {
43 LLOG_ERROR("Check column meta failed.");
44 return ErrorCode_ConfigError;
45 }
46
47 int ret = open_proxima_container(read_options);
48 CHECK_RETURN_WITH_LOG(ret, 0, "Open proxima container failed.");
49
50 ret = open_proxima_searcher();
51 CHECK_RETURN_WITH_LOG(ret, 0, "Open proxima searcher failed.");
52
53 opened_ = true;
54 LLOG_DEBUG("Opened column searcher.");
55 return 0;
56}
57
58int VectorColumnReader::close() {
59 context_pool_.clear();
60 proxima_searcher_->unload();
61 proxima_searcher_->cleanup();
62
63 opened_ = false;
64 LLOG_DEBUG("Unloaded column searcher");
65
66 return 0;
67}
68
69int VectorColumnReader::search(const std::string &query,
70 const QueryParams &query_params,
71 FilterFunction filter,
72 IndexDocumentList *results) {
73 CHECK_STATUS(opened_, true);
74
75 std::vector<IndexDocumentList> batch_results;
76 int ret = this->search(query, query_params, 1, filter, &batch_results);
77 (*results) = batch_results[0];
78 return ret;
79}
80
81
82int VectorColumnReader::search(
83 const std::string &query, const QueryParams &query_params,
84 uint32_t batch_count, FilterFunction filter,
85 std::vector<IndexDocumentList> *batch_result_list) {
86 CHECK_STATUS(opened_, true);
87
88 // Check if query legal
89 IndexQueryMeta query_meta;
90 auto feature_type =
91 IndexHelper::GetProximaFeatureType(query_params.data_type);
92 auto dimension = query_params.dimension;
93 if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) {
94 query_meta.set_meta(feature_type, dimension);
95 } else {
96 query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension());
97 }
98
99 if (query_meta.type() != proxima_meta_.type() ||
100 query_meta.dimension() != proxima_meta_.dimension()) {
101 LLOG_ERROR(
102 "Invalid query, input query feature type or dimension not matched. "
103 "query_feature_type[%d] query_dimension[%u] feature_type[%d] "
104 "dimension[%u]",
105 query_meta.type(), query_meta.type(), proxima_meta_.type(),
106 proxima_meta_.dimension());
107 return ErrorCode_InvalidQuery;
108 }
109
110 uint32_t expect_size = query_meta.element_size() * batch_count;
111 if (query.size() != expect_size) {
112 LLOG_ERROR(
113 "Invalid query, query size mismatch. expect_size[%u] "
114 "actual_size[%zu]",
115 expect_size, query.size());
116 return ErrorCode_InvalidQuery;
117 }
118
119 // Get context and set properties.
120 // Notice that, we must reset the context
121 // when return back into pool.
122 auto ctx = context_pool_.acquire();
123 ctx->set_topk(query_params.topk);
124 if (filter != nullptr) {
125 ctx->set_filter(filter);
126 }
127 if (query_params.radius > 0.0f) {
128 ctx->set_threshold(query_params.radius);
129 }
130 Defer defer([&ctx, this] {
131 ctx->set_filter(nullptr);
132 ctx->set_threshold(std::numeric_limits<float>::max());
133 context_pool_.release(std::move(ctx));
134 });
135
136 int ret = 0;
137 // Check if need to use quantizer
138 if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) {
139 std::string new_query;
140 IndexQueryMeta new_meta;
141 ret = reformer_->transform(query.data(), query_meta, &new_query, &new_meta);
142 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]",
143 ret);
144
145 if (query_params.is_linear) {
146 ret = proxima_searcher_->search_bf_impl(new_query.data(), new_meta,
147 batch_count, ctx);
148 } else {
149 ret = proxima_searcher_->search_impl(new_query.data(), new_meta,
150 batch_count, ctx);
151 }
152 } else {
153 if (query_params.is_linear) {
154 ret = proxima_searcher_->search_bf_impl(query.data(), query_meta,
155 batch_count, ctx);
156 } else {
157 ret = proxima_searcher_->search_impl(query.data(), query_meta,
158 batch_count, ctx);
159 }
160 }
161
162 CHECK_RETURN_WITH_LLOG(ret, 0,
163 "Search proxima searcher failed. ret[%d] reason[%s]",
164 ret, aitheta2::IndexError::What(ret));
165
166 for (uint32_t i = 0; i < batch_count; i++) {
167 auto &result_list = ctx->result(i);
168 if (measure_->support_normalize()) {
169 for (auto &it : const_cast<IndexDocumentList &>(result_list)) {
170 measure_->normalize(it.mutable_score());
171 }
172 }
173 if (reformer_) {
174 reformer_->normalize(query.data(), query_meta,
175 const_cast<IndexDocumentList &>(result_list));
176 }
177 batch_result_list->emplace_back(result_list);
178 }
179
180 return 0;
181}
182
183bool VectorColumnReader::check_column_meta(
184 const meta::ColumnMeta &column_meta) {
185 auto index_type = column_meta.index_type();
186 if (index_type != IndexTypes::PROXIMA_GRAPH_INDEX) {
187 LOG_ERROR("Column meta config error, only support PROXIMA_GRAPH_INDEX now");
188 return false;
189 }
190
191 auto data_type = column_meta.data_type();
192 auto feature_type = IndexHelper::GetProximaFeatureType(data_type);
193 if (feature_type == FeatureTypes::FT_UNDEFINED) {
194 LLOG_ERROR("Column meta config error, unknown data type.");
195 return false;
196 }
197
198 auto dimension = column_meta.dimension();
199 if (dimension == 0U) {
200 LLOG_ERROR("Column meta config error, dimension can't be 0.");
201 return false;
202 }
203
204 auto metric_type = column_meta.parameters().get_as_string("metric_type");
205 if (metric_type.empty()) {
206 metric_type = "SquaredEuclidean";
207 }
208
209 auto ef_search = column_meta.parameters().get_as_uint32("ef_search");
210 if (ef_search > 0U) {
211 proxima_params_.set("proxima.hnsw.searcher.ef", ef_search);
212 } else {
213 proxima_params_.set("proxima.hnsw.searcher.ef", 200U);
214 }
215
216 auto max_scan_ratio = column_meta.parameters().get_as_float("max_scan_ratio");
217 if (max_scan_ratio > 0.0f) {
218 proxima_params_.set("proxima.hnsw.searcher.max_scan_ratio", max_scan_ratio);
219 }
220
221 auto visit_bf =
222 column_meta.parameters().get_as_bool("visit_bloomfilter_enable");
223 if (visit_bf) {
224 proxima_params_.set("proxima.hnsw.searcher.visit_bloomfilter_enable",
225 visit_bf);
226 }
227
228 // Check quantize type
229 auto quantize_type = column_meta.parameters().get_as_string("quantize_type");
230 if (!quantize_type.empty()) {
231 if (IndexHelper::GetQuantizeType(quantize_type) ==
232 QuantizeTypes::UNDEFINED) {
233 LLOG_ERROR(
234 "Column meta config error, unknown quantize type. quantize_type[%s]",
235 quantize_type.c_str());
236 return false;
237 }
238
239 if (data_type != DataTypes::VECTOR_FP32) {
240 LLOG_ERROR(
241 "Column meta config error, only FP32 data type can open quantizer");
242 return false;
243 }
244
245 quantize_type_ = IndexHelper::GetQuantizeType(quantize_type);
246 }
247
248 // Set proxima index meta
249 proxima_meta_.set_meta(feature_type, dimension);
250 proxima_meta_.set_measure(metric_type, 0, IndexParams());
251
252 LLOG_INFO(
253 "Show vector column searcher options. index_type[%u] data_type[%u] "
254 "dimension[%u] measure[%s] context_count[%u] ef_search[%u] "
255 "max_scan_ratio[%f] visit_bf[%d] quantize_type[%s] ",
256 index_type, data_type, dimension, metric_type.c_str(),
257 this->concurrency(), ef_search, max_scan_ratio, visit_bf,
258 quantize_type.c_str());
259
260 return true;
261}
262
263int VectorColumnReader::open_proxima_container(
264 const ReadOptions &read_options) {
265 index_file_path_ = FileHelper::MakeFilePath(
266 this->collection_path(), FileID::SEGMENT_FILE, this->segment_id());
267
268 if (read_options.use_mmap) {
269 container_ = aitheta2::IndexFactory::CreateContainer("MMapFileContainer");
270 } else {
271 container_ = aitheta2::IndexFactory::CreateContainer("MemoryContainer");
272 }
273
274 // Default set warmup flag
275 IndexParams container_params;
276 container_params.set("proxima.mmap_file.container.memory_warmup", true);
277
278 int ret = container_->init(container_params);
279 CHECK_RETURN_WITH_LLOG(ret, 0, "Container init failed. ret[%d]", ret);
280
281 ret = container_->load(index_file_path_);
282 CHECK_RETURN_WITH_LLOG(ret, 0, "Container load failed. ret[%d] file[%s]", ret,
283 index_file_path_.c_str());
284
285 return 0;
286}
287
288int VectorColumnReader::open_proxima_searcher() {
289 int ret = 0;
290 auto index_meta = proxima_meta_;
291 // Check if need to open quantizer
292 if (quantize_type_ != QuantizeTypes::UNDEFINED) {
293 IndexConverterPtr converter;
294 switch (quantize_type_) {
295 case QuantizeTypes::VECTOR_INT4:
296 converter =
297 aitheta2::IndexFactory::CreateConverter("Int4StreamingConverter");
298 break;
299 case QuantizeTypes::VECTOR_INT8:
300 converter =
301 aitheta2::IndexFactory::CreateConverter("Int8StreamingConverter");
302 break;
303 case QuantizeTypes::VECTOR_FP16:
304 converter =
305 aitheta2::IndexFactory::CreateConverter("HalfFloatConverter");
306 break;
307 default:
308 return ErrorCode_RuntimeError;
309 }
310
311 if (!converter) {
312 LLOG_ERROR("Create converter failed.");
313 return ErrorCode_RuntimeError;
314 }
315
316 ret = converter->init(proxima_meta_, IndexParams());
317 CHECK_RETURN_WITH_LLOG(ret, 0, "Converter init failed. ret[%d]", ret);
318 index_meta = converter->meta();
319
320 reformer_ =
321 aitheta2::IndexFactory::CreateReformer(index_meta.reformer_name());
322 ret = reformer_->init(IndexParams());
323 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]", ret);
324 }
325
326 // Init measure
327 measure_ =
328 aitheta2::IndexFactory::CreateMeasure(proxima_meta_.measure_name());
329 if (!measure_) {
330 LLOG_ERROR("Create measure %s failed",
331 proxima_meta_.measure_name().c_str());
332 return aitheta2::IndexError_Runtime;
333 }
334 ret = measure_->init(proxima_meta_, IndexParams());
335 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]", ret);
336 auto query_measure = measure_->query_measure();
337 if (query_measure) {
338 measure_ = query_measure;
339 }
340
341 // Init proxima searcher
342 proxima_searcher_ = aitheta2::IndexFactory::CreateSearcher("HnswSearcher");
343 if (!proxima_searcher_) {
344 LLOG_ERROR("Create proxima searcher failed. name[HnswSearcher]");
345 return ErrorCode_RuntimeError;
346 }
347
348 ret = proxima_searcher_->init(proxima_params_);
349 CHECK_RETURN_WITH_LOG(ret, 0, "Init proxima searcher failed.");
350
351 auto column_block = container_->get(COLUMN_DUMP_BLOCK + this->column_name());
352 if (!column_block) {
353 LLOG_INFO("Can't find column block in index file.");
354 return ErrorCode_InvalidSegment;
355 }
356 auto block_container =
357 std::make_shared<aitheta2::IndexSegmentContainer>(column_block);
358 ret = block_container->load();
359 CHECK_RETURN_WITH_LLOG(ret, 0, "Column block load failed.");
360
361 ret = proxima_searcher_->load(block_container, nullptr);
362 CHECK_RETURN_WITH_LLOG(ret, 0, "Load container failed.");
363
364 // Init context pool
365 for (uint32_t i = 0; i < this->concurrency(); i++) {
366 auto ctx = proxima_searcher_->create_context();
367 if (!ctx) {
368 LLOG_ERROR("Create context for proxima searcher failed.");
369 return ErrorCode_RuntimeError;
370 }
371 context_pool_.emplace(std::move(ctx));
372 }
373
374 return 0;
375}
376
377
378} // end namespace index
379} // namespace be
380} // end namespace proxima
381