1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author hongqing.hu
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gtest/gtest.h>
22
23#define private public
24#include "repository/binlog/mysql_handler.h"
25#include "repository/binlog/table_reader.h"
26#include "repository/repository_common/error_code.h"
27#include "event_builder.h"
28#include "mock_mysql_connector.h"
29#include "mysql_result_builder.h"
30
31#undef private
32
33using namespace proxima::be;
34using namespace proxima::be::repository;
35
36class MysqlHandlerTest : public testing::Test {
37 protected:
38 void SetUp() {
39 mgr_ = std::make_shared<MysqlConnectorManager>();
40 ASSERT_TRUE(mgr_);
41 connector1_ = std::make_shared<MockMysqlConnector>();
42 ASSERT_TRUE(connector1_);
43 mgr_->put(connector1_);
44 connector2_ = std::make_shared<MockMysqlConnector>();
45 ASSERT_TRUE(connector2_);
46 mgr_->put(connector2_);
47 ctx_.seq_id = 1;
48 InitHandler();
49 }
50
51 void TearDown() {}
52
53 void InitHandler() {
54 builder_.BuildCollectionConfig();
55 handler_ = std::make_shared<MysqlHandler>(builder_.config_, mgr_);
56 }
57
58 MockMysqlResultWrapperPtr BuildSnapshotResult() {
59 MockMysqlResultWrapperPtr result =
60 std::make_shared<MockMysqlResultWrapper>();
61 result->append_field_meta("File");
62 result->append_field_meta("Position");
63 result->append_field_meta("Binlog_Do_DB");
64 result->append_field_meta("Binlog_Ignore_DB");
65 result->append_field_meta("Executed_Gtid_Set");
66
67 std::vector<std::string> values1 = {"binlog.000001", "10240", "", "", ""};
68 result->append_row_values(values1);
69
70 return result;
71 }
72
73 protected:
74 MysqlHandlerPtr handler_{};
75 MysqlConnectorManagerPtr mgr_{};
76 MockMysqlConnectorPtr connector1_{};
77 MockMysqlConnectorPtr connector2_{};
78 InfoFetcherPtr fetcher_{};
79 std::vector<enum_field_types> column_types_{};
80 std::vector<int32_t> column_metas_{};
81 LsnContext ctx_;
82 MysqlResultBuilder builder_{};
83};
84
85TEST_F(MysqlHandlerTest, TestGeneral) {
86 ScanMode mode = ScanMode::FULL;
87
88 // for validator
89 MockMysqlResultWrapperPtr version_result =
90 builder_.BuildSelectVersionResult();
91
92 MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult();
93 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
94 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
95 MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult();
96
97 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
98 // for fetcher
99 ailego::Uri test_uri = builder_.uri_;
100 EXPECT_CALL(*connector1_, execute_query(_, _, _))
101 .Times(3)
102 .WillOnce(
103 Invoke([&version_result](const std::string &,
104 MysqlResultWrapperPtr *out, bool) -> int {
105 *out = version_result;
106 return 0;
107 }))
108 .WillOnce(
109 Invoke([&binlog_result](const std::string &,
110 MysqlResultWrapperPtr *out, bool) -> int {
111 *out = binlog_result;
112 return 0;
113 }))
114 .WillOnce(Invoke([&db_result](const std::string &,
115 MysqlResultWrapperPtr *out, bool) -> int {
116 *out = db_result;
117 return 0;
118 }))
119 .RetiresOnSaturation();
120
121 EXPECT_CALL(*connector1_, uri())
122 .WillOnce(
123 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
124 .RetiresOnSaturation();
125
126 EXPECT_CALL(*connector2_, uri())
127 .WillOnce(
128 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
129 .RetiresOnSaturation();
130
131 EXPECT_CALL(*connector2_, execute_query(_, _, _))
132 .Times(2)
133 .WillOnce(Invoke([&result1](const std::string &,
134 MysqlResultWrapperPtr *out, bool) -> int {
135 *out = result1;
136 return 0;
137 }))
138 .WillOnce(
139 Invoke([&schema_result](const std::string &,
140 MysqlResultWrapperPtr *out, bool) -> int {
141 *out = schema_result;
142 return 0;
143 }))
144 .RetiresOnSaturation();
145
146 int ret = handler_->init(mode);
147 ASSERT_EQ(ret, 0);
148
149 EXPECT_CALL(*connector1_, execute_query(_, _, _))
150 .Times(1)
151 .WillOnce(Invoke([&scan_result](const std::string &,
152 MysqlResultWrapperPtr *out, bool) -> int {
153 *out = scan_result;
154 return 0;
155 }))
156 .RetiresOnSaturation();
157 ret = handler_->start(ctx_);
158 ASSERT_EQ(ret, 0);
159
160 // RowData row_data;
161 proto::WriteRequest::Row row_data;
162 LsnContext ctx;
163 ret = handler_->get_next_row_data(&row_data, &ctx);
164 ASSERT_EQ(ret, 0);
165 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
166 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
167 row_data.Clear();
168 ret = handler_->get_next_row_data(&row_data, &ctx);
169 ASSERT_EQ(ret, 0);
170 ASSERT_EQ(row_data.primary_key(), (uint64_t)2);
171 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
172 row_data.Clear();
173 ret = handler_->get_next_row_data(&row_data, &ctx);
174 ASSERT_EQ(ret, 0);
175 ASSERT_EQ(ctx.status, RowDataStatus::NO_MORE_DATA);
176 row_data.Clear();
177
178 // proxima::be::agent::proto::CollectionDataset dataset;
179 proto::WriteRequest::RowMeta meta;
180 ret = handler_->get_fields_meta(&meta);
181 ASSERT_EQ(ret, 0);
182 // ASSERT_EQ(dataset.index_tuples_size(), 2);
183 // ASSERT_EQ(dataset.index_tuples(0).field_name(), "vector1");
184 // ASSERT_EQ(dataset.index_tuples(0).field_type(),
185 // GenericValueMeta::FT_STRING);
186 // ASSERT_EQ(dataset.index_tuples(1).field_name(), "vector2");
187 // ASSERT_EQ(dataset.index_tuples(1).field_type(),
188 // GenericValueMeta::FT_STRING); ASSERT_EQ(dataset.forward_tuples_size(), 2);
189 // ASSERT_EQ(dataset.forward_tuples(0).field_name(), "name");
190 // ASSERT_EQ(dataset.forward_tuples(1).field_name(), "age");
191 ASSERT_EQ(meta.forward_column_names(0), "name");
192 ASSERT_EQ(meta.forward_column_names(1), "age");
193 ASSERT_EQ(meta.index_column_metas(0).column_name(), "vector1");
194 ASSERT_EQ(meta.index_column_metas(1).column_name(), "vector2");
195 // reset status
196 EXPECT_CALL(*connector2_, uri())
197 .WillOnce(
198 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
199 .RetiresOnSaturation();
200
201 result1->reset();
202 EXPECT_CALL(*connector2_, execute_query(_, _, _))
203 .Times(2)
204 .WillOnce(Invoke([&result1](const std::string &,
205 MysqlResultWrapperPtr *out, bool) -> int {
206 *out = result1;
207 return 0;
208 }))
209 .WillOnce(
210 Invoke([&schema_result](const std::string &,
211 MysqlResultWrapperPtr *out, bool) -> int {
212 *out = schema_result;
213 return 0;
214 }))
215 .RetiresOnSaturation();
216 // first set check sum and update lsn info
217 EXPECT_CALL(*connector1_, execute_query(_, _, _))
218 .Times(2)
219 .WillOnce(::testing::Return(0))
220 .WillOnce(::testing::Return(0))
221 .RetiresOnSaturation();
222 // second request dump
223 EXPECT_CALL(*connector1_, execute_simple_command(_, _, _))
224 .WillOnce(::testing::Return(0))
225 .RetiresOnSaturation();
226
227 mode = ScanMode::INCREMENTAL;
228 ctx_.file_name = "binlog.000003";
229 ctx_.position = 4;
230 ret = handler_->reset_status(mode, builder_.config_, ctx_);
231 ASSERT_EQ(ret, 0);
232
233 // fetch data
234 std::string table_map_str = builder_.BuildTableMapEventStr();
235 std::vector<std::string> values = {"1", "name1", "30", "123.456",
236 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
237 std::string write_rows_str = builder_.BuildWriteRowsEventStr(values);
238 EXPECT_CALL(*connector1_, client_safe_read(_))
239 .Times(2)
240 .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int {
241 *len = table_map_str.size();
242 return 0;
243 }))
244 .WillOnce(Invoke([&write_rows_str](unsigned long *len) -> int {
245 *len = write_rows_str.size();
246 return 0;
247 }))
248 .RetiresOnSaturation();
249
250 EXPECT_CALL(*connector1_, data())
251 .Times(2)
252 .WillOnce(Invoke([&table_map_str]() -> const void * {
253 return (const void *)table_map_str.c_str();
254 }))
255 .WillOnce(Invoke([&write_rows_str]() -> const void * {
256 return (const void *)write_rows_str.c_str();
257 }))
258 .RetiresOnSaturation();
259
260 row_data.Clear();
261 ret = handler_->get_next_row_data(&row_data, &ctx);
262 ASSERT_EQ(ret, 0);
263 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
264 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
265 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
266 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30);
267 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
268 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
269}
270
271TEST_F(MysqlHandlerTest, TestInit) {
272 ScanMode mode = ScanMode::FULL;
273
274 // connection manager init failed
275 {
276 auto config1 = builder_.config_;
277 config1.mutable_repository_config()->mutable_database()->set_connection_uri(
278 "invalid");
279 MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(config1);
280 int ret = handler->init(mode);
281 ASSERT_EQ(ret, ErrorCode_InvalidArgument);
282 }
283
284 // validate mysql failed
285 {
286 MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(builder_.config_);
287 int ret = handler->init(mode);
288 ASSERT_EQ(ret, ErrorCode_RuntimeError);
289 }
290
291 // success
292 {
293 MysqlHandlerPtr handler =
294 std::make_shared<MysqlHandler>(builder_.config_, mgr_);
295
296 // for validator
297 MockMysqlResultWrapperPtr version_result =
298 builder_.BuildSelectVersionResult();
299
300 MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult();
301 MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult();
302 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
303 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
304 // for fetcher
305 ailego::Uri test_uri = builder_.uri_;
306 EXPECT_CALL(*connector1_, execute_query(_, _, _))
307 .Times(3)
308 .WillOnce(
309 Invoke([&version_result](const std::string &,
310 MysqlResultWrapperPtr *out, bool) -> int {
311 *out = version_result;
312 return 0;
313 }))
314 .WillOnce(
315 Invoke([&binlog_result](const std::string &,
316 MysqlResultWrapperPtr *out, bool) -> int {
317 *out = binlog_result;
318 return 0;
319 }))
320 .WillOnce(Invoke([&db_result](const std::string &,
321 MysqlResultWrapperPtr *out, bool) -> int {
322 *out = db_result;
323 return 0;
324 }))
325 .RetiresOnSaturation();
326
327 EXPECT_CALL(*connector1_, uri())
328 .WillOnce(
329 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
330 .RetiresOnSaturation();
331
332 EXPECT_CALL(*connector2_, uri())
333 .WillOnce(
334 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
335 .RetiresOnSaturation();
336
337 EXPECT_CALL(*connector2_, execute_query(_, _, _))
338 .Times(2)
339 .WillOnce(Invoke([&result1](const std::string &,
340 MysqlResultWrapperPtr *out, bool) -> int {
341 *out = result1;
342 return 0;
343 }))
344 .WillOnce(
345 Invoke([&schema_result](const std::string &,
346 MysqlResultWrapperPtr *out, bool) -> int {
347 *out = schema_result;
348 return 0;
349 }))
350 .RetiresOnSaturation();
351
352 int ret = handler_->init(mode);
353 ASSERT_EQ(ret, 0);
354
355 ret = handler_->init(mode);
356 ASSERT_EQ(ret, ErrorCode_RepeatedInitialized);
357 }
358}
359
360TEST_F(MysqlHandlerTest, TestStart) {
361 ScanMode mode = ScanMode::FULL;
362
363 // connection manager init failed
364 {
365 auto config1 = builder_.config_;
366 config1.mutable_repository_config()->mutable_database()->set_connection_uri(
367 "invalid");
368 MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(config1);
369 int ret = handler->init(mode);
370 ASSERT_EQ(ret, ErrorCode_InvalidArgument);
371 }
372
373 // validate mysql failed
374 {
375 MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(builder_.config_);
376 int ret = handler->init(mode);
377 ASSERT_EQ(ret, ErrorCode_RuntimeError);
378 }
379
380 // success
381 {
382 MysqlHandlerPtr handler =
383 std::make_shared<MysqlHandler>(builder_.config_, mgr_);
384
385 // for validator
386 MockMysqlResultWrapperPtr version_result =
387 builder_.BuildSelectVersionResult();
388
389 MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult();
390 MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult();
391 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
392 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
393 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
394
395 // for fetcher
396 ailego::Uri test_uri = builder_.uri_;
397 EXPECT_CALL(*connector1_, execute_query(_, _, _))
398 .Times(3)
399 .WillOnce(
400 Invoke([&version_result](const std::string &,
401 MysqlResultWrapperPtr *out, bool) -> int {
402 *out = version_result;
403 return 0;
404 }))
405 .WillOnce(
406 Invoke([&binlog_result](const std::string &,
407 MysqlResultWrapperPtr *out, bool) -> int {
408 *out = binlog_result;
409 return 0;
410 }))
411 .WillOnce(Invoke([&db_result](const std::string &,
412 MysqlResultWrapperPtr *out, bool) -> int {
413 *out = db_result;
414 return 0;
415 }))
416 .RetiresOnSaturation();
417
418 EXPECT_CALL(*connector1_, uri())
419 .WillOnce(
420 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
421 .RetiresOnSaturation();
422
423 EXPECT_CALL(*connector2_, uri())
424 .WillOnce(
425 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
426 .RetiresOnSaturation();
427
428 EXPECT_CALL(*connector2_, execute_query(_, _, _))
429 .Times(2)
430 .WillOnce(Invoke([&result1](const std::string &,
431 MysqlResultWrapperPtr *out, bool) -> int {
432 *out = result1;
433 return 0;
434 }))
435 .WillOnce(
436 Invoke([&schema_result](const std::string &,
437 MysqlResultWrapperPtr *out, bool) -> int {
438 *out = schema_result;
439 return 0;
440 }))
441 .RetiresOnSaturation();
442
443 int ret = handler_->init(mode);
444 ASSERT_EQ(ret, 0);
445
446 EXPECT_CALL(*connector1_, execute_query(_, _, _))
447 .Times(1)
448 .WillOnce(
449 Invoke([&scan_result](const std::string &,
450 MysqlResultWrapperPtr *out, bool) -> int {
451 *out = scan_result;
452 return 0;
453 }))
454 .RetiresOnSaturation();
455 ret = handler_->start(ctx_);
456 ASSERT_EQ(ret, 0);
457 }
458}
459
460TEST_F(MysqlHandlerTest, TestValidateMysql) {
461 // validator init failed
462 MysqlConnectorManagerPtr mgr;
463 MysqlHandler handler1(builder_.config_, mgr);
464 int ret = handler1.validate_mysql();
465 ASSERT_EQ(ret, ErrorCode_RuntimeError);
466
467 MysqlHandler handler(builder_.config_, mgr_);
468 MockMysqlResultWrapperPtr version_result =
469 builder_.BuildSelectVersionResult();
470 MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult();
471 MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult();
472
473 // validate version failed
474 {
475 EXPECT_CALL(*connector1_, execute_query(_, _, _))
476 .Times(1)
477 .WillOnce(testing::Return(1))
478 .RetiresOnSaturation();
479 ret = handler.validate_mysql();
480 ASSERT_EQ(ret, ErrorCode_UnsupportedMysqlVersion);
481 }
482
483 // validate binlog format failed
484 {
485 EXPECT_CALL(*connector2_, execute_query(_, _, _))
486 .Times(2)
487 .WillOnce(
488 Invoke([&version_result](const std::string &,
489 MysqlResultWrapperPtr *out, bool) -> int {
490 *out = version_result;
491 return 0;
492 }))
493 .WillOnce(
494 Invoke([&binlog_result](const std::string &,
495 MysqlResultWrapperPtr *out, bool) -> int {
496 *out = binlog_result;
497 return 2;
498 }))
499 .RetiresOnSaturation();
500 ret = handler.validate_mysql();
501 ASSERT_EQ(ret, ErrorCode_UnsupportedBinlogFormat);
502 }
503
504 // validate database exist failed
505 ailego::Uri test_uri = builder_.uri_;
506 EXPECT_CALL(*connector1_, uri())
507 .WillOnce(
508 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
509 .RetiresOnSaturation();
510 {
511 version_result->reset();
512 binlog_result->reset();
513 EXPECT_CALL(*connector1_, execute_query(_, _, _))
514 .Times(3)
515 .WillOnce(
516 Invoke([&version_result](const std::string &,
517 MysqlResultWrapperPtr *out, bool) -> int {
518 *out = version_result;
519 return 0;
520 }))
521 .WillOnce(
522 Invoke([&binlog_result](const std::string &,
523 MysqlResultWrapperPtr *out, bool) -> int {
524 *out = binlog_result;
525 return 0;
526 }))
527 .WillOnce(Invoke([&db_result](const std::string &,
528 MysqlResultWrapperPtr *out, bool) -> int {
529 *out = db_result;
530 return 3;
531 }))
532 .RetiresOnSaturation();
533 ret = handler.validate_mysql();
534 ASSERT_EQ(ret, ErrorCode_InvalidCollectionConfig);
535 }
536
537 // success
538 version_result->reset();
539 binlog_result->reset();
540 db_result->reset();
541
542 EXPECT_CALL(*connector2_, uri())
543 .WillOnce(
544 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
545 .RetiresOnSaturation();
546 EXPECT_CALL(*connector2_, execute_query(_, _, _))
547 .Times(3)
548 .WillOnce(
549 Invoke([&version_result](const std::string &,
550 MysqlResultWrapperPtr *out, bool) -> int {
551 *out = version_result;
552 return 0;
553 }))
554 .WillOnce(
555 Invoke([&binlog_result](const std::string &,
556 MysqlResultWrapperPtr *out, bool) -> int {
557 *out = binlog_result;
558 return 0;
559 }))
560 .WillOnce(Invoke([&db_result](const std::string &,
561 MysqlResultWrapperPtr *out, bool) -> int {
562 *out = db_result;
563 return 0;
564 }))
565 .RetiresOnSaturation();
566
567 ret = handler.validate_mysql();
568 ASSERT_EQ(ret, 0);
569}
570
571TEST_F(MysqlHandlerTest, TestGetTableSnapshot) {
572 // not init
573 std::string binlog_file;
574 uint64_t position;
575 int ret = handler_->get_table_snapshot(&binlog_file, &position);
576 ASSERT_EQ(ret, ErrorCode_NoInitialized);
577
578
579 //
580 // for validator
581 MockMysqlResultWrapperPtr version_result =
582 builder_.BuildSelectVersionResult();
583
584 MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult();
585 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
586 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
587 MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult();
588
589 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
590 // for fetcher
591 ailego::Uri test_uri = builder_.uri_;
592 EXPECT_CALL(*connector1_, execute_query(_, _, _))
593 .Times(3)
594 .WillOnce(
595 Invoke([&version_result](const std::string &,
596 MysqlResultWrapperPtr *out, bool) -> int {
597 *out = version_result;
598 return 0;
599 }))
600 .WillOnce(
601 Invoke([&binlog_result](const std::string &,
602 MysqlResultWrapperPtr *out, bool) -> int {
603 *out = binlog_result;
604 return 0;
605 }))
606 .WillOnce(Invoke([&db_result](const std::string &,
607 MysqlResultWrapperPtr *out, bool) -> int {
608 *out = db_result;
609 return 0;
610 }))
611 .RetiresOnSaturation();
612
613 EXPECT_CALL(*connector1_, uri())
614 .WillOnce(
615 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
616 .RetiresOnSaturation();
617
618 EXPECT_CALL(*connector2_, uri())
619 .WillOnce(
620 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
621 .RetiresOnSaturation();
622
623 EXPECT_CALL(*connector2_, execute_query(_, _, _))
624 .Times(2)
625 .WillOnce(Invoke([&result1](const std::string &,
626 MysqlResultWrapperPtr *out, bool) -> int {
627 *out = result1;
628 return 0;
629 }))
630 .WillOnce(
631 Invoke([&schema_result](const std::string &,
632 MysqlResultWrapperPtr *out, bool) -> int {
633 *out = schema_result;
634 return 0;
635 }))
636 .RetiresOnSaturation();
637
638 ret = handler_->init(ScanMode::FULL);
639 ASSERT_EQ(ret, 0);
640
641 EXPECT_CALL(*connector1_, execute_query(_, _, _))
642 .Times(1)
643 .WillOnce(Invoke([&scan_result](const std::string &,
644 MysqlResultWrapperPtr *out, bool) -> int {
645 *out = scan_result;
646 return 0;
647 }))
648 .RetiresOnSaturation();
649 ret = handler_->start(ctx_);
650 ASSERT_EQ(ret, 0);
651
652
653 // get table snapshot
654 MysqlResultWrapperPtr snapshot_result = BuildSnapshotResult();
655 EXPECT_CALL(*connector2_, execute_query(_, _, _))
656 .Times(3)
657 .WillOnce(testing::Return(0))
658 .WillOnce(
659 Invoke([&snapshot_result](const std::string &,
660 MysqlResultWrapperPtr *out, bool) -> int {
661 *out = snapshot_result;
662 return 0;
663 }))
664 .WillOnce(testing::Return(0))
665 .RetiresOnSaturation();
666
667 // success
668 ret = handler_->get_table_snapshot(&binlog_file, &position);
669 ASSERT_EQ(ret, 0);
670 ASSERT_EQ(binlog_file, "binlog.000001");
671 ASSERT_EQ(position, (uint64_t)10240);
672
673 // get info fetcher failed
674 TableReader *reader =
675 (TableReader *)((MysqlHandler *)handler_.get())->mysql_reader_.get();
676 reader->info_fetcher_ = nullptr;
677 ret = handler_->get_table_snapshot(&binlog_file, &position);
678 ASSERT_EQ(ret, ErrorCode_RuntimeError);
679}
680
681
682/////////////////////////////////
683
684
685// TEST_F(MysqlHandlerTest, TestSimple) {
686// bool flag = true;
687// CollectionConfig config;
688// config.set_connection_uri("mysql://root:[email protected]:3306/mytest");
689// config.set_repository_table("mt933");
690// config.add_forward_columns("name");
691// config.add_forward_columns("age");
692// config.add_forward_columns("score");
693// config.add_forward_columns("f1");
694// config.add_forward_columns("f2");
695// config.add_forward_columns("f3");
696// config.add_forward_columns("f4");
697// config.add_forward_columns("f5");
698// config.add_forward_columns("f6");
699// config.add_forward_columns("f10");
700// config.add_forward_columns("f11");
701// config.add_forward_columns("f12");
702// config.add_forward_columns("f13");
703// if (flag) {
704// config.add_forward_columns("f14");
705// }
706
707// config.add_index_columns("f9");
708
709// MysqlHandler handler(config);
710// LsnContext ctx;
711// ctx.seq_id = 0;
712
713// int ret = handler.init(ScanMode::FULL, ctx);
714// ASSERT_EQ(ret, 0);
715
716// ret = handler.get_table_snapshot(&(ctx.file_name), &(ctx.position));
717// printf("BinLog: %s %lu\n", ctx.file_name.c_str(), ctx.position);
718
719// uint64_t idx = 1;
720// while (true) {
721// RowData row_data;
722// ret = handler.get_next_row_data(&row_data, &ctx);
723// ASSERT_EQ(ret, 0);
724// if (ctx.status == RowDataStatus::NORMAL) {
725// EXPECT_EQ(row_data.primary_key(), idx);
726// EXPECT_EQ(ctx.seq_id, idx);
727// printf("seq_id: %lu\n", ctx.seq_id);
728// idx += 1;
729// } else {
730// break;
731// }
732// printf("%s\n", row_data.ShortDebugString().c_str());
733// }
734
735// ctx.position = 4;
736// ctx.file_name = "binlog.000001";
737
738// ret = handler.reset_status(ScanMode::INCREMENTAL, config, ctx);
739// ASSERT_EQ(ret, 0);
740
741// printf("////////////////////////////////////\n");
742// idx = 12;
743// int32_t retry_times = 3;
744// int32_t times = 0;
745// while (true) {
746// RowData row_data;
747// ret = handler.get_next_row_data(&row_data, &ctx);
748// if (ret != 0) {
749// continue;
750// }
751// ASSERT_EQ(ret, 0);
752// if (ctx.status == RowDataStatus::NORMAL) {
753// EXPECT_EQ(row_data.primary_key(), idx);
754// printf("file_name: %s position: %lu\n",
755// ctx.file_name.c_str(), ctx.position);
756// idx += 1;
757// } else if (ctx.status == RowDataStatus::SCHEMA_CHANGED) {
758// continue;
759// } else {
760// // sleep(1);
761// // printf("sleep...\n");
762// // // if (++times < retry_times) {
763// // continue;
764// // // } else {
765// // // break;
766// // // }
767// // break;
768// continue;
769// }
770// // printf("%s\n", row_data.ShortDebugString().c_str());
771// // printf("primary_key: %lu, %s %d %f %s %d %lf %s %s %d %s %s %s %s
772// %u %s\n",
773// // row_data.primary_key(),
774// // row_data.forward_columns().values(0).bytes_value().c_str(),
775// // row_data.forward_columns().values(1).int32_value(),
776// // row_data.forward_columns().values(2).float_value(),
777// // row_data.forward_columns().values(3).bytes_value().c_str(),
778// // row_data.forward_columns().values(4).int32_value(),
779// // row_data.forward_columns().values(5).double_value(),
780// // row_data.forward_columns().values(6).string_value().c_str(),
781// // row_data.forward_columns().values(7).string_value().c_str(),
782// // row_data.forward_columns().values(8).int32_value(),
783// // row_data.forward_columns().values(9).string_value().c_str(),
784// // row_data.forward_columns().values(10).string_value().c_str(),
785// // row_data.forward_columns().values(11).string_value().c_str(),
786// // row_data.forward_columns().values(12).string_value().c_str(),
787// // row_data.forward_columns().values(13).uint32_value(),
788// // row_data.index_columns(0).bytes_value().c_str());
789// }
790// }
791