1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of collection class
19 */
20
21#include "collection.h"
22#include <chrono>
23#include <ailego/container/heap.h>
24#include <ailego/utility/time_helper.h>
25#include "common/defer.h"
26#include "common/error_code.h"
27#include "common/logger.h"
28#include "constants.h"
29#include "file_helper.h"
30#include "typedef.h"
31
32namespace proxima {
33namespace be {
34namespace index {
35
36CollectionPtr Collection::Create(const std::string &collection_name,
37 const std::string &prefix_path,
38 meta::CollectionMetaPtr schema,
39 uint32_t concurrency,
40 ThreadPool *thread_pool) {
41 return std::make_shared<Collection>(collection_name, prefix_path, schema,
42 concurrency, thread_pool);
43}
44
45int Collection::CreateAndOpen(const std::string &collection_name,
46 const std::string &prefix_path,
47 meta::CollectionMetaPtr schema,
48 uint32_t concurrency, ThreadPool *thread_pool,
49 const ReadOptions &read_options,
50 CollectionPtr *collection) {
51 *collection = std::make_shared<Collection>(collection_name, prefix_path,
52 schema, concurrency, thread_pool);
53
54 return (*collection)->open(read_options);
55}
56
57Collection::Collection(const std::string &coll_name, const std::string &path,
58 meta::CollectionMetaPtr coll_meta, uint32_t concur,
59 ThreadPool *pool)
60 : collection_name_(coll_name),
61 prefix_path_(path),
62 schema_(std::move(coll_meta)),
63 concurrency_(concur),
64 thread_pool_(pool) {}
65
66Collection::~Collection() {
67 if (opened_) {
68 close();
69 }
70}
71
72int Collection::open(const ReadOptions &read_options) {
73 CHECK_STATUS(opened_, false);
74
75 dir_path_ = prefix_path_ + "/" + collection_name_;
76 std::string manifest_file_path =
77 FileHelper::MakeFilePath(dir_path_, FileID::MANIFEST_FILE);
78
79 // check index data
80 if (read_options.create_new) {
81 if (FileHelper::DirectoryExists(dir_path_)) {
82 CLOG_ERROR("Index directory already exist, create failed. dir_path[%s]",
83 dir_path_.c_str());
84 return ErrorCode_DuplicateCollection;
85 }
86 } else {
87 if (!FileHelper::DirectoryExists(dir_path_) ||
88 !FileHelper::FileExists(manifest_file_path)) {
89 CLOG_ERROR(
90 "Index directory or manifest not exist, open failed. dir_path[%s]",
91 dir_path_.c_str());
92 return ErrorCode_InvalidIndexDataFormat;
93 }
94 }
95
96 int ret = recover_from_snapshot(read_options);
97 if (ret != 0) {
98 CLOG_ERROR("Recover from snapshot failed.");
99
100 // if create new collection failed
101 // need to cleanup history files
102 if (read_options.create_new) {
103 this->remove_files();
104 }
105 return ret;
106 }
107
108 opened_ = true;
109
110 CollectionStats stats;
111 this->get_stats(&stats);
112 CLOG_INFO(
113 "Open colletion success. doc_count[%zu] segment_count[%zu] "
114 "max_docs_per_segment[%zu]",
115 (size_t)stats.total_doc_count, (size_t)stats.total_segment_count,
116 (size_t)schema_->max_docs_per_segment());
117
118 return 0;
119}
120
121int Collection::close() {
122 CHECK_STATUS(opened_, true);
123
124 // Wait until dump ended
125 while (is_dumping_) {
126 LOG_INFO("Collection is dumping segment, wait until dumped...");
127 std::this_thread::sleep_for(std::chrono::seconds(1));
128 }
129
130 // Wait until flush ended
131 while (is_flushing_) {
132 LOG_INFO("Collection is flushing, wait until flushed...");
133 std::this_thread::sleep_for(std::chrono::seconds(1));
134 }
135
136 // Wait until optimize ended
137 while (is_optimizing_) {
138 LOG_INFO("Collection is optimizing, wait until optimized...");
139 std::this_thread::sleep_for(std::chrono::seconds(1));
140 }
141
142 // Close writing segment
143 writing_segment_->close();
144
145 // maybe dumping segment process exist error
146 // so we still close dumping segment safely
147 if (dumping_segment_ != nullptr) {
148 dumping_segment_->close();
149 }
150
151 persist_segment_mgr_->unload_segments();
152
153 id_map_->close();
154 delete_store_->close();
155 lsn_store_->close();
156 version_manager_->close();
157
158 opened_ = false;
159 CLOG_INFO("Close collection success.");
160
161 return 0;
162}
163
164int Collection::close_and_cleanup() {
165 CHECK_STATUS(opened_, true);
166
167 this->close();
168 this->remove_files();
169
170 return 0;
171}
172
173int Collection::flush() {
174 CHECK_STATUS(opened_, true);
175
176 CLOG_INFO("Start flushing collection.");
177 ailego::ElapsedTime timer;
178 is_flushing_ = true;
179
180 Defer defer([this] { is_flushing_ = false; });
181
182 int ret = 0;
183 ret = writing_segment_->flush();
184 CHECK_RETURN_WITH_CLOG(ret, 0, "Flush writing segment failed.");
185
186 ret = id_map_->flush();
187 CHECK_RETURN_WITH_CLOG(ret, 0, "Flush id map failed.");
188
189 ret = delete_store_->flush();
190 CHECK_RETURN_WITH_CLOG(ret, 0, "Flush delete store failed.");
191
192 ret = lsn_store_->flush();
193 CHECK_RETURN_WITH_CLOG(ret, 0, "Flush lsn store failed.");
194
195 version_manager_->update_segment_meta(writing_segment_->segment_meta());
196 ret = version_manager_->flush();
197 CHECK_RETURN_WITH_CLOG(ret, 0, "Flush version manager failed.");
198
199 CLOG_INFO("Ended flushing collection. cost[%zums]",
200 (size_t)timer.milli_seconds());
201 return 0;
202}
203
204int Collection::dump() {
205 CHECK_STATUS(opened_, true);
206
207 return this->drive_dump_segment();
208}
209
210int Collection::optimize(ThreadPoolPtr pool) {
211 CHECK_STATUS(opened_, true);
212
213 CLOG_INFO("Start optimizing collection.");
214 ailego::ElapsedTime timer;
215 is_optimizing_ = true;
216
217 Defer defer([this] { is_optimizing_ = false; });
218
219 int ret = writing_segment_->optimize(pool);
220 CHECK_RETURN_WITH_CLOG(ret, 0, "Optimize writing segment failed.");
221
222 CLOG_INFO("Ended optimizing collection. cost[%zums]",
223 (size_t)timer.milli_seconds());
224 return 0;
225}
226
227int Collection::remove_files() {
228 return FileHelper::RemoveDirectory(dir_path_);
229}
230
231int Collection::write_records(const CollectionDataset &records) {
232 CHECK_STATUS(opened_, true);
233
234 ailego::ElapsedTime timer;
235 int ret = 0;
236 int error_code = 0;
237 for (size_t i = 0; i < records.size(); i++) {
238 auto &record = records.get(i);
239 uint64_t primary_key = record.primary_key;
240 uint64_t lsn = record.lsn;
241
242 switch (record.operation_type) {
243 case OperationTypes::INSERT:
244 ret = insert_record(record);
245 if (ret != 0) {
246 error_code = ret;
247 CLOG_ERROR("Insert record failed. key[%zu] lsn[%zu] rt[%zuus]",
248 (size_t)primary_key, (size_t)lsn,
249 (size_t)timer.micro_seconds());
250 } else {
251 CLOG_INFO("Insert record success. key[%zu] lsn[%zu] rt[%zuus]",
252 (size_t)primary_key, (size_t)lsn,
253 (size_t)timer.micro_seconds());
254 }
255 break;
256 case OperationTypes::UPDATE:
257 ret = update_record(record);
258 if (ret != 0) {
259 error_code = ret;
260 CLOG_ERROR("Update record failed. key[%zu] lsn[%zu] rt[%zuus]",
261 (size_t)primary_key, (size_t)lsn,
262 (size_t)timer.micro_seconds());
263 } else {
264 CLOG_INFO("Update record success. key[%zu] lsn[%zu] rt[%zuus]",
265 (size_t)primary_key, (size_t)lsn,
266 (size_t)timer.micro_seconds());
267 }
268 break;
269 case OperationTypes::DELETE:
270 ret = delete_record(primary_key);
271 if (ret != 0) {
272 error_code = ret;
273 CLOG_ERROR("Delete record failed. key[%zu] lsn[%zu] rt[%zuus]",
274 (size_t)primary_key, (size_t)lsn,
275 (size_t)timer.micro_seconds());
276 } else {
277 CLOG_INFO("Delete record success. key[%zu] lsn[%zu] rt[%zuus]",
278 (size_t)primary_key, (size_t)lsn,
279 (size_t)timer.micro_seconds());
280 }
281 break;
282 default:
283 CLOG_ERROR("Unknown operation type. type[%d]", record.operation_type);
284 }
285 }
286
287 return error_code;
288}
289
290int Collection::insert_record(const Record &record) {
291 // 1. check if record already exists
292 if (this->has_record(record.primary_key)) {
293 CLOG_ERROR("Insert duplicate record. key[%zu]", (size_t)record.primary_key);
294 return ErrorCode_DuplicateKey;
295 }
296
297 // 2. insert into memory segment
298 idx_t doc_id = INVALID_DOC_ID;
299 int ret = writing_segment_->insert(record, &doc_id);
300 CHECK_RETURN_WITH_CLOG(ret, 0, "Insert into memory segment failed. key[%zu]",
301 (size_t)record.primary_key);
302
303 // 3. record key/doc_id mapping in id map
304 ret = id_map_->insert(record.primary_key, doc_id);
305 CHECK_RETURN_WITH_CLOG(ret, 0, "Insert into id map failed. key[%zu]",
306 (size_t)record.primary_key);
307
308 // 4. record in lsn store
309 ret = lsn_store_->append(record.lsn, record.lsn_context);
310 if (ret != 0) {
311 // do not need to terminate insert process
312 CLOG_WARN("Lsn store append failed. key[%zu]", (size_t)record.primary_key);
313 }
314
315 // try to drive dump writing segment
316 uint64_t max_docs_per_segment = schema_->max_docs_per_segment();
317 if (max_docs_per_segment > 0 &&
318 writing_segment_->doc_count() >= max_docs_per_segment) {
319 drive_dump_segment();
320 }
321
322 return 0;
323}
324
325int Collection::delete_record(uint64_t primary_key) {
326 // 1. check if record exist
327 if (!this->has_record(primary_key)) {
328 CLOG_ERROR("Record not exist in colletion. key[%zu]", (size_t)primary_key);
329 return ErrorCode_InexistentKey;
330 }
331
332 // 2. get key/doc_id mapping
333 idx_t doc_id = id_map_->get_mapping_id(primary_key);
334 if (doc_id == INVALID_DOC_ID) {
335 CLOG_ERROR("Get mapping doc-id failed. key[%zu]", (size_t)primary_key);
336 return ErrorCode_RuntimeError;
337 }
338
339 // 3. insert into delete map
340 int ret = delete_store_->insert(doc_id);
341 CHECK_RETURN_WITH_CLOG(ret, 0, "Insert into delete map failed.");
342
343 // 4. remove mapping in id_map
344 id_map_->remove(primary_key);
345
346 // 5. try to inplace remove in writing segment
347 if (writing_segment_->is_in_range(doc_id)) {
348 ret = writing_segment_->remove(doc_id);
349 CHECK_RETURN_WITH_CLOG(ret, 0, "Remove from writing segment failed.");
350 }
351 return 0;
352}
353
354int Collection::update_record(const Record &record) {
355 // 1. check if record exist
356 if (!this->has_record(record.primary_key)) {
357 CLOG_ERROR("Record not exist in collection. key[%zu]",
358 (size_t)record.primary_key);
359 return ErrorCode_InexistentKey;
360 }
361
362 // 2. check record lsn
363 int ret = 0;
364 if (record.lsn_check) {
365 Record old_record;
366 ret = this->search_record(record.primary_key, &old_record);
367 if (ret != 0 || old_record.primary_key == INVALID_KEY) {
368 CLOG_ERROR("Search record failed. key[%zu]", (size_t)record.primary_key);
369 return ret;
370 }
371
372 if (record.lsn <= old_record.lsn) {
373 CLOG_ERROR("Invalid record lsn. key[%zu] lsn[%zu] last_lsn[%zu]",
374 (size_t)record.primary_key, (size_t)record.lsn,
375 (size_t)old_record.lsn);
376 return ErrorCode_InvalidRecord;
377 }
378 }
379
380 // 3. delete old record
381 ret = this->delete_record(record.primary_key);
382 CHECK_RETURN(ret, 0);
383
384 // 4. insert new record
385 ret = this->insert_record(record);
386
387 return ret;
388}
389
390bool Collection::has_record(uint64_t primary_key) {
391 return id_map_->has(primary_key);
392}
393
394int Collection::search_record(uint64_t primary_key, Record *record) {
395 if (!this->has_record(primary_key)) {
396 return 0;
397 }
398
399 idx_t doc_id = id_map_->get_mapping_id(primary_key);
400 SegmentPtr found_segment = SegmentPtr();
401
402 auto &segment_metas = version_manager_->current_version();
403 // reverse search for newer segment match
404 for (auto it = segment_metas.crbegin(); it != segment_metas.crend(); ++it) {
405 if (doc_id >= it->min_doc_id && doc_id <= it->max_doc_id) {
406 SegmentID segment_id = it->segment_id;
407 found_segment = persist_segment_mgr_->get_segment(segment_id);
408 break;
409 }
410 }
411
412 if (!found_segment && dumping_segment_ != nullptr) {
413 if (doc_id >= dumping_segment_->segment_meta().min_doc_id &&
414 doc_id <= dumping_segment_->segment_meta().max_doc_id) {
415 found_segment = dumping_segment_;
416 }
417 }
418
419 if (!found_segment && writing_segment_ != nullptr) {
420 found_segment = writing_segment_;
421 }
422
423 QueryResult result;
424 int ret = found_segment->kv_search(primary_key, &result);
425 if (ret == 0 && result.primary_key != INVALID_KEY) {
426 record->primary_key = result.primary_key;
427 record->revision = result.revision;
428 record->forward_data = std::move(result.forward_data);
429 record->lsn = result.lsn;
430 }
431
432 return 0;
433}
434
435int Collection::get_latest_lsn(uint64_t *lsn, std::string *lsn_context) {
436 CHECK_STATUS(opened_, true);
437 return lsn_store_->get_latest_lsn(lsn, lsn_context);
438}
439
440int Collection::get_segments(std::vector<SegmentPtr> *segments) {
441 CHECK_STATUS(opened_, true);
442
443 auto &segment_metas = version_manager_->current_version();
444 for (size_t i = 0; i < segment_metas.size(); i++) {
445 SegmentID segment_id = segment_metas[i].segment_id;
446 if (persist_segment_mgr_->has_segment(segment_id)) {
447 segments->emplace_back(persist_segment_mgr_->get_segment(segment_id));
448 } else {
449 // Maybe it's pre-loaded fail, and it will be loaded again.
450 PersistSegmentPtr persist_segment;
451 ReadOptions read_options;
452 read_options.use_mmap = true;
453 read_options.create_new = false;
454 int ret = this->load_persist_segment(segment_metas[i], read_options,
455 &persist_segment);
456 CHECK_RETURN(ret, 0);
457 persist_segment_mgr_->add_segment(persist_segment);
458 segments->emplace_back(persist_segment);
459 }
460 }
461
462 if (writing_segment_ != nullptr) {
463 segments->emplace_back(writing_segment_);
464 }
465
466 if (dumping_segment_ != nullptr &&
467 !persist_segment_mgr_->has_segment(dumping_segment_->segment_id())) {
468 segments->emplace_back(dumping_segment_);
469 }
470
471 return 0;
472}
473
474int Collection::get_stats(CollectionStats *stats) {
475 stats->collection_name = collection_name_;
476 stats->collection_path = dir_path_;
477 stats->delete_doc_count = delete_store_->count();
478
479 // collect stats of persist segment
480 auto &segment_metas = version_manager_->current_version();
481 for (size_t i = 0; i < segment_metas.size(); i++) {
482 stats->total_doc_count += segment_metas[i].doc_count;
483 stats->total_index_file_count += segment_metas[i].index_file_count;
484 stats->total_index_file_size += segment_metas[i].index_file_size;
485 stats->total_segment_count++;
486 stats->segment_stats.emplace_back(segment_metas[i]);
487 }
488
489 // collect stats of memory segment
490 if (dumping_segment_ != nullptr &&
491 !persist_segment_mgr_->has_segment(dumping_segment_->segment_id())) {
492 auto &segment_meta = dumping_segment_->segment_meta();
493 stats->total_doc_count += segment_meta.doc_count;
494 stats->total_index_file_count += segment_meta.index_file_count;
495 stats->total_index_file_size += segment_meta.index_file_size;
496 stats->total_segment_count++;
497 stats->segment_stats.emplace_back(segment_meta);
498 }
499
500 if (writing_segment_ != nullptr) {
501 auto &segment_meta = writing_segment_->segment_meta();
502 stats->total_doc_count += segment_meta.doc_count;
503 stats->total_index_file_count += segment_meta.index_file_count;
504 stats->total_index_file_size += segment_meta.index_file_size;
505 stats->total_segment_count++;
506 stats->segment_stats.emplace_back(segment_meta);
507 }
508
509 stats->total_index_file_count += 4;
510 stats->total_index_file_size += FileHelper::FileSize(id_map_->file_path());
511 stats->total_index_file_size +=
512 FileHelper::FileSize(delete_store_->file_path());
513 stats->total_index_file_size +=
514 FileHelper::FileSize(version_manager_->file_path());
515 stats->total_index_file_size += FileHelper::FileSize(lsn_store_->file_path());
516
517 return 0;
518}
519
520int Collection::update_schema(meta::CollectionMetaPtr new_schema) {
521 CHECK_STATUS(opened_, true);
522
523 std::lock_guard<std::mutex> lock(schema_mutex_);
524 if (is_dumping_) {
525 CLOG_ERROR("Can't update schema while dumping segment.");
526 return ErrorCode_StatusError;
527 }
528
529 uint32_t new_revision = new_schema->revision();
530 uint32_t current_revision = schema_->revision();
531 if (new_revision <= current_revision) {
532 CLOG_ERROR(
533 "New schema revision less than current schema, update failed. "
534 "current_schema[%u] new_schema[%u]",
535 current_revision, new_revision);
536 return ErrorCode_MismatchedSchema;
537 }
538
539 std::vector<meta::ColumnMetaPtr> add_columns;
540 std::vector<meta::ColumnMetaPtr> delete_columns;
541 this->diff_schema(*new_schema, *schema_, &add_columns, &delete_columns);
542
543 int ret = 0;
544 std::vector<SegmentPtr> all_segments;
545 ret = this->get_segments(&all_segments);
546 CHECK_RETURN_WITH_CLOG(ret, 0, "Get segments failed.");
547
548 for (size_t i = 0; i < add_columns.size(); i++) {
549 for (size_t j = 0; j < all_segments.size(); j++) {
550 ret = all_segments[j]->add_column(add_columns[i]);
551 CHECK_RETURN_WITH_CLOG(
552 ret, 0, "Add new column failed. column[%s] segment_id[%zu]",
553 add_columns[i]->name().c_str(),
554 (size_t)all_segments[j]->segment_id());
555 }
556 }
557
558 for (size_t i = 0; i < delete_columns.size(); i++) {
559 for (size_t j = 0; j < all_segments.size(); j++) {
560 ret = all_segments[j]->remove_column(delete_columns[i]->name());
561 CHECK_RETURN_WITH_CLOG(ret, 0,
562 "Remove column failed. column[%s] segment_id[%zu]",
563 delete_columns[i]->name().c_str(),
564 (size_t)all_segments[j]->segment_id());
565 }
566 }
567
568 schema_ = new_schema;
569 CLOG_INFO("Update schema success. current_schema[%u] new_schema[%u]",
570 current_revision, new_revision);
571
572 return 0;
573}
574
575int Collection::drive_dump_segment() {
576 if (is_dumping_.exchange(true)) {
577 return 0;
578 }
579
580 // 1. create a new segment for writing
581 SegmentMeta new_segment_meta;
582 int ret = version_manager_->alloc_segment_meta(&new_segment_meta);
583 if (ret != 0) {
584 CLOG_ERROR("Alloc segment meta failed.");
585 is_dumping_ = false;
586 return ret;
587 }
588 new_segment_meta.min_doc_id =
589 writing_segment_->segment_meta().max_doc_id + DOC_ID_INCREASE_COUNT;
590
591 MemorySegmentPtr new_segment;
592 ReadOptions read_options;
593 read_options.use_mmap = true;
594 read_options.create_new = true;
595 ret = open_memory_segment(new_segment_meta, read_options, &new_segment);
596 if (ret != 0) {
597 is_dumping_ = false;
598 return ret;
599 }
600
601 // 2. swap writing segment -> flushing segment
602 MemorySegmentPtr tmp_segment = writing_segment_;
603 writing_segment_ = new_segment;
604 dumping_segment_ = std::move(tmp_segment);
605
606 // 3. record segment state change
607 writing_segment_->update_state(SegmentState::WRITING);
608 version_manager_->update_segment_meta(writing_segment_->segment_meta());
609
610 dumping_segment_->flush();
611 dumping_segment_->update_state(SegmentState::DUMPING);
612 version_manager_->update_segment_meta(dumping_segment_->segment_meta());
613
614 // 4. dump memory segment
615 thread_pool_->submit(
616 ailego::Closure::New(this, &Collection::do_dump_segment));
617
618 return 0;
619}
620
621int Collection::open_memory_segment(const SegmentMeta &segment_meta,
622 const ReadOptions &read_options,
623 MemorySegmentPtr *new_segment) {
624 int ret = MemorySegment::CreateAndOpen(
625 collection_name_, dir_path_, segment_meta, schema_.get(),
626 delete_store_.get(), id_map_.get(), concurrency_, read_options,
627 new_segment);
628
629 CHECK_RETURN_WITH_CLOG(
630 ret, 0, "Create and open memory segment failed. segment_id[%zu]",
631 (size_t)segment_meta.segment_id);
632
633 return 0;
634}
635
636int Collection::load_persist_segment(const SegmentMeta &segment_meta,
637 const ReadOptions &read_options,
638 PersistSegmentPtr *new_segment) {
639 int ret = PersistSegment::CreateAndLoad(
640 collection_name_, dir_path_, segment_meta, schema_.get(),
641 delete_store_.get(), id_map_.get(), concurrency_, read_options,
642 new_segment);
643
644 CHECK_RETURN_WITH_CLOG(
645 ret, 0, "Create and load persist segment failed. segment_id[%zu]",
646 (size_t)segment_meta.segment_id);
647
648 return 0;
649}
650
651int Collection::do_dump_segment() {
652 SegmentID segment_id = dumping_segment_->segment_id();
653 CLOG_INFO("Start dumping segment. segment_id[%zu]", (size_t)segment_id);
654
655 // dump persist segment with retry
656 int ret = 0;
657 int retry = 0;
658 do {
659 ret = dumping_segment_->dump();
660 if (ret != 0) {
661 CLOG_ERROR("Dumping segment failed. retry[%d] segment_id[%zu]", retry,
662 (size_t)segment_id);
663 }
664 } while (ret != 0 && retry++ < 2);
665
666 if (ret != 0) {
667 CLOG_ERROR("Dumping segment failed. segment_id[%zu]", (size_t)segment_id);
668 is_dumping_ = false;
669 return ret;
670 }
671
672 dumping_segment_->update_state(SegmentState::PERSIST);
673 version_manager_->update_segment_meta(dumping_segment_->segment_meta());
674
675 // record in version manager with retry
676 VersionEdit edit;
677 edit.add_segments.emplace_back(segment_id);
678 retry = 0;
679 do {
680 ret = version_manager_->apply(edit);
681 if (ret != 0) {
682 CLOG_ERROR("Apply new version edit failed. retry[%d]", retry);
683 }
684 } while (ret != 0 && retry++ < 2);
685
686 if (ret != 0) {
687 CLOG_ERROR("Apply new version edit failed.");
688 is_dumping_ = false;
689 return ret;
690 }
691
692 // try to pre load new persist segment into memory
693 PersistSegmentPtr persist_segment;
694 ReadOptions read_options;
695 read_options.use_mmap = true;
696 read_options.create_new = false;
697 ret = this->load_persist_segment(dumping_segment_->segment_meta(),
698 read_options, &persist_segment);
699 if (ret == 0) {
700 persist_segment_mgr_->add_segment(persist_segment);
701 }
702
703 // reduce dumping segment ref
704 // if search thread release all the refs
705 // it will trigger dumping segment auto destruct
706 dumping_segment_.reset();
707
708 // shift lsn store
709 ret = lsn_store_->shift();
710 if (ret != 0) {
711 CLOG_WARN("Shift lsn store failed.");
712 }
713
714 is_dumping_ = false;
715 CLOG_INFO("Ended dumping segment. segment_id[%zu]", (size_t)segment_id);
716 return 0;
717}
718
719int Collection::recover_from_snapshot(const ReadOptions &read_options) {
720 // init version manager
721 int ret = VersionManager::CreateAndOpen(collection_name_, dir_path_,
722 read_options, &version_manager_);
723 CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open version manager failed.");
724
725 // init id map
726 ret =
727 IDMap::CreateAndOpen(collection_name_, dir_path_, read_options, &id_map_);
728 CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open id map failed.");
729
730 // init delete store
731 ret = DeleteStore::CreateAndOpen(collection_name_, dir_path_, read_options,
732 &delete_store_);
733 CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open delete store failed.");
734
735 // init lsn store
736 ret = LsnStore::CreateAndOpen(collection_name_, dir_path_, read_options,
737 &lsn_store_);
738 CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open lsn store failed.");
739
740 // init writing segment
741 std::vector<SegmentMeta> writing_segment_metas;
742 ret = version_manager_->get_segment_metas(SegmentState::WRITING,
743 &writing_segment_metas);
744 CHECK_RETURN_WITH_CLOG(ret, 0, "Get writing segment meta failed.");
745
746 ret = this->open_memory_segment(writing_segment_metas[0], read_options,
747 &writing_segment_);
748 CHECK_RETURN(ret, 0);
749
750 // init dumping segment
751 std::vector<SegmentMeta> dumping_segment_metas;
752 ret = version_manager_->get_segment_metas(SegmentState::DUMPING,
753 &dumping_segment_metas);
754 CHECK_RETURN_WITH_CLOG(ret, 0, "Get dumping segment meta failed.");
755
756 if (dumping_segment_metas.size() > 0) {
757 ret = this->open_memory_segment(dumping_segment_metas[0], read_options,
758 &dumping_segment_);
759 CHECK_RETURN(ret, 0);
760 // continue to drive dumping segment
761 thread_pool_->submit(
762 ailego::Closure::New(this, &Collection::do_dump_segment));
763 }
764
765 // init persist segment manager
766 persist_segment_mgr_ =
767 PersistSegmentManager::Create(collection_name_, dir_path_);
768 if (!persist_segment_mgr_) {
769 CLOG_ERROR("Create persist segment manager failed.");
770 return ErrorCode_RuntimeError;
771 }
772
773 // load persist segment & add into psm
774 auto &segment_metas = version_manager_->current_version();
775 for (size_t i = 0; i < segment_metas.size(); i++) {
776 PersistSegmentPtr persist_segment;
777 ReadOptions load_options;
778 load_options.use_mmap = true;
779 load_options.create_new = false;
780 ret = this->load_persist_segment(segment_metas[i], load_options,
781 &persist_segment);
782 CHECK_RETURN(ret, 0);
783 persist_segment_mgr_->add_segment(persist_segment);
784 }
785
786 return 0;
787}
788
789void Collection::diff_schema(const meta::CollectionMeta &new_schema,
790 const meta::CollectionMeta &current_schema,
791 std::vector<meta::ColumnMetaPtr> *add_columns,
792 std::vector<meta::ColumnMetaPtr> *delete_columns) {
793 auto &new_columns = new_schema.index_columns();
794 auto &current_columns = current_schema.index_columns();
795
796 // search in new schema
797 // if column not found in current schema
798 // just add into new columns
799 for (auto &new_column : new_columns) {
800 auto it = std::find_if(current_columns.begin(), current_columns.end(),
801 [new_column](const meta::ColumnMetaPtr &cur_column) {
802 return new_column->name() == cur_column->name();
803 });
804 if (it == current_columns.end()) {
805 add_columns->emplace_back(new_column);
806 }
807 }
808
809 // search in current schema
810 // if column not found in new schema
811 // just mark it into delete columns
812 for (auto &cur_column : current_columns) {
813 auto it = std::find_if(new_columns.begin(), new_columns.end(),
814 [cur_column](const meta::ColumnMetaPtr &new_column) {
815 return cur_column->name() == new_column->name();
816 });
817
818 if (it == current_columns.end()) {
819 delete_columns->emplace_back(cur_column);
820 }
821 }
822}
823
824
825} // end namespace index
826} // namespace be
827} // end namespace proxima
828