1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Dianzhang.Chen |
17 | * \date Oct 2020 |
18 | * \brief Implementation of collection manager |
19 | */ |
20 | |
21 | #include "collection_manager.h" |
22 | #include <chrono> |
23 | #include <random> |
24 | #include <thread> |
25 | #include <ailego/utility/string_helper.h> |
26 | #include "binlog/mysql_handler.h" |
27 | #include "repository/repository_common/config.h" |
28 | #include "repository/repository_common/error_code.h" |
29 | #include "repository/repository_common/version.h" |
30 | |
31 | namespace proxima { |
32 | namespace be { |
33 | namespace repository { |
34 | |
35 | //! Create collection |
36 | int CollectionManager::create_collection(const CollectionInfo &info) { |
37 | const std::string &collection_name = info.config().collection_name(); |
38 | const std::string &uuid = info.uuid(); |
39 | |
40 | if (collections_.find(uuid) != collections_.end()) { |
41 | LOG_ERROR("Create collection failed. uuid[%s]" , uuid.c_str()); |
42 | return ErrorCode_DuplicateCollection; |
43 | } |
44 | |
45 | LOG_INFO("Start to create a new collection. name[%s], uuid[%s]" , |
46 | collection_name.c_str(), uuid.c_str()); |
47 | |
48 | CollectionPtr current_collection = collection_creator_->create(info); |
49 | if (!current_collection) { |
50 | LOG_ERROR("Create Mysql collection object failed" ); |
51 | return ErrorCode_RuntimeError; |
52 | } |
53 | |
54 | int ret = current_collection->init(); |
55 | if (ret != 0) { |
56 | LOG_ERROR("Init collection failed. name[%s]" , collection_name.c_str()); |
57 | return ret; |
58 | } |
59 | |
60 | collections_[uuid] = current_collection; |
61 | uuid_name_map_[uuid] = collection_name; |
62 | current_collection->run(); |
63 | LOG_INFO("Create a new collection successfully. name[%s], uuid[%s]" , |
64 | collection_name.c_str(), uuid.c_str()); |
65 | return 0; |
66 | } |
67 | |
68 | //! Update collection |
69 | int CollectionManager::update_collection(const std::string &uuid) { |
70 | auto it = collections_.find(uuid); |
71 | if (it == collections_.end()) { |
72 | LOG_ERROR("Can't update not exist collection. uuid[%s]" , uuid.c_str()); |
73 | return ErrorCode_CollectionNotExist; |
74 | } |
75 | it->second->update(); |
76 | return 0; |
77 | } |
78 | |
79 | //! Drop collection |
80 | int CollectionManager::drop_collection(const std::string &uuid) { |
81 | auto it = collections_.find(uuid); |
82 | if (it == collections_.end()) { |
83 | LOG_ERROR("Can't drop not exist collection. uuid[%s]" , uuid.c_str()); |
84 | return ErrorCode_CollectionNotExist; |
85 | } |
86 | it->second->drop(); |
87 | return 0; |
88 | } |
89 | |
90 | void CollectionManager::clean_invalid_collections() { |
91 | std::vector<std::string> finished_collections; |
92 | for (auto &collection : collections_) { |
93 | if (collection.second->finished()) { |
94 | collection.second->stop(); |
95 | finished_collections.emplace_back(collection.first); |
96 | } |
97 | } |
98 | for (auto &finished_collection : finished_collections) { |
99 | LOG_INFO("Clean invalid collection. uuid[%s], name[%s]" , |
100 | finished_collection.c_str(), |
101 | uuid_name_map_[finished_collection].c_str()); |
102 | collections_.erase(finished_collection); |
103 | uuid_name_map_.erase(finished_collection); |
104 | } |
105 | } |
106 | |
107 | void CollectionManager::stop_collections() { |
108 | for (auto &collection : collections_) { |
109 | LOG_INFO("Stopping Collection. uuid[%s], name[%s]" , |
110 | collection.first.c_str(), |
111 | uuid_name_map_[collection.first].c_str()); |
112 | collection.second->stop(); |
113 | } |
114 | } |
115 | |
116 | void CollectionManager::load_config() { |
117 | index_server_uri_ = repository::Config::Instance().get_index_agent_uri(); |
118 | max_retry_ = repository::Config::Instance().get_max_retry(); |
119 | timeout_ms_ = repository::Config::Instance().get_timeout_ms(); |
120 | repository_name_ = repository::Config::Instance().get_repository_name(); |
121 | load_balance_ = repository::Config::Instance().get_load_balance(); |
122 | } |
123 | |
124 | int CollectionManager::cleanup() { |
125 | std::unique_lock<std::mutex> ul(mutex_); |
126 | stop_collections(); |
127 | collections_.clear(); |
128 | return 0; |
129 | } |
130 | |
131 | int CollectionManager::stop() { |
132 | LOG_INFO("Stopping Collection Manager." ); |
133 | stop_.store(true); |
134 | cleanup(); |
135 | return 0; |
136 | } |
137 | |
138 | int CollectionManager::get_all_collections( |
139 | std::vector<CollectionInfo> *collections) { |
140 | proto::ListCondition request; |
141 | proto::ListCollectionsResponse response; |
142 | proto::ProximaService_Stub stub(&channel_); |
143 | brpc::Controller cntl; |
144 | |
145 | request.set_repository_name(repository_name_); |
146 | |
147 | stub.list_collections(&cntl, &request, &response, NULL); |
148 | if (cntl.Failed()) { |
149 | LOG_ERROR("list_collections rpc failed. reason[%s]" , |
150 | cntl.ErrorText().c_str()); |
151 | return ErrorCode_RPCFailed; |
152 | } |
153 | // Check status |
154 | if (response.status().code() != 0) { |
155 | LOG_ERROR("Failed to get all collections. reason[%s]" , |
156 | response.status().reason().c_str()); |
157 | return response.status().code(); |
158 | } |
159 | |
160 | // auto &entity = response.entity(); |
161 | collections->insert(collections->end(), response.collections().begin(), |
162 | response.collections().end()); |
163 | return 0; |
164 | } |
165 | |
166 | void CollectionManager::create_collections( |
167 | const std::vector<CollectionInfo> &infos) { |
168 | for (auto &info : infos) { |
169 | int ret = create_collection(info); |
170 | if (ret != 0) { |
171 | // one collection error, just print error message and continue |
172 | LOG_ERROR( |
173 | "Failed to create collection: name[%s], uuid[%s], code[%d], msg[%s]" , |
174 | info.config().collection_name().c_str(), info.uuid().c_str(), ret, |
175 | ErrorCode::What(ret)); |
176 | } |
177 | } |
178 | } |
179 | |
180 | void CollectionManager::update_collections( |
181 | const std::vector<std::string> &uuids) { |
182 | for (auto &uuid : uuids) { |
183 | int ret = update_collection(uuid); |
184 | if (ret != 0) { |
185 | // one collection error, just print error message and continue |
186 | LOG_ERROR("Failed to update collection: uuid[%s], code[%d], msg[%s]" , |
187 | uuid.c_str(), ret, ErrorCode::What(ret)); |
188 | } |
189 | } |
190 | } |
191 | |
192 | void CollectionManager::drop_collections( |
193 | const std::vector<std::string> &uuids) { |
194 | for (auto &uuid : uuids) { |
195 | int ret = drop_collection(uuid); |
196 | if (ret != 0) { |
197 | // one collection error, just print error message and continue |
198 | LOG_ERROR("Failed to drop collection: uuid[%s], code[%d], msg[%s]" , |
199 | uuid.c_str(), ret, ErrorCode::What(ret)); |
200 | } |
201 | } |
202 | } |
203 | |
204 | bool CollectionManager::is_old_collection(const std::string &uuid, |
205 | uint32_t new_schema_revision) const { |
206 | auto it = collections_.find(uuid); |
207 | if (it == collections_.end()) { |
208 | return false; |
209 | } |
210 | CollectionPtr current_collection = it->second; |
211 | uint32_t schema_revision = current_collection->schema_revision(); |
212 | return schema_revision < new_schema_revision; |
213 | } |
214 | |
215 | void CollectionManager::classify_collections( |
216 | const std::vector<CollectionInfo> &infos, |
217 | std::vector<CollectionInfo> *new_collection_infos, |
218 | std::vector<std::string> *old_collection_uuids, |
219 | std::vector<std::string> *expired_collection_uuids) const { |
220 | for (auto &info : infos) { |
221 | const std::string &uuid = info.uuid(); |
222 | // todo<cdz>: Read schema revision from collection info when support update. |
223 | // current just send 0; |
224 | // uint32_t schema_revision = collection.schema_revision(); |
225 | uint32_t schema_revision = 0; |
226 | if (collections_.find(uuid) == collections_.end()) { |
227 | new_collection_infos->emplace_back(info); |
228 | } else if (is_old_collection(uuid, schema_revision)) { |
229 | old_collection_uuids->emplace_back(uuid); |
230 | } |
231 | } |
232 | for (auto &exist_collection : collections_) { |
233 | const std::string &uuid = exist_collection.first; |
234 | if (std::none_of( |
235 | infos.begin(), infos.end(), |
236 | [&](const CollectionInfo &info) { return info.uuid() == uuid; })) { |
237 | expired_collection_uuids->emplace_back(uuid); |
238 | } |
239 | } |
240 | } |
241 | |
242 | uint64_t CollectionManager::get_sleep_time() const { |
243 | std::mt19937 gen((std::random_device())()); |
244 | return (std::uniform_int_distribution<uint64_t>(0, 1000))(gen); |
245 | } |
246 | |
247 | void CollectionManager::filter_collections( |
248 | const std::vector<CollectionInfo> &collections, |
249 | std::vector<CollectionInfo> *valid_collections) const { |
250 | for (auto &collection : collections) { |
251 | if (collection.status() == proto::CollectionInfo::CS_SERVING) { |
252 | valid_collections->push_back(collection); |
253 | } |
254 | } |
255 | } |
256 | |
257 | //! Deal collections periodicity |
258 | void CollectionManager::start() { |
259 | LOG_INFO("Start Collection Manager." ); |
260 | std::vector<CollectionInfo> collection_infos; |
261 | std::vector<CollectionInfo> valid_collection_infos; |
262 | std::vector<CollectionInfo> new_collection_infos; |
263 | std::vector<std::string> old_collection_uuids; |
264 | std::vector<std::string> expired_collection_uuids; |
265 | while (true) { |
266 | collection_infos.clear(); |
267 | valid_collection_infos.clear(); |
268 | new_collection_infos.clear(); |
269 | old_collection_uuids.clear(); |
270 | expired_collection_uuids.clear(); |
271 | int ret = get_all_collections(&collection_infos); |
272 | if (ret != 0) { |
273 | // Retry |
274 | uint64_t sleep_time = get_sleep_time(); |
275 | std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); |
276 | if (stop_.load()) { |
277 | return; |
278 | } |
279 | continue; |
280 | } |
281 | filter_collections(collection_infos, &valid_collection_infos); |
282 | classify_collections(valid_collection_infos, &new_collection_infos, |
283 | &old_collection_uuids, &expired_collection_uuids); |
284 | std::unique_lock<std::mutex> ul(mutex_); |
285 | if (stop_.load()) { |
286 | return; |
287 | } |
288 | create_collections(new_collection_infos); |
289 | update_collections(old_collection_uuids); |
290 | drop_collections(expired_collection_uuids); |
291 | clean_invalid_collections(); |
292 | ul.unlock(); |
293 | std::this_thread::sleep_for(std::chrono::seconds(UPDATE_INTERVAL)); |
294 | } |
295 | } |
296 | |
297 | //! Init Collection Manager |
298 | int CollectionManager::init() { |
299 | // Load configurations |
300 | load_config(); |
301 | options_ = brpc::ChannelOptions(); |
302 | options_.max_retry = max_retry_; |
303 | options_.timeout_ms = timeout_ms_; |
304 | |
305 | repository_name_ = Config::Instance().get_repository_name(); |
306 | if (repository_name_.empty()) { |
307 | LOG_ERROR("Repository name is empty." ); |
308 | return ErrorCode_ConfigError; |
309 | } |
310 | |
311 | int ret = channel_.Init(index_server_uri_.c_str(), load_balance_.c_str(), |
312 | &options_); |
313 | if (ret != 0) { |
314 | LOG_ERROR("Failed to initialize channel. uri[%s]" , |
315 | index_server_uri_.c_str()); |
316 | return ErrorCode_InitChannel; |
317 | } |
318 | |
319 | // Check Proxima BE version |
320 | ret = check_server_version(); |
321 | if (ret != 0) { |
322 | LOG_ERROR("Check Proxima BE server version failed." ); |
323 | return ret; |
324 | } |
325 | |
326 | return 0; |
327 | } |
328 | |
329 | int CollectionManager::check_server_version() { |
330 | proto::ProximaService_Stub stub(&channel_); |
331 | brpc::Controller cntl; |
332 | proto::GetVersionRequest request; |
333 | proto::GetVersionResponse response; |
334 | |
335 | stub.get_version(&cntl, &request, &response, nullptr); |
336 | if (cntl.Failed()) { |
337 | LOG_ERROR("Get Proxima BE version rpc failed. reason[%s]" , |
338 | cntl.ErrorText().c_str()); |
339 | return ErrorCode_RPCFailed; |
340 | } |
341 | |
342 | if (response.status().code() != 0) { |
343 | LOG_ERROR("Get Proxima BE version failed. reason[%s]" , |
344 | response.status().reason().c_str()); |
345 | return response.status().code(); |
346 | } |
347 | |
348 | std::string server_version = response.version(); |
349 | std::string client_version = Version::String(); |
350 | LOG_INFO("server_version: %s" , server_version.c_str()); |
351 | LOG_INFO("mysql_repository_version: %s" , Version::String()); |
352 | if (server_version == client_version) { |
353 | return 0; |
354 | } |
355 | |
356 | // TODO @Dianzhang.Chen |
357 | // Temporarily we just use first two seq number of version string to compare |
358 | // For exp: version[0.1.2] match version[0.1.3] with "0.1" |
359 | std::vector<std::string> server_sub_seqs; |
360 | ailego::StringHelper::Split(server_version, '.', &server_sub_seqs); |
361 | std::vector<std::string> client_sub_seqs; |
362 | ailego::StringHelper::Split(client_version, '.', &client_sub_seqs); |
363 | |
364 | int compare_count = 2; |
365 | for (int i = 0; i < compare_count; i++) { |
366 | if (client_sub_seqs[i] != server_sub_seqs[i]) { |
367 | LOG_ERROR("Mysql repository version: %s not match server version: %s" , |
368 | Version::String(), server_version.c_str()); |
369 | return ErrorCode_MismatchedVersion; |
370 | } |
371 | } |
372 | |
373 | return 0; |
374 | } |
375 | |
376 | } // end namespace repository |
377 | } // namespace be |
378 | } // end namespace proxima |