1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author hongqing.hu |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gtest/gtest.h> |
22 | |
23 | #define private public |
24 | #include "repository/binlog/mysql_handler.h" |
25 | #include "repository/binlog/table_reader.h" |
26 | #include "repository/repository_common/error_code.h" |
27 | #include "event_builder.h" |
28 | #include "mock_mysql_connector.h" |
29 | #include "mysql_result_builder.h" |
30 | |
31 | #undef private |
32 | |
33 | using namespace proxima::be; |
34 | using namespace proxima::be::repository; |
35 | |
36 | class MysqlHandlerTest : public testing::Test { |
37 | protected: |
38 | void SetUp() { |
39 | mgr_ = std::make_shared<MysqlConnectorManager>(); |
40 | ASSERT_TRUE(mgr_); |
41 | connector1_ = std::make_shared<MockMysqlConnector>(); |
42 | ASSERT_TRUE(connector1_); |
43 | mgr_->put(connector1_); |
44 | connector2_ = std::make_shared<MockMysqlConnector>(); |
45 | ASSERT_TRUE(connector2_); |
46 | mgr_->put(connector2_); |
47 | ctx_.seq_id = 1; |
48 | InitHandler(); |
49 | } |
50 | |
51 | void TearDown() {} |
52 | |
53 | void InitHandler() { |
54 | builder_.BuildCollectionConfig(); |
55 | handler_ = std::make_shared<MysqlHandler>(builder_.config_, mgr_); |
56 | } |
57 | |
58 | MockMysqlResultWrapperPtr BuildSnapshotResult() { |
59 | MockMysqlResultWrapperPtr result = |
60 | std::make_shared<MockMysqlResultWrapper>(); |
61 | result->append_field_meta("File" ); |
62 | result->append_field_meta("Position" ); |
63 | result->append_field_meta("Binlog_Do_DB" ); |
64 | result->append_field_meta("Binlog_Ignore_DB" ); |
65 | result->append_field_meta("Executed_Gtid_Set" ); |
66 | |
67 | std::vector<std::string> values1 = {"binlog.000001" , "10240" , "" , "" , "" }; |
68 | result->append_row_values(values1); |
69 | |
70 | return result; |
71 | } |
72 | |
73 | protected: |
74 | MysqlHandlerPtr handler_{}; |
75 | MysqlConnectorManagerPtr mgr_{}; |
76 | MockMysqlConnectorPtr connector1_{}; |
77 | MockMysqlConnectorPtr connector2_{}; |
78 | InfoFetcherPtr fetcher_{}; |
79 | std::vector<enum_field_types> column_types_{}; |
80 | std::vector<int32_t> column_metas_{}; |
81 | LsnContext ctx_; |
82 | MysqlResultBuilder builder_{}; |
83 | }; |
84 | |
85 | TEST_F(MysqlHandlerTest, TestGeneral) { |
86 | ScanMode mode = ScanMode::FULL; |
87 | |
88 | // for validator |
89 | MockMysqlResultWrapperPtr version_result = |
90 | builder_.BuildSelectVersionResult(); |
91 | |
92 | MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult(); |
93 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
94 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
95 | MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult(); |
96 | |
97 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
98 | // for fetcher |
99 | ailego::Uri test_uri = builder_.uri_; |
100 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
101 | .Times(3) |
102 | .WillOnce( |
103 | Invoke([&version_result](const std::string &, |
104 | MysqlResultWrapperPtr *out, bool) -> int { |
105 | *out = version_result; |
106 | return 0; |
107 | })) |
108 | .WillOnce( |
109 | Invoke([&binlog_result](const std::string &, |
110 | MysqlResultWrapperPtr *out, bool) -> int { |
111 | *out = binlog_result; |
112 | return 0; |
113 | })) |
114 | .WillOnce(Invoke([&db_result](const std::string &, |
115 | MysqlResultWrapperPtr *out, bool) -> int { |
116 | *out = db_result; |
117 | return 0; |
118 | })) |
119 | .RetiresOnSaturation(); |
120 | |
121 | EXPECT_CALL(*connector1_, uri()) |
122 | .WillOnce( |
123 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
124 | .RetiresOnSaturation(); |
125 | |
126 | EXPECT_CALL(*connector2_, uri()) |
127 | .WillOnce( |
128 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
129 | .RetiresOnSaturation(); |
130 | |
131 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
132 | .Times(2) |
133 | .WillOnce(Invoke([&result1](const std::string &, |
134 | MysqlResultWrapperPtr *out, bool) -> int { |
135 | *out = result1; |
136 | return 0; |
137 | })) |
138 | .WillOnce( |
139 | Invoke([&schema_result](const std::string &, |
140 | MysqlResultWrapperPtr *out, bool) -> int { |
141 | *out = schema_result; |
142 | return 0; |
143 | })) |
144 | .RetiresOnSaturation(); |
145 | |
146 | int ret = handler_->init(mode); |
147 | ASSERT_EQ(ret, 0); |
148 | |
149 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
150 | .Times(1) |
151 | .WillOnce(Invoke([&scan_result](const std::string &, |
152 | MysqlResultWrapperPtr *out, bool) -> int { |
153 | *out = scan_result; |
154 | return 0; |
155 | })) |
156 | .RetiresOnSaturation(); |
157 | ret = handler_->start(ctx_); |
158 | ASSERT_EQ(ret, 0); |
159 | |
160 | // RowData row_data; |
161 | proto::WriteRequest::Row row_data; |
162 | LsnContext ctx; |
163 | ret = handler_->get_next_row_data(&row_data, &ctx); |
164 | ASSERT_EQ(ret, 0); |
165 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
166 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
167 | row_data.Clear(); |
168 | ret = handler_->get_next_row_data(&row_data, &ctx); |
169 | ASSERT_EQ(ret, 0); |
170 | ASSERT_EQ(row_data.primary_key(), (uint64_t)2); |
171 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
172 | row_data.Clear(); |
173 | ret = handler_->get_next_row_data(&row_data, &ctx); |
174 | ASSERT_EQ(ret, 0); |
175 | ASSERT_EQ(ctx.status, RowDataStatus::NO_MORE_DATA); |
176 | row_data.Clear(); |
177 | |
178 | // proxima::be::agent::proto::CollectionDataset dataset; |
179 | proto::WriteRequest::RowMeta meta; |
180 | ret = handler_->get_fields_meta(&meta); |
181 | ASSERT_EQ(ret, 0); |
182 | // ASSERT_EQ(dataset.index_tuples_size(), 2); |
183 | // ASSERT_EQ(dataset.index_tuples(0).field_name(), "vector1"); |
184 | // ASSERT_EQ(dataset.index_tuples(0).field_type(), |
185 | // GenericValueMeta::FT_STRING); |
186 | // ASSERT_EQ(dataset.index_tuples(1).field_name(), "vector2"); |
187 | // ASSERT_EQ(dataset.index_tuples(1).field_type(), |
188 | // GenericValueMeta::FT_STRING); ASSERT_EQ(dataset.forward_tuples_size(), 2); |
189 | // ASSERT_EQ(dataset.forward_tuples(0).field_name(), "name"); |
190 | // ASSERT_EQ(dataset.forward_tuples(1).field_name(), "age"); |
191 | ASSERT_EQ(meta.forward_column_names(0), "name" ); |
192 | ASSERT_EQ(meta.forward_column_names(1), "age" ); |
193 | ASSERT_EQ(meta.index_column_metas(0).column_name(), "vector1" ); |
194 | ASSERT_EQ(meta.index_column_metas(1).column_name(), "vector2" ); |
195 | // reset status |
196 | EXPECT_CALL(*connector2_, uri()) |
197 | .WillOnce( |
198 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
199 | .RetiresOnSaturation(); |
200 | |
201 | result1->reset(); |
202 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
203 | .Times(2) |
204 | .WillOnce(Invoke([&result1](const std::string &, |
205 | MysqlResultWrapperPtr *out, bool) -> int { |
206 | *out = result1; |
207 | return 0; |
208 | })) |
209 | .WillOnce( |
210 | Invoke([&schema_result](const std::string &, |
211 | MysqlResultWrapperPtr *out, bool) -> int { |
212 | *out = schema_result; |
213 | return 0; |
214 | })) |
215 | .RetiresOnSaturation(); |
216 | // first set check sum and update lsn info |
217 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
218 | .Times(2) |
219 | .WillOnce(::testing::Return(0)) |
220 | .WillOnce(::testing::Return(0)) |
221 | .RetiresOnSaturation(); |
222 | // second request dump |
223 | EXPECT_CALL(*connector1_, execute_simple_command(_, _, _)) |
224 | .WillOnce(::testing::Return(0)) |
225 | .RetiresOnSaturation(); |
226 | |
227 | mode = ScanMode::INCREMENTAL; |
228 | ctx_.file_name = "binlog.000003" ; |
229 | ctx_.position = 4; |
230 | ret = handler_->reset_status(mode, builder_.config_, ctx_); |
231 | ASSERT_EQ(ret, 0); |
232 | |
233 | // fetch data |
234 | std::string table_map_str = builder_.BuildTableMapEventStr(); |
235 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
236 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
237 | std::string write_rows_str = builder_.BuildWriteRowsEventStr(values); |
238 | EXPECT_CALL(*connector1_, client_safe_read(_)) |
239 | .Times(2) |
240 | .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int { |
241 | *len = table_map_str.size(); |
242 | return 0; |
243 | })) |
244 | .WillOnce(Invoke([&write_rows_str](unsigned long *len) -> int { |
245 | *len = write_rows_str.size(); |
246 | return 0; |
247 | })) |
248 | .RetiresOnSaturation(); |
249 | |
250 | EXPECT_CALL(*connector1_, data()) |
251 | .Times(2) |
252 | .WillOnce(Invoke([&table_map_str]() -> const void * { |
253 | return (const void *)table_map_str.c_str(); |
254 | })) |
255 | .WillOnce(Invoke([&write_rows_str]() -> const void * { |
256 | return (const void *)write_rows_str.c_str(); |
257 | })) |
258 | .RetiresOnSaturation(); |
259 | |
260 | row_data.Clear(); |
261 | ret = handler_->get_next_row_data(&row_data, &ctx); |
262 | ASSERT_EQ(ret, 0); |
263 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
264 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
265 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
266 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30); |
267 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
268 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
269 | } |
270 | |
271 | TEST_F(MysqlHandlerTest, TestInit) { |
272 | ScanMode mode = ScanMode::FULL; |
273 | |
274 | // connection manager init failed |
275 | { |
276 | auto config1 = builder_.config_; |
277 | config1.mutable_repository_config()->mutable_database()->set_connection_uri( |
278 | "invalid" ); |
279 | MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(config1); |
280 | int ret = handler->init(mode); |
281 | ASSERT_EQ(ret, ErrorCode_InvalidArgument); |
282 | } |
283 | |
284 | // validate mysql failed |
285 | { |
286 | MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(builder_.config_); |
287 | int ret = handler->init(mode); |
288 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
289 | } |
290 | |
291 | // success |
292 | { |
293 | MysqlHandlerPtr handler = |
294 | std::make_shared<MysqlHandler>(builder_.config_, mgr_); |
295 | |
296 | // for validator |
297 | MockMysqlResultWrapperPtr version_result = |
298 | builder_.BuildSelectVersionResult(); |
299 | |
300 | MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult(); |
301 | MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult(); |
302 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
303 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
304 | // for fetcher |
305 | ailego::Uri test_uri = builder_.uri_; |
306 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
307 | .Times(3) |
308 | .WillOnce( |
309 | Invoke([&version_result](const std::string &, |
310 | MysqlResultWrapperPtr *out, bool) -> int { |
311 | *out = version_result; |
312 | return 0; |
313 | })) |
314 | .WillOnce( |
315 | Invoke([&binlog_result](const std::string &, |
316 | MysqlResultWrapperPtr *out, bool) -> int { |
317 | *out = binlog_result; |
318 | return 0; |
319 | })) |
320 | .WillOnce(Invoke([&db_result](const std::string &, |
321 | MysqlResultWrapperPtr *out, bool) -> int { |
322 | *out = db_result; |
323 | return 0; |
324 | })) |
325 | .RetiresOnSaturation(); |
326 | |
327 | EXPECT_CALL(*connector1_, uri()) |
328 | .WillOnce( |
329 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
330 | .RetiresOnSaturation(); |
331 | |
332 | EXPECT_CALL(*connector2_, uri()) |
333 | .WillOnce( |
334 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
335 | .RetiresOnSaturation(); |
336 | |
337 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
338 | .Times(2) |
339 | .WillOnce(Invoke([&result1](const std::string &, |
340 | MysqlResultWrapperPtr *out, bool) -> int { |
341 | *out = result1; |
342 | return 0; |
343 | })) |
344 | .WillOnce( |
345 | Invoke([&schema_result](const std::string &, |
346 | MysqlResultWrapperPtr *out, bool) -> int { |
347 | *out = schema_result; |
348 | return 0; |
349 | })) |
350 | .RetiresOnSaturation(); |
351 | |
352 | int ret = handler_->init(mode); |
353 | ASSERT_EQ(ret, 0); |
354 | |
355 | ret = handler_->init(mode); |
356 | ASSERT_EQ(ret, ErrorCode_RepeatedInitialized); |
357 | } |
358 | } |
359 | |
360 | TEST_F(MysqlHandlerTest, TestStart) { |
361 | ScanMode mode = ScanMode::FULL; |
362 | |
363 | // connection manager init failed |
364 | { |
365 | auto config1 = builder_.config_; |
366 | config1.mutable_repository_config()->mutable_database()->set_connection_uri( |
367 | "invalid" ); |
368 | MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(config1); |
369 | int ret = handler->init(mode); |
370 | ASSERT_EQ(ret, ErrorCode_InvalidArgument); |
371 | } |
372 | |
373 | // validate mysql failed |
374 | { |
375 | MysqlHandlerPtr handler = std::make_shared<MysqlHandler>(builder_.config_); |
376 | int ret = handler->init(mode); |
377 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
378 | } |
379 | |
380 | // success |
381 | { |
382 | MysqlHandlerPtr handler = |
383 | std::make_shared<MysqlHandler>(builder_.config_, mgr_); |
384 | |
385 | // for validator |
386 | MockMysqlResultWrapperPtr version_result = |
387 | builder_.BuildSelectVersionResult(); |
388 | |
389 | MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult(); |
390 | MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult(); |
391 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
392 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
393 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
394 | |
395 | // for fetcher |
396 | ailego::Uri test_uri = builder_.uri_; |
397 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
398 | .Times(3) |
399 | .WillOnce( |
400 | Invoke([&version_result](const std::string &, |
401 | MysqlResultWrapperPtr *out, bool) -> int { |
402 | *out = version_result; |
403 | return 0; |
404 | })) |
405 | .WillOnce( |
406 | Invoke([&binlog_result](const std::string &, |
407 | MysqlResultWrapperPtr *out, bool) -> int { |
408 | *out = binlog_result; |
409 | return 0; |
410 | })) |
411 | .WillOnce(Invoke([&db_result](const std::string &, |
412 | MysqlResultWrapperPtr *out, bool) -> int { |
413 | *out = db_result; |
414 | return 0; |
415 | })) |
416 | .RetiresOnSaturation(); |
417 | |
418 | EXPECT_CALL(*connector1_, uri()) |
419 | .WillOnce( |
420 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
421 | .RetiresOnSaturation(); |
422 | |
423 | EXPECT_CALL(*connector2_, uri()) |
424 | .WillOnce( |
425 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
426 | .RetiresOnSaturation(); |
427 | |
428 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
429 | .Times(2) |
430 | .WillOnce(Invoke([&result1](const std::string &, |
431 | MysqlResultWrapperPtr *out, bool) -> int { |
432 | *out = result1; |
433 | return 0; |
434 | })) |
435 | .WillOnce( |
436 | Invoke([&schema_result](const std::string &, |
437 | MysqlResultWrapperPtr *out, bool) -> int { |
438 | *out = schema_result; |
439 | return 0; |
440 | })) |
441 | .RetiresOnSaturation(); |
442 | |
443 | int ret = handler_->init(mode); |
444 | ASSERT_EQ(ret, 0); |
445 | |
446 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
447 | .Times(1) |
448 | .WillOnce( |
449 | Invoke([&scan_result](const std::string &, |
450 | MysqlResultWrapperPtr *out, bool) -> int { |
451 | *out = scan_result; |
452 | return 0; |
453 | })) |
454 | .RetiresOnSaturation(); |
455 | ret = handler_->start(ctx_); |
456 | ASSERT_EQ(ret, 0); |
457 | } |
458 | } |
459 | |
460 | TEST_F(MysqlHandlerTest, TestValidateMysql) { |
461 | // validator init failed |
462 | MysqlConnectorManagerPtr mgr; |
463 | MysqlHandler handler1(builder_.config_, mgr); |
464 | int ret = handler1.validate_mysql(); |
465 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
466 | |
467 | MysqlHandler handler(builder_.config_, mgr_); |
468 | MockMysqlResultWrapperPtr version_result = |
469 | builder_.BuildSelectVersionResult(); |
470 | MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult(); |
471 | MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult(); |
472 | |
473 | // validate version failed |
474 | { |
475 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
476 | .Times(1) |
477 | .WillOnce(testing::Return(1)) |
478 | .RetiresOnSaturation(); |
479 | ret = handler.validate_mysql(); |
480 | ASSERT_EQ(ret, ErrorCode_UnsupportedMysqlVersion); |
481 | } |
482 | |
483 | // validate binlog format failed |
484 | { |
485 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
486 | .Times(2) |
487 | .WillOnce( |
488 | Invoke([&version_result](const std::string &, |
489 | MysqlResultWrapperPtr *out, bool) -> int { |
490 | *out = version_result; |
491 | return 0; |
492 | })) |
493 | .WillOnce( |
494 | Invoke([&binlog_result](const std::string &, |
495 | MysqlResultWrapperPtr *out, bool) -> int { |
496 | *out = binlog_result; |
497 | return 2; |
498 | })) |
499 | .RetiresOnSaturation(); |
500 | ret = handler.validate_mysql(); |
501 | ASSERT_EQ(ret, ErrorCode_UnsupportedBinlogFormat); |
502 | } |
503 | |
504 | // validate database exist failed |
505 | ailego::Uri test_uri = builder_.uri_; |
506 | EXPECT_CALL(*connector1_, uri()) |
507 | .WillOnce( |
508 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
509 | .RetiresOnSaturation(); |
510 | { |
511 | version_result->reset(); |
512 | binlog_result->reset(); |
513 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
514 | .Times(3) |
515 | .WillOnce( |
516 | Invoke([&version_result](const std::string &, |
517 | MysqlResultWrapperPtr *out, bool) -> int { |
518 | *out = version_result; |
519 | return 0; |
520 | })) |
521 | .WillOnce( |
522 | Invoke([&binlog_result](const std::string &, |
523 | MysqlResultWrapperPtr *out, bool) -> int { |
524 | *out = binlog_result; |
525 | return 0; |
526 | })) |
527 | .WillOnce(Invoke([&db_result](const std::string &, |
528 | MysqlResultWrapperPtr *out, bool) -> int { |
529 | *out = db_result; |
530 | return 3; |
531 | })) |
532 | .RetiresOnSaturation(); |
533 | ret = handler.validate_mysql(); |
534 | ASSERT_EQ(ret, ErrorCode_InvalidCollectionConfig); |
535 | } |
536 | |
537 | // success |
538 | version_result->reset(); |
539 | binlog_result->reset(); |
540 | db_result->reset(); |
541 | |
542 | EXPECT_CALL(*connector2_, uri()) |
543 | .WillOnce( |
544 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
545 | .RetiresOnSaturation(); |
546 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
547 | .Times(3) |
548 | .WillOnce( |
549 | Invoke([&version_result](const std::string &, |
550 | MysqlResultWrapperPtr *out, bool) -> int { |
551 | *out = version_result; |
552 | return 0; |
553 | })) |
554 | .WillOnce( |
555 | Invoke([&binlog_result](const std::string &, |
556 | MysqlResultWrapperPtr *out, bool) -> int { |
557 | *out = binlog_result; |
558 | return 0; |
559 | })) |
560 | .WillOnce(Invoke([&db_result](const std::string &, |
561 | MysqlResultWrapperPtr *out, bool) -> int { |
562 | *out = db_result; |
563 | return 0; |
564 | })) |
565 | .RetiresOnSaturation(); |
566 | |
567 | ret = handler.validate_mysql(); |
568 | ASSERT_EQ(ret, 0); |
569 | } |
570 | |
571 | TEST_F(MysqlHandlerTest, TestGetTableSnapshot) { |
572 | // not init |
573 | std::string binlog_file; |
574 | uint64_t position; |
575 | int ret = handler_->get_table_snapshot(&binlog_file, &position); |
576 | ASSERT_EQ(ret, ErrorCode_NoInitialized); |
577 | |
578 | |
579 | // |
580 | // for validator |
581 | MockMysqlResultWrapperPtr version_result = |
582 | builder_.BuildSelectVersionResult(); |
583 | |
584 | MockMysqlResultWrapperPtr binlog_result = builder_.BuildShowBinlogResult(); |
585 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
586 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
587 | MockMysqlResultWrapperPtr db_result = builder_.BuildSelectDbResult(); |
588 | |
589 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
590 | // for fetcher |
591 | ailego::Uri test_uri = builder_.uri_; |
592 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
593 | .Times(3) |
594 | .WillOnce( |
595 | Invoke([&version_result](const std::string &, |
596 | MysqlResultWrapperPtr *out, bool) -> int { |
597 | *out = version_result; |
598 | return 0; |
599 | })) |
600 | .WillOnce( |
601 | Invoke([&binlog_result](const std::string &, |
602 | MysqlResultWrapperPtr *out, bool) -> int { |
603 | *out = binlog_result; |
604 | return 0; |
605 | })) |
606 | .WillOnce(Invoke([&db_result](const std::string &, |
607 | MysqlResultWrapperPtr *out, bool) -> int { |
608 | *out = db_result; |
609 | return 0; |
610 | })) |
611 | .RetiresOnSaturation(); |
612 | |
613 | EXPECT_CALL(*connector1_, uri()) |
614 | .WillOnce( |
615 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
616 | .RetiresOnSaturation(); |
617 | |
618 | EXPECT_CALL(*connector2_, uri()) |
619 | .WillOnce( |
620 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
621 | .RetiresOnSaturation(); |
622 | |
623 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
624 | .Times(2) |
625 | .WillOnce(Invoke([&result1](const std::string &, |
626 | MysqlResultWrapperPtr *out, bool) -> int { |
627 | *out = result1; |
628 | return 0; |
629 | })) |
630 | .WillOnce( |
631 | Invoke([&schema_result](const std::string &, |
632 | MysqlResultWrapperPtr *out, bool) -> int { |
633 | *out = schema_result; |
634 | return 0; |
635 | })) |
636 | .RetiresOnSaturation(); |
637 | |
638 | ret = handler_->init(ScanMode::FULL); |
639 | ASSERT_EQ(ret, 0); |
640 | |
641 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
642 | .Times(1) |
643 | .WillOnce(Invoke([&scan_result](const std::string &, |
644 | MysqlResultWrapperPtr *out, bool) -> int { |
645 | *out = scan_result; |
646 | return 0; |
647 | })) |
648 | .RetiresOnSaturation(); |
649 | ret = handler_->start(ctx_); |
650 | ASSERT_EQ(ret, 0); |
651 | |
652 | |
653 | // get table snapshot |
654 | MysqlResultWrapperPtr snapshot_result = BuildSnapshotResult(); |
655 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
656 | .Times(3) |
657 | .WillOnce(testing::Return(0)) |
658 | .WillOnce( |
659 | Invoke([&snapshot_result](const std::string &, |
660 | MysqlResultWrapperPtr *out, bool) -> int { |
661 | *out = snapshot_result; |
662 | return 0; |
663 | })) |
664 | .WillOnce(testing::Return(0)) |
665 | .RetiresOnSaturation(); |
666 | |
667 | // success |
668 | ret = handler_->get_table_snapshot(&binlog_file, &position); |
669 | ASSERT_EQ(ret, 0); |
670 | ASSERT_EQ(binlog_file, "binlog.000001" ); |
671 | ASSERT_EQ(position, (uint64_t)10240); |
672 | |
673 | // get info fetcher failed |
674 | TableReader *reader = |
675 | (TableReader *)((MysqlHandler *)handler_.get())->mysql_reader_.get(); |
676 | reader->info_fetcher_ = nullptr; |
677 | ret = handler_->get_table_snapshot(&binlog_file, &position); |
678 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
679 | } |
680 | |
681 | |
682 | ///////////////////////////////// |
683 | |
684 | |
685 | // TEST_F(MysqlHandlerTest, TestSimple) { |
686 | // bool flag = true; |
687 | // CollectionConfig config; |
688 | // config.set_connection_uri("mysql://root:[email protected]:3306/mytest"); |
689 | // config.set_repository_table("mt933"); |
690 | // config.add_forward_columns("name"); |
691 | // config.add_forward_columns("age"); |
692 | // config.add_forward_columns("score"); |
693 | // config.add_forward_columns("f1"); |
694 | // config.add_forward_columns("f2"); |
695 | // config.add_forward_columns("f3"); |
696 | // config.add_forward_columns("f4"); |
697 | // config.add_forward_columns("f5"); |
698 | // config.add_forward_columns("f6"); |
699 | // config.add_forward_columns("f10"); |
700 | // config.add_forward_columns("f11"); |
701 | // config.add_forward_columns("f12"); |
702 | // config.add_forward_columns("f13"); |
703 | // if (flag) { |
704 | // config.add_forward_columns("f14"); |
705 | // } |
706 | |
707 | // config.add_index_columns("f9"); |
708 | |
709 | // MysqlHandler handler(config); |
710 | // LsnContext ctx; |
711 | // ctx.seq_id = 0; |
712 | |
713 | // int ret = handler.init(ScanMode::FULL, ctx); |
714 | // ASSERT_EQ(ret, 0); |
715 | |
716 | // ret = handler.get_table_snapshot(&(ctx.file_name), &(ctx.position)); |
717 | // printf("BinLog: %s %lu\n", ctx.file_name.c_str(), ctx.position); |
718 | |
719 | // uint64_t idx = 1; |
720 | // while (true) { |
721 | // RowData row_data; |
722 | // ret = handler.get_next_row_data(&row_data, &ctx); |
723 | // ASSERT_EQ(ret, 0); |
724 | // if (ctx.status == RowDataStatus::NORMAL) { |
725 | // EXPECT_EQ(row_data.primary_key(), idx); |
726 | // EXPECT_EQ(ctx.seq_id, idx); |
727 | // printf("seq_id: %lu\n", ctx.seq_id); |
728 | // idx += 1; |
729 | // } else { |
730 | // break; |
731 | // } |
732 | // printf("%s\n", row_data.ShortDebugString().c_str()); |
733 | // } |
734 | |
735 | // ctx.position = 4; |
736 | // ctx.file_name = "binlog.000001"; |
737 | |
738 | // ret = handler.reset_status(ScanMode::INCREMENTAL, config, ctx); |
739 | // ASSERT_EQ(ret, 0); |
740 | |
741 | // printf("////////////////////////////////////\n"); |
742 | // idx = 12; |
743 | // int32_t retry_times = 3; |
744 | // int32_t times = 0; |
745 | // while (true) { |
746 | // RowData row_data; |
747 | // ret = handler.get_next_row_data(&row_data, &ctx); |
748 | // if (ret != 0) { |
749 | // continue; |
750 | // } |
751 | // ASSERT_EQ(ret, 0); |
752 | // if (ctx.status == RowDataStatus::NORMAL) { |
753 | // EXPECT_EQ(row_data.primary_key(), idx); |
754 | // printf("file_name: %s position: %lu\n", |
755 | // ctx.file_name.c_str(), ctx.position); |
756 | // idx += 1; |
757 | // } else if (ctx.status == RowDataStatus::SCHEMA_CHANGED) { |
758 | // continue; |
759 | // } else { |
760 | // // sleep(1); |
761 | // // printf("sleep...\n"); |
762 | // // // if (++times < retry_times) { |
763 | // // continue; |
764 | // // // } else { |
765 | // // // break; |
766 | // // // } |
767 | // // break; |
768 | // continue; |
769 | // } |
770 | // // printf("%s\n", row_data.ShortDebugString().c_str()); |
771 | // // printf("primary_key: %lu, %s %d %f %s %d %lf %s %s %d %s %s %s %s |
772 | // %u %s\n", |
773 | // // row_data.primary_key(), |
774 | // // row_data.forward_columns().values(0).bytes_value().c_str(), |
775 | // // row_data.forward_columns().values(1).int32_value(), |
776 | // // row_data.forward_columns().values(2).float_value(), |
777 | // // row_data.forward_columns().values(3).bytes_value().c_str(), |
778 | // // row_data.forward_columns().values(4).int32_value(), |
779 | // // row_data.forward_columns().values(5).double_value(), |
780 | // // row_data.forward_columns().values(6).string_value().c_str(), |
781 | // // row_data.forward_columns().values(7).string_value().c_str(), |
782 | // // row_data.forward_columns().values(8).int32_value(), |
783 | // // row_data.forward_columns().values(9).string_value().c_str(), |
784 | // // row_data.forward_columns().values(10).string_value().c_str(), |
785 | // // row_data.forward_columns().values(11).string_value().c_str(), |
786 | // // row_data.forward_columns().values(12).string_value().c_str(), |
787 | // // row_data.forward_columns().values(13).uint32_value(), |
788 | // // row_data.index_columns(0).bytes_value().c_str()); |
789 | // } |
790 | // } |
791 | |