1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Oct 2020
18 * \brief Implementation of collection manager
19 */
20
21#include "collection_manager.h"
22#include <chrono>
23#include <random>
24#include <thread>
25#include <ailego/utility/string_helper.h>
26#include "binlog/mysql_handler.h"
27#include "repository/repository_common/config.h"
28#include "repository/repository_common/error_code.h"
29#include "repository/repository_common/version.h"
30
31namespace proxima {
32namespace be {
33namespace repository {
34
35//! Create collection
36int CollectionManager::create_collection(const CollectionInfo &info) {
37 const std::string &collection_name = info.config().collection_name();
38 const std::string &uuid = info.uuid();
39
40 if (collections_.find(uuid) != collections_.end()) {
41 LOG_ERROR("Create collection failed. uuid[%s]", uuid.c_str());
42 return ErrorCode_DuplicateCollection;
43 }
44
45 LOG_INFO("Start to create a new collection. name[%s], uuid[%s]",
46 collection_name.c_str(), uuid.c_str());
47
48 CollectionPtr current_collection = collection_creator_->create(info);
49 if (!current_collection) {
50 LOG_ERROR("Create Mysql collection object failed");
51 return ErrorCode_RuntimeError;
52 }
53
54 int ret = current_collection->init();
55 if (ret != 0) {
56 LOG_ERROR("Init collection failed. name[%s]", collection_name.c_str());
57 return ret;
58 }
59
60 collections_[uuid] = current_collection;
61 uuid_name_map_[uuid] = collection_name;
62 current_collection->run();
63 LOG_INFO("Create a new collection successfully. name[%s], uuid[%s]",
64 collection_name.c_str(), uuid.c_str());
65 return 0;
66}
67
68//! Update collection
69int CollectionManager::update_collection(const std::string &uuid) {
70 auto it = collections_.find(uuid);
71 if (it == collections_.end()) {
72 LOG_ERROR("Can't update not exist collection. uuid[%s]", uuid.c_str());
73 return ErrorCode_CollectionNotExist;
74 }
75 it->second->update();
76 return 0;
77}
78
79//! Drop collection
80int CollectionManager::drop_collection(const std::string &uuid) {
81 auto it = collections_.find(uuid);
82 if (it == collections_.end()) {
83 LOG_ERROR("Can't drop not exist collection. uuid[%s]", uuid.c_str());
84 return ErrorCode_CollectionNotExist;
85 }
86 it->second->drop();
87 return 0;
88}
89
90void CollectionManager::clean_invalid_collections() {
91 std::vector<std::string> finished_collections;
92 for (auto &collection : collections_) {
93 if (collection.second->finished()) {
94 collection.second->stop();
95 finished_collections.emplace_back(collection.first);
96 }
97 }
98 for (auto &finished_collection : finished_collections) {
99 LOG_INFO("Clean invalid collection. uuid[%s], name[%s]",
100 finished_collection.c_str(),
101 uuid_name_map_[finished_collection].c_str());
102 collections_.erase(finished_collection);
103 uuid_name_map_.erase(finished_collection);
104 }
105}
106
107void CollectionManager::stop_collections() {
108 for (auto &collection : collections_) {
109 LOG_INFO("Stopping Collection. uuid[%s], name[%s]",
110 collection.first.c_str(),
111 uuid_name_map_[collection.first].c_str());
112 collection.second->stop();
113 }
114}
115
116void CollectionManager::load_config() {
117 index_server_uri_ = repository::Config::Instance().get_index_agent_uri();
118 max_retry_ = repository::Config::Instance().get_max_retry();
119 timeout_ms_ = repository::Config::Instance().get_timeout_ms();
120 repository_name_ = repository::Config::Instance().get_repository_name();
121 load_balance_ = repository::Config::Instance().get_load_balance();
122}
123
124int CollectionManager::cleanup() {
125 std::unique_lock<std::mutex> ul(mutex_);
126 stop_collections();
127 collections_.clear();
128 return 0;
129}
130
131int CollectionManager::stop() {
132 LOG_INFO("Stopping Collection Manager.");
133 stop_.store(true);
134 cleanup();
135 return 0;
136}
137
138int CollectionManager::get_all_collections(
139 std::vector<CollectionInfo> *collections) {
140 proto::ListCondition request;
141 proto::ListCollectionsResponse response;
142 proto::ProximaService_Stub stub(&channel_);
143 brpc::Controller cntl;
144
145 request.set_repository_name(repository_name_);
146
147 stub.list_collections(&cntl, &request, &response, NULL);
148 if (cntl.Failed()) {
149 LOG_ERROR("list_collections rpc failed. reason[%s]",
150 cntl.ErrorText().c_str());
151 return ErrorCode_RPCFailed;
152 }
153 // Check status
154 if (response.status().code() != 0) {
155 LOG_ERROR("Failed to get all collections. reason[%s]",
156 response.status().reason().c_str());
157 return response.status().code();
158 }
159
160 // auto &entity = response.entity();
161 collections->insert(collections->end(), response.collections().begin(),
162 response.collections().end());
163 return 0;
164}
165
166void CollectionManager::create_collections(
167 const std::vector<CollectionInfo> &infos) {
168 for (auto &info : infos) {
169 int ret = create_collection(info);
170 if (ret != 0) {
171 // one collection error, just print error message and continue
172 LOG_ERROR(
173 "Failed to create collection: name[%s], uuid[%s], code[%d], msg[%s]",
174 info.config().collection_name().c_str(), info.uuid().c_str(), ret,
175 ErrorCode::What(ret));
176 }
177 }
178}
179
180void CollectionManager::update_collections(
181 const std::vector<std::string> &uuids) {
182 for (auto &uuid : uuids) {
183 int ret = update_collection(uuid);
184 if (ret != 0) {
185 // one collection error, just print error message and continue
186 LOG_ERROR("Failed to update collection: uuid[%s], code[%d], msg[%s]",
187 uuid.c_str(), ret, ErrorCode::What(ret));
188 }
189 }
190}
191
192void CollectionManager::drop_collections(
193 const std::vector<std::string> &uuids) {
194 for (auto &uuid : uuids) {
195 int ret = drop_collection(uuid);
196 if (ret != 0) {
197 // one collection error, just print error message and continue
198 LOG_ERROR("Failed to drop collection: uuid[%s], code[%d], msg[%s]",
199 uuid.c_str(), ret, ErrorCode::What(ret));
200 }
201 }
202}
203
204bool CollectionManager::is_old_collection(const std::string &uuid,
205 uint32_t new_schema_revision) const {
206 auto it = collections_.find(uuid);
207 if (it == collections_.end()) {
208 return false;
209 }
210 CollectionPtr current_collection = it->second;
211 uint32_t schema_revision = current_collection->schema_revision();
212 return schema_revision < new_schema_revision;
213}
214
215void CollectionManager::classify_collections(
216 const std::vector<CollectionInfo> &infos,
217 std::vector<CollectionInfo> *new_collection_infos,
218 std::vector<std::string> *old_collection_uuids,
219 std::vector<std::string> *expired_collection_uuids) const {
220 for (auto &info : infos) {
221 const std::string &uuid = info.uuid();
222 // todo<cdz>: Read schema revision from collection info when support update.
223 // current just send 0;
224 // uint32_t schema_revision = collection.schema_revision();
225 uint32_t schema_revision = 0;
226 if (collections_.find(uuid) == collections_.end()) {
227 new_collection_infos->emplace_back(info);
228 } else if (is_old_collection(uuid, schema_revision)) {
229 old_collection_uuids->emplace_back(uuid);
230 }
231 }
232 for (auto &exist_collection : collections_) {
233 const std::string &uuid = exist_collection.first;
234 if (std::none_of(
235 infos.begin(), infos.end(),
236 [&](const CollectionInfo &info) { return info.uuid() == uuid; })) {
237 expired_collection_uuids->emplace_back(uuid);
238 }
239 }
240}
241
242uint64_t CollectionManager::get_sleep_time() const {
243 std::mt19937 gen((std::random_device())());
244 return (std::uniform_int_distribution<uint64_t>(0, 1000))(gen);
245}
246
247void CollectionManager::filter_collections(
248 const std::vector<CollectionInfo> &collections,
249 std::vector<CollectionInfo> *valid_collections) const {
250 for (auto &collection : collections) {
251 if (collection.status() == proto::CollectionInfo::CS_SERVING) {
252 valid_collections->push_back(collection);
253 }
254 }
255}
256
257//! Deal collections periodicity
258void CollectionManager::start() {
259 LOG_INFO("Start Collection Manager.");
260 std::vector<CollectionInfo> collection_infos;
261 std::vector<CollectionInfo> valid_collection_infos;
262 std::vector<CollectionInfo> new_collection_infos;
263 std::vector<std::string> old_collection_uuids;
264 std::vector<std::string> expired_collection_uuids;
265 while (true) {
266 collection_infos.clear();
267 valid_collection_infos.clear();
268 new_collection_infos.clear();
269 old_collection_uuids.clear();
270 expired_collection_uuids.clear();
271 int ret = get_all_collections(&collection_infos);
272 if (ret != 0) {
273 // Retry
274 uint64_t sleep_time = get_sleep_time();
275 std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
276 if (stop_.load()) {
277 return;
278 }
279 continue;
280 }
281 filter_collections(collection_infos, &valid_collection_infos);
282 classify_collections(valid_collection_infos, &new_collection_infos,
283 &old_collection_uuids, &expired_collection_uuids);
284 std::unique_lock<std::mutex> ul(mutex_);
285 if (stop_.load()) {
286 return;
287 }
288 create_collections(new_collection_infos);
289 update_collections(old_collection_uuids);
290 drop_collections(expired_collection_uuids);
291 clean_invalid_collections();
292 ul.unlock();
293 std::this_thread::sleep_for(std::chrono::seconds(UPDATE_INTERVAL));
294 }
295}
296
297//! Init Collection Manager
298int CollectionManager::init() {
299 // Load configurations
300 load_config();
301 options_ = brpc::ChannelOptions();
302 options_.max_retry = max_retry_;
303 options_.timeout_ms = timeout_ms_;
304
305 repository_name_ = Config::Instance().get_repository_name();
306 if (repository_name_.empty()) {
307 LOG_ERROR("Repository name is empty.");
308 return ErrorCode_ConfigError;
309 }
310
311 int ret = channel_.Init(index_server_uri_.c_str(), load_balance_.c_str(),
312 &options_);
313 if (ret != 0) {
314 LOG_ERROR("Failed to initialize channel. uri[%s]",
315 index_server_uri_.c_str());
316 return ErrorCode_InitChannel;
317 }
318
319 // Check Proxima BE version
320 ret = check_server_version();
321 if (ret != 0) {
322 LOG_ERROR("Check Proxima BE server version failed.");
323 return ret;
324 }
325
326 return 0;
327}
328
329int CollectionManager::check_server_version() {
330 proto::ProximaService_Stub stub(&channel_);
331 brpc::Controller cntl;
332 proto::GetVersionRequest request;
333 proto::GetVersionResponse response;
334
335 stub.get_version(&cntl, &request, &response, nullptr);
336 if (cntl.Failed()) {
337 LOG_ERROR("Get Proxima BE version rpc failed. reason[%s]",
338 cntl.ErrorText().c_str());
339 return ErrorCode_RPCFailed;
340 }
341
342 if (response.status().code() != 0) {
343 LOG_ERROR("Get Proxima BE version failed. reason[%s]",
344 response.status().reason().c_str());
345 return response.status().code();
346 }
347
348 std::string server_version = response.version();
349 std::string client_version = Version::String();
350 LOG_INFO("server_version: %s", server_version.c_str());
351 LOG_INFO("mysql_repository_version: %s", Version::String());
352 if (server_version == client_version) {
353 return 0;
354 }
355
356 // TODO @Dianzhang.Chen
357 // Temporarily we just use first two seq number of version string to compare
358 // For exp: version[0.1.2] match version[0.1.3] with "0.1"
359 std::vector<std::string> server_sub_seqs;
360 ailego::StringHelper::Split(server_version, '.', &server_sub_seqs);
361 std::vector<std::string> client_sub_seqs;
362 ailego::StringHelper::Split(client_version, '.', &client_sub_seqs);
363
364 int compare_count = 2;
365 for (int i = 0; i < compare_count; i++) {
366 if (client_sub_seqs[i] != server_sub_seqs[i]) {
367 LOG_ERROR("Mysql repository version: %s not match server version: %s",
368 Version::String(), server_version.c_str());
369 return ErrorCode_MismatchedVersion;
370 }
371 }
372
373 return 0;
374}
375
376} // end namespace repository
377} // namespace be
378} // end namespace proxima