1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of version store |
19 | */ |
20 | |
21 | #include "version_store.h" |
22 | #include "common/error_code.h" |
23 | #include "constants.h" |
24 | |
25 | namespace proxima { |
26 | namespace be { |
27 | namespace index { |
28 | |
29 | int VersionStore::mount(const IndexStoragePtr &stg) { |
30 | if (!stg) { |
31 | LOG_ERROR("Mount null storage" ); |
32 | return ErrorCode_RuntimeError; |
33 | } |
34 | |
35 | storage_ = stg; |
36 | IndexBlockPtr summary_block = storage_->get(SUMMARY_BLOCK); |
37 | int ret = 0; |
38 | |
39 | if (!summary_block) { |
40 | ret = init_storage(); |
41 | } else { |
42 | ret = load_storage(); |
43 | } |
44 | |
45 | return ret; |
46 | } |
47 | |
48 | void VersionStore::unmount() { |
49 | summary_block_.reset(); |
50 | version_block_.reset(); |
51 | segment_block_.reset(); |
52 | memset(&header_, 0, sizeof(header_)); |
53 | } |
54 | |
55 | int VersionStore::alloc_segment_meta(SegmentMeta *segment_meta) { |
56 | std::lock_guard<std::mutex> lock(mutex_); |
57 | |
58 | if (header_.total_segment_count >= kMaxSegmentCount) { |
59 | LOG_ERROR("Exceed max segment meta count limit. total_segment_count[%zu]" , |
60 | (size_t)header_.total_segment_count); |
61 | return ErrorCode_ExceedLimit; |
62 | } |
63 | |
64 | // check if exists unused segment meta |
65 | // just return it to reuse |
66 | if (header_.total_segment_count > 0) { |
67 | SegmentID last_segment_id = header_.total_segment_count - 1; |
68 | SegmentMeta last_segment_meta; |
69 | int ret = this->get_segment_meta_impl(last_segment_id, &last_segment_meta); |
70 | CHECK_RETURN(ret, 0); |
71 | if (last_segment_meta.state == SegmentState::CREATED) { |
72 | *segment_meta = last_segment_meta; |
73 | return 0; |
74 | } |
75 | } |
76 | |
77 | SegmentMeta new_segment_meta; |
78 | new_segment_meta.segment_id = header_.total_segment_count; |
79 | |
80 | // update segment meta |
81 | int ret = this->update_segment_meta_impl(new_segment_meta.segment_id, |
82 | new_segment_meta); |
83 | CHECK_RETURN_WITH_LOG(ret, 0, "Write segment meta failed." ); |
84 | |
85 | // update version header |
86 | header_.total_segment_count++; |
87 | ret = this->update_header_impl(header_); |
88 | CHECK_RETURN_WITH_LOG(ret, 0, "Write header failed." ); |
89 | |
90 | // copy out |
91 | *segment_meta = new_segment_meta; |
92 | return 0; |
93 | } |
94 | |
95 | int VersionStore::get_segment_meta(SegmentID segment_id, |
96 | SegmentMeta *segment_meta) const { |
97 | if (segment_id >= header_.total_segment_count) { |
98 | LOG_ERROR("Illegal segment id. segment_id[%zu]" , (size_t)segment_id); |
99 | return ErrorCode_ExceedLimit; |
100 | } |
101 | |
102 | return this->get_segment_meta_impl(segment_id, segment_meta); |
103 | } |
104 | |
105 | int VersionStore::update_segment_meta(const SegmentMeta &segment_meta) { |
106 | std::lock_guard<std::mutex> lock(mutex_); |
107 | |
108 | SegmentID segment_id = segment_meta.segment_id; |
109 | if (segment_id >= header_.total_segment_count) { |
110 | LOG_ERROR("Illegal segment id. segment_id[%zu]" , (size_t)segment_id); |
111 | return ErrorCode_ExceedLimit; |
112 | } |
113 | |
114 | return this->update_segment_meta_impl(segment_id, segment_meta); |
115 | } |
116 | |
117 | int VersionStore::get_version_set(VersionSet *version_set) const { |
118 | size_t offset = header_.current_version_offset; |
119 | size_t read_len = |
120 | version_block_->fetch(offset, (void *)version_set, sizeof(VersionSet)); |
121 | if (read_len != sizeof(VersionSet)) { |
122 | LOG_ERROR("Read version block failed." ); |
123 | return ErrorCode_WriteData; |
124 | } |
125 | |
126 | return 0; |
127 | } |
128 | |
129 | int VersionStore::update_version_set(const VersionSet &version_set) { |
130 | std::lock_guard<std::mutex> lock(mutex_); |
131 | |
132 | size_t offset = sizeof(VersionHeader); |
133 | size_t write_len = 0U; |
134 | |
135 | // write version set |
136 | write_len = version_block_->write(offset, &version_set, sizeof(VersionSet)); |
137 | if (write_len != sizeof(VersionSet)) { |
138 | LOG_ERROR("Write version block failed. " ); |
139 | return ErrorCode_WriteData; |
140 | } |
141 | |
142 | // update version header |
143 | header_.total_version_count++; |
144 | header_.current_version_offset = offset; |
145 | |
146 | return this->update_header_impl(header_); |
147 | } |
148 | |
149 | int VersionStore::get_collection_summary(CollectionSummary *summary) const { |
150 | size_t read_len = |
151 | summary_block_->fetch(0U, (void *)summary, sizeof(CollectionSummary)); |
152 | if (read_len != sizeof(CollectionSummary)) { |
153 | LOG_ERROR("Read summary block failed." ); |
154 | return ErrorCode_ReadData; |
155 | } |
156 | return 0; |
157 | } |
158 | |
159 | int VersionStore::update_collection_summary(const CollectionSummary &summary) { |
160 | std::lock_guard<std::mutex> lock(mutex_); |
161 | |
162 | size_t write_len = |
163 | summary_block_->write(0U, &summary, sizeof(CollectionSummary)); |
164 | if (write_len != sizeof(CollectionSummary)) { |
165 | LOG_ERROR("Write summary block failed." ); |
166 | return ErrorCode_WriteData; |
167 | } |
168 | return 0; |
169 | } |
170 | |
171 | int VersionStore::init_storage() { |
172 | int ret = 0; |
173 | size_t alloc_len = 0U; |
174 | |
175 | alloc_len = sizeof(CollectionSummary); |
176 | ret = storage_->append(SUMMARY_BLOCK, alloc_len); |
177 | CHECK_RETURN_WITH_LOG(ret, 0, "Append summary block failed." ); |
178 | summary_block_ = storage_->get(SUMMARY_BLOCK); |
179 | |
180 | alloc_len = sizeof(VersionHeader) + sizeof(VersionSet); |
181 | ret = storage_->append(VERSION_BLOCK, alloc_len); |
182 | CHECK_RETURN_WITH_LOG(ret, 0, "Append version block failed." ); |
183 | version_block_ = storage_->get(VERSION_BLOCK); |
184 | |
185 | alloc_len = sizeof(SegmentMeta) * kMaxSegmentCount; |
186 | ret = storage_->append(SEGMENT_BLOCK, alloc_len); |
187 | CHECK_RETURN_WITH_LOG(ret, 0, "Append segment block failed." ); |
188 | segment_block_ = storage_->get(SEGMENT_BLOCK); |
189 | |
190 | // init summary block |
191 | CollectionSummary summary; |
192 | ret = update_collection_summary(summary); |
193 | CHECK_RETURN(ret, 0); |
194 | |
195 | // init writing segment |
196 | SegmentMeta segment_meta; |
197 | ret = this->alloc_segment_meta(&segment_meta); |
198 | CHECK_RETURN(ret, 0); |
199 | |
200 | segment_meta.state = SegmentState::WRITING; |
201 | segment_meta.min_doc_id = 0U; |
202 | ret = this->update_segment_meta(segment_meta); |
203 | CHECK_RETURN(ret, 0); |
204 | |
205 | return 0; |
206 | } |
207 | |
208 | int VersionStore::load_storage() { |
209 | summary_block_ = storage_->get(SUMMARY_BLOCK); |
210 | if (!summary_block_) { |
211 | LOG_ERROR("Get summary block failed." ); |
212 | return ErrorCode_InvalidIndexDataFormat; |
213 | } |
214 | |
215 | version_block_ = storage_->get(VERSION_BLOCK); |
216 | if (!version_block_) { |
217 | LOG_ERROR("Get version block failed." ); |
218 | return ErrorCode_InvalidIndexDataFormat; |
219 | } |
220 | |
221 | segment_block_ = storage_->get(SEGMENT_BLOCK); |
222 | if (!segment_block_) { |
223 | LOG_ERROR("Get segment block failed." ); |
224 | return ErrorCode_InvalidIndexDataFormat; |
225 | } |
226 | |
227 | size_t read_len = |
228 | version_block_->fetch(0, (void *)&header_, sizeof(VersionHeader)); |
229 | if (read_len != sizeof(VersionHeader)) { |
230 | LOG_ERROR("Read header block failed." ); |
231 | return ErrorCode_ReadData; |
232 | } |
233 | |
234 | return 0; |
235 | } |
236 | |
237 | |
238 | } // end namespace index |
239 | } // namespace be |
240 | } // end namespace proxima |
241 | |