1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of version store
19 */
20
21#include "version_store.h"
22#include "common/error_code.h"
23#include "constants.h"
24
25namespace proxima {
26namespace be {
27namespace index {
28
29int VersionStore::mount(const IndexStoragePtr &stg) {
30 if (!stg) {
31 LOG_ERROR("Mount null storage");
32 return ErrorCode_RuntimeError;
33 }
34
35 storage_ = stg;
36 IndexBlockPtr summary_block = storage_->get(SUMMARY_BLOCK);
37 int ret = 0;
38
39 if (!summary_block) {
40 ret = init_storage();
41 } else {
42 ret = load_storage();
43 }
44
45 return ret;
46}
47
48void VersionStore::unmount() {
49 summary_block_.reset();
50 version_block_.reset();
51 segment_block_.reset();
52 memset(&header_, 0, sizeof(header_));
53}
54
55int VersionStore::alloc_segment_meta(SegmentMeta *segment_meta) {
56 std::lock_guard<std::mutex> lock(mutex_);
57
58 if (header_.total_segment_count >= kMaxSegmentCount) {
59 LOG_ERROR("Exceed max segment meta count limit. total_segment_count[%zu]",
60 (size_t)header_.total_segment_count);
61 return ErrorCode_ExceedLimit;
62 }
63
64 // check if exists unused segment meta
65 // just return it to reuse
66 if (header_.total_segment_count > 0) {
67 SegmentID last_segment_id = header_.total_segment_count - 1;
68 SegmentMeta last_segment_meta;
69 int ret = this->get_segment_meta_impl(last_segment_id, &last_segment_meta);
70 CHECK_RETURN(ret, 0);
71 if (last_segment_meta.state == SegmentState::CREATED) {
72 *segment_meta = last_segment_meta;
73 return 0;
74 }
75 }
76
77 SegmentMeta new_segment_meta;
78 new_segment_meta.segment_id = header_.total_segment_count;
79
80 // update segment meta
81 int ret = this->update_segment_meta_impl(new_segment_meta.segment_id,
82 new_segment_meta);
83 CHECK_RETURN_WITH_LOG(ret, 0, "Write segment meta failed.");
84
85 // update version header
86 header_.total_segment_count++;
87 ret = this->update_header_impl(header_);
88 CHECK_RETURN_WITH_LOG(ret, 0, "Write header failed.");
89
90 // copy out
91 *segment_meta = new_segment_meta;
92 return 0;
93}
94
95int VersionStore::get_segment_meta(SegmentID segment_id,
96 SegmentMeta *segment_meta) const {
97 if (segment_id >= header_.total_segment_count) {
98 LOG_ERROR("Illegal segment id. segment_id[%zu]", (size_t)segment_id);
99 return ErrorCode_ExceedLimit;
100 }
101
102 return this->get_segment_meta_impl(segment_id, segment_meta);
103}
104
105int VersionStore::update_segment_meta(const SegmentMeta &segment_meta) {
106 std::lock_guard<std::mutex> lock(mutex_);
107
108 SegmentID segment_id = segment_meta.segment_id;
109 if (segment_id >= header_.total_segment_count) {
110 LOG_ERROR("Illegal segment id. segment_id[%zu]", (size_t)segment_id);
111 return ErrorCode_ExceedLimit;
112 }
113
114 return this->update_segment_meta_impl(segment_id, segment_meta);
115}
116
117int VersionStore::get_version_set(VersionSet *version_set) const {
118 size_t offset = header_.current_version_offset;
119 size_t read_len =
120 version_block_->fetch(offset, (void *)version_set, sizeof(VersionSet));
121 if (read_len != sizeof(VersionSet)) {
122 LOG_ERROR("Read version block failed.");
123 return ErrorCode_WriteData;
124 }
125
126 return 0;
127}
128
129int VersionStore::update_version_set(const VersionSet &version_set) {
130 std::lock_guard<std::mutex> lock(mutex_);
131
132 size_t offset = sizeof(VersionHeader);
133 size_t write_len = 0U;
134
135 // write version set
136 write_len = version_block_->write(offset, &version_set, sizeof(VersionSet));
137 if (write_len != sizeof(VersionSet)) {
138 LOG_ERROR("Write version block failed. ");
139 return ErrorCode_WriteData;
140 }
141
142 // update version header
143 header_.total_version_count++;
144 header_.current_version_offset = offset;
145
146 return this->update_header_impl(header_);
147}
148
149int VersionStore::get_collection_summary(CollectionSummary *summary) const {
150 size_t read_len =
151 summary_block_->fetch(0U, (void *)summary, sizeof(CollectionSummary));
152 if (read_len != sizeof(CollectionSummary)) {
153 LOG_ERROR("Read summary block failed.");
154 return ErrorCode_ReadData;
155 }
156 return 0;
157}
158
159int VersionStore::update_collection_summary(const CollectionSummary &summary) {
160 std::lock_guard<std::mutex> lock(mutex_);
161
162 size_t write_len =
163 summary_block_->write(0U, &summary, sizeof(CollectionSummary));
164 if (write_len != sizeof(CollectionSummary)) {
165 LOG_ERROR("Write summary block failed.");
166 return ErrorCode_WriteData;
167 }
168 return 0;
169}
170
171int VersionStore::init_storage() {
172 int ret = 0;
173 size_t alloc_len = 0U;
174
175 alloc_len = sizeof(CollectionSummary);
176 ret = storage_->append(SUMMARY_BLOCK, alloc_len);
177 CHECK_RETURN_WITH_LOG(ret, 0, "Append summary block failed.");
178 summary_block_ = storage_->get(SUMMARY_BLOCK);
179
180 alloc_len = sizeof(VersionHeader) + sizeof(VersionSet);
181 ret = storage_->append(VERSION_BLOCK, alloc_len);
182 CHECK_RETURN_WITH_LOG(ret, 0, "Append version block failed.");
183 version_block_ = storage_->get(VERSION_BLOCK);
184
185 alloc_len = sizeof(SegmentMeta) * kMaxSegmentCount;
186 ret = storage_->append(SEGMENT_BLOCK, alloc_len);
187 CHECK_RETURN_WITH_LOG(ret, 0, "Append segment block failed.");
188 segment_block_ = storage_->get(SEGMENT_BLOCK);
189
190 // init summary block
191 CollectionSummary summary;
192 ret = update_collection_summary(summary);
193 CHECK_RETURN(ret, 0);
194
195 // init writing segment
196 SegmentMeta segment_meta;
197 ret = this->alloc_segment_meta(&segment_meta);
198 CHECK_RETURN(ret, 0);
199
200 segment_meta.state = SegmentState::WRITING;
201 segment_meta.min_doc_id = 0U;
202 ret = this->update_segment_meta(segment_meta);
203 CHECK_RETURN(ret, 0);
204
205 return 0;
206}
207
208int VersionStore::load_storage() {
209 summary_block_ = storage_->get(SUMMARY_BLOCK);
210 if (!summary_block_) {
211 LOG_ERROR("Get summary block failed.");
212 return ErrorCode_InvalidIndexDataFormat;
213 }
214
215 version_block_ = storage_->get(VERSION_BLOCK);
216 if (!version_block_) {
217 LOG_ERROR("Get version block failed.");
218 return ErrorCode_InvalidIndexDataFormat;
219 }
220
221 segment_block_ = storage_->get(SEGMENT_BLOCK);
222 if (!segment_block_) {
223 LOG_ERROR("Get segment block failed.");
224 return ErrorCode_InvalidIndexDataFormat;
225 }
226
227 size_t read_len =
228 version_block_->fetch(0, (void *)&header_, sizeof(VersionHeader));
229 if (read_len != sizeof(VersionHeader)) {
230 LOG_ERROR("Read header block failed.");
231 return ErrorCode_ReadData;
232 }
233
234 return 0;
235}
236
237
238} // end namespace index
239} // namespace be
240} // end namespace proxima
241