1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Reserving append data and mmap to persist storage, thread-safe |
19 | */ |
20 | |
21 | #pragma once |
22 | |
23 | #include <mutex> |
24 | #include <ailego/utility/string_helper.h> |
25 | #include "common/error_code.h" |
26 | #include "common/macro_define.h" |
27 | #include "constants.h" |
28 | #include "typedef.h" |
29 | |
30 | namespace proxima { |
31 | namespace be { |
32 | namespace index { |
33 | |
34 | /* |
35 | * DeltaStore is a kind of linear storage in memory, and |
36 | * it can snapshot to disk at the same time with mmap. |
37 | * It can only increase at the tail part. |
38 | */ |
39 | template <typename T> |
40 | class DeltaStore { |
41 | public: |
42 | /* |
43 | * Header segment struct |
44 | */ |
45 | struct { |
46 | uint64_t {0U}; |
47 | uint64_t {0U}; |
48 | uint64_t [6]; |
49 | }; |
50 | |
51 | static_assert(sizeof(Header) % 64 == 0, |
52 | "Header must be aligned with 64 bytes" ); |
53 | |
54 | public: |
55 | PROXIMA_DISALLOW_COPY_AND_ASSIGN(DeltaStore); |
56 | |
57 | //! Constructor |
58 | DeltaStore() = default; |
59 | |
60 | //! Destructor |
61 | ~DeltaStore() = default; |
62 | |
63 | //! Mount persist storage |
64 | int mount(const IndexStoragePtr &stg) { |
65 | if (!stg) { |
66 | LOG_ERROR("Mount null storage" ); |
67 | return ErrorCode_RuntimeError; |
68 | } |
69 | |
70 | storage_ = stg; |
71 | header_block_ = storage_->get(HEADER_BLOCK); |
72 | int ret = 0; |
73 | if (!header_block_) { |
74 | ret = init_storage(); |
75 | } else { |
76 | ret = load_storage(); |
77 | } |
78 | CHECK_RETURN(ret, 0); |
79 | |
80 | return 0; |
81 | } |
82 | |
83 | //! Unmount persist storage |
84 | void unmount() { |
85 | storage_ = nullptr; |
86 | header_block_.reset(); |
87 | data_blocks_.clear(); |
88 | memset(&header_, 0, sizeof(header_)); |
89 | node_count_ = 0U; |
90 | } |
91 | |
92 | //! Append an element |
93 | int append(const T &element) { |
94 | std::lock_guard<std::mutex> lock(mutex_); |
95 | |
96 | size_t block_offset = 0U; |
97 | IndexBlockPtr data_block; |
98 | |
99 | uint32_t block_index = data_blocks_.size() - 1; |
100 | size_t block_size = kNodeCountPerBlock * sizeof(T); |
101 | if (block_index == -1U || |
102 | data_blocks_[block_index]->data_size() >= block_size) { |
103 | block_index++; |
104 | std::string new_block_name = |
105 | ailego::StringHelper::Concat(DATA_BLOCK, block_index); |
106 | |
107 | int ret = storage_->append(new_block_name, block_size); |
108 | CHECK_RETURN(ret, 0); |
109 | |
110 | header_.block_count++; |
111 | header_.total_size += block_size; |
112 | ret = update_header(); |
113 | CHECK_RETURN(ret, 0); |
114 | |
115 | block_offset = 0U; |
116 | data_block = storage_->get(new_block_name); |
117 | data_blocks_.emplace_back(data_block); |
118 | } else { |
119 | data_block = data_blocks_[block_index]; |
120 | block_offset = data_block->data_size(); |
121 | } |
122 | |
123 | size_t write_len = data_block->write(block_offset, &element, sizeof(T)); |
124 | if (write_len != sizeof(T)) { |
125 | return ErrorCode_WriteData; |
126 | } |
127 | |
128 | node_count_++; |
129 | return 0; |
130 | } |
131 | |
132 | //! Update an element by position |
133 | int update(size_t pos, const T &element) { |
134 | std::lock_guard<std::mutex> lock(mutex_); |
135 | |
136 | if (pos >= node_count_) { |
137 | return ErrorCode_RuntimeError; |
138 | } |
139 | |
140 | size_t block_index = pos / kNodeCountPerBlock; |
141 | size_t block_offset = (pos % kNodeCountPerBlock) * sizeof(T); |
142 | |
143 | std::string block_name = |
144 | ailego::StringHelper::Concat(DATA_BLOCK, block_index); |
145 | IndexBlockPtr data_block = storage_->get(block_name); |
146 | if (!data_block) { |
147 | return ErrorCode_InvalidIndexDataFormat; |
148 | } |
149 | |
150 | size_t write_len = data_block->write(block_offset, &element, sizeof(T)); |
151 | if (write_len != sizeof(T)) { |
152 | return ErrorCode_WriteData; |
153 | } |
154 | |
155 | return 0; |
156 | } |
157 | |
158 | //! Get an element by position |
159 | const T *at(size_t pos) const { |
160 | if (pos >= node_count_) { |
161 | return nullptr; |
162 | } |
163 | |
164 | size_t block_index = pos / kNodeCountPerBlock; |
165 | size_t block_offset = (pos % kNodeCountPerBlock) * sizeof(T); |
166 | |
167 | std::string block_name = |
168 | ailego::StringHelper::Concat(DATA_BLOCK, block_index); |
169 | IndexBlockPtr data_block = storage_->get(block_name); |
170 | if (!data_block) { |
171 | return nullptr; |
172 | } |
173 | |
174 | T *element; |
175 | size_t read_len = |
176 | data_block->read(block_offset, (const void **)&element, sizeof(T)); |
177 | if (read_len != sizeof(T)) { |
178 | return nullptr; |
179 | } |
180 | return element; |
181 | } |
182 | |
183 | //! Return node count |
184 | size_t count() const { |
185 | return node_count_; |
186 | } |
187 | |
188 | private: |
189 | int init_storage() { |
190 | int ret = storage_->append(HEADER_BLOCK, sizeof(Header)); |
191 | CHECK_RETURN(ret, 0); |
192 | |
193 | header_block_ = storage_->get(HEADER_BLOCK); |
194 | header_.block_count = 0U; |
195 | header_.total_size = 0U; |
196 | ret = update_header(); |
197 | |
198 | node_count_ = 0UL; |
199 | return ret; |
200 | } |
201 | |
202 | int load_storage() { |
203 | header_block_ = storage_->get(HEADER_BLOCK); |
204 | |
205 | Header *; |
206 | size_t read_len = |
207 | header_block_->read(0, (const void **)&header, sizeof(Header)); |
208 | if (read_len != sizeof(Header)) { |
209 | return ErrorCode_ReadData; |
210 | } |
211 | header_ = *header; |
212 | |
213 | for (uint32_t i = 0; i < header_.block_count; i++) { |
214 | std::string block_name = ailego::StringHelper::Concat(DATA_BLOCK, i); |
215 | auto data_block = storage_->get(block_name); |
216 | if (!data_block) { |
217 | return ErrorCode_ReadData; |
218 | } |
219 | data_blocks_.emplace_back(data_block); |
220 | } |
221 | |
222 | if (header_.block_count > 0) { |
223 | node_count_ = |
224 | (header_.block_count - 1) * kNodeCountPerBlock + |
225 | data_blocks_[header_.block_count - 1]->data_size() / sizeof(T); |
226 | } else { |
227 | node_count_ = 0U; |
228 | } |
229 | |
230 | return 0; |
231 | } |
232 | |
233 | int () { |
234 | size_t write_len = header_block_->write(0, &header_, sizeof(Header)); |
235 | if (write_len != sizeof(Header)) { |
236 | return ErrorCode_WriteData; |
237 | } |
238 | return 0; |
239 | } |
240 | |
241 | private: |
242 | static constexpr uint32_t kNodeCountPerBlock = 1UL * 1024UL * 1024UL; |
243 | |
244 | private: |
245 | IndexStoragePtr storage_{}; |
246 | |
247 | IndexBlockPtr {}; |
248 | std::vector<IndexBlockPtr> data_blocks_{}; |
249 | Header ; |
250 | |
251 | std::mutex mutex_{}; |
252 | std::atomic<uint64_t> node_count_{0UL}; |
253 | }; |
254 | |
255 | |
256 | } // end namespace index |
257 | } // namespace be |
258 | } // end namespace proxima |
259 | |