1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implemention of index service
19 */
20
21#include "index_service.h"
22#include "common/error_code.h"
23
24namespace proxima {
25namespace be {
26namespace index {
27
28IndexService::~IndexService() {
29 if (status_ == STARTED) {
30 this->stop_impl();
31 }
32 this->cleanup_impl();
33}
34
35int IndexService::create_collection(const std::string &collection_name,
36 const meta::CollectionMetaPtr &schema) {
37 CHECK_STATUS(status_, STARTED);
38
39 if (this->has_collection(collection_name)) {
40 LOG_ERROR("Collection already exists, create failed. collection[%s]",
41 collection_name.c_str());
42 return ErrorCode_DuplicateCollection;
43 }
44
45 ReadOptions read_options;
46 read_options.use_mmap = use_mmap_read_;
47
48 /// Notice we add a check action here:
49 /// a. the collection index file exists, then we just load it.
50 /// b. the collection index not exist, then we just create new one.
51 std::string collection_path = index_directory_ + "/" + collection_name;
52 std::string manifest_file_path =
53 FileHelper::MakeFilePath(collection_path, FileID::MANIFEST_FILE);
54 if (FileHelper::FileExists(manifest_file_path)) {
55 read_options.create_new = false;
56 } else {
57 read_options.create_new = true;
58 }
59
60 CollectionPtr collection;
61 int ret = Collection::CreateAndOpen(collection_name, index_directory_, schema,
62 concurrency_, thread_pool_.get(),
63 read_options, &collection);
64 CHECK_RETURN_WITH_LOG(ret, 0,
65 "Create and open new collection failed. collection[%s]",
66 collection_name.c_str());
67
68 collections_.emplace(collection_name, collection);
69 LOG_INFO("Create new collection success. collection[%s]",
70 collection_name.c_str());
71
72 return 0;
73}
74
75int IndexService::update_collection(const std::string &collection_name,
76 const meta::CollectionMetaPtr &new_schema) {
77 CHECK_STATUS(status_, STARTED);
78
79 if (!this->has_collection(collection_name)) {
80 LOG_ERROR("Collection not exist, update failed. collection[%s]",
81 collection_name.c_str());
82 return ErrorCode_InexistentCollection;
83 }
84
85 return collections_.get(collection_name)->update_schema(new_schema);
86}
87
88bool IndexService::has_collection(const std::string &collection_name) {
89 CHECK_STATUS(status_, STARTED);
90
91 return collections_.has(collection_name);
92}
93
94int IndexService::load_collections(
95 const std::vector<std::string> &collection_names,
96 const std::vector<meta::CollectionMetaPtr> &schemas) {
97 CHECK_STATUS(status_, STARTED);
98
99 ReadOptions read_options;
100 read_options.use_mmap = use_mmap_read_;
101 read_options.create_new = false;
102
103 int ret = 0;
104 for (size_t i = 0; i < collection_names.size() && i < schemas.size(); i++) {
105 CollectionPtr collection;
106 ret = Collection::CreateAndOpen(
107 collection_names[i], index_directory_, schemas[i], concurrency_,
108 thread_pool_.get(), read_options, &collection);
109 CHECK_RETURN_WITH_LOG(ret, 0, "Load collection failed. collection[%s]",
110 collection_names[i].c_str());
111
112 collections_.emplace(collection_names[i], collection);
113 LOG_INFO("Load collection success. collectoin[%s]",
114 collection_names[i].c_str());
115 }
116
117 return 0;
118}
119
120int IndexService::drop_collection(const std::string &collection_name) {
121 CHECK_STATUS(status_, STARTED);
122
123 if (!this->has_collection(collection_name)) {
124 LOG_ERROR("Collection not exist, drop failed. collection[%s]",
125 collection_name.c_str());
126 return ErrorCode_InexistentCollection;
127 }
128
129 collections_.get(collection_name)->close_and_cleanup();
130 collections_.erase(collection_name);
131
132 LOG_INFO("Drop collection success. collection[%s]", collection_name.c_str());
133 return 0;
134}
135
136int IndexService::list_collections(std::vector<std::string> *collection_names) {
137 CHECK_STATUS(status_, STARTED);
138
139 for (auto &it : collections_) {
140 collection_names->emplace_back(it.first);
141 }
142 return 0;
143}
144
145int IndexService::get_collection_stats(const std::string &collection_name,
146 CollectionStats *collection_stats) {
147 CHECK_STATUS(status_, STARTED);
148
149 if (!this->has_collection(collection_name)) {
150 LOG_ERROR("Collection not exist, get statistics failed. collection[%s]",
151 collection_name.c_str());
152 return ErrorCode_InexistentCollection;
153 }
154
155 return collections_.get(collection_name)->get_stats(collection_stats);
156}
157
158int IndexService::list_segments(const std::string &collection_name,
159 std::vector<SegmentPtr> *segments) {
160 CHECK_STATUS(status_, STARTED);
161
162 if (!this->has_collection(collection_name)) {
163 LOG_ERROR("Collection not exist, list segments failed. collection[%s]",
164 collection_name.c_str());
165 return ErrorCode_InexistentCollection;
166 }
167
168 return collections_.get(collection_name)->get_segments(segments);
169}
170
171int IndexService::get_latest_lsn(const std::string &collection_name,
172 uint64_t *lsn, std::string *lsn_context) {
173 CHECK_STATUS(status_, STARTED);
174
175 if (!this->has_collection(collection_name)) {
176 LOG_ERROR("Collection not exist, get latest lsn failed. collection[%s]",
177 collection_name.c_str());
178 return ErrorCode_InexistentCollection;
179 }
180
181 return collections_.get(collection_name)->get_latest_lsn(lsn, lsn_context);
182}
183
184int IndexService::write_records(const std::string &collection_name,
185 const CollectionDatasetPtr &records) {
186 CHECK_STATUS(status_, STARTED);
187
188 if (!this->has_collection(collection_name)) {
189 LOG_ERROR("Collection not exist, write records failed. collection[%s]",
190 collection_name.c_str());
191 return ErrorCode_InexistentCollection;
192 }
193
194 return collections_.get(collection_name)->write_records(*records);
195}
196
197int IndexService::init_impl() {
198 if (!load_config()) {
199 LOG_ERROR("Load config failed.");
200 return ErrorCode_LoadConfig;
201 }
202
203 thread_pool_ = std::make_shared<ThreadPool>(thread_count_, false);
204 if (!thread_pool_) {
205 LOG_ERROR("Create thread pool failed.");
206 return ErrorCode_RuntimeError;
207 }
208
209 LOG_INFO("IndexService initialize complete.");
210 return 0;
211}
212
213int IndexService::cleanup_impl() {
214 thread_count_ = 0U;
215 index_directory_ = "";
216 flush_internal_ = 0U;
217 concurrency_ = 0U;
218 use_mmap_read_ = false;
219
220 LOG_INFO("IndexService cleanup complete.");
221 return 0;
222}
223
224int IndexService::start_impl() {
225 if (flush_internal_ > 0U) {
226 thread_pool_->submit(
227 ailego::Closure::New(this, &IndexService::do_routine_flush));
228 }
229
230 if (optimize_internal_ > 0U) {
231 thread_pool_->submit(
232 ailego::Closure::New(this, &IndexService::do_routine_optimize));
233 }
234
235 LOG_INFO("IndexService start complete.");
236 return 0;
237}
238
239int IndexService::stop_impl() {
240 std::this_thread::sleep_for(std::chrono::milliseconds(100));
241
242 flush_flag_ = false;
243 flush_notifier_.notify();
244
245 optimize_flag_ = false;
246 optimize_notifier_.notify();
247
248 thread_pool_->stop();
249
250 for (auto &it : collections_) {
251 it.second->close();
252 }
253 collections_.clear();
254
255 LOG_INFO("IndexService stopped.");
256 return 0;
257}
258
259bool IndexService::load_config() {
260 auto &config = Config::Instance();
261 thread_count_ = config.get_index_dump_thread_count();
262 index_directory_ = config.get_index_directory();
263 flush_internal_ = config.get_index_flush_internal();
264 optimize_internal_ = config.get_index_optimize_internal();
265 concurrency_ =
266 config.get_index_build_thread_count() + config.get_query_thread_count();
267
268 use_mmap_read_ = true;
269 return true;
270}
271
272void IndexService::do_routine_flush() {
273 flush_flag_ = true;
274
275 while (true) {
276 if (!flush_flag_) {
277 LOG_INFO("Exited flush thread");
278 break;
279 }
280
281 for (auto it : collections_) {
282 it.second->flush();
283 }
284
285 flush_notifier_.wait_for(std::chrono::seconds(flush_internal_));
286 }
287}
288
289void IndexService::do_routine_optimize() {
290 optimize_flag_ = true;
291
292 while (true) {
293 if (!optimize_flag_) {
294 LOG_INFO("Exited optimize thread");
295 break;
296 }
297
298 for (auto it : collections_) {
299 it.second->optimize(thread_pool_);
300 }
301
302 optimize_notifier_.wait_for(std::chrono::seconds(optimize_internal_));
303 }
304}
305
306
307} // end namespace index
308} // namespace be
309} // end namespace proxima
310