1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of collection class |
19 | */ |
20 | |
21 | #include "collection.h" |
22 | #include <chrono> |
23 | #include <ailego/container/heap.h> |
24 | #include <ailego/utility/time_helper.h> |
25 | #include "common/defer.h" |
26 | #include "common/error_code.h" |
27 | #include "common/logger.h" |
28 | #include "constants.h" |
29 | #include "file_helper.h" |
30 | #include "typedef.h" |
31 | |
32 | namespace proxima { |
33 | namespace be { |
34 | namespace index { |
35 | |
36 | CollectionPtr Collection::Create(const std::string &collection_name, |
37 | const std::string &prefix_path, |
38 | meta::CollectionMetaPtr schema, |
39 | uint32_t concurrency, |
40 | ThreadPool *thread_pool) { |
41 | return std::make_shared<Collection>(collection_name, prefix_path, schema, |
42 | concurrency, thread_pool); |
43 | } |
44 | |
45 | int Collection::CreateAndOpen(const std::string &collection_name, |
46 | const std::string &prefix_path, |
47 | meta::CollectionMetaPtr schema, |
48 | uint32_t concurrency, ThreadPool *thread_pool, |
49 | const ReadOptions &read_options, |
50 | CollectionPtr *collection) { |
51 | *collection = std::make_shared<Collection>(collection_name, prefix_path, |
52 | schema, concurrency, thread_pool); |
53 | |
54 | return (*collection)->open(read_options); |
55 | } |
56 | |
57 | Collection::Collection(const std::string &coll_name, const std::string &path, |
58 | meta::CollectionMetaPtr coll_meta, uint32_t concur, |
59 | ThreadPool *pool) |
60 | : collection_name_(coll_name), |
61 | prefix_path_(path), |
62 | schema_(std::move(coll_meta)), |
63 | concurrency_(concur), |
64 | thread_pool_(pool) {} |
65 | |
66 | Collection::~Collection() { |
67 | if (opened_) { |
68 | close(); |
69 | } |
70 | } |
71 | |
72 | int Collection::open(const ReadOptions &read_options) { |
73 | CHECK_STATUS(opened_, false); |
74 | |
75 | dir_path_ = prefix_path_ + "/" + collection_name_; |
76 | std::string manifest_file_path = |
77 | FileHelper::MakeFilePath(dir_path_, FileID::MANIFEST_FILE); |
78 | |
79 | // check index data |
80 | if (read_options.create_new) { |
81 | if (FileHelper::DirectoryExists(dir_path_)) { |
82 | CLOG_ERROR("Index directory already exist, create failed. dir_path[%s]" , |
83 | dir_path_.c_str()); |
84 | return ErrorCode_DuplicateCollection; |
85 | } |
86 | } else { |
87 | if (!FileHelper::DirectoryExists(dir_path_) || |
88 | !FileHelper::FileExists(manifest_file_path)) { |
89 | CLOG_ERROR( |
90 | "Index directory or manifest not exist, open failed. dir_path[%s]" , |
91 | dir_path_.c_str()); |
92 | return ErrorCode_InvalidIndexDataFormat; |
93 | } |
94 | } |
95 | |
96 | int ret = recover_from_snapshot(read_options); |
97 | if (ret != 0) { |
98 | CLOG_ERROR("Recover from snapshot failed." ); |
99 | |
100 | // if create new collection failed |
101 | // need to cleanup history files |
102 | if (read_options.create_new) { |
103 | this->remove_files(); |
104 | } |
105 | return ret; |
106 | } |
107 | |
108 | opened_ = true; |
109 | |
110 | CollectionStats stats; |
111 | this->get_stats(&stats); |
112 | CLOG_INFO( |
113 | "Open colletion success. doc_count[%zu] segment_count[%zu] " |
114 | "max_docs_per_segment[%zu]" , |
115 | (size_t)stats.total_doc_count, (size_t)stats.total_segment_count, |
116 | (size_t)schema_->max_docs_per_segment()); |
117 | |
118 | return 0; |
119 | } |
120 | |
121 | int Collection::close() { |
122 | CHECK_STATUS(opened_, true); |
123 | |
124 | // Wait until dump ended |
125 | while (is_dumping_) { |
126 | LOG_INFO("Collection is dumping segment, wait until dumped..." ); |
127 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
128 | } |
129 | |
130 | // Wait until flush ended |
131 | while (is_flushing_) { |
132 | LOG_INFO("Collection is flushing, wait until flushed..." ); |
133 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
134 | } |
135 | |
136 | // Wait until optimize ended |
137 | while (is_optimizing_) { |
138 | LOG_INFO("Collection is optimizing, wait until optimized..." ); |
139 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
140 | } |
141 | |
142 | // Close writing segment |
143 | writing_segment_->close(); |
144 | |
145 | // maybe dumping segment process exist error |
146 | // so we still close dumping segment safely |
147 | if (dumping_segment_ != nullptr) { |
148 | dumping_segment_->close(); |
149 | } |
150 | |
151 | persist_segment_mgr_->unload_segments(); |
152 | |
153 | id_map_->close(); |
154 | delete_store_->close(); |
155 | lsn_store_->close(); |
156 | version_manager_->close(); |
157 | |
158 | opened_ = false; |
159 | CLOG_INFO("Close collection success." ); |
160 | |
161 | return 0; |
162 | } |
163 | |
164 | int Collection::close_and_cleanup() { |
165 | CHECK_STATUS(opened_, true); |
166 | |
167 | this->close(); |
168 | this->remove_files(); |
169 | |
170 | return 0; |
171 | } |
172 | |
173 | int Collection::flush() { |
174 | CHECK_STATUS(opened_, true); |
175 | |
176 | CLOG_INFO("Start flushing collection." ); |
177 | ailego::ElapsedTime timer; |
178 | is_flushing_ = true; |
179 | |
180 | Defer defer([this] { is_flushing_ = false; }); |
181 | |
182 | int ret = 0; |
183 | ret = writing_segment_->flush(); |
184 | CHECK_RETURN_WITH_CLOG(ret, 0, "Flush writing segment failed." ); |
185 | |
186 | ret = id_map_->flush(); |
187 | CHECK_RETURN_WITH_CLOG(ret, 0, "Flush id map failed." ); |
188 | |
189 | ret = delete_store_->flush(); |
190 | CHECK_RETURN_WITH_CLOG(ret, 0, "Flush delete store failed." ); |
191 | |
192 | ret = lsn_store_->flush(); |
193 | CHECK_RETURN_WITH_CLOG(ret, 0, "Flush lsn store failed." ); |
194 | |
195 | version_manager_->update_segment_meta(writing_segment_->segment_meta()); |
196 | ret = version_manager_->flush(); |
197 | CHECK_RETURN_WITH_CLOG(ret, 0, "Flush version manager failed." ); |
198 | |
199 | CLOG_INFO("Ended flushing collection. cost[%zums]" , |
200 | (size_t)timer.milli_seconds()); |
201 | return 0; |
202 | } |
203 | |
204 | int Collection::dump() { |
205 | CHECK_STATUS(opened_, true); |
206 | |
207 | return this->drive_dump_segment(); |
208 | } |
209 | |
210 | int Collection::optimize(ThreadPoolPtr pool) { |
211 | CHECK_STATUS(opened_, true); |
212 | |
213 | CLOG_INFO("Start optimizing collection." ); |
214 | ailego::ElapsedTime timer; |
215 | is_optimizing_ = true; |
216 | |
217 | Defer defer([this] { is_optimizing_ = false; }); |
218 | |
219 | int ret = writing_segment_->optimize(pool); |
220 | CHECK_RETURN_WITH_CLOG(ret, 0, "Optimize writing segment failed." ); |
221 | |
222 | CLOG_INFO("Ended optimizing collection. cost[%zums]" , |
223 | (size_t)timer.milli_seconds()); |
224 | return 0; |
225 | } |
226 | |
227 | int Collection::remove_files() { |
228 | return FileHelper::RemoveDirectory(dir_path_); |
229 | } |
230 | |
231 | int Collection::write_records(const CollectionDataset &records) { |
232 | CHECK_STATUS(opened_, true); |
233 | |
234 | ailego::ElapsedTime timer; |
235 | int ret = 0; |
236 | int error_code = 0; |
237 | for (size_t i = 0; i < records.size(); i++) { |
238 | auto &record = records.get(i); |
239 | uint64_t primary_key = record.primary_key; |
240 | uint64_t lsn = record.lsn; |
241 | |
242 | switch (record.operation_type) { |
243 | case OperationTypes::INSERT: |
244 | ret = insert_record(record); |
245 | if (ret != 0) { |
246 | error_code = ret; |
247 | CLOG_ERROR("Insert record failed. key[%zu] lsn[%zu] rt[%zuus]" , |
248 | (size_t)primary_key, (size_t)lsn, |
249 | (size_t)timer.micro_seconds()); |
250 | } else { |
251 | CLOG_INFO("Insert record success. key[%zu] lsn[%zu] rt[%zuus]" , |
252 | (size_t)primary_key, (size_t)lsn, |
253 | (size_t)timer.micro_seconds()); |
254 | } |
255 | break; |
256 | case OperationTypes::UPDATE: |
257 | ret = update_record(record); |
258 | if (ret != 0) { |
259 | error_code = ret; |
260 | CLOG_ERROR("Update record failed. key[%zu] lsn[%zu] rt[%zuus]" , |
261 | (size_t)primary_key, (size_t)lsn, |
262 | (size_t)timer.micro_seconds()); |
263 | } else { |
264 | CLOG_INFO("Update record success. key[%zu] lsn[%zu] rt[%zuus]" , |
265 | (size_t)primary_key, (size_t)lsn, |
266 | (size_t)timer.micro_seconds()); |
267 | } |
268 | break; |
269 | case OperationTypes::DELETE: |
270 | ret = delete_record(primary_key); |
271 | if (ret != 0) { |
272 | error_code = ret; |
273 | CLOG_ERROR("Delete record failed. key[%zu] lsn[%zu] rt[%zuus]" , |
274 | (size_t)primary_key, (size_t)lsn, |
275 | (size_t)timer.micro_seconds()); |
276 | } else { |
277 | CLOG_INFO("Delete record success. key[%zu] lsn[%zu] rt[%zuus]" , |
278 | (size_t)primary_key, (size_t)lsn, |
279 | (size_t)timer.micro_seconds()); |
280 | } |
281 | break; |
282 | default: |
283 | CLOG_ERROR("Unknown operation type. type[%d]" , record.operation_type); |
284 | } |
285 | } |
286 | |
287 | return error_code; |
288 | } |
289 | |
290 | int Collection::insert_record(const Record &record) { |
291 | // 1. check if record already exists |
292 | if (this->has_record(record.primary_key)) { |
293 | CLOG_ERROR("Insert duplicate record. key[%zu]" , (size_t)record.primary_key); |
294 | return ErrorCode_DuplicateKey; |
295 | } |
296 | |
297 | // 2. insert into memory segment |
298 | idx_t doc_id = INVALID_DOC_ID; |
299 | int ret = writing_segment_->insert(record, &doc_id); |
300 | CHECK_RETURN_WITH_CLOG(ret, 0, "Insert into memory segment failed. key[%zu]" , |
301 | (size_t)record.primary_key); |
302 | |
303 | // 3. record key/doc_id mapping in id map |
304 | ret = id_map_->insert(record.primary_key, doc_id); |
305 | CHECK_RETURN_WITH_CLOG(ret, 0, "Insert into id map failed. key[%zu]" , |
306 | (size_t)record.primary_key); |
307 | |
308 | // 4. record in lsn store |
309 | ret = lsn_store_->append(record.lsn, record.lsn_context); |
310 | if (ret != 0) { |
311 | // do not need to terminate insert process |
312 | CLOG_WARN("Lsn store append failed. key[%zu]" , (size_t)record.primary_key); |
313 | } |
314 | |
315 | // try to drive dump writing segment |
316 | uint64_t max_docs_per_segment = schema_->max_docs_per_segment(); |
317 | if (max_docs_per_segment > 0 && |
318 | writing_segment_->doc_count() >= max_docs_per_segment) { |
319 | drive_dump_segment(); |
320 | } |
321 | |
322 | return 0; |
323 | } |
324 | |
325 | int Collection::delete_record(uint64_t primary_key) { |
326 | // 1. check if record exist |
327 | if (!this->has_record(primary_key)) { |
328 | CLOG_ERROR("Record not exist in colletion. key[%zu]" , (size_t)primary_key); |
329 | return ErrorCode_InexistentKey; |
330 | } |
331 | |
332 | // 2. get key/doc_id mapping |
333 | idx_t doc_id = id_map_->get_mapping_id(primary_key); |
334 | if (doc_id == INVALID_DOC_ID) { |
335 | CLOG_ERROR("Get mapping doc-id failed. key[%zu]" , (size_t)primary_key); |
336 | return ErrorCode_RuntimeError; |
337 | } |
338 | |
339 | // 3. insert into delete map |
340 | int ret = delete_store_->insert(doc_id); |
341 | CHECK_RETURN_WITH_CLOG(ret, 0, "Insert into delete map failed." ); |
342 | |
343 | // 4. remove mapping in id_map |
344 | id_map_->remove(primary_key); |
345 | |
346 | // 5. try to inplace remove in writing segment |
347 | if (writing_segment_->is_in_range(doc_id)) { |
348 | ret = writing_segment_->remove(doc_id); |
349 | CHECK_RETURN_WITH_CLOG(ret, 0, "Remove from writing segment failed." ); |
350 | } |
351 | return 0; |
352 | } |
353 | |
354 | int Collection::update_record(const Record &record) { |
355 | // 1. check if record exist |
356 | if (!this->has_record(record.primary_key)) { |
357 | CLOG_ERROR("Record not exist in collection. key[%zu]" , |
358 | (size_t)record.primary_key); |
359 | return ErrorCode_InexistentKey; |
360 | } |
361 | |
362 | // 2. check record lsn |
363 | int ret = 0; |
364 | if (record.lsn_check) { |
365 | Record old_record; |
366 | ret = this->search_record(record.primary_key, &old_record); |
367 | if (ret != 0 || old_record.primary_key == INVALID_KEY) { |
368 | CLOG_ERROR("Search record failed. key[%zu]" , (size_t)record.primary_key); |
369 | return ret; |
370 | } |
371 | |
372 | if (record.lsn <= old_record.lsn) { |
373 | CLOG_ERROR("Invalid record lsn. key[%zu] lsn[%zu] last_lsn[%zu]" , |
374 | (size_t)record.primary_key, (size_t)record.lsn, |
375 | (size_t)old_record.lsn); |
376 | return ErrorCode_InvalidRecord; |
377 | } |
378 | } |
379 | |
380 | // 3. delete old record |
381 | ret = this->delete_record(record.primary_key); |
382 | CHECK_RETURN(ret, 0); |
383 | |
384 | // 4. insert new record |
385 | ret = this->insert_record(record); |
386 | |
387 | return ret; |
388 | } |
389 | |
390 | bool Collection::has_record(uint64_t primary_key) { |
391 | return id_map_->has(primary_key); |
392 | } |
393 | |
394 | int Collection::search_record(uint64_t primary_key, Record *record) { |
395 | if (!this->has_record(primary_key)) { |
396 | return 0; |
397 | } |
398 | |
399 | idx_t doc_id = id_map_->get_mapping_id(primary_key); |
400 | SegmentPtr found_segment = SegmentPtr(); |
401 | |
402 | auto &segment_metas = version_manager_->current_version(); |
403 | // reverse search for newer segment match |
404 | for (auto it = segment_metas.crbegin(); it != segment_metas.crend(); ++it) { |
405 | if (doc_id >= it->min_doc_id && doc_id <= it->max_doc_id) { |
406 | SegmentID segment_id = it->segment_id; |
407 | found_segment = persist_segment_mgr_->get_segment(segment_id); |
408 | break; |
409 | } |
410 | } |
411 | |
412 | if (!found_segment && dumping_segment_ != nullptr) { |
413 | if (doc_id >= dumping_segment_->segment_meta().min_doc_id && |
414 | doc_id <= dumping_segment_->segment_meta().max_doc_id) { |
415 | found_segment = dumping_segment_; |
416 | } |
417 | } |
418 | |
419 | if (!found_segment && writing_segment_ != nullptr) { |
420 | found_segment = writing_segment_; |
421 | } |
422 | |
423 | QueryResult result; |
424 | int ret = found_segment->kv_search(primary_key, &result); |
425 | if (ret == 0 && result.primary_key != INVALID_KEY) { |
426 | record->primary_key = result.primary_key; |
427 | record->revision = result.revision; |
428 | record->forward_data = std::move(result.forward_data); |
429 | record->lsn = result.lsn; |
430 | } |
431 | |
432 | return 0; |
433 | } |
434 | |
435 | int Collection::get_latest_lsn(uint64_t *lsn, std::string *lsn_context) { |
436 | CHECK_STATUS(opened_, true); |
437 | return lsn_store_->get_latest_lsn(lsn, lsn_context); |
438 | } |
439 | |
440 | int Collection::get_segments(std::vector<SegmentPtr> *segments) { |
441 | CHECK_STATUS(opened_, true); |
442 | |
443 | auto &segment_metas = version_manager_->current_version(); |
444 | for (size_t i = 0; i < segment_metas.size(); i++) { |
445 | SegmentID segment_id = segment_metas[i].segment_id; |
446 | if (persist_segment_mgr_->has_segment(segment_id)) { |
447 | segments->emplace_back(persist_segment_mgr_->get_segment(segment_id)); |
448 | } else { |
449 | // Maybe it's pre-loaded fail, and it will be loaded again. |
450 | PersistSegmentPtr persist_segment; |
451 | ReadOptions read_options; |
452 | read_options.use_mmap = true; |
453 | read_options.create_new = false; |
454 | int ret = this->load_persist_segment(segment_metas[i], read_options, |
455 | &persist_segment); |
456 | CHECK_RETURN(ret, 0); |
457 | persist_segment_mgr_->add_segment(persist_segment); |
458 | segments->emplace_back(persist_segment); |
459 | } |
460 | } |
461 | |
462 | if (writing_segment_ != nullptr) { |
463 | segments->emplace_back(writing_segment_); |
464 | } |
465 | |
466 | if (dumping_segment_ != nullptr && |
467 | !persist_segment_mgr_->has_segment(dumping_segment_->segment_id())) { |
468 | segments->emplace_back(dumping_segment_); |
469 | } |
470 | |
471 | return 0; |
472 | } |
473 | |
474 | int Collection::get_stats(CollectionStats *stats) { |
475 | stats->collection_name = collection_name_; |
476 | stats->collection_path = dir_path_; |
477 | stats->delete_doc_count = delete_store_->count(); |
478 | |
479 | // collect stats of persist segment |
480 | auto &segment_metas = version_manager_->current_version(); |
481 | for (size_t i = 0; i < segment_metas.size(); i++) { |
482 | stats->total_doc_count += segment_metas[i].doc_count; |
483 | stats->total_index_file_count += segment_metas[i].index_file_count; |
484 | stats->total_index_file_size += segment_metas[i].index_file_size; |
485 | stats->total_segment_count++; |
486 | stats->segment_stats.emplace_back(segment_metas[i]); |
487 | } |
488 | |
489 | // collect stats of memory segment |
490 | if (dumping_segment_ != nullptr && |
491 | !persist_segment_mgr_->has_segment(dumping_segment_->segment_id())) { |
492 | auto &segment_meta = dumping_segment_->segment_meta(); |
493 | stats->total_doc_count += segment_meta.doc_count; |
494 | stats->total_index_file_count += segment_meta.index_file_count; |
495 | stats->total_index_file_size += segment_meta.index_file_size; |
496 | stats->total_segment_count++; |
497 | stats->segment_stats.emplace_back(segment_meta); |
498 | } |
499 | |
500 | if (writing_segment_ != nullptr) { |
501 | auto &segment_meta = writing_segment_->segment_meta(); |
502 | stats->total_doc_count += segment_meta.doc_count; |
503 | stats->total_index_file_count += segment_meta.index_file_count; |
504 | stats->total_index_file_size += segment_meta.index_file_size; |
505 | stats->total_segment_count++; |
506 | stats->segment_stats.emplace_back(segment_meta); |
507 | } |
508 | |
509 | stats->total_index_file_count += 4; |
510 | stats->total_index_file_size += FileHelper::FileSize(id_map_->file_path()); |
511 | stats->total_index_file_size += |
512 | FileHelper::FileSize(delete_store_->file_path()); |
513 | stats->total_index_file_size += |
514 | FileHelper::FileSize(version_manager_->file_path()); |
515 | stats->total_index_file_size += FileHelper::FileSize(lsn_store_->file_path()); |
516 | |
517 | return 0; |
518 | } |
519 | |
520 | int Collection::update_schema(meta::CollectionMetaPtr new_schema) { |
521 | CHECK_STATUS(opened_, true); |
522 | |
523 | std::lock_guard<std::mutex> lock(schema_mutex_); |
524 | if (is_dumping_) { |
525 | CLOG_ERROR("Can't update schema while dumping segment." ); |
526 | return ErrorCode_StatusError; |
527 | } |
528 | |
529 | uint32_t new_revision = new_schema->revision(); |
530 | uint32_t current_revision = schema_->revision(); |
531 | if (new_revision <= current_revision) { |
532 | CLOG_ERROR( |
533 | "New schema revision less than current schema, update failed. " |
534 | "current_schema[%u] new_schema[%u]" , |
535 | current_revision, new_revision); |
536 | return ErrorCode_MismatchedSchema; |
537 | } |
538 | |
539 | std::vector<meta::ColumnMetaPtr> add_columns; |
540 | std::vector<meta::ColumnMetaPtr> delete_columns; |
541 | this->diff_schema(*new_schema, *schema_, &add_columns, &delete_columns); |
542 | |
543 | int ret = 0; |
544 | std::vector<SegmentPtr> all_segments; |
545 | ret = this->get_segments(&all_segments); |
546 | CHECK_RETURN_WITH_CLOG(ret, 0, "Get segments failed." ); |
547 | |
548 | for (size_t i = 0; i < add_columns.size(); i++) { |
549 | for (size_t j = 0; j < all_segments.size(); j++) { |
550 | ret = all_segments[j]->add_column(add_columns[i]); |
551 | CHECK_RETURN_WITH_CLOG( |
552 | ret, 0, "Add new column failed. column[%s] segment_id[%zu]" , |
553 | add_columns[i]->name().c_str(), |
554 | (size_t)all_segments[j]->segment_id()); |
555 | } |
556 | } |
557 | |
558 | for (size_t i = 0; i < delete_columns.size(); i++) { |
559 | for (size_t j = 0; j < all_segments.size(); j++) { |
560 | ret = all_segments[j]->remove_column(delete_columns[i]->name()); |
561 | CHECK_RETURN_WITH_CLOG(ret, 0, |
562 | "Remove column failed. column[%s] segment_id[%zu]" , |
563 | delete_columns[i]->name().c_str(), |
564 | (size_t)all_segments[j]->segment_id()); |
565 | } |
566 | } |
567 | |
568 | schema_ = new_schema; |
569 | CLOG_INFO("Update schema success. current_schema[%u] new_schema[%u]" , |
570 | current_revision, new_revision); |
571 | |
572 | return 0; |
573 | } |
574 | |
575 | int Collection::drive_dump_segment() { |
576 | if (is_dumping_.exchange(true)) { |
577 | return 0; |
578 | } |
579 | |
580 | // 1. create a new segment for writing |
581 | SegmentMeta new_segment_meta; |
582 | int ret = version_manager_->alloc_segment_meta(&new_segment_meta); |
583 | if (ret != 0) { |
584 | CLOG_ERROR("Alloc segment meta failed." ); |
585 | is_dumping_ = false; |
586 | return ret; |
587 | } |
588 | new_segment_meta.min_doc_id = |
589 | writing_segment_->segment_meta().max_doc_id + DOC_ID_INCREASE_COUNT; |
590 | |
591 | MemorySegmentPtr new_segment; |
592 | ReadOptions read_options; |
593 | read_options.use_mmap = true; |
594 | read_options.create_new = true; |
595 | ret = open_memory_segment(new_segment_meta, read_options, &new_segment); |
596 | if (ret != 0) { |
597 | is_dumping_ = false; |
598 | return ret; |
599 | } |
600 | |
601 | // 2. swap writing segment -> flushing segment |
602 | MemorySegmentPtr tmp_segment = writing_segment_; |
603 | writing_segment_ = new_segment; |
604 | dumping_segment_ = std::move(tmp_segment); |
605 | |
606 | // 3. record segment state change |
607 | writing_segment_->update_state(SegmentState::WRITING); |
608 | version_manager_->update_segment_meta(writing_segment_->segment_meta()); |
609 | |
610 | dumping_segment_->flush(); |
611 | dumping_segment_->update_state(SegmentState::DUMPING); |
612 | version_manager_->update_segment_meta(dumping_segment_->segment_meta()); |
613 | |
614 | // 4. dump memory segment |
615 | thread_pool_->submit( |
616 | ailego::Closure::New(this, &Collection::do_dump_segment)); |
617 | |
618 | return 0; |
619 | } |
620 | |
621 | int Collection::open_memory_segment(const SegmentMeta &segment_meta, |
622 | const ReadOptions &read_options, |
623 | MemorySegmentPtr *new_segment) { |
624 | int ret = MemorySegment::CreateAndOpen( |
625 | collection_name_, dir_path_, segment_meta, schema_.get(), |
626 | delete_store_.get(), id_map_.get(), concurrency_, read_options, |
627 | new_segment); |
628 | |
629 | CHECK_RETURN_WITH_CLOG( |
630 | ret, 0, "Create and open memory segment failed. segment_id[%zu]" , |
631 | (size_t)segment_meta.segment_id); |
632 | |
633 | return 0; |
634 | } |
635 | |
636 | int Collection::load_persist_segment(const SegmentMeta &segment_meta, |
637 | const ReadOptions &read_options, |
638 | PersistSegmentPtr *new_segment) { |
639 | int ret = PersistSegment::CreateAndLoad( |
640 | collection_name_, dir_path_, segment_meta, schema_.get(), |
641 | delete_store_.get(), id_map_.get(), concurrency_, read_options, |
642 | new_segment); |
643 | |
644 | CHECK_RETURN_WITH_CLOG( |
645 | ret, 0, "Create and load persist segment failed. segment_id[%zu]" , |
646 | (size_t)segment_meta.segment_id); |
647 | |
648 | return 0; |
649 | } |
650 | |
651 | int Collection::do_dump_segment() { |
652 | SegmentID segment_id = dumping_segment_->segment_id(); |
653 | CLOG_INFO("Start dumping segment. segment_id[%zu]" , (size_t)segment_id); |
654 | |
655 | // dump persist segment with retry |
656 | int ret = 0; |
657 | int retry = 0; |
658 | do { |
659 | ret = dumping_segment_->dump(); |
660 | if (ret != 0) { |
661 | CLOG_ERROR("Dumping segment failed. retry[%d] segment_id[%zu]" , retry, |
662 | (size_t)segment_id); |
663 | } |
664 | } while (ret != 0 && retry++ < 2); |
665 | |
666 | if (ret != 0) { |
667 | CLOG_ERROR("Dumping segment failed. segment_id[%zu]" , (size_t)segment_id); |
668 | is_dumping_ = false; |
669 | return ret; |
670 | } |
671 | |
672 | dumping_segment_->update_state(SegmentState::PERSIST); |
673 | version_manager_->update_segment_meta(dumping_segment_->segment_meta()); |
674 | |
675 | // record in version manager with retry |
676 | VersionEdit edit; |
677 | edit.add_segments.emplace_back(segment_id); |
678 | retry = 0; |
679 | do { |
680 | ret = version_manager_->apply(edit); |
681 | if (ret != 0) { |
682 | CLOG_ERROR("Apply new version edit failed. retry[%d]" , retry); |
683 | } |
684 | } while (ret != 0 && retry++ < 2); |
685 | |
686 | if (ret != 0) { |
687 | CLOG_ERROR("Apply new version edit failed." ); |
688 | is_dumping_ = false; |
689 | return ret; |
690 | } |
691 | |
692 | // try to pre load new persist segment into memory |
693 | PersistSegmentPtr persist_segment; |
694 | ReadOptions read_options; |
695 | read_options.use_mmap = true; |
696 | read_options.create_new = false; |
697 | ret = this->load_persist_segment(dumping_segment_->segment_meta(), |
698 | read_options, &persist_segment); |
699 | if (ret == 0) { |
700 | persist_segment_mgr_->add_segment(persist_segment); |
701 | } |
702 | |
703 | // reduce dumping segment ref |
704 | // if search thread release all the refs |
705 | // it will trigger dumping segment auto destruct |
706 | dumping_segment_.reset(); |
707 | |
708 | // shift lsn store |
709 | ret = lsn_store_->shift(); |
710 | if (ret != 0) { |
711 | CLOG_WARN("Shift lsn store failed." ); |
712 | } |
713 | |
714 | is_dumping_ = false; |
715 | CLOG_INFO("Ended dumping segment. segment_id[%zu]" , (size_t)segment_id); |
716 | return 0; |
717 | } |
718 | |
719 | int Collection::recover_from_snapshot(const ReadOptions &read_options) { |
720 | // init version manager |
721 | int ret = VersionManager::CreateAndOpen(collection_name_, dir_path_, |
722 | read_options, &version_manager_); |
723 | CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open version manager failed." ); |
724 | |
725 | // init id map |
726 | ret = |
727 | IDMap::CreateAndOpen(collection_name_, dir_path_, read_options, &id_map_); |
728 | CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open id map failed." ); |
729 | |
730 | // init delete store |
731 | ret = DeleteStore::CreateAndOpen(collection_name_, dir_path_, read_options, |
732 | &delete_store_); |
733 | CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open delete store failed." ); |
734 | |
735 | // init lsn store |
736 | ret = LsnStore::CreateAndOpen(collection_name_, dir_path_, read_options, |
737 | &lsn_store_); |
738 | CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open lsn store failed." ); |
739 | |
740 | // init writing segment |
741 | std::vector<SegmentMeta> writing_segment_metas; |
742 | ret = version_manager_->get_segment_metas(SegmentState::WRITING, |
743 | &writing_segment_metas); |
744 | CHECK_RETURN_WITH_CLOG(ret, 0, "Get writing segment meta failed." ); |
745 | |
746 | ret = this->open_memory_segment(writing_segment_metas[0], read_options, |
747 | &writing_segment_); |
748 | CHECK_RETURN(ret, 0); |
749 | |
750 | // init dumping segment |
751 | std::vector<SegmentMeta> dumping_segment_metas; |
752 | ret = version_manager_->get_segment_metas(SegmentState::DUMPING, |
753 | &dumping_segment_metas); |
754 | CHECK_RETURN_WITH_CLOG(ret, 0, "Get dumping segment meta failed." ); |
755 | |
756 | if (dumping_segment_metas.size() > 0) { |
757 | ret = this->open_memory_segment(dumping_segment_metas[0], read_options, |
758 | &dumping_segment_); |
759 | CHECK_RETURN(ret, 0); |
760 | // continue to drive dumping segment |
761 | thread_pool_->submit( |
762 | ailego::Closure::New(this, &Collection::do_dump_segment)); |
763 | } |
764 | |
765 | // init persist segment manager |
766 | persist_segment_mgr_ = |
767 | PersistSegmentManager::Create(collection_name_, dir_path_); |
768 | if (!persist_segment_mgr_) { |
769 | CLOG_ERROR("Create persist segment manager failed." ); |
770 | return ErrorCode_RuntimeError; |
771 | } |
772 | |
773 | // load persist segment & add into psm |
774 | auto &segment_metas = version_manager_->current_version(); |
775 | for (size_t i = 0; i < segment_metas.size(); i++) { |
776 | PersistSegmentPtr persist_segment; |
777 | ReadOptions load_options; |
778 | load_options.use_mmap = true; |
779 | load_options.create_new = false; |
780 | ret = this->load_persist_segment(segment_metas[i], load_options, |
781 | &persist_segment); |
782 | CHECK_RETURN(ret, 0); |
783 | persist_segment_mgr_->add_segment(persist_segment); |
784 | } |
785 | |
786 | return 0; |
787 | } |
788 | |
789 | void Collection::diff_schema(const meta::CollectionMeta &new_schema, |
790 | const meta::CollectionMeta ¤t_schema, |
791 | std::vector<meta::ColumnMetaPtr> *add_columns, |
792 | std::vector<meta::ColumnMetaPtr> *delete_columns) { |
793 | auto &new_columns = new_schema.index_columns(); |
794 | auto ¤t_columns = current_schema.index_columns(); |
795 | |
796 | // search in new schema |
797 | // if column not found in current schema |
798 | // just add into new columns |
799 | for (auto &new_column : new_columns) { |
800 | auto it = std::find_if(current_columns.begin(), current_columns.end(), |
801 | [new_column](const meta::ColumnMetaPtr &cur_column) { |
802 | return new_column->name() == cur_column->name(); |
803 | }); |
804 | if (it == current_columns.end()) { |
805 | add_columns->emplace_back(new_column); |
806 | } |
807 | } |
808 | |
809 | // search in current schema |
810 | // if column not found in new schema |
811 | // just mark it into delete columns |
812 | for (auto &cur_column : current_columns) { |
813 | auto it = std::find_if(new_columns.begin(), new_columns.end(), |
814 | [cur_column](const meta::ColumnMetaPtr &new_column) { |
815 | return cur_column->name() == new_column->name(); |
816 | }); |
817 | |
818 | if (it == current_columns.end()) { |
819 | delete_columns->emplace_back(cur_column); |
820 | } |
821 | } |
822 | } |
823 | |
824 | |
825 | } // end namespace index |
826 | } // namespace be |
827 | } // end namespace proxima |
828 | |