1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author zhongyou.dlx |
17 | * \date Jun 2021 |
18 | * \brief Persist hashmap which is thread-safe for add/del operations |
19 | */ |
20 | |
21 | #pragma once |
22 | |
23 | #include <ailego/parallel/lock.h> |
24 | #include <ailego/utility/string_helper.h> |
25 | #include "common/error_code.h" |
26 | #include "common/macro_define.h" |
27 | #include "constants.h" |
28 | #include "typedef.h" |
29 | |
30 | namespace proxima { |
31 | namespace be { |
32 | namespace index { |
33 | |
34 | static constexpr uint32_t INVALID_NODE_ID{-1U}; |
35 | |
36 | /* |
37 | * A PersistHashMap represents block of hash data in persist storage. |
38 | */ |
39 | template <typename TKey, typename TValue, typename Hash = std::hash<TKey> > |
40 | class PersistHashMap { |
41 | struct { |
42 | uint32_t ; |
43 | uint32_t ; |
44 | uint32_t ; |
45 | uint32_t [13]; |
46 | }; |
47 | |
48 | static_assert(sizeof(BlockHeader) % 64 == 0, |
49 | "BlockHeader must be aligned with 64 bytes" ); |
50 | |
51 | struct NodeType { |
52 | NodeType(TKey key, TValue value) |
53 | : first(key), second(value), next(INVALID_NODE_ID) {} |
54 | |
55 | TKey first; |
56 | TValue second; |
57 | uint32_t next; |
58 | }; |
59 | |
60 | public: |
61 | PROXIMA_DISALLOW_COPY_AND_ASSIGN(PersistHashMap); |
62 | |
63 | //! Constructor |
64 | PersistHashMap() = default; |
65 | |
66 | //! Destructor |
67 | ~PersistHashMap() = default; |
68 | |
69 | public: |
70 | //! Mount persist storage |
71 | int mount(const IndexStoragePtr &stg) { |
72 | if (!stg) { |
73 | LOG_ERROR("Mount null storage" ); |
74 | return ErrorCode_RuntimeError; |
75 | } |
76 | |
77 | storage_ = stg; |
78 | for (int i = 0; /* No Limit */; ++i) { |
79 | std::string block_name = ailego::StringHelper::Concat(DATA_BLOCK, i); |
80 | auto block = storage_->get(block_name); |
81 | if (!block) break; |
82 | |
83 | const void *data = nullptr; |
84 | if (ailego_unlikely(block->read(0, &data, sizeof(BlockHeader)) != |
85 | sizeof(BlockHeader))) { |
86 | LOG_ERROR("Failed to read block header from block idx %d" , i); |
87 | return ErrorCode_ReadData; |
88 | } |
89 | BlockHeader = *static_cast<const BlockHeader *>(data); |
90 | |
91 | size_t bucket_count = block_header.bucket_count; |
92 | size_t node_count = bucket_count * kLoadFactor; |
93 | size_t block_size = sizeof(BlockHeader) + |
94 | bucket_count * sizeof(uint32_t) + |
95 | node_count * sizeof(NodeType); |
96 | |
97 | if (ailego_unlikely(block->capacity() < block_size)) { |
98 | return ErrorCode_ReadData; |
99 | } |
100 | |
101 | if (ailego_unlikely(block->data_size() != block_size)) { |
102 | LOG_DEBUG("Block need reinit" ); |
103 | int ret = init_block(block.get(), bucket_count); |
104 | if (ret < 0) { |
105 | return ret; |
106 | } |
107 | } |
108 | |
109 | blocks_.emplace_back(block); |
110 | blocks_header_.emplace_back(block_header); |
111 | } |
112 | |
113 | return ErrorCode_Success; |
114 | } |
115 | |
116 | //! Unmount persist storage |
117 | void unmount() { |
118 | storage_ = nullptr; |
119 | blocks_.clear(); |
120 | blocks_header_.clear(); |
121 | } |
122 | |
123 | //! Reserve |
124 | int reserve(size_t bucket_count) { |
125 | ailego::WriteLock wlock(mutex_); |
126 | std::lock_guard<ailego::WriteLock> signal_lock(wlock); |
127 | int ret = ErrorCode_Success; |
128 | if (blocks_.empty()) { |
129 | ret = add_block(bucket_count); |
130 | } |
131 | return ret; |
132 | } |
133 | |
134 | //! Emplace a key-value pair |
135 | int emplace(const TKey &key, const TValue &val) { |
136 | ailego::WriteLock wlock(mutex_); |
137 | std::lock_guard<ailego::WriteLock> signal_lock(wlock); |
138 | |
139 | size_t block_idx = -1UL; |
140 | for (int idx = blocks_.size() - 1; idx >= 0; --idx) { |
141 | if (blocks_header_[idx].free_header != INVALID_NODE_ID) { |
142 | block_idx = idx; |
143 | break; |
144 | } |
145 | } |
146 | |
147 | if (ailego_unlikely(block_idx == -1UL)) { |
148 | int ret = add_block(); |
149 | if (ret < 0) { |
150 | return ret; |
151 | } |
152 | block_idx = ret; |
153 | } |
154 | |
155 | int ret = emplace_in_block(block_idx, key, val); |
156 | return ret; |
157 | } |
158 | |
159 | int emplace_or_assign(const TKey &key, const TValue &val) { |
160 | ailego::WriteLock wlock(mutex_); |
161 | std::lock_guard<ailego::WriteLock> signal_lock(wlock); |
162 | int ret = find_key(key, [this, &val](const NodeType *node, |
163 | uint32_t node_idx, const NodeType *, |
164 | uint32_t, size_t, size_t block_idx) { |
165 | NodeType writable_node = *node; |
166 | writable_node.second = val; |
167 | size_t offset = |
168 | sizeof(BlockHeader) + |
169 | blocks_header_[block_idx].bucket_count * sizeof(uint32_t) + |
170 | node_idx * sizeof(NodeType); |
171 | if (ailego_unlikely(blocks_[block_idx]->write(offset, &writable_node, |
172 | sizeof(NodeType)) != |
173 | sizeof(NodeType))) { |
174 | LOG_ERROR("Failed to write node content for block idx %zu" , block_idx); |
175 | return ErrorCode_WriteData; |
176 | } |
177 | return ErrorCode_Success; |
178 | }); |
179 | |
180 | if (ret == ErrorCode_InexistentKey) { |
181 | ret = emplace(key, val); |
182 | } |
183 | return ret; |
184 | } |
185 | |
186 | //! Get value by key |
187 | int get(const TKey &key, TValue *value) const { |
188 | ailego::ReadLock rlock(mutex_); |
189 | std::lock_guard<ailego::ReadLock> signal_lock(rlock); |
190 | int ret = |
191 | find_key(key, [&value](const NodeType *node, uint32_t, const NodeType *, |
192 | uint32_t, size_t, size_t) { |
193 | *value = node->second; |
194 | return ErrorCode_Success; |
195 | }); |
196 | return ret; |
197 | } |
198 | |
199 | //! If has key |
200 | bool has(const TKey &key) const { |
201 | ailego::ReadLock rlock(mutex_); |
202 | std::lock_guard<ailego::ReadLock> signal_lock(rlock); |
203 | int ret = |
204 | find_key(key, [](const NodeType *, uint32_t, const NodeType *, uint32_t, |
205 | size_t, size_t) { return ErrorCode_Success; }); |
206 | return (ret == ErrorCode_Success); |
207 | } |
208 | |
209 | //! Return key-value pair count |
210 | size_t size() const { |
211 | ailego::ReadLock rlock(mutex_); |
212 | std::lock_guard<ailego::ReadLock> signal_lock(rlock); |
213 | size_t val = 0; |
214 | for (int i = 0; i < (int)blocks_header_.size(); ++i) { |
215 | val += blocks_header_[i].node_count; |
216 | } |
217 | return val; |
218 | } |
219 | |
220 | //! Erase a pair by key |
221 | int erase(const TKey &key) { |
222 | ailego::WriteLock wlock(mutex_); |
223 | std::lock_guard<ailego::WriteLock> signal_lock(wlock); |
224 | int ret = find_key(key, [this](const NodeType *node, uint32_t node_idx, |
225 | const NodeType *pre_node, |
226 | uint32_t pre_node_idx, size_t bucket_offset, |
227 | size_t block_idx) { |
228 | auto &block = blocks_[block_idx]; |
229 | |
230 | // free node |
231 | if (pre_node == nullptr) { |
232 | if (ailego_unlikely( |
233 | block->write(bucket_offset, &(node->next), sizeof(uint32_t)) != |
234 | sizeof(uint32_t))) { |
235 | LOG_ERROR("Failed to write bucket content for block idx %zu" , |
236 | block_idx); |
237 | return ErrorCode_WriteData; |
238 | } |
239 | } else { |
240 | NodeType writable_node = *pre_node; |
241 | writable_node.next = node->next; |
242 | size_t offset = |
243 | sizeof(BlockHeader) + |
244 | blocks_header_[block_idx].bucket_count * sizeof(uint32_t) + |
245 | pre_node_idx * sizeof(NodeType); |
246 | if (ailego_unlikely( |
247 | block->write(offset, &writable_node, sizeof(NodeType)) != |
248 | sizeof(NodeType))) { |
249 | LOG_ERROR("Failed to write node content for block idx %zu" , |
250 | block_idx); |
251 | return ErrorCode_WriteData; |
252 | } |
253 | } |
254 | |
255 | // recycle node |
256 | NodeType writable_node = *node; |
257 | writable_node.next = blocks_header_[block_idx].free_header; |
258 | size_t offset = |
259 | sizeof(BlockHeader) + |
260 | blocks_header_[block_idx].bucket_count * sizeof(uint32_t) + |
261 | node_idx * sizeof(NodeType); |
262 | if (ailego_unlikely(block->write(offset, &writable_node, |
263 | sizeof(NodeType)) != sizeof(NodeType))) { |
264 | LOG_ERROR("Failed to write node content for block idx %zu" , block_idx); |
265 | return ErrorCode_WriteData; |
266 | } |
267 | |
268 | blocks_header_[block_idx].free_header = node_idx; |
269 | blocks_header_[block_idx].node_count -= 1; |
270 | if (ailego_unlikely(block->write(0, &blocks_header_[block_idx], |
271 | sizeof(BlockHeader)) != |
272 | sizeof(BlockHeader))) { |
273 | LOG_ERROR("Failed to write block header for block idx %zu" , block_idx); |
274 | return ErrorCode_WriteData; |
275 | } |
276 | |
277 | return ErrorCode_Success; |
278 | }); |
279 | |
280 | return ret; |
281 | } |
282 | |
283 | private: |
284 | size_t constrain_hash(size_t hash, size_t block_capacity) const { |
285 | size_t slot = hash % block_capacity; |
286 | return sizeof(BlockHeader) + slot * sizeof(uint32_t); |
287 | } |
288 | |
289 | int find_key(const TKey &key, |
290 | std::function<int(const NodeType *, uint32_t, const NodeType *, |
291 | uint32_t, size_t, size_t)> |
292 | fun) const { |
293 | const uint64_t hash = hasher_(key); |
294 | for (int idx = blocks_.size() - 1; idx >= 0; --idx) { |
295 | auto &block = blocks_[idx]; |
296 | size_t bucket_offset = |
297 | constrain_hash(hash, blocks_header_[idx].bucket_count); |
298 | ailego_assert_with(bucket_offset < block->data_size(), "Invalid Offset" ); |
299 | |
300 | const NodeType *pre_node = nullptr; |
301 | uint32_t pre_node_idx = INVALID_NODE_ID; |
302 | |
303 | const void *data = nullptr; |
304 | if (ailego_unlikely(block->read(bucket_offset, &data, sizeof(uint32_t)) != |
305 | sizeof(uint32_t))) { |
306 | LOG_ERROR("Failed to read bucket content from block idx %d" , idx); |
307 | return ErrorCode_ReadData; |
308 | } |
309 | uint32_t bucket_idx = *static_cast<const uint32_t *>(data); |
310 | uint32_t next = bucket_idx; |
311 | while (next != INVALID_NODE_ID) { |
312 | size_t offset = sizeof(BlockHeader) + |
313 | blocks_header_[idx].bucket_count * sizeof(uint32_t) + |
314 | next * sizeof(NodeType); |
315 | if (ailego_unlikely(block->read(offset, &data, sizeof(NodeType)) != |
316 | sizeof(NodeType))) { |
317 | LOG_ERROR("Failed to read node content from block idx %d" , idx); |
318 | return ErrorCode_ReadData; |
319 | } |
320 | const NodeType *node = static_cast<const NodeType *>(data); |
321 | if (node->first == key) { |
322 | int ret = fun(node, next, pre_node, pre_node_idx, bucket_offset, idx); |
323 | if (ailego_unlikely(ret < 0)) { |
324 | return ret; |
325 | } |
326 | return ErrorCode_Success; |
327 | } else { |
328 | pre_node_idx = next; |
329 | pre_node = node; |
330 | next = node->next; |
331 | } |
332 | } |
333 | } |
334 | return ErrorCode_InexistentKey; |
335 | } |
336 | |
337 | int emplace_in_block(size_t block_idx, const TKey &key, const TValue &value) { |
338 | auto &block = blocks_[block_idx]; |
339 | |
340 | uint32_t free_idx = blocks_header_[block_idx].free_header; |
341 | uint32_t bucket_count = blocks_header_[block_idx].bucket_count; |
342 | |
343 | // alloc Node |
344 | const void *data = nullptr; |
345 | size_t free_offset = sizeof(BlockHeader) + bucket_count * sizeof(uint32_t) + |
346 | free_idx * sizeof(NodeType); |
347 | if (ailego_unlikely(block->read(free_offset, &data, sizeof(NodeType)) != |
348 | sizeof(NodeType))) { |
349 | LOG_ERROR("Failed to read node content from block idx %zu" , block_idx); |
350 | return ErrorCode_ReadData; |
351 | } |
352 | const NodeType *free_node = static_cast<const NodeType *>(data); |
353 | blocks_header_[block_idx].free_header = free_node->next; |
354 | blocks_header_[block_idx].node_count += 1; |
355 | |
356 | // write node |
357 | const uint64_t hash = hasher_(key); |
358 | size_t bucket_offset = |
359 | constrain_hash(hash, blocks_header_[block_idx].bucket_count); |
360 | if (ailego_unlikely(block->read(bucket_offset, &data, sizeof(uint32_t)) != |
361 | sizeof(uint32_t))) { |
362 | LOG_ERROR("Failed to read bucket content from block idx %zu" , block_idx); |
363 | return ErrorCode_ReadData; |
364 | } |
365 | NodeType node(key, value); |
366 | node.next = *static_cast<const uint32_t *>(data); |
367 | if (ailego_unlikely(block->write(free_offset, &node, sizeof(NodeType)) != |
368 | sizeof(NodeType))) { |
369 | LOG_ERROR("Failed to write node content for block idx %zu" , block_idx); |
370 | return ErrorCode_WriteData; |
371 | } |
372 | |
373 | // update bucket hash link |
374 | if (ailego_unlikely(block->write(bucket_offset, &free_idx, |
375 | sizeof(uint32_t)) != sizeof(uint32_t))) { |
376 | LOG_ERROR("Failed to write bucket content for block idx %zu" , block_idx); |
377 | return ErrorCode_WriteData; |
378 | } |
379 | |
380 | // update block header |
381 | if (ailego_unlikely( |
382 | block->write(0, &blocks_header_[block_idx], sizeof(BlockHeader)) != |
383 | sizeof(BlockHeader))) { |
384 | LOG_ERROR("Failed to write block header for block idx %zu" , block_idx); |
385 | return ErrorCode_WriteData; |
386 | } |
387 | |
388 | return ErrorCode_Success; |
389 | } |
390 | |
391 | int add_block(size_t bucket_count = 0) { |
392 | uint32_t block_idx = blocks_.size(); |
393 | std::string block_name = |
394 | ailego::StringHelper::Concat(DATA_BLOCK, block_idx); |
395 | if (bucket_count == 0) { |
396 | bucket_count = (block_idx == 0) |
397 | ? kInitBucketCount |
398 | : blocks_header_[block_idx - 1].bucket_count * 2; |
399 | } else { |
400 | bucket_count = pow(2, ceil(log2(bucket_count))); |
401 | } |
402 | size_t node_count = bucket_count * kLoadFactor; |
403 | |
404 | if (node_count >= INVALID_NODE_ID) { |
405 | bucket_count = pow(2, floor(log2(INVALID_NODE_ID / kLoadFactor))); |
406 | node_count = bucket_count * kLoadFactor; |
407 | } |
408 | |
409 | size_t block_size = sizeof(BlockHeader) + bucket_count * sizeof(uint32_t) + |
410 | node_count * sizeof(NodeType); |
411 | int ret = storage_->append(block_name, block_size); |
412 | if (ret != 0) { |
413 | LOG_ERROR("Failed to append block %s for %s, size %zu" , |
414 | block_name.c_str(), aitheta2::IndexError::What(ret), |
415 | block_size); |
416 | return ret; |
417 | } |
418 | IndexBlockPtr block = storage_->get(block_name); |
419 | |
420 | ret = init_block(block.get(), bucket_count); |
421 | if (ret != 0) { |
422 | LOG_ERROR("Failed to init new block" ); |
423 | return ret; |
424 | } |
425 | blocks_.emplace_back(block); |
426 | |
427 | BlockHeader ; |
428 | block_header.bucket_count = bucket_count; |
429 | block_header.node_count = 0; |
430 | block_header.free_header = 0; |
431 | blocks_header_.emplace_back(block_header); |
432 | |
433 | LOG_DEBUG("Add new block with bucket count[%zu]" , bucket_count); |
434 | return block_idx; |
435 | } |
436 | |
437 | int init_block(IndexBlock *block, uint32_t bucket_count) { |
438 | if (block == nullptr) { |
439 | return ErrorCode_WriteData; |
440 | } |
441 | LOG_DEBUG("Init block with bucket count[%u]" , bucket_count); |
442 | |
443 | BlockHeader ; |
444 | block_header.bucket_count = bucket_count; |
445 | block_header.node_count = 0; |
446 | block_header.free_header = 0; |
447 | if (ailego_unlikely(block->write(0, &block_header, sizeof(BlockHeader)) != |
448 | sizeof(BlockHeader))) { |
449 | LOG_ERROR("Failed to fill block header" ); |
450 | return ErrorCode_WriteData; |
451 | } |
452 | std::vector<uint32_t> buckets(bucket_count, INVALID_NODE_ID); |
453 | size_t buckets_size = buckets.size() * sizeof(uint32_t); |
454 | if (ailego_unlikely(block->write(sizeof(BlockHeader), buckets.data(), |
455 | buckets_size) != buckets_size)) { |
456 | LOG_ERROR("Failed to fill block buckets" ); |
457 | return ErrorCode_WriteData; |
458 | } |
459 | |
460 | size_t offset = sizeof(BlockHeader) + buckets_size; |
461 | NodeType empty(0, TValue()); |
462 | size_t empty_size = sizeof(NodeType); |
463 | size_t node_count = bucket_count * kLoadFactor; |
464 | for (size_t i = 0; i < node_count; ++i) { |
465 | empty.next = i == (node_count - 1) ? INVALID_NODE_ID : i + 1; |
466 | if (ailego_unlikely(block->write(offset, &empty, empty_size) != |
467 | empty_size)) { |
468 | LOG_ERROR("Failed to fill block nodes" ); |
469 | return ErrorCode_WriteData; |
470 | }; |
471 | offset += empty_size; |
472 | } |
473 | |
474 | return ErrorCode_Success; |
475 | } |
476 | |
477 | private: |
478 | static constexpr uint32_t kInitBucketCount{1U * 1024U}; |
479 | static constexpr double kLoadFactor{1.0}; |
480 | |
481 | private: |
482 | mutable ailego::SharedMutex mutex_{}; |
483 | |
484 | Hash hasher_{}; |
485 | IndexStoragePtr storage_{}; |
486 | std::vector<IndexBlockPtr> blocks_{}; |
487 | std::vector<BlockHeader> {}; |
488 | }; |
489 | |
490 | |
491 | } // end namespace index |
492 | } // namespace be |
493 | } // end namespace proxima |
494 | |