1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Dianzhang.Chen |
17 | * \date Nov 2020 |
18 | * \brief Implementation of mysql collection |
19 | */ |
20 | |
21 | #include "mysql_collection.h" |
22 | #include <sstream> |
23 | #include <vector> |
24 | #include "repository/repository_common/config.h" |
25 | #include "repository/repository_common/error_code.h" |
26 | #include "lsn_context_format.h" |
27 | |
28 | namespace proxima { |
29 | namespace be { |
30 | namespace repository { |
31 | |
32 | //! Start process |
33 | void MysqlCollection::run() { |
34 | fetch_thread_ = std::thread(&MysqlCollection::fetch_impl, this); |
35 | send_thread_ = std::thread(&MysqlCollection::send_impl, this); |
36 | LOG_INFO("Collection is running. name[%s]" , |
37 | config_.collection_name().c_str()); |
38 | } |
39 | |
40 | //! Update Collection |
41 | // todo<cdz>: Current version do not support update collection. |
42 | void MysqlCollection::update() { |
43 | collection_state_flag_.store(CollectionStateFlag::UPDATE); |
44 | } |
45 | |
46 | //! Drop Collection |
47 | void MysqlCollection::drop() { |
48 | collection_state_flag_.store(CollectionStateFlag::DROP); |
49 | } |
50 | |
51 | //! Reset Collection back to CONTINUE |
52 | int MysqlCollection::reset_collection() { |
53 | // Get lsn information |
54 | int ret = load_lsn_info(); |
55 | if (ret != 0) { |
56 | LOG_ERROR("Failed to update lsn information." ); |
57 | return ret; |
58 | } |
59 | |
60 | // LOG_INFO("Init mysql handler successed"); |
61 | if (lsn_ == 0) { |
62 | ret = mysql_handler_->get_table_snapshot(&context_.file_name, |
63 | &context_.position); |
64 | if (ret != 0) { |
65 | LOG_ERROR("Failed to get full table snapshot." ); |
66 | return ret; |
67 | } |
68 | LOG_INFO("Get table snapshot, file_name: %s position: %zu" , |
69 | context_.file_name.c_str(), (size_t)context_.position); |
70 | } |
71 | |
72 | // Reset binlog pull state |
73 | ret = mysql_handler_->reset_status(pull_state_flag_, config_, context_); |
74 | if (ret != 0) { |
75 | LOG_ERROR("Failed to reset mysql handler." ); |
76 | return ret; |
77 | } |
78 | |
79 | // Reset collection state |
80 | clear_fetch_data(); |
81 | reset_send_status(); |
82 | reset_fetch_status(); |
83 | return 0; |
84 | } |
85 | |
86 | uint32_t MysqlCollection::schema_revision() const { |
87 | // todo<cdz>: Read from collection info when add update support. |
88 | // return config_.schema_revision(); |
89 | return 0; |
90 | } |
91 | |
92 | bool MysqlCollection::finished() const { |
93 | return state() == CollectionStatus::FINISHED; |
94 | } |
95 | |
96 | CollectionStateFlag MysqlCollection::get_collection_flag() { |
97 | CollectionStateFlag status = CollectionStateFlag::UPDATE; |
98 | // In case update twice |
99 | bool is_update = collection_state_flag_.compare_exchange_strong( |
100 | status, CollectionStateFlag::NORMAL); |
101 | if (is_update) { |
102 | return CollectionStateFlag::UPDATE; |
103 | } else { |
104 | return status; |
105 | } |
106 | } |
107 | |
108 | void MysqlCollection::clear_fetch_data() { |
109 | fetch_request_->Clear(); |
110 | } |
111 | |
112 | void MysqlCollection::reset_fetch_status() { |
113 | prepared_data_size_.store(0); |
114 | reset_.store(true); |
115 | } |
116 | |
117 | void MysqlCollection::reset_send_status() { |
118 | ready_.store(false); |
119 | } |
120 | |
121 | void MysqlCollection::stop() { |
122 | const std::string &collection_name = config_.collection_name(); |
123 | state_.store(CollectionStatus::FINISHED); |
124 | if (fetch_thread_.joinable()) { |
125 | fetch_thread_.join(); |
126 | } |
127 | if (send_thread_.joinable()) { |
128 | send_thread_.join(); |
129 | } |
130 | LOG_INFO("Stop collection successed. name[%s]" , collection_name.c_str()); |
131 | } |
132 | |
133 | void MysqlCollection::update_action() { |
134 | const std::string &collection_name = config_.collection_name(); |
135 | LOG_INFO("Updating collection. name[%s]" , collection_name.c_str()); |
136 | std::unique_lock<std::mutex> lg(update_mutex_); |
137 | while (is_valid()) { |
138 | int ret = reset_collection(); |
139 | if (ret != 0) { |
140 | LOG_ERROR("Failed to reset collection. retry ..." ); |
141 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
142 | continue; |
143 | } |
144 | state_.store(CollectionStatus::RUNNING); |
145 | break; |
146 | } |
147 | } |
148 | |
149 | void MysqlCollection::process_update() { |
150 | switch (state_.load()) { |
151 | case CollectionStatus::INIT: |
152 | case CollectionStatus::RUNNING: |
153 | case CollectionStatus::UPDATING: |
154 | update_action(); |
155 | break; |
156 | case CollectionStatus::FINISHED: |
157 | break; |
158 | } |
159 | } |
160 | |
161 | void MysqlCollection::process_drop() { |
162 | state_.store(CollectionStatus::FINISHED); |
163 | LOG_INFO("Drop collection. name[%s]" , config_.collection_name().c_str()); |
164 | } |
165 | |
166 | void MysqlCollection::wait_update_command() { |
167 | while (is_valid()) { |
168 | CollectionStateFlag flag = get_collection_flag(); |
169 | switch (flag) { |
170 | case CollectionStateFlag::UPDATE: |
171 | return; |
172 | case CollectionStateFlag::DROP: |
173 | process_drop(); |
174 | break; |
175 | case CollectionStateFlag::NORMAL: |
176 | std::this_thread::sleep_for(std::chrono::microseconds(10)); |
177 | } |
178 | } |
179 | } |
180 | |
181 | void MysqlCollection::process_normal() { |
182 | auto current_state = state_.load(); |
183 | switch (current_state) { |
184 | case CollectionStatus::INIT: |
185 | state_.store(CollectionStatus::RUNNING); |
186 | break; |
187 | case CollectionStatus::RUNNING: |
188 | case CollectionStatus::FINISHED: |
189 | // If is running, or finished. do nothing |
190 | break; |
191 | case CollectionStatus::UPDATING: |
192 | // If current state is updating. wait until get update command and update |
193 | wait_update_command(); |
194 | process_update(); |
195 | } |
196 | return; |
197 | } |
198 | |
199 | void MysqlCollection::process_event(const CollectionStateFlag &flag) { |
200 | switch (flag) { |
201 | case CollectionStateFlag::UPDATE: |
202 | process_update(); |
203 | break; |
204 | case CollectionStateFlag::DROP: |
205 | process_drop(); |
206 | break; |
207 | case CollectionStateFlag::NORMAL: |
208 | process_normal(); |
209 | break; |
210 | } |
211 | } |
212 | |
213 | CollectionStatus MysqlCollection::state() const { |
214 | return state_.load(); |
215 | } |
216 | |
217 | int MysqlCollection::init_brpc() { |
218 | brpc::ChannelOptions options; |
219 | options.max_retry = max_retry_; |
220 | options.timeout_ms = brpc_timeout_ms_; |
221 | int ret = |
222 | channel_.Init(index_server_uri_.c_str(), load_balance_.c_str(), &options); |
223 | if (ret != 0) { |
224 | LOG_ERROR("Failed to initialize channel. uri[%s]" , |
225 | index_server_uri_.c_str()); |
226 | return ErrorCode_InitChannel; |
227 | } |
228 | stub_.reset(new (std::nothrow) proto::ProximaService_Stub(&channel_)); |
229 | fetch_request_.reset(new (std::nothrow) proto::WriteRequest()); |
230 | send_request_.reset(new (std::nothrow) proto::WriteRequest()); |
231 | reset_.store(true); |
232 | return 0; |
233 | } |
234 | |
235 | int MysqlCollection::init_mysql_module() { |
236 | if (mysql_handler_ == nullptr) { |
237 | LOG_ERROR("Invalid mysql handler" ); |
238 | return ErrorCode_InvalidMysqlHandler; |
239 | } |
240 | |
241 | int ret = load_lsn_info(false); |
242 | if (ret != 0) { |
243 | LOG_ERROR("Failed to load lsn map information" ); |
244 | return ret; |
245 | } |
246 | LOG_INFO("Load lsn info successed" ); |
247 | |
248 | ret = mysql_handler_->init(pull_state_flag_); |
249 | if (ret != 0) { |
250 | LOG_ERROR("Failed to init mysql handler" ); |
251 | return ret; |
252 | } |
253 | LOG_INFO("Init mysql handler successed" ); |
254 | |
255 | if (lsn_ == 0) { |
256 | ret = mysql_handler_->get_table_snapshot(&context_.file_name, |
257 | &context_.position); |
258 | if (ret != 0) { |
259 | LOG_ERROR("Failed to get full table snapshot." ); |
260 | return ret; |
261 | } |
262 | LOG_INFO("Get table snapshot successed, file_name: %s position: %zu" , |
263 | context_.file_name.c_str(), (size_t)context_.position); |
264 | } |
265 | |
266 | ret = mysql_handler_->start(context_); |
267 | if (ret != 0) { |
268 | LOG_ERROR("Failed to start mysql handler." ); |
269 | return ret; |
270 | } |
271 | |
272 | return 0; |
273 | } |
274 | |
275 | //! Initialize MySQL Collection |
276 | int MysqlCollection::init() { |
277 | load_config(); |
278 | //! Init brpc components |
279 | int ret = init_brpc(); |
280 | if (ret != 0) { |
281 | LOG_ERROR("Failed to init brpc components" ); |
282 | return ret; |
283 | } |
284 | LOG_INFO("Init brpc successed" ); |
285 | |
286 | //! Init mysql module |
287 | ret = init_mysql_module(); |
288 | if (ret != 0) { |
289 | LOG_ERROR("Failed to init mysql module" ); |
290 | return ret; |
291 | } |
292 | LOG_INFO("Init mysql module successed" ); |
293 | LOG_INFO("Init mysql collection successed" ); |
294 | return 0; |
295 | } |
296 | |
297 | void MysqlCollection::load_config() { |
298 | batch_size_ = repository::Config::Instance().get_batch_size(); |
299 | batch_interval_ = repository::Config::Instance().get_batch_interval(); |
300 | index_server_uri_ = repository::Config::Instance().get_index_agent_uri(); |
301 | load_balance_ = repository::Config::Instance().get_load_balance(); |
302 | max_retry_ = repository::Config::Instance().get_max_retry(); |
303 | brpc_timeout_ms_ = repository::Config::Instance().get_timeout_ms(); |
304 | } |
305 | |
306 | int MysqlCollection::load_lsn_info(bool is_retry) { |
307 | proto::CollectionName request; |
308 | proto::DescribeCollectionResponse response; |
309 | brpc::Controller cntl; |
310 | request.set_collection_name(config_.collection_name()); |
311 | while (true) { |
312 | if (!is_valid()) { |
313 | return ErrorCode_Terminate; |
314 | } |
315 | stub_->describe_collection(&cntl, &request, &response, NULL); |
316 | if (cntl.Failed()) { |
317 | LOG_ERROR("Failed to get collection from index agent. msg[%s]" , |
318 | cntl.ErrorText().c_str()); |
319 | if (is_retry) { |
320 | uint64_t sleep_time = get_sleep_time(); |
321 | std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); |
322 | cntl.Reset(); |
323 | continue; |
324 | } else { |
325 | return ErrorCode_RPCFailed; |
326 | } |
327 | } |
328 | break; |
329 | } |
330 | |
331 | // Debug |
332 | LOG_INFO("get_collection response : %s" , response.ShortDebugString().c_str()); |
333 | |
334 | // Get lsn |
335 | auto &info = response.collection(); |
336 | config_ = info.config(); |
337 | agent_timestamp_ = info.magic_number(); |
338 | auto &lsn_context = info.latest_lsn_context(); |
339 | lsn_ = lsn_context.lsn(); |
340 | if (lsn_ == 0) { |
341 | LOG_INFO("LSN is zero, use initial value." ); |
342 | pull_state_flag_ = ScanMode::FULL; |
343 | return 0; |
344 | } |
345 | |
346 | LsnContextFormat lsn_context_format; |
347 | int ret = lsn_context_format.parse_from_string(lsn_context.context()); |
348 | // int ret = string_to_context(lsn_context.context(), &lsn_context_format); |
349 | if (ret != 0) { |
350 | LOG_ERROR("Parse lsn context from string failed." ); |
351 | return ret; |
352 | } |
353 | |
354 | context_.file_name = lsn_context_format.file_name(); |
355 | context_.position = lsn_context_format.position(); |
356 | context_.seq_id = lsn_context_format.seq_id(); |
357 | pull_state_flag_ = lsn_context_format.mode(); |
358 | return 0; |
359 | } |
360 | |
361 | bool MysqlCollection::is_valid() const { |
362 | if (state_.load() == CollectionStatus::FINISHED || |
363 | collection_state_flag_.load() == CollectionStateFlag::DROP) { |
364 | return false; |
365 | } |
366 | return true; |
367 | } |
368 | |
369 | int MysqlCollection::update_request_meta() { |
370 | if (reset_.load() == true) { |
371 | start_time_ = ailego::Monotime::MilliSeconds(); |
372 | int ret = reset_request(); |
373 | if (ret != 0) { |
374 | LOG_ERROR("Reset fetch request failed" ); |
375 | return ret; |
376 | } |
377 | reset_.store(false); |
378 | } |
379 | return 0; |
380 | } |
381 | |
382 | int MysqlCollection::get_request_row() { |
383 | int ret = 0; |
384 | LsnContext current_context; |
385 | auto *next_row = fetch_request_->add_rows(); |
386 | ret = mysql_handler_->get_next_row_data(next_row, ¤t_context); |
387 | if (ret != 0) { |
388 | // Not fatal error, continue |
389 | LOG_ERROR("Get next row data failed. code[%d], msg[%s]" , ret, |
390 | ErrorCode::What(ret)); |
391 | // Remove the last row |
392 | fetch_request_->mutable_rows()->RemoveLast(); |
393 | std::this_thread::sleep_for(std::chrono::microseconds(10)); |
394 | return ret; |
395 | } |
396 | ret = verify_and_handle(current_context); |
397 | if (ret != 0) { |
398 | return ret; |
399 | } |
400 | update_lsn_map_info(next_row); |
401 | return 0; |
402 | } |
403 | |
404 | bool MysqlCollection::must_send(uint64_t start_time) { |
405 | if (ready_.load() == true) { |
406 | return true; |
407 | } |
408 | uint64_t current_time = ailego::Monotime::MicroSeconds(); |
409 | if ((current_time - start_time) >= (uint64_t)batch_interval_ && |
410 | prepared_data_size_.load() != 0) { |
411 | ready_.store(true); |
412 | return true; |
413 | } |
414 | return false; |
415 | } |
416 | |
417 | //! Start process |
418 | void MysqlCollection::fetch_impl() { |
419 | LOG_INFO("Start fetch thread" ); |
420 | uint64_t start_time = ailego::Monotime::MicroSeconds(); |
421 | start_time_ = start_time; // start_time_ just use to print info |
422 | while (is_valid()) { |
423 | std::this_thread::sleep_for(std::chrono::microseconds(2)); |
424 | std::unique_lock<std::mutex> ul(update_mutex_); |
425 | if (must_send(start_time)) { |
426 | start_time = ailego::Monotime::MicroSeconds(); |
427 | continue; |
428 | } |
429 | |
430 | int ret = update_request_meta(); |
431 | if (ret != 0) { |
432 | LOG_ERROR("Update request meta failed. code[%d], msg[%s]" , ret, |
433 | ErrorCode::What(ret)); |
434 | // If failed, continue |
435 | continue; |
436 | } |
437 | ret = get_request_row(); |
438 | if (ret != 0) { |
439 | // Get row data failed, just continue |
440 | continue; |
441 | } |
442 | |
443 | prepared_data_size_++; |
444 | if (prepared_data_size_ >= batch_size_) { |
445 | ready_.store(true); |
446 | } |
447 | } |
448 | } |
449 | |
450 | void MysqlCollection::update_lsn_map_info( |
451 | proto::WriteRequest::Row *row_data) const { |
452 | auto lsn_context = row_data->mutable_lsn_context(); |
453 | lsn_context->set_lsn(lsn_); |
454 | |
455 | LsnContextFormat current_context(context_.file_name, context_.position, |
456 | context_.seq_id, pull_state_flag_); |
457 | lsn_context->set_context(current_context.convert_to_string()); |
458 | } |
459 | |
460 | int MysqlCollection::verify_and_handle(const LsnContext &context) { |
461 | auto current_state = context.status; |
462 | switch (current_state) { |
463 | case RowDataStatus::NO_MORE_DATA: |
464 | handle_no_data(); |
465 | return ErrorCode_NoMoreData; |
466 | case RowDataStatus::SCHEMA_CHANGED: |
467 | handle_schema_changed(); |
468 | return ErrorCode_SchemaChanged; |
469 | default: |
470 | update_context(context); |
471 | lsn_++; |
472 | return 0; |
473 | } |
474 | } |
475 | |
476 | void MysqlCollection::handle_no_data() { |
477 | fetch_request_->mutable_rows()->RemoveLast(); |
478 | if (pull_state_flag_ == ScanMode::FULL) { |
479 | LOG_INFO("Scan mode need change" ); |
480 | pull_state_flag_ = ScanMode::INCREMENTAL; |
481 | context_.seq_id = 0; // seq_id set to invalid value |
482 | mysql_handler_->reset_status(ScanMode::INCREMENTAL, config_, context_); |
483 | ready_.store(true); |
484 | } |
485 | } |
486 | |
487 | void MysqlCollection::update_context(const LsnContext &context) { |
488 | if (pull_state_flag_ == ScanMode::FULL) { |
489 | context_.seq_id = context.seq_id; |
490 | } else { |
491 | context_.file_name = context.file_name; |
492 | context_.position = context.position; |
493 | } |
494 | context_.status = context.status; |
495 | } |
496 | |
497 | void MysqlCollection::handle_schema_changed() { |
498 | LOG_INFO("Schema changed" ); |
499 | fetch_request_->mutable_rows()->RemoveLast(); |
500 | ready_.store(true); |
501 | } |
502 | |
503 | //! Start process |
504 | void MysqlCollection::send_impl() { |
505 | LOG_INFO("Start send thread" ); |
506 | // Actual work here |
507 | while (!finished()) { |
508 | update_state(); |
509 | bool is_ready = wait_prepared_data(); |
510 | if (is_ready == false) { |
511 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
512 | continue; |
513 | } |
514 | |
515 | int ret = send_data(); |
516 | if (ret != 0) { |
517 | LOG_ERROR("Failed to send data. code[%d], msg[%s]" , ret, |
518 | ErrorCode::What(ret)); |
519 | continue; |
520 | } |
521 | } |
522 | } |
523 | |
524 | void MysqlCollection::update_state() { |
525 | CollectionStateFlag flag = get_collection_flag(); |
526 | process_event(flag); |
527 | } |
528 | |
529 | uint64_t MysqlCollection::get_sleep_time() const { |
530 | std::mt19937 gen((std::random_device())()); |
531 | return (std::uniform_int_distribution<uint64_t>(500, 1000))(gen); |
532 | } |
533 | |
534 | void MysqlCollection::print_send_data_info() const { |
535 | uint64_t current_time = ailego::Monotime::MilliSeconds(); |
536 | uint64_t cost = current_time - start_time_; |
537 | auto row_size = send_request_->rows_size(); |
538 | auto lsn_min = send_request_->rows(0).lsn_context().lsn(); |
539 | auto lsn_max = send_request_->rows(row_size - 1).lsn_context().lsn(); |
540 | LOG_INFO("Send request. size[%d], cost[%zums], lsn_min[%zu], lsn_max[%zu]" , |
541 | row_size, (size_t)cost, (size_t)lsn_min, (size_t)lsn_max); |
542 | } |
543 | |
544 | int MysqlCollection::send_data() { |
545 | int ret = 0; |
546 | proto::Status response; |
547 | brpc::Controller cntl; |
548 | while (is_valid()) { |
549 | print_send_data_info(); |
550 | stub_->write(&cntl, send_request_.get(), &response, NULL); |
551 | if (cntl.Failed()) { |
552 | ret = ErrorCode_RPCFailed; |
553 | LOG_ERROR("Failed RPC. msg[%s]." , cntl.ErrorText().c_str()); |
554 | uint64_t sleep_time = get_sleep_time(); |
555 | std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); |
556 | cntl.Reset(); |
557 | continue; |
558 | } |
559 | |
560 | ret = (int)response.code(); |
561 | if (ret == ErrorCode_Success.value()) { |
562 | // If successed |
563 | return 0; |
564 | } else if (ret == ErrorCode_ExceedRateLimit.value()) { |
565 | // If exceed ratelimited, retry |
566 | LOG_INFO("Exceed rate limite. Retry ..." ); |
567 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
568 | cntl.Reset(); |
569 | continue; |
570 | } else if (ret == ErrorCode_MismatchedSchema.value()) { |
571 | // If schema revision mismatched, change state to updating, and wait |
572 | // update command later |
573 | LOG_INFO("Schema revision mismatch" ); |
574 | state_.store(CollectionStatus::UPDATING); |
575 | return 0; |
576 | } else if (ret == ErrorCode_MismatchedMagicNumber.value()) { |
577 | // If agent timestamp mismatch, update now |
578 | LOG_INFO("Agent timestamp mismatch" ); |
579 | state_.store(CollectionStatus::UPDATING); |
580 | process_update(); |
581 | return 0; |
582 | } else if (ret == ErrorCode_CollectionNotExist.value()) { |
583 | // If collection not exit |
584 | LOG_INFO("Collection not exist" ); |
585 | return ret; |
586 | } else { |
587 | // Other unknown response, just retry |
588 | LOG_ERROR("Send data failed, unknown response. response_code[%d]" , ret); |
589 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
590 | cntl.Reset(); |
591 | continue; |
592 | } |
593 | } |
594 | return ret; |
595 | } |
596 | |
597 | std::string MysqlCollection::generate_request_id() const { |
598 | std::random_device rd; |
599 | std::mt19937 gen(rd()); |
600 | std::uniform_int_distribution<uint64_t> distrib(0, UINT64_MAX); |
601 | return std::to_string(distrib(gen)); |
602 | } |
603 | |
604 | int MysqlCollection::reset_request() { |
605 | clear_fetch_data(); // make sure fetch request is clean |
606 | fetch_request_->set_request_id(generate_request_id()); |
607 | // todo<cdz>: Set schema revision when support update |
608 | // fetch_request_->set_schema_revision(config_.schema_revision()); |
609 | fetch_request_->set_magic_number(agent_timestamp_); |
610 | fetch_request_->set_collection_name(config_.collection_name()); |
611 | auto meta = fetch_request_->mutable_row_meta(); |
612 | int ret = mysql_handler_->get_fields_meta(meta); |
613 | if (ret != 0) { |
614 | LOG_ERROR("Failed to get fields meta. code[%d] msg[%s]" , ret, |
615 | ErrorCode::What(ret)); |
616 | return ret; |
617 | } |
618 | return 0; |
619 | } |
620 | |
621 | void MysqlCollection::get_sending_request() { |
622 | send_request_.swap(fetch_request_); |
623 | clear_fetch_data(); |
624 | reset_fetch_status(); // inform fetch thread reset request |
625 | reset_send_status(); // mark send thread get ready data |
626 | } |
627 | |
628 | bool MysqlCollection::ready() const { |
629 | return ready_.load() == true; |
630 | } |
631 | |
632 | bool MysqlCollection::wait_prepared_data() { |
633 | if (!ready()) { |
634 | return false; |
635 | } |
636 | get_sending_request(); |
637 | if (is_send_request_empty()) { |
638 | return false; |
639 | } |
640 | return true; |
641 | } |
642 | |
643 | bool MysqlCollection::is_send_request_empty() const { |
644 | auto row_size = send_request_->rows_size(); |
645 | return (row_size == 0); |
646 | } |
647 | |
648 | } // namespace repository |
649 | } // namespace be |
650 | } // namespace proxima |
651 | |