1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of lsn store |
19 | */ |
20 | |
21 | #include "lsn_store.h" |
22 | #include "common/error_code.h" |
23 | #include "constants.h" |
24 | #include "file_helper.h" |
25 | |
26 | namespace proxima { |
27 | namespace be { |
28 | namespace index { |
29 | |
30 | LsnStorePtr LsnStore::Create(const std::string &collection_name, |
31 | const std::string &collection_path) { |
32 | return std::make_shared<LsnStore>(collection_name, collection_path); |
33 | } |
34 | |
35 | int LsnStore::CreateAndOpen(const std::string &collection_name, |
36 | const std::string &collection_path, |
37 | const ReadOptions &read_options, |
38 | LsnStorePtr *lsn_store) { |
39 | (*lsn_store) = Create(collection_name, collection_path); |
40 | |
41 | return (*lsn_store)->open(read_options); |
42 | } |
43 | |
44 | LsnStore::~LsnStore() { |
45 | if (opened_) { |
46 | this->close(); |
47 | } |
48 | } |
49 | |
50 | int LsnStore::open(const ReadOptions &read_options) { |
51 | CHECK_STATUS(opened_, false); |
52 | |
53 | int ret = Snapshot::CreateAndOpen(collection_path_, FileID::LSN_FILE, |
54 | read_options, &snapshot_); |
55 | CHECK_RETURN_WITH_CLOG(ret, 0, "Create snapshot failed." ); |
56 | |
57 | ret = this->mount(); |
58 | CHECK_RETURN_WITH_CLOG(ret, 0, "Mount storage failed." ); |
59 | |
60 | opened_ = true; |
61 | CLOG_DEBUG("Opened lsn store." ); |
62 | return 0; |
63 | } |
64 | |
65 | int LsnStore::flush() { |
66 | CHECK_STATUS(opened_, true); |
67 | |
68 | return snapshot_->flush(); |
69 | } |
70 | |
71 | int LsnStore::close() { |
72 | CHECK_STATUS(opened_, true); |
73 | this->unmount(); |
74 | |
75 | int ret = snapshot_->close(); |
76 | if (ret != 0) { |
77 | LOG_WARN("LsnStore close snapshot failed." ); |
78 | } |
79 | |
80 | opened_ = false; |
81 | CLOG_DEBUG("Closed lsn store" ); |
82 | return 0; |
83 | } |
84 | |
85 | int LsnStore::append(uint64_t lsn, const std::string &lsn_context) { |
86 | CHECK_STATUS(opened_, true); |
87 | |
88 | ailego::WriteLock wlock(mutex_); |
89 | std::lock_guard<ailego::WriteLock> lock(wlock); |
90 | |
91 | uint64_t expect_write_len = |
92 | sizeof(uint64_t) + sizeof(uint64_t) + lsn_context.size(); |
93 | if (expect_write_len > kDataBlockSize) { |
94 | return ErrorCode_ExceedLimit; |
95 | } |
96 | |
97 | uint32_t block_index = header_.tail_block_index; |
98 | auto data_block = data_blocks_[block_index]; |
99 | |
100 | if (data_block->padding_size() < expect_write_len) { |
101 | block_index = (block_index + 1) % 2; |
102 | data_block = data_blocks_[block_index]; |
103 | data_block->resize(0); |
104 | header_.tail_block_index = block_index; |
105 | update_header(); |
106 | } |
107 | |
108 | size_t write_len = |
109 | data_block->write(data_block->data_size(), &lsn, sizeof(uint64_t)); |
110 | if (write_len != sizeof(uint64_t)) { |
111 | return ErrorCode_WriteData; |
112 | } |
113 | |
114 | uint64_t lsn_context_len = lsn_context.size(); |
115 | write_len = data_block->write(data_block->data_size(), &lsn_context_len, |
116 | sizeof(uint64_t)); |
117 | if (write_len != sizeof(uint64_t)) { |
118 | return ErrorCode_WriteData; |
119 | } |
120 | |
121 | write_len = data_block->write(data_block->data_size(), lsn_context.data(), |
122 | lsn_context.size()); |
123 | if (write_len != lsn_context.size()) { |
124 | return ErrorCode_WriteData; |
125 | } |
126 | |
127 | header_.lsn_count++; |
128 | update_header(); |
129 | return 0; |
130 | } |
131 | |
132 | int LsnStore::shift() { |
133 | CHECK_STATUS(opened_, true); |
134 | |
135 | ailego::ReadLock rlock(mutex_); |
136 | std::lock_guard<ailego::ReadLock> lock(rlock); |
137 | |
138 | // find the writing data block |
139 | uint32_t block_index = header_.tail_block_index; |
140 | if (data_blocks_[block_index]->data_size() == 0U) { |
141 | block_index = (block_index + 1) % 2; |
142 | } |
143 | |
144 | uint64_t expect_shift_len = data_blocks_[block_index]->data_size(); |
145 | if (expect_shift_len == 0U) { |
146 | return ErrorCode_ReadData; |
147 | } |
148 | |
149 | auto src_data_block = data_blocks_[block_index]; |
150 | |
151 | // copy and shift data block |
152 | const void *lsn_data; |
153 | size_t read_len = data_blocks_[block_index]->read( |
154 | 0, &lsn_data, data_blocks_[block_index]->data_size()); |
155 | if (read_len != data_blocks_[block_index]->data_size()) { |
156 | return ErrorCode_ReadData; |
157 | } |
158 | |
159 | data_blocks_[2]->resize(0); |
160 | size_t write_len = data_blocks_[2]->write(0, lsn_data, read_len); |
161 | if (write_len != read_len) { |
162 | return ErrorCode_WriteData; |
163 | } |
164 | |
165 | return 0; |
166 | } |
167 | |
168 | int LsnStore::get_latest_lsn(uint64_t *lsn, std::string *lsn_context) { |
169 | CHECK_STATUS(opened_, true); |
170 | |
171 | ailego::ReadLock rlock(mutex_); |
172 | std::lock_guard<ailego::ReadLock> lock(rlock); |
173 | |
174 | ailego::Heap<LSN, std::greater<LSN>> lsn_heap(kWindowSize); |
175 | // scan lsn data blocks |
176 | for (size_t i = 0; i < data_blocks_.size(); i++) { |
177 | auto data_block = data_blocks_[i]; |
178 | uint64_t offset = 0; |
179 | while (offset < data_block->data_size()) { |
180 | uint64_t lsn_val = 0U; |
181 | size_t read_len = data_block->fetch(offset, &lsn_val, sizeof(uint64_t)); |
182 | if (read_len != sizeof(uint64_t)) { |
183 | return ErrorCode_ReadData; |
184 | } |
185 | offset += sizeof(uint64_t); |
186 | |
187 | uint64_t lsn_context_len; |
188 | read_len = data_block->fetch(offset, &lsn_context_len, sizeof(uint64_t)); |
189 | if (read_len != sizeof(uint64_t)) { |
190 | return ErrorCode_ReadData; |
191 | } |
192 | offset += sizeof(uint64_t); |
193 | |
194 | std::string lsn_context_val; |
195 | lsn_context_val.resize(lsn_context_len); |
196 | read_len = data_block->fetch(offset, (void *)lsn_context_val.data(), |
197 | lsn_context_len); |
198 | offset += lsn_context_len; |
199 | |
200 | lsn_heap.emplace(LSN(lsn_val, lsn_context_val)); |
201 | } |
202 | } |
203 | |
204 | // from small to large |
205 | std::sort(lsn_heap.begin(), lsn_heap.end()); |
206 | |
207 | // find last not continues lsn |
208 | LSN max_lsn; |
209 | bool found = false; |
210 | for (size_t i = 0; i + 1 < lsn_heap.size(); i++) { |
211 | if (lsn_heap[i + 1].lsn > lsn_heap[i].lsn + 1) { |
212 | max_lsn = lsn_heap[i]; |
213 | found = true; |
214 | break; |
215 | } |
216 | } |
217 | |
218 | // if not find, just pick up last lsn |
219 | if (!found && lsn_heap.size() > 0) { |
220 | max_lsn = *(lsn_heap.rbegin()); |
221 | } |
222 | |
223 | *lsn = max_lsn.lsn; |
224 | *lsn_context = max_lsn.lsn_context; |
225 | return 0; |
226 | } |
227 | |
228 | int LsnStore::mount() { |
229 | auto &storage = snapshot_->data(); |
230 | |
231 | header_block_ = storage->get(HEADER_BLOCK); |
232 | int ret = 0; |
233 | if (!header_block_) { |
234 | ret = storage->append(HEADER_BLOCK, sizeof(Header)); |
235 | CHECK_RETURN(ret, 0); |
236 | header_block_ = storage->get(HEADER_BLOCK); |
237 | ret = update_header(); |
238 | CHECK_RETURN(ret, 0); |
239 | |
240 | for (uint32_t i = 0; i < kDataBlockCount; i++) { |
241 | auto block_id = ailego::StringHelper::Concat(DATA_BLOCK, i); |
242 | ret = storage->append(block_id, kDataBlockSize); |
243 | CHECK_RETURN(ret, 0); |
244 | auto data_block = storage->get(block_id); |
245 | data_blocks_.emplace_back(data_block); |
246 | } |
247 | } else { |
248 | size_t read_len = header_block_->fetch(0, &header_, sizeof(Header)); |
249 | if (read_len != sizeof(Header)) { |
250 | return ErrorCode_ReadData; |
251 | } |
252 | |
253 | for (uint32_t i = 0; i < kDataBlockCount; i++) { |
254 | auto data_block = |
255 | storage->get(ailego::StringHelper::Concat(DATA_BLOCK, i)); |
256 | if (!data_block) { |
257 | return ErrorCode_InvalidIndexDataFormat; |
258 | } |
259 | data_blocks_.emplace_back(data_block); |
260 | } |
261 | } |
262 | |
263 | return 0; |
264 | } |
265 | |
266 | void LsnStore::unmount() { |
267 | header_block_.reset(); |
268 | data_blocks_.clear(); |
269 | memset(&header_, 0, sizeof(header_)); |
270 | } |
271 | |
272 | int LsnStore::() { |
273 | size_t write_len = header_block_->write(0, &header_, sizeof(Header)); |
274 | if (write_len != sizeof(Header)) { |
275 | return ErrorCode_WriteData; |
276 | } |
277 | |
278 | return 0; |
279 | } |
280 | |
281 | |
282 | } // end namespace index |
283 | } // namespace be |
284 | } // end namespace proxima |
285 | |