1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of memory segment
19 */
20
21#include "memory_segment.h"
22#include <chrono>
23#include <ailego/utility/time_helper.h>
24#include "common/error_code.h"
25#include "../file_helper.h"
26#include "../typedef.h"
27
28namespace proxima {
29namespace be {
30namespace index {
31
32
33MemorySegmentPtr MemorySegment::Create(const std::string &collection_name,
34 const std::string &collection_path,
35 const SegmentMeta &segment_meta,
36 const meta::CollectionMeta *schema,
37 const DeleteStore *delete_store,
38 const IDMap *id_map,
39 uint32_t concurrency) {
40 return std::make_shared<MemorySegment>(collection_name, collection_path,
41 segment_meta, schema, delete_store,
42 id_map, concurrency);
43}
44
45int MemorySegment::CreateAndOpen(
46 const std::string &collection_name, const std::string &collection_path,
47 const SegmentMeta &segment_meta, const meta::CollectionMeta *schema,
48 const DeleteStore *delete_store, const IDMap *id_map, uint32_t concurrency,
49 const ReadOptions &read_options, MemorySegmentPtr *memory_segment) {
50 *memory_segment = Create(collection_name, collection_path, segment_meta,
51 schema, delete_store, id_map, concurrency);
52
53 return (*memory_segment)->open(read_options);
54}
55
56
57MemorySegment::~MemorySegment() {
58 if (opened_) {
59 if (segment_meta_.state == SegmentState::PERSIST) {
60 close_and_remove_files();
61 } else {
62 close();
63 }
64 }
65}
66
67int MemorySegment::open(const ReadOptions &read_options) {
68 CHECK_STATUS(opened_, false);
69
70 int ret = open_forward_indexer(read_options);
71 CHECK_RETURN(ret, 0);
72
73 ret = open_column_indexers(read_options);
74 CHECK_RETURN(ret, 0);
75
76 segment_meta_.index_file_count = this->get_index_file_count();
77 segment_meta_.index_file_size = this->get_index_file_size();
78
79 opened_ = true;
80 SLOG_INFO("Opened memory segment.");
81 return 0;
82}
83
84int MemorySegment::close() {
85 CHECK_STATUS(opened_, true);
86
87 // try to ensure active insert requests finished
88 uint32_t retry = 0;
89 while (retry < MAX_WAIT_RETRY_COUNT &&
90 (active_insert_count_ > 0 || active_search_count_ > 0)) {
91 LOG_INFO(
92 "Try to wait active request finished. active_insert_count[%zu] "
93 "active_search_count[%zu] retry[%d]",
94 (size_t)active_insert_count_.load(),
95 (size_t)active_search_count_.load(), retry);
96 std::this_thread::sleep_for(std::chrono::seconds(1));
97 retry++;
98 }
99
100 forward_indexer_->close();
101 for (auto &it : column_indexers_) {
102 it.second->close();
103 }
104 column_indexers_.clear();
105
106 opened_ = false;
107 SLOG_DEBUG("Closed memory segment.");
108 return 0;
109}
110
111
112int MemorySegment::flush() {
113 CHECK_STATUS(opened_, true);
114
115 forward_indexer_->flush();
116 for (auto &it : column_indexers_) {
117 it.second->flush();
118 }
119
120 segment_meta_.index_file_count = this->get_index_file_count();
121 segment_meta_.index_file_size = this->get_index_file_size();
122 return 0;
123}
124
125int MemorySegment::dump() {
126 CHECK_STATUS(opened_, true);
127
128 // try to ensure active insert requests finished
129 uint32_t retry = 0;
130 while (retry < MAX_WAIT_RETRY_COUNT && active_insert_count_ > 0) {
131 LOG_INFO(
132 "Try to wait active request finished. active_insert_count[%zu] "
133 "retry[%d]",
134 (size_t)active_insert_count_.load(), retry);
135 std::this_thread::sleep_for(std::chrono::seconds(1));
136 retry++;
137 }
138
139 auto dumper = aitheta2::IndexFactory::CreateDumper("FileDumper");
140 if (!dumper) {
141 SLOG_ERROR("Create dumper failed.");
142 return ErrorCode_RuntimeError;
143 }
144
145 std::string segment_file_path = FileHelper::MakeFilePath(
146 collection_path_, FileID::SEGMENT_FILE, segment_meta_.segment_id);
147
148 int ret = dumper->create(segment_file_path);
149 CHECK_RETURN_WITH_CLOG(ret, 0, "Create dumper file failed.");
150
151 ret = dump_forward_indexer(dumper);
152 CHECK_RETURN(ret, 0);
153
154 ret = dump_column_indexers(dumper);
155 CHECK_RETURN(ret, 0);
156
157 dumper->close();
158
159 segment_meta_.index_file_count = 1U;
160 segment_meta_.index_file_size = FileHelper::FileSize(segment_file_path);
161 return 0;
162}
163
164int MemorySegment::close_and_remove_files() {
165 CHECK_STATUS(opened_, true);
166
167 forward_indexer_->close();
168 FileHelper::RemoveFile(forward_indexer_->index_file_path());
169
170 for (auto &it : column_indexers_) {
171 it.second->close();
172 FileHelper::RemoveFile(it.second->index_file_path());
173 }
174 column_indexers_.clear();
175
176 opened_ = false;
177 SLOG_DEBUG("Closed memory segment and remove index files.");
178 return 0;
179}
180
181int MemorySegment::insert(const Record &record, idx_t *doc_id) {
182 CHECK_STATUS(opened_, true);
183
184 AutoCounter ac(active_insert_count_);
185
186 // 1. insert into forward indexer first
187 ForwardData fwd_data;
188 fwd_data.header.primary_key = record.primary_key;
189 fwd_data.header.timestamp = record.timestamp;
190 fwd_data.header.lsn = record.lsn;
191 fwd_data.header.revision = record.revision;
192 fwd_data.data = std::move(record.forward_data);
193
194 int ret = forward_indexer_->insert(fwd_data, doc_id);
195 CHECK_RETURN_WITH_SLOG(ret, 0, "Insert into forward indexer failed. key[%zu]",
196 (size_t)record.primary_key);
197
198 // 2. insert into column indexers
199 for (size_t i = 0; i < record.column_datas.size(); i++) {
200 auto &column_data = record.column_datas[i];
201 std::string column_name = column_data.column_name;
202
203 // Skip not-exist column
204 if (!column_indexers_.has(column_name)) {
205 SLOG_ERROR("Not find column indexer. column[%s]", column_name.c_str());
206 continue;
207 }
208
209 auto &column_indexer = column_indexers_.get(column_name);
210 ret = column_indexer->insert(*doc_id, column_data);
211 CHECK_RETURN_WITH_SLOG(
212 ret, 0, "Insert into column indexer failed. key[%zu] column[%s]",
213 (size_t)record.primary_key, column_name.c_str());
214 }
215
216 // 3. update segment stats
217 update_stats(record, *doc_id);
218 return 0;
219}
220
221int MemorySegment::remove(idx_t doc_id) {
222 CHECK_STATUS(opened_, true);
223
224 ailego::ElapsedTime timer;
225 int ret = 0;
226
227 // No need to remove forward data.
228#if 0
229 ret = forward_indexer_->remove(doc_id);
230 CHECK_RETURN_WITH_SLOG(ret, 0,
231 "Remove from forward indexer failed. doc_id[%zu]",
232 (size_t)doc_id);
233#endif
234
235 for (auto &column_meta : schema_->index_columns()) {
236 std::string column_name = column_meta->name();
237 if (column_indexers_.has(column_name)) {
238 ret = column_indexers_.get(column_name)->remove(doc_id);
239 if (ret != 0) {
240 SLOG_WARN(
241 "Remove from column indexer failed. column_name[%s] doc_id[%zu]",
242 column_name.c_str(), (size_t)doc_id);
243 continue;
244 }
245 }
246 }
247
248 SLOG_DEBUG("Remove from memory segment success. doc_id[%lu] cost[%zuus]",
249 (size_t)doc_id, timer.micro_seconds());
250 return 0;
251}
252
253int MemorySegment::optimize(ThreadPoolPtr pool) {
254 CHECK_STATUS(opened_, true);
255
256 int ret = 0;
257 for (auto &column_meta : schema_->index_columns()) {
258 std::string column_name = column_meta->name();
259 if (column_indexers_.has(column_name)) {
260 ret = column_indexers_.get(column_name)->optimize(pool);
261 if (ret != 0) {
262 SLOG_WARN("Optimize column indexer failed. column_name[%s]",
263 column_name.c_str());
264 continue;
265 }
266 }
267 }
268
269 return 0;
270}
271
272#if 0
273int MemorySegment::update(idx_t doc_id, const Record &record) {
274 CHECK_STATUS(opened_, true);
275
276 AutoCounter ac(active_insert_count_);
277
278 // 1. insert into forward indexer first
279 ForwardData fwd_data;
280 fwd_data.header.primary_key = record.primary_key;
281 fwd_data.header.timestamp = record.timestamp;
282 fwd_data.header.lsn = record.lsn;
283 fwd_data.header.revision = record.revision;
284 fwd_data.data = std::move(record.forward_data);
285
286 int ret = forward_indexer_->update(doc_id, fwd_data);
287 CHECK_RETURN_WITH_SLOG(ret, 0, "Update forward indexer failed. key[%zu]",
288 (size_t)record.primary_key);
289
290 // 2. insert into column indexers
291 for (size_t i = 0; i < record.column_datas.size(); i++) {
292 auto &column_data = record.column_datas[i];
293 auto &column_name = column_data.column_name;
294
295 // Skip not-exist column
296 if (!column_indexers_.has(column_name)) {
297 SLOG_ERROR("Not find column indexer. column[%s]", column_name.c_str());
298 continue;
299 }
300
301 auto &column_indexer = column_indexers_.get(column_name);
302 ret = column_indexer->update(doc_id, column_data);
303 CHECK_RETURN_WITH_SLOG(
304 ret, 0, "Update column indexer failed. key[%zu] column[%s]",
305 (size_t)record.primary_key, column_name.c_str());
306 }
307
308 return 0;
309}
310#endif
311
312int MemorySegment::knn_search(const std::string &column_name,
313 const std::string &query,
314 const QueryParams &query_params,
315 QueryResultList *result) {
316 CHECK_STATUS(opened_, true);
317
318 std::vector<QueryResultList> batch_results;
319 int ret =
320 this->knn_search(column_name, query, query_params, 1, &batch_results);
321 CHECK_RETURN(ret, 0);
322
323 (*result) = batch_results[0];
324 return 0;
325}
326
327int MemorySegment::knn_search(const std::string &column_name,
328 const std::string &query,
329 const QueryParams &query_params,
330 uint32_t batch_count,
331 std::vector<QueryResultList> *batch_results) {
332 CHECK_STATUS(opened_, true);
333
334 AutoCounter as(active_search_count_);
335
336 ailego::ElapsedTime timer;
337 uint64_t query_id = query_params.query_id;
338 if (!column_indexers_.has(column_name)) {
339 SLOG_ERROR("Column not exist. query_id[%zu] column[%s]", (size_t)query_id,
340 column_name.c_str());
341 return ErrorCode_InexistentColumn;
342 }
343
344 // check query format
345 auto &column_indexer = column_indexers_.get(column_name);
346
347 // search columns
348 std::vector<IndexDocumentList> batch_search_results;
349 FilterFunction filter = nullptr;
350 // If user choose to use deep delete, then we don't need to pass filter
351 // to column indexer.
352 if (delete_store_ && delete_store_->count() > 0) {
353 filter = [this](idx_t doc_id) { return delete_store_->has(doc_id); };
354 }
355
356 int ret = column_indexer->search(query, query_params, batch_count, filter,
357 &batch_search_results);
358 CHECK_RETURN_WITH_SLOG(
359 ret, 0, "Column indexer search failed. query_id[%zu] column[%s]",
360 (size_t)query_id, column_name.c_str());
361
362 // fill results
363 uint32_t res_num = 0U;
364 for (size_t i = 0; i < batch_search_results.size(); i++) {
365 auto &search_results = batch_search_results[i];
366 QueryResultList output_result_list;
367 for (size_t j = 0; j < search_results.size(); j++) {
368 idx_t doc_id = search_results[j].key();
369 ForwardData fwd_data;
370 ret = forward_indexer_->seek(doc_id, &fwd_data);
371 if (ret != 0) {
372 SLOG_WARN(
373 "Forward data not exist. query_id[%zu] doc_id[%zu] column[%s]",
374 (size_t)query_id, (size_t)doc_id, column_name.c_str());
375 continue;
376 }
377 QueryResult res;
378 res.primary_key = fwd_data.header.primary_key;
379 res.score = search_results[j].score();
380 res.revision = fwd_data.header.revision;
381 res.forward_data = std::move(fwd_data.data);
382 res.lsn = fwd_data.header.lsn;
383 output_result_list.emplace_back(res);
384 }
385 res_num += search_results.size();
386 batch_results->emplace_back(output_result_list);
387 }
388
389 SLOG_DEBUG(
390 "Knn search query success. query_id[%zu] "
391 "batch_count[%u] topk[%u] res_num[%u] cost[%zuus] column[%s]",
392 (size_t)query_id, batch_count, query_params.topk, res_num,
393 (size_t)timer.micro_seconds(), column_name.c_str());
394
395 return 0;
396}
397
398int MemorySegment::kv_search(uint64_t primary_key, QueryResult *result) {
399 CHECK_STATUS(opened_, true);
400
401 idx_t doc_id = id_map_->get_mapping_id(primary_key);
402 bool found = false;
403 result->primary_key = INVALID_KEY;
404
405 if (!delete_store_->has(doc_id)) {
406 if (doc_id >= segment_meta_.min_doc_id &&
407 doc_id <= segment_meta_.max_doc_id) {
408 ForwardData fwd_data;
409 int ret = forward_indexer_->seek(doc_id, &fwd_data);
410 if (ret == 0 && fwd_data.header.primary_key != INVALID_KEY) {
411 result->primary_key = fwd_data.header.primary_key;
412 result->revision = fwd_data.header.revision;
413 result->forward_data = std::move(fwd_data.data);
414 result->lsn = fwd_data.header.lsn;
415 found = true;
416 }
417 }
418 }
419
420 SLOG_DEBUG("Kv search query success. key[%zu] found[%d]", (size_t)primary_key,
421 found);
422 return 0;
423}
424
425int MemorySegment::remove_column(const std::string &column_name) {
426 CHECK_STATUS(opened_, true);
427
428 if (!column_indexers_.has(column_name)) {
429 SLOG_WARN("Column not exist, remove failed. column[%s]",
430 column_name.c_str());
431 return 0;
432 }
433
434 column_indexers_.get(column_name)->close();
435 column_indexers_.erase(column_name);
436
437 SLOG_INFO("Remove column done. column[%s]", column_name.c_str());
438 return 0;
439}
440
441int MemorySegment::add_column(const meta::ColumnMetaPtr &column_meta) {
442 CHECK_STATUS(opened_, true);
443
444 std::string column_name = column_meta->name();
445 if (column_indexers_.has(column_name)) {
446 SLOG_WARN("Column already exist, add failed. column[%s]",
447 column_name.c_str());
448 return 0;
449 }
450
451 ReadOptions read_options;
452 read_options.use_mmap = true;
453 read_options.create_new = true;
454
455 ColumnIndexerPtr column_indexer = ColumnIndexer::Create(
456 collection_name_, collection_path_, segment_meta_.segment_id, column_name,
457 column_meta->index_type());
458
459 if (!column_indexer) {
460 SLOG_ERROR("Create column indexer failed. index_type[%d] column[%s]",
461 column_meta->index_type(), column_name.c_str());
462 return ErrorCode_RuntimeError;
463 }
464
465 column_indexer->set_concurrency(concurrency_);
466 int ret = column_indexer->open(*column_meta.get(), read_options);
467 CHECK_RETURN_WITH_SLOG(
468 ret, 0, "Create and open column indexer failed. ret[%d] column[%s]", ret,
469 column_name.c_str());
470
471 column_indexers_.emplace(column_name, column_indexer);
472 SLOG_INFO("Add column success.column[%s]", column_name.c_str());
473
474 return 0;
475}
476
477int MemorySegment::open_forward_indexer(const ReadOptions &read_options) {
478 forward_indexer_ = ForwardIndexer::Create(collection_name_, collection_path_,
479 segment_meta_.segment_id);
480 if (!forward_indexer_) {
481 SLOG_ERROR("Create forward indexer failed.");
482 return ErrorCode_RuntimeError;
483 }
484
485 forward_indexer_->set_start_doc_id(segment_meta_.min_doc_id);
486 int ret = forward_indexer_->open(read_options);
487 CHECK_RETURN_WITH_SLOG(ret, 0, "Open forward indexer failed.");
488
489 SLOG_DEBUG("Opened forward indexer. min_doc_id[%zu] forward_count[%zu]",
490 (size_t)segment_meta_.min_doc_id,
491 (size_t)forward_indexer_->doc_count());
492 return 0;
493}
494
495int MemorySegment::open_column_indexers(const ReadOptions &read_options) {
496 for (auto &column_meta : schema_->index_columns()) {
497 std::string column_name = column_meta->name();
498
499 ColumnIndexerPtr column_indexer = ColumnIndexer::Create(
500 collection_name_, collection_path_, segment_meta_.segment_id,
501 column_name, column_meta->index_type());
502 if (!column_indexer) {
503 SLOG_ERROR("Create column indexer failed. index_type[%d] column[%s]",
504 column_meta->index_type(), column_name.c_str());
505 return ErrorCode_RuntimeError;
506 }
507
508 int ret = column_indexer->open(*column_meta.get(), read_options);
509 CHECK_RETURN_WITH_SLOG(
510 ret, 0, "Create and open column indexer failed. ret[%d] column[%s]",
511 ret, column_name.c_str());
512
513 column_indexers_.emplace(column_name, column_indexer);
514 SLOG_DEBUG("Opened column indexer. column[%s]", column_name.c_str());
515 }
516
517 return 0;
518}
519
520int MemorySegment::dump_forward_indexer(const IndexDumperPtr &dumper) {
521 IndexDumperPtr fwd_dumper =
522 std::make_shared<IndexSegmentDumper>(dumper, FORWARD_DUMP_BLOCK);
523 int ret = forward_indexer_->dump(fwd_dumper);
524 CHECK_RETURN_WITH_SLOG(ret, 0, "Dump forward indexer failed.");
525
526 fwd_dumper->close();
527 return 0;
528}
529
530int MemorySegment::dump_column_indexers(const IndexDumperPtr &dumper) {
531 for (auto &it : column_indexers_) {
532 std::string column_name = it.first;
533 auto &column_indexer = it.second;
534 IndexDumperPtr index_dumper = std::make_shared<IndexSegmentDumper>(
535 dumper, COLUMN_DUMP_BLOCK + column_name);
536 int ret = column_indexer->dump(index_dumper);
537 CHECK_RETURN_WITH_SLOG(ret, 0, "Dump column indexer failed. column[%s]",
538 column_name.c_str());
539 index_dumper->close();
540 }
541 return 0;
542}
543
544void MemorySegment::update_stats(const Record &record, idx_t doc_id) {
545 std::lock_guard<std::mutex> lock(mutex_);
546 segment_meta_.doc_count++;
547
548 if (doc_id > segment_meta_.max_doc_id) {
549 segment_meta_.max_doc_id = doc_id;
550 }
551
552 if (record.primary_key < segment_meta_.min_primary_key) {
553 segment_meta_.min_primary_key = record.primary_key;
554 }
555
556 if (record.primary_key > segment_meta_.max_primary_key) {
557 segment_meta_.max_primary_key = record.primary_key;
558 }
559
560 if (record.timestamp < segment_meta_.min_timestamp) {
561 segment_meta_.min_timestamp = record.timestamp;
562 }
563
564 if (record.timestamp > segment_meta_.max_timestamp) {
565 segment_meta_.max_timestamp = record.timestamp;
566 }
567
568 if (record.lsn > segment_meta_.max_lsn) {
569 segment_meta_.max_lsn = record.lsn;
570 }
571
572 if (record.lsn < segment_meta_.min_lsn) {
573 segment_meta_.min_lsn = record.lsn;
574 }
575}
576
577size_t MemorySegment::get_index_file_count() {
578 return column_indexers_.size() + 1;
579}
580
581size_t MemorySegment::get_index_file_size() {
582 size_t file_size = 0U;
583 for (auto &it : column_indexers_) {
584 file_size += FileHelper::FileSize(it.second->index_file_path());
585 }
586
587 file_size += FileHelper::FileSize(forward_indexer_->index_file_path());
588 return file_size;
589}
590
591
592} // end namespace index
593} // namespace be
594} // end namespace proxima
595