1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of lsn store
19 */
20
21#include "lsn_store.h"
22#include "common/error_code.h"
23#include "constants.h"
24#include "file_helper.h"
25
26namespace proxima {
27namespace be {
28namespace index {
29
30LsnStorePtr LsnStore::Create(const std::string &collection_name,
31 const std::string &collection_path) {
32 return std::make_shared<LsnStore>(collection_name, collection_path);
33}
34
35int LsnStore::CreateAndOpen(const std::string &collection_name,
36 const std::string &collection_path,
37 const ReadOptions &read_options,
38 LsnStorePtr *lsn_store) {
39 (*lsn_store) = Create(collection_name, collection_path);
40
41 return (*lsn_store)->open(read_options);
42}
43
44LsnStore::~LsnStore() {
45 if (opened_) {
46 this->close();
47 }
48}
49
50int LsnStore::open(const ReadOptions &read_options) {
51 CHECK_STATUS(opened_, false);
52
53 int ret = Snapshot::CreateAndOpen(collection_path_, FileID::LSN_FILE,
54 read_options, &snapshot_);
55 CHECK_RETURN_WITH_CLOG(ret, 0, "Create snapshot failed.");
56
57 ret = this->mount();
58 CHECK_RETURN_WITH_CLOG(ret, 0, "Mount storage failed.");
59
60 opened_ = true;
61 CLOG_DEBUG("Opened lsn store.");
62 return 0;
63}
64
65int LsnStore::flush() {
66 CHECK_STATUS(opened_, true);
67
68 return snapshot_->flush();
69}
70
71int LsnStore::close() {
72 CHECK_STATUS(opened_, true);
73 this->unmount();
74
75 int ret = snapshot_->close();
76 if (ret != 0) {
77 LOG_WARN("LsnStore close snapshot failed.");
78 }
79
80 opened_ = false;
81 CLOG_DEBUG("Closed lsn store");
82 return 0;
83}
84
85int LsnStore::append(uint64_t lsn, const std::string &lsn_context) {
86 CHECK_STATUS(opened_, true);
87
88 ailego::WriteLock wlock(mutex_);
89 std::lock_guard<ailego::WriteLock> lock(wlock);
90
91 uint64_t expect_write_len =
92 sizeof(uint64_t) + sizeof(uint64_t) + lsn_context.size();
93 if (expect_write_len > kDataBlockSize) {
94 return ErrorCode_ExceedLimit;
95 }
96
97 uint32_t block_index = header_.tail_block_index;
98 auto data_block = data_blocks_[block_index];
99
100 if (data_block->padding_size() < expect_write_len) {
101 block_index = (block_index + 1) % 2;
102 data_block = data_blocks_[block_index];
103 data_block->resize(0);
104 header_.tail_block_index = block_index;
105 update_header();
106 }
107
108 size_t write_len =
109 data_block->write(data_block->data_size(), &lsn, sizeof(uint64_t));
110 if (write_len != sizeof(uint64_t)) {
111 return ErrorCode_WriteData;
112 }
113
114 uint64_t lsn_context_len = lsn_context.size();
115 write_len = data_block->write(data_block->data_size(), &lsn_context_len,
116 sizeof(uint64_t));
117 if (write_len != sizeof(uint64_t)) {
118 return ErrorCode_WriteData;
119 }
120
121 write_len = data_block->write(data_block->data_size(), lsn_context.data(),
122 lsn_context.size());
123 if (write_len != lsn_context.size()) {
124 return ErrorCode_WriteData;
125 }
126
127 header_.lsn_count++;
128 update_header();
129 return 0;
130}
131
132int LsnStore::shift() {
133 CHECK_STATUS(opened_, true);
134
135 ailego::ReadLock rlock(mutex_);
136 std::lock_guard<ailego::ReadLock> lock(rlock);
137
138 // find the writing data block
139 uint32_t block_index = header_.tail_block_index;
140 if (data_blocks_[block_index]->data_size() == 0U) {
141 block_index = (block_index + 1) % 2;
142 }
143
144 uint64_t expect_shift_len = data_blocks_[block_index]->data_size();
145 if (expect_shift_len == 0U) {
146 return ErrorCode_ReadData;
147 }
148
149 auto src_data_block = data_blocks_[block_index];
150
151 // copy and shift data block
152 const void *lsn_data;
153 size_t read_len = data_blocks_[block_index]->read(
154 0, &lsn_data, data_blocks_[block_index]->data_size());
155 if (read_len != data_blocks_[block_index]->data_size()) {
156 return ErrorCode_ReadData;
157 }
158
159 data_blocks_[2]->resize(0);
160 size_t write_len = data_blocks_[2]->write(0, lsn_data, read_len);
161 if (write_len != read_len) {
162 return ErrorCode_WriteData;
163 }
164
165 return 0;
166}
167
168int LsnStore::get_latest_lsn(uint64_t *lsn, std::string *lsn_context) {
169 CHECK_STATUS(opened_, true);
170
171 ailego::ReadLock rlock(mutex_);
172 std::lock_guard<ailego::ReadLock> lock(rlock);
173
174 ailego::Heap<LSN, std::greater<LSN>> lsn_heap(kWindowSize);
175 // scan lsn data blocks
176 for (size_t i = 0; i < data_blocks_.size(); i++) {
177 auto data_block = data_blocks_[i];
178 uint64_t offset = 0;
179 while (offset < data_block->data_size()) {
180 uint64_t lsn_val = 0U;
181 size_t read_len = data_block->fetch(offset, &lsn_val, sizeof(uint64_t));
182 if (read_len != sizeof(uint64_t)) {
183 return ErrorCode_ReadData;
184 }
185 offset += sizeof(uint64_t);
186
187 uint64_t lsn_context_len;
188 read_len = data_block->fetch(offset, &lsn_context_len, sizeof(uint64_t));
189 if (read_len != sizeof(uint64_t)) {
190 return ErrorCode_ReadData;
191 }
192 offset += sizeof(uint64_t);
193
194 std::string lsn_context_val;
195 lsn_context_val.resize(lsn_context_len);
196 read_len = data_block->fetch(offset, (void *)lsn_context_val.data(),
197 lsn_context_len);
198 offset += lsn_context_len;
199
200 lsn_heap.emplace(LSN(lsn_val, lsn_context_val));
201 }
202 }
203
204 // from small to large
205 std::sort(lsn_heap.begin(), lsn_heap.end());
206
207 // find last not continues lsn
208 LSN max_lsn;
209 bool found = false;
210 for (size_t i = 0; i + 1 < lsn_heap.size(); i++) {
211 if (lsn_heap[i + 1].lsn > lsn_heap[i].lsn + 1) {
212 max_lsn = lsn_heap[i];
213 found = true;
214 break;
215 }
216 }
217
218 // if not find, just pick up last lsn
219 if (!found && lsn_heap.size() > 0) {
220 max_lsn = *(lsn_heap.rbegin());
221 }
222
223 *lsn = max_lsn.lsn;
224 *lsn_context = max_lsn.lsn_context;
225 return 0;
226}
227
228int LsnStore::mount() {
229 auto &storage = snapshot_->data();
230
231 header_block_ = storage->get(HEADER_BLOCK);
232 int ret = 0;
233 if (!header_block_) {
234 ret = storage->append(HEADER_BLOCK, sizeof(Header));
235 CHECK_RETURN(ret, 0);
236 header_block_ = storage->get(HEADER_BLOCK);
237 ret = update_header();
238 CHECK_RETURN(ret, 0);
239
240 for (uint32_t i = 0; i < kDataBlockCount; i++) {
241 auto block_id = ailego::StringHelper::Concat(DATA_BLOCK, i);
242 ret = storage->append(block_id, kDataBlockSize);
243 CHECK_RETURN(ret, 0);
244 auto data_block = storage->get(block_id);
245 data_blocks_.emplace_back(data_block);
246 }
247 } else {
248 size_t read_len = header_block_->fetch(0, &header_, sizeof(Header));
249 if (read_len != sizeof(Header)) {
250 return ErrorCode_ReadData;
251 }
252
253 for (uint32_t i = 0; i < kDataBlockCount; i++) {
254 auto data_block =
255 storage->get(ailego::StringHelper::Concat(DATA_BLOCK, i));
256 if (!data_block) {
257 return ErrorCode_InvalidIndexDataFormat;
258 }
259 data_blocks_.emplace_back(data_block);
260 }
261 }
262
263 return 0;
264}
265
266void LsnStore::unmount() {
267 header_block_.reset();
268 data_blocks_.clear();
269 memset(&header_, 0, sizeof(header_));
270}
271
272int LsnStore::update_header() {
273 size_t write_len = header_block_->write(0, &header_, sizeof(Header));
274 if (write_len != sizeof(Header)) {
275 return ErrorCode_WriteData;
276 }
277
278 return 0;
279}
280
281
282} // end namespace index
283} // namespace be
284} // end namespace proxima
285