1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of version manager
19 */
20
21#include "version_manager.h"
22#include "common/error_code.h"
23#include "file_helper.h"
24
25namespace proxima {
26namespace be {
27namespace index {
28
29VersionManagerPtr VersionManager::Create(const std::string &collection_name,
30 const std::string &collection_path) {
31 return std::make_shared<VersionManager>(collection_name, collection_path);
32}
33
34int VersionManager::CreateAndOpen(const std::string &collection_name,
35 const std::string &collection_path,
36 const ReadOptions &read_options,
37 VersionManagerPtr *version_manager) {
38 (*version_manager) = Create(collection_name, collection_path);
39
40 return (*version_manager)->open(read_options);
41}
42
43VersionManager::~VersionManager() {
44 if (opened_) {
45 this->close();
46 }
47}
48
49int VersionManager::open(const ReadOptions &read_options) {
50 CHECK_STATUS(opened_, false);
51
52 int ret = Snapshot::CreateAndOpen(collection_path_, FileID::MANIFEST_FILE,
53 read_options, &snapshot_);
54 CHECK_RETURN_WITH_CLOG(ret, 0, "Create and open snapshot failed.");
55
56 ret = version_store_.mount(snapshot_->data());
57 CHECK_RETURN_WITH_CLOG(ret, 0, "Mount snapshot failed.");
58
59 // Load current version, segments
60 if (version_store_.total_version_count() > 0) {
61 VersionSet version_set;
62 ret = version_store_.get_version_set(&version_set);
63 CHECK_RETURN(ret, 0);
64
65 for (uint32_t i = 0; i < version_set.segment_count; i++) {
66 SegmentID segment_id = version_set.segment_ids[i];
67 SegmentMeta segment_meta;
68 ret = version_store_.get_segment_meta(segment_id, &segment_meta);
69 CHECK_RETURN(ret, 0);
70 current_version_.emplace_back(segment_meta);
71 }
72 }
73
74 opened_ = true;
75 CLOG_DEBUG("Opened version manager.");
76 return 0;
77}
78
79int VersionManager::flush() {
80 CHECK_STATUS(opened_, true);
81
82 return snapshot_->flush();
83}
84
85int VersionManager::close() {
86 CHECK_STATUS(opened_, true);
87
88 current_version_.clear();
89 version_store_.unmount();
90
91 int ret = snapshot_->close();
92 if (ret != 0) {
93 CLOG_WARN("Close snapshot failed.");
94 }
95
96 opened_ = false;
97 CLOG_DEBUG("Closed version manager.");
98 return ret;
99}
100
101int VersionManager::apply(const VersionEdit &edit) {
102 CHECK_STATUS(opened_, true);
103
104 std::lock_guard<std::mutex> lock(mutex_);
105 int ret = 0;
106 for (size_t i = 0; i < edit.add_segments.size(); i++) {
107 SegmentID segment_id = edit.add_segments[i];
108 SegmentMeta segment_meta;
109 ret = version_store_.get_segment_meta(segment_id, &segment_meta);
110 CHECK_RETURN(ret, 0);
111
112 current_version_.emplace_back(segment_meta);
113 }
114
115 for (size_t i = 0; i < edit.delete_segments.size(); i++) {
116 for (size_t j = 0; j < current_version_.size(); j++) {
117 if (edit.delete_segments[i] == current_version_[j].segment_id) {
118 current_version_.erase(current_version_.begin() + j);
119 }
120 }
121 }
122
123 VersionSet version_set;
124 version_set.segment_count = current_version_.size();
125 for (size_t i = 0; i < current_version_.size(); i++) {
126 version_set.segment_ids[i] = current_version_[i].segment_id;
127 }
128
129 return version_store_.update_version_set(version_set);
130}
131
132
133} // end namespace index
134} // namespace be
135} // namespace proxima
136