1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of column indexer |
19 | */ |
20 | |
21 | #include "vector_column_indexer.h" |
22 | #include "common/defer.h" |
23 | #include "common/logger.h" |
24 | #include "../file_helper.h" |
25 | |
26 | namespace proxima { |
27 | namespace be { |
28 | namespace index { |
29 | |
30 | VectorColumnIndexer::~VectorColumnIndexer() { |
31 | // TODO can't call virtual function |
32 | if (opened_) { |
33 | this->close(); |
34 | } |
35 | } |
36 | |
37 | int VectorColumnIndexer::open(const meta::ColumnMeta &column_meta, |
38 | const ReadOptions &read_options) { |
39 | CHECK_STATUS(opened_, false); |
40 | |
41 | if (!check_column_meta(column_meta)) { |
42 | LLOG_ERROR("Check column meta failed." ); |
43 | return ErrorCode_ConfigError; |
44 | } |
45 | |
46 | // open snapshot |
47 | int ret = Snapshot::CreateAndOpen( |
48 | this->collection_path(), FileID::PROXIMA_FILE, this->segment_id(), |
49 | this->column_name(), read_options, &snapshot_); |
50 | CHECK_RETURN_WITH_LLOG(ret, 0, "Create and open snapshot failed. ret[%d]" , |
51 | ret); |
52 | |
53 | // open proxima streamer |
54 | ret = open_proxima_streamer(); |
55 | CHECK_RETURN_WITH_LLOG(ret, 0, "Open proxima streamer failed." ); |
56 | |
57 | opened_ = true; |
58 | return 0; |
59 | } |
60 | |
61 | int VectorColumnIndexer::close() { |
62 | CHECK_STATUS(opened_, true); |
63 | |
64 | context_pool_.clear(); |
65 | proxima_streamer_->cleanup(); |
66 | |
67 | int ret = snapshot_->close(); |
68 | if (ret != 0) { |
69 | LLOG_WARN("Close snapshot failed." ); |
70 | } |
71 | |
72 | opened_ = false; |
73 | return ret; |
74 | } |
75 | |
76 | int VectorColumnIndexer::flush() { |
77 | CHECK_STATUS(opened_, true); |
78 | |
79 | return proxima_streamer_->flush(0); |
80 | } |
81 | |
82 | int VectorColumnIndexer::dump(IndexDumperPtr dumper) { |
83 | CHECK_STATUS(opened_, true); |
84 | |
85 | return proxima_streamer_->dump(dumper); |
86 | } |
87 | |
88 | int VectorColumnIndexer::insert(idx_t doc_id, const ColumnData &column_data) { |
89 | CHECK_STATUS(opened_, true); |
90 | |
91 | // Check column data if legal |
92 | IndexQueryMeta query_meta; |
93 | auto feature_type = IndexHelper::GetProximaFeatureType(column_data.data_type); |
94 | auto dimension = column_data.dimension; |
95 | if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) { |
96 | query_meta.set_meta(feature_type, dimension); |
97 | } else { |
98 | query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension()); |
99 | } |
100 | |
101 | if (query_meta.type() != proxima_meta_.type() || |
102 | query_meta.dimension() != proxima_meta_.dimension()) { |
103 | LLOG_ERROR( |
104 | "Invalid record, input query feature type or dimension not matched. " |
105 | "query_feature_type[%d] query_dimension[%u] feature_type[%d] " |
106 | "dimension[%u]" , |
107 | query_meta.type(), query_meta.dimension(), proxima_meta_.type(), |
108 | proxima_meta_.dimension()); |
109 | return ErrorCode_InvalidRecord; |
110 | } |
111 | |
112 | const std::string &vector = column_data.data; |
113 | uint32_t expect_size = proxima_meta_.element_size(); |
114 | if (vector.size() != expect_size) { |
115 | LLOG_ERROR( |
116 | "Invalid record, vector size mismatch. expect_size[%u] " |
117 | "actual_size[%zu]" , |
118 | expect_size, vector.size()); |
119 | return ErrorCode_InvalidRecord; |
120 | } |
121 | |
122 | // Get context and set properties. |
123 | // Notice we must return back to pool in the end. |
124 | auto ctx = context_pool_.acquire(); |
125 | Defer defer([&ctx, this] { context_pool_.release(std::move(ctx)); }); |
126 | |
127 | int ret = 0; |
128 | // Check if need to use quantizer |
129 | if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) { |
130 | std::string new_vector; |
131 | IndexQueryMeta new_meta; |
132 | ret = reformer_->convert(vector.data(), query_meta, &new_vector, &new_meta); |
133 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]" , |
134 | ret); |
135 | ret = proxima_streamer_->add_impl(doc_id, new_vector.data(), new_meta, ctx); |
136 | } else { |
137 | ret = proxima_streamer_->add_impl(doc_id, vector.data(), query_meta, ctx); |
138 | } |
139 | CHECK_RETURN_WITH_LLOG(ret, 0, |
140 | "Insert proxima streamer failed. ret[%d] reason[%s]" , |
141 | ret, aitheta2::IndexError::What(ret)); |
142 | |
143 | return 0; |
144 | } |
145 | |
146 | #if 0 |
147 | int VectorColumnIndexer::update(idx_t doc_id, const ColumnData &column_data) { |
148 | CHECK_STATUS(opened_, true); |
149 | |
150 | // Check column data if legal |
151 | IndexQueryMeta query_meta; |
152 | auto feature_type = IndexHelper::GetProximaFeatureType(column_data.data_type); |
153 | auto dimension = column_data.dimension; |
154 | if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) { |
155 | query_meta.set_meta(feature_type, dimension); |
156 | } else { |
157 | query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension()); |
158 | } |
159 | |
160 | if (query_meta.type() != proxima_meta_.type() || |
161 | query_meta.dimension() != proxima_meta_.dimension()) { |
162 | LLOG_ERROR( |
163 | "Invalid record, input query feature type or dimension not matched. " |
164 | "query_feature_type[%d] query_dimension[%u] feature_type[%d] " |
165 | "dimension[%u]" , |
166 | query_meta.type(), query_meta.dimension(), proxima_meta_.type(), |
167 | proxima_meta_.dimension()); |
168 | return ErrorCode_InvalidRecord; |
169 | } |
170 | |
171 | const std::string &vector = column_data.data; |
172 | uint32_t expect_size = proxima_meta_.element_size(); |
173 | if (vector.size() != expect_size) { |
174 | LLOG_ERROR( |
175 | "Invalid record, vector size mismatch. expect_size[%u] " |
176 | "actual_size[%zu]" , |
177 | expect_size, vector.size()); |
178 | return ErrorCode_InvalidRecord; |
179 | } |
180 | |
181 | // Get context and set properties. |
182 | // Notice we must return back to pool in the end. |
183 | auto ctx = context_pool_.acquire(); |
184 | Defer defer([&ctx, this] { context_pool_.release(std::move(ctx)); }); |
185 | |
186 | int ret = 0; |
187 | // Check if need to use quantizer |
188 | if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) { |
189 | std::string new_vector; |
190 | IndexQueryMeta new_meta; |
191 | ret = reformer_->convert(vector.data(), query_meta, &new_vector, &new_meta); |
192 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]" , |
193 | ret); |
194 | ret = proxima_streamer_->update_impl(doc_id, new_vector.data(), new_meta, ctx); |
195 | } else { |
196 | ret = proxima_streamer_->update_impl(doc_id, vector.data(), query_meta, ctx); |
197 | } |
198 | CHECK_RETURN_WITH_LLOG(ret, 0, |
199 | "Update proxima streamer failed. ret[%d] reason[%s]" , |
200 | ret, aitheta2::IndexError::What(ret)); |
201 | |
202 | return 0; |
203 | |
204 | } |
205 | #endif |
206 | |
207 | int VectorColumnIndexer::remove(idx_t doc_id) { |
208 | CHECK_STATUS(opened_, true); |
209 | |
210 | // HNSW do not need to do remove |
211 | if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) { |
212 | auto ctx = context_pool_.acquire(); |
213 | Defer defer([&ctx, this] { context_pool_.release(std::move(ctx)); }); |
214 | |
215 | int ret = proxima_streamer_->remove_impl(doc_id, ctx); |
216 | CHECK_RETURN_WITH_LLOG( |
217 | ret, 0, "Remove from proxima streamer failed. doc_id[%zu] ret[%d]" , |
218 | (size_t)doc_id, ret) |
219 | } |
220 | return 0; |
221 | } |
222 | |
223 | int VectorColumnIndexer::optimize(ThreadPoolPtr pool) { |
224 | CHECK_STATUS(opened_, true); |
225 | |
226 | // HNSW do not need to do optimize |
227 | if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) { |
228 | ailego::ElapsedTime timer; |
229 | int ret = proxima_streamer_->optimize_impl(ThreadPoolPtr(pool)); |
230 | CHECK_RETURN_WITH_LLOG(ret, 0, "Optimize column indexer failed. ret[%d]" , |
231 | ret); |
232 | |
233 | LLOG_DEBUG("Optmize column indexer complete. cost[%zuus]" , |
234 | (size_t)timer.micro_seconds()); |
235 | } |
236 | return 0; |
237 | } |
238 | |
239 | int VectorColumnIndexer::search(const std::string &query, |
240 | const QueryParams &query_params, |
241 | FilterFunction filter, |
242 | IndexDocumentList *result_list) { |
243 | CHECK_STATUS(opened_, true); |
244 | |
245 | std::vector<IndexDocumentList> batch_result_list; |
246 | int ret = this->search(query, query_params, 1, filter, &batch_result_list); |
247 | (*result_list) = batch_result_list[0]; |
248 | return ret; |
249 | } |
250 | |
251 | int VectorColumnIndexer::search( |
252 | const std::string &query, const QueryParams &query_params, |
253 | uint32_t batch_count, FilterFunction filter, |
254 | std::vector<IndexDocumentList> *batch_result_list) { |
255 | CHECK_STATUS(opened_, true); |
256 | |
257 | // Check if query legal |
258 | IndexQueryMeta query_meta; |
259 | auto feature_type = |
260 | IndexHelper::GetProximaFeatureType(query_params.data_type); |
261 | auto dimension = query_params.dimension; |
262 | if (feature_type != FeatureTypes::FT_UNDEFINED && dimension != 0) { |
263 | query_meta.set_meta(feature_type, dimension); |
264 | } else { |
265 | query_meta.set_meta(proxima_meta_.type(), proxima_meta_.dimension()); |
266 | } |
267 | |
268 | if (query_meta.type() != proxima_meta_.type() || |
269 | query_meta.dimension() != proxima_meta_.dimension()) { |
270 | LLOG_ERROR( |
271 | "Invalid query, input query feature type or dimension not matched. " |
272 | "query_feature_type[%d] query_dimension[%u] feature_type[%d] " |
273 | "dimension[%u]" , |
274 | query_meta.type(), query_meta.type(), proxima_meta_.type(), |
275 | proxima_meta_.dimension()); |
276 | return ErrorCode_InvalidQuery; |
277 | } |
278 | |
279 | uint32_t expect_size = query_meta.element_size() * batch_count; |
280 | if (query.size() != expect_size) { |
281 | LLOG_ERROR( |
282 | "Invalid query, query size mismatch. expect_size[%u] " |
283 | "actual_size[%zu]" , |
284 | expect_size, query.size()); |
285 | return ErrorCode_InvalidQuery; |
286 | } |
287 | |
288 | // Get context and set properties. |
289 | // Notice that, we must reset the context |
290 | // when return back into pool. |
291 | auto ctx = context_pool_.acquire(); |
292 | ctx->set_topk(query_params.topk); |
293 | if (filter != nullptr) { |
294 | ctx->set_filter(filter); |
295 | } |
296 | |
297 | // Notice oswg graph do not need to pass filter |
298 | if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) { |
299 | ctx->set_filter(nullptr); |
300 | } |
301 | |
302 | if (query_params.radius > 0.0f) { |
303 | ctx->set_threshold(query_params.radius); |
304 | } |
305 | Defer defer([&ctx, this] { |
306 | ctx->set_filter(nullptr); |
307 | ctx->set_threshold(std::numeric_limits<float>::max()); |
308 | context_pool_.release(std::move(ctx)); |
309 | }); |
310 | |
311 | int ret = 0; |
312 | // Check if need to use quantizer |
313 | if (quantize_type_ != QuantizeTypes::UNDEFINED && reformer_ != nullptr) { |
314 | std::string new_query; |
315 | IndexQueryMeta new_meta; |
316 | ret = reformer_->transform(query.data(), query_meta, &new_query, &new_meta); |
317 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer transform data failed. ret[%d]" , |
318 | ret); |
319 | |
320 | if (query_params.is_linear) { |
321 | ret = proxima_streamer_->search_bf_impl(new_query.data(), new_meta, |
322 | batch_count, ctx); |
323 | } else { |
324 | ret = proxima_streamer_->search_impl(new_query.data(), new_meta, |
325 | batch_count, ctx); |
326 | } |
327 | } else { |
328 | if (query_params.is_linear) { |
329 | ret = proxima_streamer_->search_bf_impl(query.data(), query_meta, |
330 | batch_count, ctx); |
331 | } else { |
332 | ret = proxima_streamer_->search_impl(query.data(), query_meta, |
333 | batch_count, ctx); |
334 | } |
335 | } |
336 | CHECK_RETURN_WITH_LLOG(ret, 0, |
337 | "Search proxima streamer failed. ret[%d] reason[%s]" , |
338 | ret, aitheta2::IndexError::What(ret)); |
339 | |
340 | for (uint32_t i = 0; i < batch_count; i++) { |
341 | auto &result_list = ctx->result(i); |
342 | if (measure_->support_normalize()) { |
343 | for (auto &it : const_cast<IndexDocumentList &>(result_list)) { |
344 | measure_->normalize(it.mutable_score()); |
345 | } |
346 | } |
347 | if (reformer_) { |
348 | reformer_->normalize(query.data(), query_meta, |
349 | const_cast<IndexDocumentList &>(result_list)); |
350 | } |
351 | batch_result_list->emplace_back(result_list); |
352 | } |
353 | |
354 | return 0; |
355 | } |
356 | |
357 | bool VectorColumnIndexer::check_column_meta( |
358 | const meta::ColumnMeta &column_meta) { |
359 | auto index_type = column_meta.index_type(); |
360 | if (index_type != IndexTypes::PROXIMA_GRAPH_INDEX) { |
361 | LOG_ERROR("Column meta config error, only support PROXIMA_GRAPH_INDEX now" ); |
362 | return false; |
363 | } |
364 | |
365 | auto data_type = column_meta.data_type(); |
366 | auto feature_type = IndexHelper::GetProximaFeatureType(data_type); |
367 | if (feature_type == FeatureTypes::FT_UNDEFINED) { |
368 | LLOG_ERROR("Column meta config error, unknown data type." ); |
369 | return false; |
370 | } |
371 | |
372 | auto dimension = column_meta.dimension(); |
373 | if (dimension == 0U) { |
374 | LLOG_ERROR("Column meta config error, dimension can't be 0." ); |
375 | return false; |
376 | } |
377 | |
378 | auto metric_type = column_meta.parameters().get_as_string("metric_type" ); |
379 | if (metric_type.empty()) { |
380 | metric_type = "SquaredEuclidean" ; |
381 | } else if (metric_type == "InnerProduct" ) { |
382 | metric_type = "MipsSquaredEuclidean" ; |
383 | } |
384 | |
385 | auto max_neighbor_count = |
386 | column_meta.parameters().get_as_uint32("max_neighbor_count" ); |
387 | if (max_neighbor_count > 0U) { |
388 | proxima_params_.set("proxima.hnsw.streamer.max_neighbor_count" , |
389 | max_neighbor_count); |
390 | proxima_params_.set("proxima.oswg.streamer.max_neighbor_count" , |
391 | max_neighbor_count); |
392 | } |
393 | |
394 | auto ef_construction = |
395 | column_meta.parameters().get_as_uint32("ef_construction" ); |
396 | if (ef_construction > 0U) { |
397 | proxima_params_.set("proxima.hnsw.streamer.ef_construction" , |
398 | ef_construction); |
399 | proxima_params_.set("proxima.oswg.streamer.ef_construction" , |
400 | ef_construction); |
401 | } |
402 | |
403 | auto ef_search = column_meta.parameters().get_as_uint32("ef_search" ); |
404 | if (ef_search > 0U) { |
405 | proxima_params_.set("proxima.hnsw.streamer.ef" , ef_search); |
406 | proxima_params_.set("proxima.oswg.streamer.ef" , ef_search); |
407 | } else { |
408 | proxima_params_.set("proxima.hnsw.streamer.ef" , 200U); |
409 | proxima_params_.set("proxima.oswg.streamer.ef" , 200U); |
410 | } |
411 | |
412 | auto chunk_size = column_meta.parameters().get_as_uint32("chunk_size" ); |
413 | if (chunk_size > 0U) { |
414 | proxima_params_.set("proxima.hnsw.streamer.chunk_size" , chunk_size); |
415 | proxima_params_.set("proxima.oswg.streamer.segment_size" , chunk_size); |
416 | } else { |
417 | proxima_params_.set("proxima.hnsw.streamer.chunk_size" , |
418 | 64UL * 1024UL * 1024UL); |
419 | proxima_params_.set("proxima.oswg.streamer.segment_size" , |
420 | 64UL * 1024UL * 1024UL); |
421 | } |
422 | |
423 | auto max_scan_ratio = column_meta.parameters().get_as_float("max_scan_ratio" ); |
424 | if (max_scan_ratio > 0.0f) { |
425 | proxima_params_.set("proxima.hnsw.streamer.max_scan_ratio" , max_scan_ratio); |
426 | proxima_params_.set("proxima.oswg.streamer.max_scan_ratio" , max_scan_ratio); |
427 | } |
428 | |
429 | auto visit_bf = |
430 | column_meta.parameters().get_as_bool("visit_bloomfilter_enable" ); |
431 | if (visit_bf) { |
432 | proxima_params_.set("proxima.hnsw.streamer.visit_bloomfilter_enable" , |
433 | visit_bf); |
434 | proxima_params_.set("proxima.oswg.streamer.visit_bloomfilter_enable" , |
435 | visit_bf); |
436 | } |
437 | |
438 | // Check quantize type |
439 | auto quantize_type = column_meta.parameters().get_as_string("quantize_type" ); |
440 | if (!quantize_type.empty()) { |
441 | if (IndexHelper::GetQuantizeType(quantize_type) == |
442 | QuantizeTypes::UNDEFINED) { |
443 | LLOG_ERROR( |
444 | "Column meta config error, unknown quantize type. quantize_type[%s]" , |
445 | quantize_type.c_str()); |
446 | return false; |
447 | } |
448 | |
449 | if (data_type != DataTypes::VECTOR_FP32) { |
450 | LLOG_ERROR( |
451 | "Column meta config error, only FP32 data type can open quantizer" ); |
452 | return false; |
453 | } |
454 | |
455 | quantize_type_ = IndexHelper::GetQuantizeType(quantize_type); |
456 | } |
457 | |
458 | // Default filter duplicate records |
459 | proxima_params_.set("proxima.hnsw.streamer.filter_same_key" , true); |
460 | |
461 | // Set proxima index meta |
462 | proxima_meta_.set_meta(feature_type, dimension); |
463 | proxima_meta_.set_measure(metric_type, 0, IndexParams()); |
464 | |
465 | // Decide which engine to use |
466 | std::string engine = column_meta.parameters().get_as_string("engine" ); |
467 | if (engine == "OSWG" ) { |
468 | engine_type_ = EngineTypes::PROXIMA_OSWG_STREAMER; |
469 | } else if (engine == "HNSW" ) { |
470 | engine_type_ = EngineTypes::PROXIMA_HNSW_STREAMER; |
471 | } |
472 | |
473 | LLOG_INFO( |
474 | "Show vector column indexer options. index_type[%u] data_type[%u] " |
475 | "dimension[%u] " |
476 | "measure[%s] context_count[%u] max_neighbor_count[%u] " |
477 | "ef_construction[%u] chunk_size[%u] ef_search[%u] max_scan_ratio[%f] " |
478 | "visit_bf[%d] quantize_type[%s] engine_type[%d]" , |
479 | index_type, data_type, dimension, metric_type.c_str(), |
480 | this->concurrency(), max_neighbor_count, ef_construction, chunk_size, |
481 | ef_search, max_scan_ratio, visit_bf, quantize_type.c_str(), engine_type_); |
482 | |
483 | return true; |
484 | } |
485 | |
486 | int VectorColumnIndexer::open_proxima_streamer() { |
487 | int ret = 0; |
488 | auto index_meta = proxima_meta_; |
489 | // Check if need to open quantizer |
490 | if (quantize_type_ != QuantizeTypes::UNDEFINED) { |
491 | IndexConverterPtr converter; |
492 | switch (quantize_type_) { |
493 | case QuantizeTypes::VECTOR_INT4: |
494 | converter = |
495 | aitheta2::IndexFactory::CreateConverter("Int4StreamingConverter" ); |
496 | break; |
497 | case QuantizeTypes::VECTOR_INT8: |
498 | converter = |
499 | aitheta2::IndexFactory::CreateConverter("Int8StreamingConverter" ); |
500 | break; |
501 | case QuantizeTypes::VECTOR_FP16: |
502 | converter = |
503 | aitheta2::IndexFactory::CreateConverter("HalfFloatConverter" ); |
504 | break; |
505 | default: |
506 | return ErrorCode_RuntimeError; |
507 | } |
508 | |
509 | if (!converter) { |
510 | LLOG_ERROR("Create converter failed." ); |
511 | return ErrorCode_RuntimeError; |
512 | } |
513 | |
514 | ret = converter->init(proxima_meta_, IndexParams()); |
515 | CHECK_RETURN_WITH_LLOG(ret, 0, "Converter init failed. ret[%d]" , ret); |
516 | index_meta = converter->meta(); |
517 | |
518 | reformer_ = |
519 | aitheta2::IndexFactory::CreateReformer(index_meta.reformer_name()); |
520 | ret = reformer_->init(IndexParams()); |
521 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]" , ret); |
522 | } |
523 | |
524 | // Init measure |
525 | measure_ = |
526 | aitheta2::IndexFactory::CreateMeasure(proxima_meta_.measure_name()); |
527 | if (!measure_) { |
528 | LLOG_ERROR("Create measure %s failed" , |
529 | proxima_meta_.measure_name().c_str()); |
530 | return aitheta2::IndexError_Runtime; |
531 | } |
532 | ret = measure_->init(proxima_meta_, IndexParams()); |
533 | CHECK_RETURN_WITH_LLOG(ret, 0, "Reformer init failed. ret[%d]" , ret); |
534 | auto query_measure = measure_->query_measure(); |
535 | if (query_measure) { |
536 | measure_ = query_measure; |
537 | } |
538 | |
539 | // Get actual engine name and initialize it with factory |
540 | std::string engine_name = this->get_engine_name(); |
541 | proxima_streamer_ = aitheta2::IndexFactory::CreateStreamer(engine_name); |
542 | if (!proxima_streamer_) { |
543 | LLOG_ERROR("Create proxima streamer failed. name[%s]" , engine_name.c_str()); |
544 | return ErrorCode_RuntimeError; |
545 | } |
546 | |
547 | //! Notice use new index meta as initialize params |
548 | //! When user config quantize type, it may change its value. |
549 | ret = proxima_streamer_->init(index_meta, proxima_params_); |
550 | CHECK_RETURN_WITH_LLOG(ret, 0, "Init proxima streamer failed. ret[%d]" , ret); |
551 | |
552 | ret = proxima_streamer_->open(snapshot_->data()); |
553 | CHECK_RETURN_WITH_LLOG(ret, 0, "Open proxima streamer failed. ret[%d]" , ret); |
554 | |
555 | // Initialize context pool |
556 | for (uint32_t i = 0; i < this->concurrency(); i++) { |
557 | auto ctx = proxima_streamer_->create_context(); |
558 | if (!ctx) { |
559 | LLOG_ERROR("Create proxima streamer context failed." ); |
560 | return ErrorCode_RuntimeError; |
561 | } |
562 | context_pool_.emplace(std::move(ctx)); |
563 | } |
564 | |
565 | return 0; |
566 | } |
567 | |
568 | |
569 | } // end namespace index |
570 | } // namespace be |
571 | } // end namespace proxima |
572 | |