1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implemention of index service |
19 | */ |
20 | |
21 | #include "index_service.h" |
22 | #include "common/error_code.h" |
23 | |
24 | namespace proxima { |
25 | namespace be { |
26 | namespace index { |
27 | |
28 | IndexService::~IndexService() { |
29 | if (status_ == STARTED) { |
30 | this->stop_impl(); |
31 | } |
32 | this->cleanup_impl(); |
33 | } |
34 | |
35 | int IndexService::create_collection(const std::string &collection_name, |
36 | const meta::CollectionMetaPtr &schema) { |
37 | CHECK_STATUS(status_, STARTED); |
38 | |
39 | if (this->has_collection(collection_name)) { |
40 | LOG_ERROR("Collection already exists, create failed. collection[%s]" , |
41 | collection_name.c_str()); |
42 | return ErrorCode_DuplicateCollection; |
43 | } |
44 | |
45 | ReadOptions read_options; |
46 | read_options.use_mmap = use_mmap_read_; |
47 | |
48 | /// Notice we add a check action here: |
49 | /// a. the collection index file exists, then we just load it. |
50 | /// b. the collection index not exist, then we just create new one. |
51 | std::string collection_path = index_directory_ + "/" + collection_name; |
52 | std::string manifest_file_path = |
53 | FileHelper::MakeFilePath(collection_path, FileID::MANIFEST_FILE); |
54 | if (FileHelper::FileExists(manifest_file_path)) { |
55 | read_options.create_new = false; |
56 | } else { |
57 | read_options.create_new = true; |
58 | } |
59 | |
60 | CollectionPtr collection; |
61 | int ret = Collection::CreateAndOpen(collection_name, index_directory_, schema, |
62 | concurrency_, thread_pool_.get(), |
63 | read_options, &collection); |
64 | CHECK_RETURN_WITH_LOG(ret, 0, |
65 | "Create and open new collection failed. collection[%s]" , |
66 | collection_name.c_str()); |
67 | |
68 | collections_.emplace(collection_name, collection); |
69 | LOG_INFO("Create new collection success. collection[%s]" , |
70 | collection_name.c_str()); |
71 | |
72 | return 0; |
73 | } |
74 | |
75 | int IndexService::update_collection(const std::string &collection_name, |
76 | const meta::CollectionMetaPtr &new_schema) { |
77 | CHECK_STATUS(status_, STARTED); |
78 | |
79 | if (!this->has_collection(collection_name)) { |
80 | LOG_ERROR("Collection not exist, update failed. collection[%s]" , |
81 | collection_name.c_str()); |
82 | return ErrorCode_InexistentCollection; |
83 | } |
84 | |
85 | return collections_.get(collection_name)->update_schema(new_schema); |
86 | } |
87 | |
88 | bool IndexService::has_collection(const std::string &collection_name) { |
89 | CHECK_STATUS(status_, STARTED); |
90 | |
91 | return collections_.has(collection_name); |
92 | } |
93 | |
94 | int IndexService::load_collections( |
95 | const std::vector<std::string> &collection_names, |
96 | const std::vector<meta::CollectionMetaPtr> &schemas) { |
97 | CHECK_STATUS(status_, STARTED); |
98 | |
99 | ReadOptions read_options; |
100 | read_options.use_mmap = use_mmap_read_; |
101 | read_options.create_new = false; |
102 | |
103 | int ret = 0; |
104 | for (size_t i = 0; i < collection_names.size() && i < schemas.size(); i++) { |
105 | CollectionPtr collection; |
106 | ret = Collection::CreateAndOpen( |
107 | collection_names[i], index_directory_, schemas[i], concurrency_, |
108 | thread_pool_.get(), read_options, &collection); |
109 | CHECK_RETURN_WITH_LOG(ret, 0, "Load collection failed. collection[%s]" , |
110 | collection_names[i].c_str()); |
111 | |
112 | collections_.emplace(collection_names[i], collection); |
113 | LOG_INFO("Load collection success. collectoin[%s]" , |
114 | collection_names[i].c_str()); |
115 | } |
116 | |
117 | return 0; |
118 | } |
119 | |
120 | int IndexService::drop_collection(const std::string &collection_name) { |
121 | CHECK_STATUS(status_, STARTED); |
122 | |
123 | if (!this->has_collection(collection_name)) { |
124 | LOG_ERROR("Collection not exist, drop failed. collection[%s]" , |
125 | collection_name.c_str()); |
126 | return ErrorCode_InexistentCollection; |
127 | } |
128 | |
129 | collections_.get(collection_name)->close_and_cleanup(); |
130 | collections_.erase(collection_name); |
131 | |
132 | LOG_INFO("Drop collection success. collection[%s]" , collection_name.c_str()); |
133 | return 0; |
134 | } |
135 | |
136 | int IndexService::list_collections(std::vector<std::string> *collection_names) { |
137 | CHECK_STATUS(status_, STARTED); |
138 | |
139 | for (auto &it : collections_) { |
140 | collection_names->emplace_back(it.first); |
141 | } |
142 | return 0; |
143 | } |
144 | |
145 | int IndexService::get_collection_stats(const std::string &collection_name, |
146 | CollectionStats *collection_stats) { |
147 | CHECK_STATUS(status_, STARTED); |
148 | |
149 | if (!this->has_collection(collection_name)) { |
150 | LOG_ERROR("Collection not exist, get statistics failed. collection[%s]" , |
151 | collection_name.c_str()); |
152 | return ErrorCode_InexistentCollection; |
153 | } |
154 | |
155 | return collections_.get(collection_name)->get_stats(collection_stats); |
156 | } |
157 | |
158 | int IndexService::list_segments(const std::string &collection_name, |
159 | std::vector<SegmentPtr> *segments) { |
160 | CHECK_STATUS(status_, STARTED); |
161 | |
162 | if (!this->has_collection(collection_name)) { |
163 | LOG_ERROR("Collection not exist, list segments failed. collection[%s]" , |
164 | collection_name.c_str()); |
165 | return ErrorCode_InexistentCollection; |
166 | } |
167 | |
168 | return collections_.get(collection_name)->get_segments(segments); |
169 | } |
170 | |
171 | int IndexService::get_latest_lsn(const std::string &collection_name, |
172 | uint64_t *lsn, std::string *lsn_context) { |
173 | CHECK_STATUS(status_, STARTED); |
174 | |
175 | if (!this->has_collection(collection_name)) { |
176 | LOG_ERROR("Collection not exist, get latest lsn failed. collection[%s]" , |
177 | collection_name.c_str()); |
178 | return ErrorCode_InexistentCollection; |
179 | } |
180 | |
181 | return collections_.get(collection_name)->get_latest_lsn(lsn, lsn_context); |
182 | } |
183 | |
184 | int IndexService::write_records(const std::string &collection_name, |
185 | const CollectionDatasetPtr &records) { |
186 | CHECK_STATUS(status_, STARTED); |
187 | |
188 | if (!this->has_collection(collection_name)) { |
189 | LOG_ERROR("Collection not exist, write records failed. collection[%s]" , |
190 | collection_name.c_str()); |
191 | return ErrorCode_InexistentCollection; |
192 | } |
193 | |
194 | return collections_.get(collection_name)->write_records(*records); |
195 | } |
196 | |
197 | int IndexService::init_impl() { |
198 | if (!load_config()) { |
199 | LOG_ERROR("Load config failed." ); |
200 | return ErrorCode_LoadConfig; |
201 | } |
202 | |
203 | thread_pool_ = std::make_shared<ThreadPool>(thread_count_, false); |
204 | if (!thread_pool_) { |
205 | LOG_ERROR("Create thread pool failed." ); |
206 | return ErrorCode_RuntimeError; |
207 | } |
208 | |
209 | LOG_INFO("IndexService initialize complete." ); |
210 | return 0; |
211 | } |
212 | |
213 | int IndexService::cleanup_impl() { |
214 | thread_count_ = 0U; |
215 | index_directory_ = "" ; |
216 | flush_internal_ = 0U; |
217 | concurrency_ = 0U; |
218 | use_mmap_read_ = false; |
219 | |
220 | LOG_INFO("IndexService cleanup complete." ); |
221 | return 0; |
222 | } |
223 | |
224 | int IndexService::start_impl() { |
225 | if (flush_internal_ > 0U) { |
226 | thread_pool_->submit( |
227 | ailego::Closure::New(this, &IndexService::do_routine_flush)); |
228 | } |
229 | |
230 | if (optimize_internal_ > 0U) { |
231 | thread_pool_->submit( |
232 | ailego::Closure::New(this, &IndexService::do_routine_optimize)); |
233 | } |
234 | |
235 | LOG_INFO("IndexService start complete." ); |
236 | return 0; |
237 | } |
238 | |
239 | int IndexService::stop_impl() { |
240 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
241 | |
242 | flush_flag_ = false; |
243 | flush_notifier_.notify(); |
244 | |
245 | optimize_flag_ = false; |
246 | optimize_notifier_.notify(); |
247 | |
248 | thread_pool_->stop(); |
249 | |
250 | for (auto &it : collections_) { |
251 | it.second->close(); |
252 | } |
253 | collections_.clear(); |
254 | |
255 | LOG_INFO("IndexService stopped." ); |
256 | return 0; |
257 | } |
258 | |
259 | bool IndexService::load_config() { |
260 | auto &config = Config::Instance(); |
261 | thread_count_ = config.get_index_dump_thread_count(); |
262 | index_directory_ = config.get_index_directory(); |
263 | flush_internal_ = config.get_index_flush_internal(); |
264 | optimize_internal_ = config.get_index_optimize_internal(); |
265 | concurrency_ = |
266 | config.get_index_build_thread_count() + config.get_query_thread_count(); |
267 | |
268 | use_mmap_read_ = true; |
269 | return true; |
270 | } |
271 | |
272 | void IndexService::do_routine_flush() { |
273 | flush_flag_ = true; |
274 | |
275 | while (true) { |
276 | if (!flush_flag_) { |
277 | LOG_INFO("Exited flush thread" ); |
278 | break; |
279 | } |
280 | |
281 | for (auto it : collections_) { |
282 | it.second->flush(); |
283 | } |
284 | |
285 | flush_notifier_.wait_for(std::chrono::seconds(flush_internal_)); |
286 | } |
287 | } |
288 | |
289 | void IndexService::do_routine_optimize() { |
290 | optimize_flag_ = true; |
291 | |
292 | while (true) { |
293 | if (!optimize_flag_) { |
294 | LOG_INFO("Exited optimize thread" ); |
295 | break; |
296 | } |
297 | |
298 | for (auto it : collections_) { |
299 | it.second->optimize(thread_pool_); |
300 | } |
301 | |
302 | optimize_notifier_.wait_for(std::chrono::seconds(optimize_internal_)); |
303 | } |
304 | } |
305 | |
306 | |
307 | } // end namespace index |
308 | } // namespace be |
309 | } // end namespace proxima |
310 | |