1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Nov 2020
18 * \brief Implementation of mysql collection
19 */
20
21#include "mysql_collection.h"
22#include <sstream>
23#include <vector>
24#include "repository/repository_common/config.h"
25#include "repository/repository_common/error_code.h"
26#include "lsn_context_format.h"
27
28namespace proxima {
29namespace be {
30namespace repository {
31
32//! Start process
33void MysqlCollection::run() {
34 fetch_thread_ = std::thread(&MysqlCollection::fetch_impl, this);
35 send_thread_ = std::thread(&MysqlCollection::send_impl, this);
36 LOG_INFO("Collection is running. name[%s]",
37 config_.collection_name().c_str());
38}
39
40//! Update Collection
41// todo<cdz>: Current version do not support update collection.
42void MysqlCollection::update() {
43 collection_state_flag_.store(CollectionStateFlag::UPDATE);
44}
45
46//! Drop Collection
47void MysqlCollection::drop() {
48 collection_state_flag_.store(CollectionStateFlag::DROP);
49}
50
51//! Reset Collection back to CONTINUE
52int MysqlCollection::reset_collection() {
53 // Get lsn information
54 int ret = load_lsn_info();
55 if (ret != 0) {
56 LOG_ERROR("Failed to update lsn information.");
57 return ret;
58 }
59
60 // LOG_INFO("Init mysql handler successed");
61 if (lsn_ == 0) {
62 ret = mysql_handler_->get_table_snapshot(&context_.file_name,
63 &context_.position);
64 if (ret != 0) {
65 LOG_ERROR("Failed to get full table snapshot.");
66 return ret;
67 }
68 LOG_INFO("Get table snapshot, file_name: %s position: %zu",
69 context_.file_name.c_str(), (size_t)context_.position);
70 }
71
72 // Reset binlog pull state
73 ret = mysql_handler_->reset_status(pull_state_flag_, config_, context_);
74 if (ret != 0) {
75 LOG_ERROR("Failed to reset mysql handler.");
76 return ret;
77 }
78
79 // Reset collection state
80 clear_fetch_data();
81 reset_send_status();
82 reset_fetch_status();
83 return 0;
84}
85
86uint32_t MysqlCollection::schema_revision() const {
87 // todo<cdz>: Read from collection info when add update support.
88 // return config_.schema_revision();
89 return 0;
90}
91
92bool MysqlCollection::finished() const {
93 return state() == CollectionStatus::FINISHED;
94}
95
96CollectionStateFlag MysqlCollection::get_collection_flag() {
97 CollectionStateFlag status = CollectionStateFlag::UPDATE;
98 // In case update twice
99 bool is_update = collection_state_flag_.compare_exchange_strong(
100 status, CollectionStateFlag::NORMAL);
101 if (is_update) {
102 return CollectionStateFlag::UPDATE;
103 } else {
104 return status;
105 }
106}
107
108void MysqlCollection::clear_fetch_data() {
109 fetch_request_->Clear();
110}
111
112void MysqlCollection::reset_fetch_status() {
113 prepared_data_size_.store(0);
114 reset_.store(true);
115}
116
117void MysqlCollection::reset_send_status() {
118 ready_.store(false);
119}
120
121void MysqlCollection::stop() {
122 const std::string &collection_name = config_.collection_name();
123 state_.store(CollectionStatus::FINISHED);
124 if (fetch_thread_.joinable()) {
125 fetch_thread_.join();
126 }
127 if (send_thread_.joinable()) {
128 send_thread_.join();
129 }
130 LOG_INFO("Stop collection successed. name[%s]", collection_name.c_str());
131}
132
133void MysqlCollection::update_action() {
134 const std::string &collection_name = config_.collection_name();
135 LOG_INFO("Updating collection. name[%s]", collection_name.c_str());
136 std::unique_lock<std::mutex> lg(update_mutex_);
137 while (is_valid()) {
138 int ret = reset_collection();
139 if (ret != 0) {
140 LOG_ERROR("Failed to reset collection. retry ...");
141 std::this_thread::sleep_for(std::chrono::milliseconds(10));
142 continue;
143 }
144 state_.store(CollectionStatus::RUNNING);
145 break;
146 }
147}
148
149void MysqlCollection::process_update() {
150 switch (state_.load()) {
151 case CollectionStatus::INIT:
152 case CollectionStatus::RUNNING:
153 case CollectionStatus::UPDATING:
154 update_action();
155 break;
156 case CollectionStatus::FINISHED:
157 break;
158 }
159}
160
161void MysqlCollection::process_drop() {
162 state_.store(CollectionStatus::FINISHED);
163 LOG_INFO("Drop collection. name[%s]", config_.collection_name().c_str());
164}
165
166void MysqlCollection::wait_update_command() {
167 while (is_valid()) {
168 CollectionStateFlag flag = get_collection_flag();
169 switch (flag) {
170 case CollectionStateFlag::UPDATE:
171 return;
172 case CollectionStateFlag::DROP:
173 process_drop();
174 break;
175 case CollectionStateFlag::NORMAL:
176 std::this_thread::sleep_for(std::chrono::microseconds(10));
177 }
178 }
179}
180
181void MysqlCollection::process_normal() {
182 auto current_state = state_.load();
183 switch (current_state) {
184 case CollectionStatus::INIT:
185 state_.store(CollectionStatus::RUNNING);
186 break;
187 case CollectionStatus::RUNNING:
188 case CollectionStatus::FINISHED:
189 // If is running, or finished. do nothing
190 break;
191 case CollectionStatus::UPDATING:
192 // If current state is updating. wait until get update command and update
193 wait_update_command();
194 process_update();
195 }
196 return;
197}
198
199void MysqlCollection::process_event(const CollectionStateFlag &flag) {
200 switch (flag) {
201 case CollectionStateFlag::UPDATE:
202 process_update();
203 break;
204 case CollectionStateFlag::DROP:
205 process_drop();
206 break;
207 case CollectionStateFlag::NORMAL:
208 process_normal();
209 break;
210 }
211}
212
213CollectionStatus MysqlCollection::state() const {
214 return state_.load();
215}
216
217int MysqlCollection::init_brpc() {
218 brpc::ChannelOptions options;
219 options.max_retry = max_retry_;
220 options.timeout_ms = brpc_timeout_ms_;
221 int ret =
222 channel_.Init(index_server_uri_.c_str(), load_balance_.c_str(), &options);
223 if (ret != 0) {
224 LOG_ERROR("Failed to initialize channel. uri[%s]",
225 index_server_uri_.c_str());
226 return ErrorCode_InitChannel;
227 }
228 stub_.reset(new (std::nothrow) proto::ProximaService_Stub(&channel_));
229 fetch_request_.reset(new (std::nothrow) proto::WriteRequest());
230 send_request_.reset(new (std::nothrow) proto::WriteRequest());
231 reset_.store(true);
232 return 0;
233}
234
235int MysqlCollection::init_mysql_module() {
236 if (mysql_handler_ == nullptr) {
237 LOG_ERROR("Invalid mysql handler");
238 return ErrorCode_InvalidMysqlHandler;
239 }
240
241 int ret = load_lsn_info(false);
242 if (ret != 0) {
243 LOG_ERROR("Failed to load lsn map information");
244 return ret;
245 }
246 LOG_INFO("Load lsn info successed");
247
248 ret = mysql_handler_->init(pull_state_flag_);
249 if (ret != 0) {
250 LOG_ERROR("Failed to init mysql handler");
251 return ret;
252 }
253 LOG_INFO("Init mysql handler successed");
254
255 if (lsn_ == 0) {
256 ret = mysql_handler_->get_table_snapshot(&context_.file_name,
257 &context_.position);
258 if (ret != 0) {
259 LOG_ERROR("Failed to get full table snapshot.");
260 return ret;
261 }
262 LOG_INFO("Get table snapshot successed, file_name: %s position: %zu",
263 context_.file_name.c_str(), (size_t)context_.position);
264 }
265
266 ret = mysql_handler_->start(context_);
267 if (ret != 0) {
268 LOG_ERROR("Failed to start mysql handler.");
269 return ret;
270 }
271
272 return 0;
273}
274
275//! Initialize MySQL Collection
276int MysqlCollection::init() {
277 load_config();
278 //! Init brpc components
279 int ret = init_brpc();
280 if (ret != 0) {
281 LOG_ERROR("Failed to init brpc components");
282 return ret;
283 }
284 LOG_INFO("Init brpc successed");
285
286 //! Init mysql module
287 ret = init_mysql_module();
288 if (ret != 0) {
289 LOG_ERROR("Failed to init mysql module");
290 return ret;
291 }
292 LOG_INFO("Init mysql module successed");
293 LOG_INFO("Init mysql collection successed");
294 return 0;
295}
296
297void MysqlCollection::load_config() {
298 batch_size_ = repository::Config::Instance().get_batch_size();
299 batch_interval_ = repository::Config::Instance().get_batch_interval();
300 index_server_uri_ = repository::Config::Instance().get_index_agent_uri();
301 load_balance_ = repository::Config::Instance().get_load_balance();
302 max_retry_ = repository::Config::Instance().get_max_retry();
303 brpc_timeout_ms_ = repository::Config::Instance().get_timeout_ms();
304}
305
306int MysqlCollection::load_lsn_info(bool is_retry) {
307 proto::CollectionName request;
308 proto::DescribeCollectionResponse response;
309 brpc::Controller cntl;
310 request.set_collection_name(config_.collection_name());
311 while (true) {
312 if (!is_valid()) {
313 return ErrorCode_Terminate;
314 }
315 stub_->describe_collection(&cntl, &request, &response, NULL);
316 if (cntl.Failed()) {
317 LOG_ERROR("Failed to get collection from index agent. msg[%s]",
318 cntl.ErrorText().c_str());
319 if (is_retry) {
320 uint64_t sleep_time = get_sleep_time();
321 std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
322 cntl.Reset();
323 continue;
324 } else {
325 return ErrorCode_RPCFailed;
326 }
327 }
328 break;
329 }
330
331 // Debug
332 LOG_INFO("get_collection response : %s", response.ShortDebugString().c_str());
333
334 // Get lsn
335 auto &info = response.collection();
336 config_ = info.config();
337 agent_timestamp_ = info.magic_number();
338 auto &lsn_context = info.latest_lsn_context();
339 lsn_ = lsn_context.lsn();
340 if (lsn_ == 0) {
341 LOG_INFO("LSN is zero, use initial value.");
342 pull_state_flag_ = ScanMode::FULL;
343 return 0;
344 }
345
346 LsnContextFormat lsn_context_format;
347 int ret = lsn_context_format.parse_from_string(lsn_context.context());
348 // int ret = string_to_context(lsn_context.context(), &lsn_context_format);
349 if (ret != 0) {
350 LOG_ERROR("Parse lsn context from string failed.");
351 return ret;
352 }
353
354 context_.file_name = lsn_context_format.file_name();
355 context_.position = lsn_context_format.position();
356 context_.seq_id = lsn_context_format.seq_id();
357 pull_state_flag_ = lsn_context_format.mode();
358 return 0;
359}
360
361bool MysqlCollection::is_valid() const {
362 if (state_.load() == CollectionStatus::FINISHED ||
363 collection_state_flag_.load() == CollectionStateFlag::DROP) {
364 return false;
365 }
366 return true;
367}
368
369int MysqlCollection::update_request_meta() {
370 if (reset_.load() == true) {
371 start_time_ = ailego::Monotime::MilliSeconds();
372 int ret = reset_request();
373 if (ret != 0) {
374 LOG_ERROR("Reset fetch request failed");
375 return ret;
376 }
377 reset_.store(false);
378 }
379 return 0;
380}
381
382int MysqlCollection::get_request_row() {
383 int ret = 0;
384 LsnContext current_context;
385 auto *next_row = fetch_request_->add_rows();
386 ret = mysql_handler_->get_next_row_data(next_row, &current_context);
387 if (ret != 0) {
388 // Not fatal error, continue
389 LOG_ERROR("Get next row data failed. code[%d], msg[%s]", ret,
390 ErrorCode::What(ret));
391 // Remove the last row
392 fetch_request_->mutable_rows()->RemoveLast();
393 std::this_thread::sleep_for(std::chrono::microseconds(10));
394 return ret;
395 }
396 ret = verify_and_handle(current_context);
397 if (ret != 0) {
398 return ret;
399 }
400 update_lsn_map_info(next_row);
401 return 0;
402}
403
404bool MysqlCollection::must_send(uint64_t start_time) {
405 if (ready_.load() == true) {
406 return true;
407 }
408 uint64_t current_time = ailego::Monotime::MicroSeconds();
409 if ((current_time - start_time) >= (uint64_t)batch_interval_ &&
410 prepared_data_size_.load() != 0) {
411 ready_.store(true);
412 return true;
413 }
414 return false;
415}
416
417//! Start process
418void MysqlCollection::fetch_impl() {
419 LOG_INFO("Start fetch thread");
420 uint64_t start_time = ailego::Monotime::MicroSeconds();
421 start_time_ = start_time; // start_time_ just use to print info
422 while (is_valid()) {
423 std::this_thread::sleep_for(std::chrono::microseconds(2));
424 std::unique_lock<std::mutex> ul(update_mutex_);
425 if (must_send(start_time)) {
426 start_time = ailego::Monotime::MicroSeconds();
427 continue;
428 }
429
430 int ret = update_request_meta();
431 if (ret != 0) {
432 LOG_ERROR("Update request meta failed. code[%d], msg[%s]", ret,
433 ErrorCode::What(ret));
434 // If failed, continue
435 continue;
436 }
437 ret = get_request_row();
438 if (ret != 0) {
439 // Get row data failed, just continue
440 continue;
441 }
442
443 prepared_data_size_++;
444 if (prepared_data_size_ >= batch_size_) {
445 ready_.store(true);
446 }
447 }
448}
449
450void MysqlCollection::update_lsn_map_info(
451 proto::WriteRequest::Row *row_data) const {
452 auto lsn_context = row_data->mutable_lsn_context();
453 lsn_context->set_lsn(lsn_);
454
455 LsnContextFormat current_context(context_.file_name, context_.position,
456 context_.seq_id, pull_state_flag_);
457 lsn_context->set_context(current_context.convert_to_string());
458}
459
460int MysqlCollection::verify_and_handle(const LsnContext &context) {
461 auto current_state = context.status;
462 switch (current_state) {
463 case RowDataStatus::NO_MORE_DATA:
464 handle_no_data();
465 return ErrorCode_NoMoreData;
466 case RowDataStatus::SCHEMA_CHANGED:
467 handle_schema_changed();
468 return ErrorCode_SchemaChanged;
469 default:
470 update_context(context);
471 lsn_++;
472 return 0;
473 }
474}
475
476void MysqlCollection::handle_no_data() {
477 fetch_request_->mutable_rows()->RemoveLast();
478 if (pull_state_flag_ == ScanMode::FULL) {
479 LOG_INFO("Scan mode need change");
480 pull_state_flag_ = ScanMode::INCREMENTAL;
481 context_.seq_id = 0; // seq_id set to invalid value
482 mysql_handler_->reset_status(ScanMode::INCREMENTAL, config_, context_);
483 ready_.store(true);
484 }
485}
486
487void MysqlCollection::update_context(const LsnContext &context) {
488 if (pull_state_flag_ == ScanMode::FULL) {
489 context_.seq_id = context.seq_id;
490 } else {
491 context_.file_name = context.file_name;
492 context_.position = context.position;
493 }
494 context_.status = context.status;
495}
496
497void MysqlCollection::handle_schema_changed() {
498 LOG_INFO("Schema changed");
499 fetch_request_->mutable_rows()->RemoveLast();
500 ready_.store(true);
501}
502
503//! Start process
504void MysqlCollection::send_impl() {
505 LOG_INFO("Start send thread");
506 // Actual work here
507 while (!finished()) {
508 update_state();
509 bool is_ready = wait_prepared_data();
510 if (is_ready == false) {
511 std::this_thread::sleep_for(std::chrono::milliseconds(1));
512 continue;
513 }
514
515 int ret = send_data();
516 if (ret != 0) {
517 LOG_ERROR("Failed to send data. code[%d], msg[%s]", ret,
518 ErrorCode::What(ret));
519 continue;
520 }
521 }
522}
523
524void MysqlCollection::update_state() {
525 CollectionStateFlag flag = get_collection_flag();
526 process_event(flag);
527}
528
529uint64_t MysqlCollection::get_sleep_time() const {
530 std::mt19937 gen((std::random_device())());
531 return (std::uniform_int_distribution<uint64_t>(500, 1000))(gen);
532}
533
534void MysqlCollection::print_send_data_info() const {
535 uint64_t current_time = ailego::Monotime::MilliSeconds();
536 uint64_t cost = current_time - start_time_;
537 auto row_size = send_request_->rows_size();
538 auto lsn_min = send_request_->rows(0).lsn_context().lsn();
539 auto lsn_max = send_request_->rows(row_size - 1).lsn_context().lsn();
540 LOG_INFO("Send request. size[%d], cost[%zums], lsn_min[%zu], lsn_max[%zu]",
541 row_size, (size_t)cost, (size_t)lsn_min, (size_t)lsn_max);
542}
543
544int MysqlCollection::send_data() {
545 int ret = 0;
546 proto::Status response;
547 brpc::Controller cntl;
548 while (is_valid()) {
549 print_send_data_info();
550 stub_->write(&cntl, send_request_.get(), &response, NULL);
551 if (cntl.Failed()) {
552 ret = ErrorCode_RPCFailed;
553 LOG_ERROR("Failed RPC. msg[%s].", cntl.ErrorText().c_str());
554 uint64_t sleep_time = get_sleep_time();
555 std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
556 cntl.Reset();
557 continue;
558 }
559
560 ret = (int)response.code();
561 if (ret == ErrorCode_Success.value()) {
562 // If successed
563 return 0;
564 } else if (ret == ErrorCode_ExceedRateLimit.value()) {
565 // If exceed ratelimited, retry
566 LOG_INFO("Exceed rate limite. Retry ...");
567 std::this_thread::sleep_for(std::chrono::milliseconds(10));
568 cntl.Reset();
569 continue;
570 } else if (ret == ErrorCode_MismatchedSchema.value()) {
571 // If schema revision mismatched, change state to updating, and wait
572 // update command later
573 LOG_INFO("Schema revision mismatch");
574 state_.store(CollectionStatus::UPDATING);
575 return 0;
576 } else if (ret == ErrorCode_MismatchedMagicNumber.value()) {
577 // If agent timestamp mismatch, update now
578 LOG_INFO("Agent timestamp mismatch");
579 state_.store(CollectionStatus::UPDATING);
580 process_update();
581 return 0;
582 } else if (ret == ErrorCode_CollectionNotExist.value()) {
583 // If collection not exit
584 LOG_INFO("Collection not exist");
585 return ret;
586 } else {
587 // Other unknown response, just retry
588 LOG_ERROR("Send data failed, unknown response. response_code[%d]", ret);
589 std::this_thread::sleep_for(std::chrono::milliseconds(10));
590 cntl.Reset();
591 continue;
592 }
593 }
594 return ret;
595}
596
597std::string MysqlCollection::generate_request_id() const {
598 std::random_device rd;
599 std::mt19937 gen(rd());
600 std::uniform_int_distribution<uint64_t> distrib(0, UINT64_MAX);
601 return std::to_string(distrib(gen));
602}
603
604int MysqlCollection::reset_request() {
605 clear_fetch_data(); // make sure fetch request is clean
606 fetch_request_->set_request_id(generate_request_id());
607 // todo<cdz>: Set schema revision when support update
608 // fetch_request_->set_schema_revision(config_.schema_revision());
609 fetch_request_->set_magic_number(agent_timestamp_);
610 fetch_request_->set_collection_name(config_.collection_name());
611 auto meta = fetch_request_->mutable_row_meta();
612 int ret = mysql_handler_->get_fields_meta(meta);
613 if (ret != 0) {
614 LOG_ERROR("Failed to get fields meta. code[%d] msg[%s]", ret,
615 ErrorCode::What(ret));
616 return ret;
617 }
618 return 0;
619}
620
621void MysqlCollection::get_sending_request() {
622 send_request_.swap(fetch_request_);
623 clear_fetch_data();
624 reset_fetch_status(); // inform fetch thread reset request
625 reset_send_status(); // mark send thread get ready data
626}
627
628bool MysqlCollection::ready() const {
629 return ready_.load() == true;
630}
631
632bool MysqlCollection::wait_prepared_data() {
633 if (!ready()) {
634 return false;
635 }
636 get_sending_request();
637 if (is_send_request_empty()) {
638 return false;
639 }
640 return true;
641}
642
643bool MysqlCollection::is_send_request_empty() const {
644 auto row_size = send_request_->rows_size();
645 return (row_size == 0);
646}
647
648} // namespace repository
649} // namespace be
650} // namespace proxima
651