1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of column indexer
19 */
20
21#include "vector_column_indexer.h"
22#include "common/defer.h"
23#include "common/logger.h"
24#include "../file_helper.h"
25
26namespace proxima {
27namespace be {
28namespace index {
29
30VectorColumnIndexer::~VectorColumnIndexer() {
31 // TODO can't call virtual function
32 if (opened_) {
33 this->close();
34 }
35}
36
37int VectorColumnIndexer::open(const meta::ColumnMeta &column_meta,
38 const ReadOptions &read_options) {
39 CHECK_STATUS(opened_, false);
40
41 if (!check_column_meta(column_meta)) {
42 LLOG_ERROR("Check column meta failed.");
43 return ErrorCode_ConfigError;
44 }
45
46 // open snapshot
47 int ret = Snapshot::CreateAndOpen(
48 this->collection_path(), FileID::PROXIMA_FILE, this->segment_id(),
49 this->column_name(), read_options, &snapshot_);
50 CHECK_RETURN_WITH_LLOG(ret, 0, "Create and open snapshot failed. ret[%d]",
51 ret);
52
53 // open proxima streamer
54 ret = open_proxima_streamer();
55 CHECK_RETURN_WITH_LLOG(ret, 0, "Open proxima streamer failed.");
56
57 opened_ = true;
58 return 0;
59}
60
61int VectorColumnIndexer::close() {
62 CHECK_STATUS(opened_, true);
63
64 context_pool_.clear();
65 proxima_streamer_->cleanup();
66
67 int ret = snapshot_->close();
68 if (ret != 0) {
69 LLOG_WARN("Close snapshot failed.");
70 }
71
72 opened_ = false;
73 return ret;
74}
75
76int VectorColumnIndexer::flush() {
77 CHECK_STATUS(opened_, true);
78
79 return proxima_streamer_->flush(0);
80}
81
82int VectorColumnIndexer::dump(IndexDumperPtr dumper) {
83 CHECK_STATUS(opened_, true);
84
85 return proxima_streamer_->dump(dumper);
86}
87
88int VectorColumnIndexer::insert(idx_t doc_id, const ColumnData &column_data) {
89 CHECK_STATUS(opened_, true);
90
91 // Check column data if legal
92 IndexQueryMeta query_meta;
93 auto feature_type = IndexHelper::GetProximaFeatureType(column_data.data_type);
94 auto dimension = column_data.dimension;
95 if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) {
96 query_meta.set_meta(feature_type, dimension);
97 } else {
98 query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension());
99 }
100
101 if (query_meta.type() != proxima_meta_.type() ||
102 query_meta.dimension() != proxima_meta_.dimension()) {
103 LLOG_ERROR(
104 "Invalid record, input query feature type or dimension not matched. "
105 "query_feature_type[%d] query_dimension[%u] feature_type[%d] "
106 "dimension[%u]",
107 query_meta.type(), query_meta.dimension(), proxima_meta_.type(),
108 proxima_meta_.dimension());
109 return ErrorCode_InvalidRecord;
110 }
111
112 const std::string &vector = column_data.data;
113 uint32_t expect_size = proxima_meta_.element_size();
114 if (vector.size() != expect_size) {
115 LLOG_ERROR(
116 "Invalid record, vector size mismatch. expect_size[%u] "
117 "actual_size[%zu]",
118 expect_size, vector.size());
119 return ErrorCode_InvalidRecord;
120 }
121
122 // Get context and set properties.
123 // Notice we must return back to pool in the end.
124 auto ctx = context_pool_.acquire();
125 Defer defer([&ctx, this] { context_pool_.release(std::move(ctx)); });
126
127 int ret = 0;
128 // Check if need to use quantizer
129 if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) {
130 std::string new_vector;
131 IndexQueryMeta new_meta;
132 ret = reformer_->convert(vector.data(), query_meta, &new_vector, &new_meta);
133 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]",
134 ret);
135 ret = proxima_streamer_->add_impl(doc_id, new_vector.data(), new_meta, ctx);
136 } else {
137 ret = proxima_streamer_->add_impl(doc_id, vector.data(), query_meta, ctx);
138 }
139 CHECK_RETURN_WITH_LLOG(ret, 0,
140 "Insert proxima streamer failed. ret[%d] reason[%s]",
141 ret, aitheta2::IndexError::What(ret));
142
143 return 0;
144}
145
146#if 0
147int VectorColumnIndexer::update(idx_t doc_id, const ColumnData &column_data) {
148 CHECK_STATUS(opened_, true);
149
150 // Check column data if legal
151 IndexQueryMeta query_meta;
152 auto feature_type = IndexHelper::GetProximaFeatureType(column_data.data_type);
153 auto dimension = column_data.dimension;
154 if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) {
155 query_meta.set_meta(feature_type, dimension);
156 } else {
157 query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension());
158 }
159
160 if (query_meta.type() != proxima_meta_.type() ||
161 query_meta.dimension() != proxima_meta_.dimension()) {
162 LLOG_ERROR(
163 "Invalid record, input query feature type or dimension not matched. "
164 "query_feature_type[%d] query_dimension[%u] feature_type[%d] "
165 "dimension[%u]",
166 query_meta.type(), query_meta.dimension(), proxima_meta_.type(),
167 proxima_meta_.dimension());
168 return ErrorCode_InvalidRecord;
169 }
170
171 const std::string &vector = column_data.data;
172 uint32_t expect_size = proxima_meta_.element_size();
173 if (vector.size() != expect_size) {
174 LLOG_ERROR(
175 "Invalid record, vector size mismatch. expect_size[%u] "
176 "actual_size[%zu]",
177 expect_size, vector.size());
178 return ErrorCode_InvalidRecord;
179 }
180
181 // Get context and set properties.
182 // Notice we must return back to pool in the end.
183 auto ctx = context_pool_.acquire();
184 Defer defer([&ctx, this] { context_pool_.release(std::move(ctx)); });
185
186 int ret = 0;
187 // Check if need to use quantizer
188 if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) {
189 std::string new_vector;
190 IndexQueryMeta new_meta;
191 ret = reformer_->convert(vector.data(), query_meta, &new_vector, &new_meta);
192 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]",
193 ret);
194 ret = proxima_streamer_->update_impl(doc_id, new_vector.data(), new_meta, ctx);
195 } else {
196 ret = proxima_streamer_->update_impl(doc_id, vector.data(), query_meta, ctx);
197 }
198 CHECK_RETURN_WITH_LLOG(ret, 0,
199 "Update proxima streamer failed. ret[%d] reason[%s]",
200 ret, aitheta2::IndexError::What(ret));
201
202 return 0;
203
204}
205#endif
206
207int VectorColumnIndexer::remove(idx_t doc_id) {
208 CHECK_STATUS(opened_, true);
209
210 // HNSW do not need to do remove
211 if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) {
212 auto ctx = context_pool_.acquire();
213 Defer defer([&ctx, this] { context_pool_.release(std::move(ctx)); });
214
215 int ret = proxima_streamer_->remove_impl(doc_id, ctx);
216 CHECK_RETURN_WITH_LLOG(
217 ret, 0, "Remove from proxima streamer failed. doc_id[%zu] ret[%d]",
218 (size_t)doc_id, ret)
219 }
220 return 0;
221}
222
223int VectorColumnIndexer::optimize(ThreadPoolPtr pool) {
224 CHECK_STATUS(opened_, true);
225
226 // HNSW do not need to do optimize
227 if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) {
228 ailego::ElapsedTime timer;
229 int ret = proxima_streamer_->optimize_impl(ThreadPoolPtr(pool));
230 CHECK_RETURN_WITH_LLOG(ret, 0, "Optimize column indexer failed. ret[%d]",
231 ret);
232
233 LLOG_DEBUG("Optmize column indexer complete. cost[%zuus]",
234 (size_t)timer.micro_seconds());
235 }
236 return 0;
237}
238
239int VectorColumnIndexer::search(const std::string &query,
240 const QueryParams &query_params,
241 FilterFunction filter,
242 IndexDocumentList *result_list) {
243 CHECK_STATUS(opened_, true);
244
245 std::vector<IndexDocumentList> batch_result_list;
246 int ret = this->search(query, query_params, 1, filter, &batch_result_list);
247 (*result_list) = batch_result_list[0];
248 return ret;
249}
250
251int VectorColumnIndexer::search(
252 const std::string &query, const QueryParams &query_params,
253 uint32_t batch_count, FilterFunction filter,
254 std::vector<IndexDocumentList> *batch_result_list) {
255 CHECK_STATUS(opened_, true);
256
257 // Check if query legal
258 IndexQueryMeta query_meta;
259 auto feature_type =
260 IndexHelper::GetProximaFeatureType(query_params.data_type);
261 auto dimension = query_params.dimension;
262 if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) {
263 query_meta.set_meta(feature_type, dimension);
264 } else {
265 query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension());
266 }
267
268 if (query_meta.type() != proxima_meta_.type() ||
269 query_meta.dimension() != proxima_meta_.dimension()) {
270 LLOG_ERROR(
271 "Invalid query, input query feature type or dimension not matched. "
272 "query_feature_type[%d] query_dimension[%u] feature_type[%d] "
273 "dimension[%u]",
274 query_meta.type(), query_meta.type(), proxima_meta_.type(),
275 proxima_meta_.dimension());
276 return ErrorCode_InvalidQuery;
277 }
278
279 uint32_t expect_size = query_meta.element_size() * batch_count;
280 if (query.size() != expect_size) {
281 LLOG_ERROR(
282 "Invalid query, query size mismatch. expect_size[%u] "
283 "actual_size[%zu]",
284 expect_size, query.size());
285 return ErrorCode_InvalidQuery;
286 }
287
288 // Get context and set properties.
289 // Notice that, we must reset the context
290 // when return back into pool.
291 auto ctx = context_pool_.acquire();
292 ctx->set_topk(query_params.topk);
293 if (filter != nullptr) {
294 ctx->set_filter(filter);
295 }
296
297 // Notice oswg graph do not need to pass filter
298 if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) {
299 ctx->set_filter(nullptr);
300 }
301
302 if (query_params.radius > 0.0f) {
303 ctx->set_threshold(query_params.radius);
304 }
305 Defer defer([&ctx, this] {
306 ctx->set_filter(nullptr);
307 ctx->set_threshold(std::numeric_limits<float>::max());
308 context_pool_.release(std::move(ctx));
309 });
310
311 int ret = 0;
312 // Check if need to use quantizer
313 if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) {
314 std::string new_query;
315 IndexQueryMeta new_meta;
316 ret = reformer_->transform(query.data(), query_meta, &new_query, &new_meta);
317 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]",
318 ret);
319
320 if (query_params.is_linear) {
321 ret = proxima_streamer_->search_bf_impl(new_query.data(), new_meta,
322 batch_count, ctx);
323 } else {
324 ret = proxima_streamer_->search_impl(new_query.data(), new_meta,
325 batch_count, ctx);
326 }
327 } else {
328 if (query_params.is_linear) {
329 ret = proxima_streamer_->search_bf_impl(query.data(), query_meta,
330 batch_count, ctx);
331 } else {
332 ret = proxima_streamer_->search_impl(query.data(), query_meta,
333 batch_count, ctx);
334 }
335 }
336 CHECK_RETURN_WITH_LLOG(ret, 0,
337 "Search proxima streamer failed. ret[%d] reason[%s]",
338 ret, aitheta2::IndexError::What(ret));
339
340 for (uint32_t i = 0; i < batch_count; i++) {
341 auto &result_list = ctx->result(i);
342 if (measure_->support_normalize()) {
343 for (auto &it : const_cast<IndexDocumentList &>(result_list)) {
344 measure_->normalize(it.mutable_score());
345 }
346 }
347 if (reformer_) {
348 reformer_->normalize(query.data(), query_meta,
349 const_cast<IndexDocumentList &>(result_list));
350 }
351 batch_result_list->emplace_back(result_list);
352 }
353
354 return 0;
355}
356
357bool VectorColumnIndexer::check_column_meta(
358 const meta::ColumnMeta &column_meta) {
359 auto index_type = column_meta.index_type();
360 if (index_type != IndexTypes::PROXIMA_GRAPH_INDEX) {
361 LOG_ERROR("Column meta config error, only support PROXIMA_GRAPH_INDEX now");
362 return false;
363 }
364
365 auto data_type = column_meta.data_type();
366 auto feature_type = IndexHelper::GetProximaFeatureType(data_type);
367 if (feature_type == FeatureTypes::FT_UNDEFINED) {
368 LLOG_ERROR("Column meta config error, unknown data type.");
369 return false;
370 }
371
372 auto dimension = column_meta.dimension();
373 if (dimension == 0U) {
374 LLOG_ERROR("Column meta config error, dimension can't be 0.");
375 return false;
376 }
377
378 auto metric_type = column_meta.parameters().get_as_string("metric_type");
379 if (metric_type.empty()) {
380 metric_type = "SquaredEuclidean";
381 } else if (metric_type == "InnerProduct") {
382 metric_type = "MipsSquaredEuclidean";
383 }
384
385 auto max_neighbor_count =
386 column_meta.parameters().get_as_uint32("max_neighbor_count");
387 if (max_neighbor_count > 0U) {
388 proxima_params_.set("proxima.hnsw.streamer.max_neighbor_count",
389 max_neighbor_count);
390 proxima_params_.set("proxima.oswg.streamer.max_neighbor_count",
391 max_neighbor_count);
392 }
393
394 auto ef_construction =
395 column_meta.parameters().get_as_uint32("ef_construction");
396 if (ef_construction > 0U) {
397 proxima_params_.set("proxima.hnsw.streamer.ef_construction",
398 ef_construction);
399 proxima_params_.set("proxima.oswg.streamer.ef_construction",
400 ef_construction);
401 }
402
403 auto ef_search = column_meta.parameters().get_as_uint32("ef_search");
404 if (ef_search > 0U) {
405 proxima_params_.set("proxima.hnsw.streamer.ef", ef_search);
406 proxima_params_.set("proxima.oswg.streamer.ef", ef_search);
407 } else {
408 proxima_params_.set("proxima.hnsw.streamer.ef", 200U);
409 proxima_params_.set("proxima.oswg.streamer.ef", 200U);
410 }
411
412 auto chunk_size = column_meta.parameters().get_as_uint32("chunk_size");
413 if (chunk_size > 0U) {
414 proxima_params_.set("proxima.hnsw.streamer.chunk_size", chunk_size);
415 proxima_params_.set("proxima.oswg.streamer.segment_size", chunk_size);
416 } else {
417 proxima_params_.set("proxima.hnsw.streamer.chunk_size",
418 64UL * 1024UL * 1024UL);
419 proxima_params_.set("proxima.oswg.streamer.segment_size",
420 64UL * 1024UL * 1024UL);
421 }
422
423 auto max_scan_ratio = column_meta.parameters().get_as_float("max_scan_ratio");
424 if (max_scan_ratio > 0.0f) {
425 proxima_params_.set("proxima.hnsw.streamer.max_scan_ratio", max_scan_ratio);
426 proxima_params_.set("proxima.oswg.streamer.max_scan_ratio", max_scan_ratio);
427 }
428
429 auto visit_bf =
430 column_meta.parameters().get_as_bool("visit_bloomfilter_enable");
431 if (visit_bf) {
432 proxima_params_.set("proxima.hnsw.streamer.visit_bloomfilter_enable",
433 visit_bf);
434 proxima_params_.set("proxima.oswg.streamer.visit_bloomfilter_enable",
435 visit_bf);
436 }
437
438 // Check quantize type
439 auto quantize_type = column_meta.parameters().get_as_string("quantize_type");
440 if (!quantize_type.empty()) {
441 if (IndexHelper::GetQuantizeType(quantize_type) ==
442 QuantizeTypes::UNDEFINED) {
443 LLOG_ERROR(
444 "Column meta config error, unknown quantize type. quantize_type[%s]",
445 quantize_type.c_str());
446 return false;
447 }
448
449 if (data_type != DataTypes::VECTOR_FP32) {
450 LLOG_ERROR(
451 "Column meta config error, only FP32 data type can open quantizer");
452 return false;
453 }
454
455 quantize_type_ = IndexHelper::GetQuantizeType(quantize_type);
456 }
457
458 // Default filter duplicate records
459 proxima_params_.set("proxima.hnsw.streamer.filter_same_key", true);
460
461 // Set proxima index meta
462 proxima_meta_.set_meta(feature_type, dimension);
463 proxima_meta_.set_measure(metric_type, 0, IndexParams());
464
465 // Decide which engine to use
466 std::string engine = column_meta.parameters().get_as_string("engine");
467 if (engine == "OSWG") {
468 engine_type_ = EngineTypes::PROXIMA_OSWG_STREAMER;
469 } else if (engine == "HNSW") {
470 engine_type_ = EngineTypes::PROXIMA_HNSW_STREAMER;
471 }
472
473 LLOG_INFO(
474 "Show vector column indexer options. index_type[%u] data_type[%u] "
475 "dimension[%u] "
476 "measure[%s] context_count[%u] max_neighbor_count[%u] "
477 "ef_construction[%u] chunk_size[%u] ef_search[%u] max_scan_ratio[%f] "
478 "visit_bf[%d] quantize_type[%s] engine_type[%d]",
479 index_type, data_type, dimension, metric_type.c_str(),
480 this->concurrency(), max_neighbor_count, ef_construction, chunk_size,
481 ef_search, max_scan_ratio, visit_bf, quantize_type.c_str(), engine_type_);
482
483 return true;
484}
485
486int VectorColumnIndexer::open_proxima_streamer() {
487 int ret = 0;
488 auto index_meta = proxima_meta_;
489 // Check if need to open quantizer
490 if (quantize_type_ != QuantizeTypes::UNDEFINED) {
491 IndexConverterPtr converter;
492 switch (quantize_type_) {
493 case QuantizeTypes::VECTOR_INT4:
494 converter =
495 aitheta2::IndexFactory::CreateConverter("Int4StreamingConverter");
496 break;
497 case QuantizeTypes::VECTOR_INT8:
498 converter =
499 aitheta2::IndexFactory::CreateConverter("Int8StreamingConverter");
500 break;
501 case QuantizeTypes::VECTOR_FP16:
502 converter =
503 aitheta2::IndexFactory::CreateConverter("HalfFloatConverter");
504 break;
505 default:
506 return ErrorCode_RuntimeError;
507 }
508
509 if (!converter) {
510 LLOG_ERROR("Create converter failed.");
511 return ErrorCode_RuntimeError;
512 }
513
514 ret = converter->init(proxima_meta_, IndexParams());
515 CHECK_RETURN_WITH_LLOG(ret, 0, "Converter init failed. ret[%d]", ret);
516 index_meta = converter->meta();
517
518 reformer_ =
519 aitheta2::IndexFactory::CreateReformer(index_meta.reformer_name());
520 ret = reformer_->init(IndexParams());
521 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]", ret);
522 }
523
524 // Init measure
525 measure_ =
526 aitheta2::IndexFactory::CreateMeasure(proxima_meta_.measure_name());
527 if (!measure_) {
528 LLOG_ERROR("Create measure %s failed",
529 proxima_meta_.measure_name().c_str());
530 return aitheta2::IndexError_Runtime;
531 }
532 ret = measure_->init(proxima_meta_, IndexParams());
533 CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]", ret);
534 auto query_measure = measure_->query_measure();
535 if (query_measure) {
536 measure_ = query_measure;
537 }
538
539 // Get actual engine name and initialize it with factory
540 std::string engine_name = this->get_engine_name();
541 proxima_streamer_ = aitheta2::IndexFactory::CreateStreamer(engine_name);
542 if (!proxima_streamer_) {
543 LLOG_ERROR("Create proxima streamer failed. name[%s]", engine_name.c_str());
544 return ErrorCode_RuntimeError;
545 }
546
547 //! Notice use new index meta as initialize params
548 //! When user config quantize type, it may change its value.
549 ret = proxima_streamer_->init(index_meta, proxima_params_);
550 CHECK_RETURN_WITH_LLOG(ret, 0, "Init proxima streamer failed. ret[%d]", ret);
551
552 ret = proxima_streamer_->open(snapshot_->data());
553 CHECK_RETURN_WITH_LLOG(ret, 0, "Open proxima streamer failed. ret[%d]", ret);
554
555 // Initialize context pool
556 for (uint32_t i = 0; i < this->concurrency(); i++) {
557 auto ctx = proxima_streamer_->create_context();
558 if (!ctx) {
559 LLOG_ERROR("Create proxima streamer context failed.");
560 return ErrorCode_RuntimeError;
561 }
562 context_pool_.emplace(std::move(ctx));
563 }
564
565 return 0;
566}
567
568
569} // end namespace index
570} // namespace be
571} // end namespace proxima
572