1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author zhongyou.dlx
17 * \date Jun 2021
18 * \brief Persist hashmap which is thread-safe for add/del operations
19 */
20
21#pragma once
22
23#include <ailego/parallel/lock.h>
24#include <ailego/utility/string_helper.h>
25#include "common/error_code.h"
26#include "common/macro_define.h"
27#include "constants.h"
28#include "typedef.h"
29
30namespace proxima {
31namespace be {
32namespace index {
33
34static constexpr uint32_t INVALID_NODE_ID{-1U};
35
36/*
37 * A PersistHashMap represents block of hash data in persist storage.
38 */
39template <typename TKey, typename TValue, typename Hash = std::hash<TKey> >
40class PersistHashMap {
41 struct BlockHeader {
42 uint32_t bucket_count;
43 uint32_t node_count;
44 uint32_t free_header;
45 uint32_t reserved[13];
46 };
47
48 static_assert(sizeof(BlockHeader) % 64 == 0,
49 "BlockHeader must be aligned with 64 bytes");
50
51 struct NodeType {
52 NodeType(TKey key, TValue value)
53 : first(key), second(value), next(INVALID_NODE_ID) {}
54
55 TKey first;
56 TValue second;
57 uint32_t next;
58 };
59
60 public:
61 PROXIMA_DISALLOW_COPY_AND_ASSIGN(PersistHashMap);
62
63 //! Constructor
64 PersistHashMap() = default;
65
66 //! Destructor
67 ~PersistHashMap() = default;
68
69 public:
70 //! Mount persist storage
71 int mount(const IndexStoragePtr &stg) {
72 if (!stg) {
73 LOG_ERROR("Mount null storage");
74 return ErrorCode_RuntimeError;
75 }
76
77 storage_ = stg;
78 for (int i = 0; /* No Limit */; ++i) {
79 std::string block_name = ailego::StringHelper::Concat(DATA_BLOCK, i);
80 auto block = storage_->get(block_name);
81 if (!block) break;
82
83 const void *data = nullptr;
84 if (ailego_unlikely(block->read(0, &data, sizeof(BlockHeader)) !=
85 sizeof(BlockHeader))) {
86 LOG_ERROR("Failed to read block header from block idx %d", i);
87 return ErrorCode_ReadData;
88 }
89 BlockHeader block_header = *static_cast<const BlockHeader *>(data);
90
91 size_t bucket_count = block_header.bucket_count;
92 size_t node_count = bucket_count * kLoadFactor;
93 size_t block_size = sizeof(BlockHeader) +
94 bucket_count * sizeof(uint32_t) +
95 node_count * sizeof(NodeType);
96
97 if (ailego_unlikely(block->capacity() < block_size)) {
98 return ErrorCode_ReadData;
99 }
100
101 if (ailego_unlikely(block->data_size() != block_size)) {
102 LOG_DEBUG("Block need reinit");
103 int ret = init_block(block.get(), bucket_count);
104 if (ret < 0) {
105 return ret;
106 }
107 }
108
109 blocks_.emplace_back(block);
110 blocks_header_.emplace_back(block_header);
111 }
112
113 return ErrorCode_Success;
114 }
115
116 //! Unmount persist storage
117 void unmount() {
118 storage_ = nullptr;
119 blocks_.clear();
120 blocks_header_.clear();
121 }
122
123 //! Reserve
124 int reserve(size_t bucket_count) {
125 ailego::WriteLock wlock(mutex_);
126 std::lock_guard<ailego::WriteLock> signal_lock(wlock);
127 int ret = ErrorCode_Success;
128 if (blocks_.empty()) {
129 ret = add_block(bucket_count);
130 }
131 return ret;
132 }
133
134 //! Emplace a key-value pair
135 int emplace(const TKey &key, const TValue &val) {
136 ailego::WriteLock wlock(mutex_);
137 std::lock_guard<ailego::WriteLock> signal_lock(wlock);
138
139 size_t block_idx = -1UL;
140 for (int idx = blocks_.size() - 1; idx >= 0; --idx) {
141 if (blocks_header_[idx].free_header != INVALID_NODE_ID) {
142 block_idx = idx;
143 break;
144 }
145 }
146
147 if (ailego_unlikely(block_idx == -1UL)) {
148 int ret = add_block();
149 if (ret < 0) {
150 return ret;
151 }
152 block_idx = ret;
153 }
154
155 int ret = emplace_in_block(block_idx, key, val);
156 return ret;
157 }
158
159 int emplace_or_assign(const TKey &key, const TValue &val) {
160 ailego::WriteLock wlock(mutex_);
161 std::lock_guard<ailego::WriteLock> signal_lock(wlock);
162 int ret = find_key(key, [this, &val](const NodeType *node,
163 uint32_t node_idx, const NodeType *,
164 uint32_t, size_t, size_t block_idx) {
165 NodeType writable_node = *node;
166 writable_node.second = val;
167 size_t offset =
168 sizeof(BlockHeader) +
169 blocks_header_[block_idx].bucket_count * sizeof(uint32_t) +
170 node_idx * sizeof(NodeType);
171 if (ailego_unlikely(blocks_[block_idx]->write(offset, &writable_node,
172 sizeof(NodeType)) !=
173 sizeof(NodeType))) {
174 LOG_ERROR("Failed to write node content for block idx %zu", block_idx);
175 return ErrorCode_WriteData;
176 }
177 return ErrorCode_Success;
178 });
179
180 if (ret == ErrorCode_InexistentKey) {
181 ret = emplace(key, val);
182 }
183 return ret;
184 }
185
186 //! Get value by key
187 int get(const TKey &key, TValue *value) const {
188 ailego::ReadLock rlock(mutex_);
189 std::lock_guard<ailego::ReadLock> signal_lock(rlock);
190 int ret =
191 find_key(key, [&value](const NodeType *node, uint32_t, const NodeType *,
192 uint32_t, size_t, size_t) {
193 *value = node->second;
194 return ErrorCode_Success;
195 });
196 return ret;
197 }
198
199 //! If has key
200 bool has(const TKey &key) const {
201 ailego::ReadLock rlock(mutex_);
202 std::lock_guard<ailego::ReadLock> signal_lock(rlock);
203 int ret =
204 find_key(key, [](const NodeType *, uint32_t, const NodeType *, uint32_t,
205 size_t, size_t) { return ErrorCode_Success; });
206 return (ret == ErrorCode_Success);
207 }
208
209 //! Return key-value pair count
210 size_t size() const {
211 ailego::ReadLock rlock(mutex_);
212 std::lock_guard<ailego::ReadLock> signal_lock(rlock);
213 size_t val = 0;
214 for (int i = 0; i < (int)blocks_header_.size(); ++i) {
215 val += blocks_header_[i].node_count;
216 }
217 return val;
218 }
219
220 //! Erase a pair by key
221 int erase(const TKey &key) {
222 ailego::WriteLock wlock(mutex_);
223 std::lock_guard<ailego::WriteLock> signal_lock(wlock);
224 int ret = find_key(key, [this](const NodeType *node, uint32_t node_idx,
225 const NodeType *pre_node,
226 uint32_t pre_node_idx, size_t bucket_offset,
227 size_t block_idx) {
228 auto &block = blocks_[block_idx];
229
230 // free node
231 if (pre_node == nullptr) {
232 if (ailego_unlikely(
233 block->write(bucket_offset, &(node->next), sizeof(uint32_t)) !=
234 sizeof(uint32_t))) {
235 LOG_ERROR("Failed to write bucket content for block idx %zu",
236 block_idx);
237 return ErrorCode_WriteData;
238 }
239 } else {
240 NodeType writable_node = *pre_node;
241 writable_node.next = node->next;
242 size_t offset =
243 sizeof(BlockHeader) +
244 blocks_header_[block_idx].bucket_count * sizeof(uint32_t) +
245 pre_node_idx * sizeof(NodeType);
246 if (ailego_unlikely(
247 block->write(offset, &writable_node, sizeof(NodeType)) !=
248 sizeof(NodeType))) {
249 LOG_ERROR("Failed to write node content for block idx %zu",
250 block_idx);
251 return ErrorCode_WriteData;
252 }
253 }
254
255 // recycle node
256 NodeType writable_node = *node;
257 writable_node.next = blocks_header_[block_idx].free_header;
258 size_t offset =
259 sizeof(BlockHeader) +
260 blocks_header_[block_idx].bucket_count * sizeof(uint32_t) +
261 node_idx * sizeof(NodeType);
262 if (ailego_unlikely(block->write(offset, &writable_node,
263 sizeof(NodeType)) != sizeof(NodeType))) {
264 LOG_ERROR("Failed to write node content for block idx %zu", block_idx);
265 return ErrorCode_WriteData;
266 }
267
268 blocks_header_[block_idx].free_header = node_idx;
269 blocks_header_[block_idx].node_count -= 1;
270 if (ailego_unlikely(block->write(0, &blocks_header_[block_idx],
271 sizeof(BlockHeader)) !=
272 sizeof(BlockHeader))) {
273 LOG_ERROR("Failed to write block header for block idx %zu", block_idx);
274 return ErrorCode_WriteData;
275 }
276
277 return ErrorCode_Success;
278 });
279
280 return ret;
281 }
282
283 private:
284 size_t constrain_hash(size_t hash, size_t block_capacity) const {
285 size_t slot = hash % block_capacity;
286 return sizeof(BlockHeader) + slot * sizeof(uint32_t);
287 }
288
289 int find_key(const TKey &key,
290 std::function<int(const NodeType *, uint32_t, const NodeType *,
291 uint32_t, size_t, size_t)>
292 fun) const {
293 const uint64_t hash = hasher_(key);
294 for (int idx = blocks_.size() - 1; idx >= 0; --idx) {
295 auto &block = blocks_[idx];
296 size_t bucket_offset =
297 constrain_hash(hash, blocks_header_[idx].bucket_count);
298 ailego_assert_with(bucket_offset < block->data_size(), "Invalid Offset");
299
300 const NodeType *pre_node = nullptr;
301 uint32_t pre_node_idx = INVALID_NODE_ID;
302
303 const void *data = nullptr;
304 if (ailego_unlikely(block->read(bucket_offset, &data, sizeof(uint32_t)) !=
305 sizeof(uint32_t))) {
306 LOG_ERROR("Failed to read bucket content from block idx %d", idx);
307 return ErrorCode_ReadData;
308 }
309 uint32_t bucket_idx = *static_cast<const uint32_t *>(data);
310 uint32_t next = bucket_idx;
311 while (next != INVALID_NODE_ID) {
312 size_t offset = sizeof(BlockHeader) +
313 blocks_header_[idx].bucket_count * sizeof(uint32_t) +
314 next * sizeof(NodeType);
315 if (ailego_unlikely(block->read(offset, &data, sizeof(NodeType)) !=
316 sizeof(NodeType))) {
317 LOG_ERROR("Failed to read node content from block idx %d", idx);
318 return ErrorCode_ReadData;
319 }
320 const NodeType *node = static_cast<const NodeType *>(data);
321 if (node->first == key) {
322 int ret = fun(node, next, pre_node, pre_node_idx, bucket_offset, idx);
323 if (ailego_unlikely(ret < 0)) {
324 return ret;
325 }
326 return ErrorCode_Success;
327 } else {
328 pre_node_idx = next;
329 pre_node = node;
330 next = node->next;
331 }
332 }
333 }
334 return ErrorCode_InexistentKey;
335 }
336
337 int emplace_in_block(size_t block_idx, const TKey &key, const TValue &value) {
338 auto &block = blocks_[block_idx];
339
340 uint32_t free_idx = blocks_header_[block_idx].free_header;
341 uint32_t bucket_count = blocks_header_[block_idx].bucket_count;
342
343 // alloc Node
344 const void *data = nullptr;
345 size_t free_offset = sizeof(BlockHeader) + bucket_count * sizeof(uint32_t) +
346 free_idx * sizeof(NodeType);
347 if (ailego_unlikely(block->read(free_offset, &data, sizeof(NodeType)) !=
348 sizeof(NodeType))) {
349 LOG_ERROR("Failed to read node content from block idx %zu", block_idx);
350 return ErrorCode_ReadData;
351 }
352 const NodeType *free_node = static_cast<const NodeType *>(data);
353 blocks_header_[block_idx].free_header = free_node->next;
354 blocks_header_[block_idx].node_count += 1;
355
356 // write node
357 const uint64_t hash = hasher_(key);
358 size_t bucket_offset =
359 constrain_hash(hash, blocks_header_[block_idx].bucket_count);
360 if (ailego_unlikely(block->read(bucket_offset, &data, sizeof(uint32_t)) !=
361 sizeof(uint32_t))) {
362 LOG_ERROR("Failed to read bucket content from block idx %zu", block_idx);
363 return ErrorCode_ReadData;
364 }
365 NodeType node(key, value);
366 node.next = *static_cast<const uint32_t *>(data);
367 if (ailego_unlikely(block->write(free_offset, &node, sizeof(NodeType)) !=
368 sizeof(NodeType))) {
369 LOG_ERROR("Failed to write node content for block idx %zu", block_idx);
370 return ErrorCode_WriteData;
371 }
372
373 // update bucket hash link
374 if (ailego_unlikely(block->write(bucket_offset, &free_idx,
375 sizeof(uint32_t)) != sizeof(uint32_t))) {
376 LOG_ERROR("Failed to write bucket content for block idx %zu", block_idx);
377 return ErrorCode_WriteData;
378 }
379
380 // update block header
381 if (ailego_unlikely(
382 block->write(0, &blocks_header_[block_idx], sizeof(BlockHeader)) !=
383 sizeof(BlockHeader))) {
384 LOG_ERROR("Failed to write block header for block idx %zu", block_idx);
385 return ErrorCode_WriteData;
386 }
387
388 return ErrorCode_Success;
389 }
390
391 int add_block(size_t bucket_count = 0) {
392 uint32_t block_idx = blocks_.size();
393 std::string block_name =
394 ailego::StringHelper::Concat(DATA_BLOCK, block_idx);
395 if (bucket_count == 0) {
396 bucket_count = (block_idx == 0)
397 ? kInitBucketCount
398 : blocks_header_[block_idx - 1].bucket_count * 2;
399 } else {
400 bucket_count = pow(2, ceil(log2(bucket_count)));
401 }
402 size_t node_count = bucket_count * kLoadFactor;
403
404 if (node_count >= INVALID_NODE_ID) {
405 bucket_count = pow(2, floor(log2(INVALID_NODE_ID / kLoadFactor)));
406 node_count = bucket_count * kLoadFactor;
407 }
408
409 size_t block_size = sizeof(BlockHeader) + bucket_count * sizeof(uint32_t) +
410 node_count * sizeof(NodeType);
411 int ret = storage_->append(block_name, block_size);
412 if (ret != 0) {
413 LOG_ERROR("Failed to append block %s for %s, size %zu",
414 block_name.c_str(), aitheta2::IndexError::What(ret),
415 block_size);
416 return ret;
417 }
418 IndexBlockPtr block = storage_->get(block_name);
419
420 ret = init_block(block.get(), bucket_count);
421 if (ret != 0) {
422 LOG_ERROR("Failed to init new block");
423 return ret;
424 }
425 blocks_.emplace_back(block);
426
427 BlockHeader block_header;
428 block_header.bucket_count = bucket_count;
429 block_header.node_count = 0;
430 block_header.free_header = 0;
431 blocks_header_.emplace_back(block_header);
432
433 LOG_DEBUG("Add new block with bucket count[%zu]", bucket_count);
434 return block_idx;
435 }
436
437 int init_block(IndexBlock *block, uint32_t bucket_count) {
438 if (block == nullptr) {
439 return ErrorCode_WriteData;
440 }
441 LOG_DEBUG("Init block with bucket count[%u]", bucket_count);
442
443 BlockHeader block_header;
444 block_header.bucket_count = bucket_count;
445 block_header.node_count = 0;
446 block_header.free_header = 0;
447 if (ailego_unlikely(block->write(0, &block_header, sizeof(BlockHeader)) !=
448 sizeof(BlockHeader))) {
449 LOG_ERROR("Failed to fill block header");
450 return ErrorCode_WriteData;
451 }
452 std::vector<uint32_t> buckets(bucket_count, INVALID_NODE_ID);
453 size_t buckets_size = buckets.size() * sizeof(uint32_t);
454 if (ailego_unlikely(block->write(sizeof(BlockHeader), buckets.data(),
455 buckets_size) != buckets_size)) {
456 LOG_ERROR("Failed to fill block buckets");
457 return ErrorCode_WriteData;
458 }
459
460 size_t offset = sizeof(BlockHeader) + buckets_size;
461 NodeType empty(0, TValue());
462 size_t empty_size = sizeof(NodeType);
463 size_t node_count = bucket_count * kLoadFactor;
464 for (size_t i = 0; i < node_count; ++i) {
465 empty.next = i == (node_count - 1) ? INVALID_NODE_ID : i + 1;
466 if (ailego_unlikely(block->write(offset, &empty, empty_size) !=
467 empty_size)) {
468 LOG_ERROR("Failed to fill block nodes");
469 return ErrorCode_WriteData;
470 };
471 offset += empty_size;
472 }
473
474 return ErrorCode_Success;
475 }
476
477 private:
478 static constexpr uint32_t kInitBucketCount{1U * 1024U};
479 static constexpr double kLoadFactor{1.0};
480
481 private:
482 mutable ailego::SharedMutex mutex_{};
483
484 Hash hasher_{};
485 IndexStoragePtr storage_{};
486 std::vector<IndexBlockPtr> blocks_{};
487 std::vector<BlockHeader> blocks_header_{};
488};
489
490
491} // end namespace index
492} // namespace be
493} // end namespace proxima
494