1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Reserving append data and mmap to persist storage, thread-safe
19 */
20
21#pragma once
22
23#include <mutex>
24#include <ailego/utility/string_helper.h>
25#include "common/error_code.h"
26#include "common/macro_define.h"
27#include "constants.h"
28#include "typedef.h"
29
30namespace proxima {
31namespace be {
32namespace index {
33
34/*
35 * DeltaStore is a kind of linear storage in memory, and
36 * it can snapshot to disk at the same time with mmap.
37 * It can only increase at the tail part.
38 */
39template <typename T>
40class DeltaStore {
41 public:
42 /*
43 * Header segment struct
44 */
45 struct Header {
46 uint64_t block_count{0U};
47 uint64_t total_size{0U};
48 uint64_t reserved_[6];
49 };
50
51 static_assert(sizeof(Header) % 64 == 0,
52 "Header must be aligned with 64 bytes");
53
54 public:
55 PROXIMA_DISALLOW_COPY_AND_ASSIGN(DeltaStore);
56
57 //! Constructor
58 DeltaStore() = default;
59
60 //! Destructor
61 ~DeltaStore() = default;
62
63 //! Mount persist storage
64 int mount(const IndexStoragePtr &stg) {
65 if (!stg) {
66 LOG_ERROR("Mount null storage");
67 return ErrorCode_RuntimeError;
68 }
69
70 storage_ = stg;
71 header_block_ = storage_->get(HEADER_BLOCK);
72 int ret = 0;
73 if (!header_block_) {
74 ret = init_storage();
75 } else {
76 ret = load_storage();
77 }
78 CHECK_RETURN(ret, 0);
79
80 return 0;
81 }
82
83 //! Unmount persist storage
84 void unmount() {
85 storage_ = nullptr;
86 header_block_.reset();
87 data_blocks_.clear();
88 memset(&header_, 0, sizeof(header_));
89 node_count_ = 0U;
90 }
91
92 //! Append an element
93 int append(const T &element) {
94 std::lock_guard<std::mutex> lock(mutex_);
95
96 size_t block_offset = 0U;
97 IndexBlockPtr data_block;
98
99 uint32_t block_index = data_blocks_.size() - 1;
100 size_t block_size = kNodeCountPerBlock * sizeof(T);
101 if (block_index == -1U ||
102 data_blocks_[block_index]->data_size() >= block_size) {
103 block_index++;
104 std::string new_block_name =
105 ailego::StringHelper::Concat(DATA_BLOCK, block_index);
106
107 int ret = storage_->append(new_block_name, block_size);
108 CHECK_RETURN(ret, 0);
109
110 header_.block_count++;
111 header_.total_size += block_size;
112 ret = update_header();
113 CHECK_RETURN(ret, 0);
114
115 block_offset = 0U;
116 data_block = storage_->get(new_block_name);
117 data_blocks_.emplace_back(data_block);
118 } else {
119 data_block = data_blocks_[block_index];
120 block_offset = data_block->data_size();
121 }
122
123 size_t write_len = data_block->write(block_offset, &element, sizeof(T));
124 if (write_len != sizeof(T)) {
125 return ErrorCode_WriteData;
126 }
127
128 node_count_++;
129 return 0;
130 }
131
132 //! Update an element by position
133 int update(size_t pos, const T &element) {
134 std::lock_guard<std::mutex> lock(mutex_);
135
136 if (pos >= node_count_) {
137 return ErrorCode_RuntimeError;
138 }
139
140 size_t block_index = pos / kNodeCountPerBlock;
141 size_t block_offset = (pos % kNodeCountPerBlock) * sizeof(T);
142
143 std::string block_name =
144 ailego::StringHelper::Concat(DATA_BLOCK, block_index);
145 IndexBlockPtr data_block = storage_->get(block_name);
146 if (!data_block) {
147 return ErrorCode_InvalidIndexDataFormat;
148 }
149
150 size_t write_len = data_block->write(block_offset, &element, sizeof(T));
151 if (write_len != sizeof(T)) {
152 return ErrorCode_WriteData;
153 }
154
155 return 0;
156 }
157
158 //! Get an element by position
159 const T *at(size_t pos) const {
160 if (pos >= node_count_) {
161 return nullptr;
162 }
163
164 size_t block_index = pos / kNodeCountPerBlock;
165 size_t block_offset = (pos % kNodeCountPerBlock) * sizeof(T);
166
167 std::string block_name =
168 ailego::StringHelper::Concat(DATA_BLOCK, block_index);
169 IndexBlockPtr data_block = storage_->get(block_name);
170 if (!data_block) {
171 return nullptr;
172 }
173
174 T *element;
175 size_t read_len =
176 data_block->read(block_offset, (const void **)&element, sizeof(T));
177 if (read_len != sizeof(T)) {
178 return nullptr;
179 }
180 return element;
181 }
182
183 //! Return node count
184 size_t count() const {
185 return node_count_;
186 }
187
188 private:
189 int init_storage() {
190 int ret = storage_->append(HEADER_BLOCK, sizeof(Header));
191 CHECK_RETURN(ret, 0);
192
193 header_block_ = storage_->get(HEADER_BLOCK);
194 header_.block_count = 0U;
195 header_.total_size = 0U;
196 ret = update_header();
197
198 node_count_ = 0UL;
199 return ret;
200 }
201
202 int load_storage() {
203 header_block_ = storage_->get(HEADER_BLOCK);
204
205 Header *header;
206 size_t read_len =
207 header_block_->read(0, (const void **)&header, sizeof(Header));
208 if (read_len != sizeof(Header)) {
209 return ErrorCode_ReadData;
210 }
211 header_ = *header;
212
213 for (uint32_t i = 0; i < header_.block_count; i++) {
214 std::string block_name = ailego::StringHelper::Concat(DATA_BLOCK, i);
215 auto data_block = storage_->get(block_name);
216 if (!data_block) {
217 return ErrorCode_ReadData;
218 }
219 data_blocks_.emplace_back(data_block);
220 }
221
222 if (header_.block_count > 0) {
223 node_count_ =
224 (header_.block_count - 1) * kNodeCountPerBlock +
225 data_blocks_[header_.block_count - 1]->data_size() / sizeof(T);
226 } else {
227 node_count_ = 0U;
228 }
229
230 return 0;
231 }
232
233 int update_header() {
234 size_t write_len = header_block_->write(0, &header_, sizeof(Header));
235 if (write_len != sizeof(Header)) {
236 return ErrorCode_WriteData;
237 }
238 return 0;
239 }
240
241 private:
242 static constexpr uint32_t kNodeCountPerBlock = 1UL * 1024UL * 1024UL;
243
244 private:
245 IndexStoragePtr storage_{};
246
247 IndexBlockPtr header_block_{};
248 std::vector<IndexBlockPtr> data_blocks_{};
249 Header header_;
250
251 std::mutex mutex_{};
252 std::atomic<uint64_t> node_count_{0UL};
253};
254
255
256} // end namespace index
257} // namespace be
258} // end namespace proxima
259