1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author hongqing.hu |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gtest/gtest.h> |
22 | |
23 | #define private public |
24 | #include "repository/binlog/binlog_reader.h" |
25 | #include "repository/binlog/info_fetcher.h" |
26 | #include "repository/binlog/rows_event_parser.h" |
27 | #include "mock_mysql_connector.h" |
28 | #undef private |
29 | #include "repository/repository_common/error_code.h" |
30 | #include "event_builder.h" |
31 | #include "mysql_result_builder.h" |
32 | |
33 | using namespace ::proxima::be; |
34 | using namespace proxima::be::repository; |
35 | |
36 | class BinlogReaderTest : public testing::Test { |
37 | protected: |
38 | void SetUp() { |
39 | mgr_ = std::make_shared<MysqlConnectorManager>(); |
40 | ASSERT_TRUE(mgr_); |
41 | connector1_ = std::make_shared<MockMysqlConnector>(); |
42 | ASSERT_TRUE(connector1_); |
43 | mgr_->put(connector1_); |
44 | connector2_ = std::make_shared<MockMysqlConnector>(); |
45 | ASSERT_TRUE(connector2_); |
46 | mgr_->put(connector2_); |
47 | InitTableSchema(); |
48 | |
49 | ctx_.position = 4; |
50 | ctx_.file_name = "binlog.000004" ; |
51 | |
52 | table_name_ = builder_.table_name_; |
53 | } |
54 | |
55 | void TearDown() {} |
56 | |
57 | void InitTableSchema() { |
58 | builder_.BuildCollectionConfig(); |
59 | |
60 | // init |
61 | ailego::Uri test_uri = builder_.uri_; |
62 | EXPECT_CALL(*connector1_, uri()) |
63 | .WillOnce( |
64 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
65 | .RetiresOnSaturation(); |
66 | fetcher_ = std::make_shared<InfoFetcher>(builder_.config_, mgr_); |
67 | int ret = fetcher_->init(); |
68 | ASSERT_EQ(ret, 0); |
69 | } |
70 | |
71 | std::string BuildNoMoreDataEvent() { |
72 | std::string str(1, (char)254); |
73 | return str; |
74 | } |
75 | |
76 | std::string BuildQueryEventStr(const std::string &query) { |
77 | return " " + EventBuilder::BuildQueryEvent(builder_.db_, query); |
78 | } |
79 | |
80 | std::string BuildRotateEventStr(const std::string &file, bool has_crc) { |
81 | return " " + EventBuilder::BuildRotateEvent(file, 4, has_crc); |
82 | } |
83 | |
84 | protected: |
85 | MysqlConnectorManagerPtr mgr_{}; |
86 | MockMysqlConnectorPtr connector1_{}; |
87 | MockMysqlConnectorPtr connector2_{}; |
88 | std::string table_name_{}; |
89 | InfoFetcherPtr fetcher_{}; |
90 | TableSchemaPtr schema_; |
91 | LsnContext ctx_; |
92 | MysqlResultBuilder builder_{}; |
93 | }; |
94 | |
95 | TEST_F(BinlogReaderTest, TestSimple) { |
96 | BinlogReader reader(table_name_, fetcher_, mgr_); |
97 | MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult(); |
98 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
99 | // init table schema |
100 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
101 | .Times(2) |
102 | .WillOnce(Invoke([&result1](const std::string &, |
103 | MysqlResultWrapperPtr *out, bool) -> int { |
104 | *out = result1; |
105 | return 0; |
106 | })) |
107 | .WillOnce(Invoke([&result](const std::string &, |
108 | MysqlResultWrapperPtr *out, bool) -> int { |
109 | *out = result; |
110 | return 0; |
111 | })) |
112 | .RetiresOnSaturation(); |
113 | int ret = reader.init(); |
114 | ASSERT_EQ(ret, 0); |
115 | |
116 | // first set check sum |
117 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
118 | .Times(2) |
119 | .WillOnce(::testing::Return(0)) |
120 | .WillOnce(::testing::Return(0)) |
121 | .RetiresOnSaturation(); |
122 | // second request dump |
123 | EXPECT_CALL(*connector2_, execute_simple_command(_, _, _)) |
124 | .WillOnce(::testing::Return(0)) |
125 | .RetiresOnSaturation(); |
126 | ret = reader.start(ctx_); |
127 | ASSERT_EQ(ret, 0); |
128 | |
129 | // fetch data |
130 | std::string table_map_str = builder_.BuildTableMapEventStr(); |
131 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
132 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
133 | std::string write_rows_str = builder_.BuildWriteRowsEventStr(values); |
134 | EXPECT_CALL(*connector2_, client_safe_read(_)) |
135 | .Times(2) |
136 | .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int { |
137 | *len = table_map_str.size(); |
138 | return 0; |
139 | })) |
140 | .WillOnce(Invoke([&write_rows_str](unsigned long *len) -> int { |
141 | *len = write_rows_str.size(); |
142 | return 0; |
143 | })) |
144 | .RetiresOnSaturation(); |
145 | |
146 | EXPECT_CALL(*connector2_, data()) |
147 | .Times(2) |
148 | .WillOnce(Invoke([&table_map_str]() -> unsigned char * { |
149 | return (unsigned char *)table_map_str.c_str(); |
150 | })) |
151 | .WillOnce(Invoke([&write_rows_str]() -> unsigned char * { |
152 | return (unsigned char *)write_rows_str.c_str(); |
153 | })) |
154 | .RetiresOnSaturation(); |
155 | |
156 | WriteRequest::Row row_data; |
157 | LsnContext context; |
158 | ret = reader.get_next_row_data(&row_data, &context); |
159 | ASSERT_EQ(ret, 0); |
160 | ASSERT_EQ(context.status, RowDataStatus::NORMAL); |
161 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
162 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
163 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30); |
164 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
165 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
166 | } |
167 | |
168 | TEST_F(BinlogReaderTest, TestGetNextRowData) { |
169 | BinlogReader reader(table_name_, fetcher_, mgr_); |
170 | MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult(); |
171 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
172 | // init table schema |
173 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
174 | .Times(2) |
175 | .WillOnce(Invoke([&result1](const std::string &, |
176 | MysqlResultWrapperPtr *out, bool) -> int { |
177 | *out = result1; |
178 | return 0; |
179 | })) |
180 | .WillOnce(Invoke([&result](const std::string &, |
181 | MysqlResultWrapperPtr *out, bool) -> int { |
182 | *out = result; |
183 | return 0; |
184 | })) |
185 | .RetiresOnSaturation(); |
186 | |
187 | int ret = reader.init(); |
188 | ASSERT_EQ(ret, 0); |
189 | |
190 | // first set check sum |
191 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
192 | .Times(2) |
193 | .WillOnce(::testing::Return(0)) |
194 | .WillOnce(::testing::Return(0)) |
195 | .RetiresOnSaturation(); |
196 | // second request dump |
197 | EXPECT_CALL(*connector2_, execute_simple_command(_, _, _)) |
198 | .WillOnce(::testing::Return(0)) |
199 | .RetiresOnSaturation(); |
200 | |
201 | ret = reader.start(ctx_); |
202 | ASSERT_EQ(ret, 0); |
203 | |
204 | // fetch data |
205 | std::string rotate_event_str = BuildRotateEventStr(ctx_.file_name, false); |
206 | std::string rotate_event_str1 = BuildRotateEventStr(ctx_.file_name, true); |
207 | std::string query_event_str = BuildQueryEventStr("query event" ); |
208 | std::string query_event_str1 = BuildQueryEventStr("alter table mytest." ); |
209 | std::string table_map_str = builder_.BuildTableMapEventStr(); |
210 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
211 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
212 | size_t rows_count = 2; |
213 | std::string write_rows_str = |
214 | builder_.BuildWriteRowsEventStr(values, rows_count); |
215 | EXPECT_CALL(*connector2_, client_safe_read(_)) |
216 | .Times(5) |
217 | .WillOnce(Invoke([&rotate_event_str](unsigned long *len) -> int { |
218 | *len = rotate_event_str.size(); |
219 | return 0; |
220 | })) |
221 | .WillOnce(Invoke([&query_event_str](unsigned long *len) -> int { |
222 | *len = query_event_str.size(); |
223 | return 0; |
224 | })) |
225 | .WillOnce(Invoke([&rotate_event_str1](unsigned long *len) -> int { |
226 | *len = rotate_event_str1.size(); |
227 | return 0; |
228 | })) |
229 | .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int { |
230 | *len = table_map_str.size(); |
231 | return 0; |
232 | })) |
233 | .WillOnce(Invoke([&write_rows_str](unsigned long *len) -> int { |
234 | *len = write_rows_str.size(); |
235 | return 0; |
236 | })) |
237 | .RetiresOnSaturation(); |
238 | |
239 | EXPECT_CALL(*connector2_, data()) |
240 | .Times(5) |
241 | .WillOnce(Invoke([&rotate_event_str]() -> unsigned char * { |
242 | return (unsigned char *)rotate_event_str.c_str(); |
243 | })) |
244 | .WillOnce(Invoke([&query_event_str]() -> unsigned char * { |
245 | return (unsigned char *)query_event_str.c_str(); |
246 | })) |
247 | .WillOnce(Invoke([&rotate_event_str1]() -> unsigned char * { |
248 | return (unsigned char *)rotate_event_str1.c_str(); |
249 | })) |
250 | .WillOnce(Invoke([&table_map_str]() -> unsigned char * { |
251 | return (unsigned char *)table_map_str.c_str(); |
252 | })) |
253 | .WillOnce(Invoke([&write_rows_str]() -> unsigned char * { |
254 | return (unsigned char *)write_rows_str.c_str(); |
255 | })) |
256 | .RetiresOnSaturation(); |
257 | |
258 | |
259 | for (size_t i = 0; i < rows_count; ++i) { |
260 | WriteRequest::Row row_data; |
261 | LsnContext context; |
262 | ret = reader.get_next_row_data(&row_data, &context); |
263 | ASSERT_EQ(ret, 0); |
264 | ASSERT_EQ(context.status, RowDataStatus::NORMAL); |
265 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
266 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), |
267 | "name1" ); |
268 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30); |
269 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), |
270 | "1,2,3,4" ); |
271 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), |
272 | "1,2,3,5" ); |
273 | } |
274 | |
275 | EXPECT_CALL(*connector2_, client_safe_read(_)) |
276 | .WillOnce(Invoke([&query_event_str1](unsigned long *len) -> int { |
277 | *len = query_event_str1.size(); |
278 | return 0; |
279 | })) |
280 | .RetiresOnSaturation(); |
281 | |
282 | EXPECT_CALL(*connector2_, data()) |
283 | .WillOnce(Invoke([&query_event_str1]() -> unsigned char * { |
284 | return (unsigned char *)query_event_str1.c_str(); |
285 | })) |
286 | .RetiresOnSaturation(); |
287 | |
288 | result1->reset(); |
289 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
290 | .Times(2) |
291 | .WillOnce(Invoke([&result1](const std::string &, |
292 | MysqlResultWrapperPtr *out, bool) -> int { |
293 | *out = result1; |
294 | return 0; |
295 | })) |
296 | .WillOnce(Invoke([&result](const std::string &, |
297 | MysqlResultWrapperPtr *out, bool) -> int { |
298 | *out = result; |
299 | return ErrorCode_ExecuteMysql; |
300 | })) |
301 | .RetiresOnSaturation(); |
302 | |
303 | WriteRequest::Row row_data; |
304 | LsnContext context; |
305 | ret = reader.get_next_row_data(&row_data, &context); |
306 | ASSERT_EQ(ret, ErrorCode_ExecuteMysql); |
307 | |
308 | result1->reset(); |
309 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
310 | .Times(2) |
311 | .WillOnce(Invoke([&result1](const std::string &, |
312 | MysqlResultWrapperPtr *out, bool) -> int { |
313 | *out = result1; |
314 | return 0; |
315 | })) |
316 | .WillOnce(Invoke([&result](const std::string &, |
317 | MysqlResultWrapperPtr *out, bool) -> int { |
318 | *out = result; |
319 | return 0; |
320 | })) |
321 | .RetiresOnSaturation(); |
322 | |
323 | ret = reader.get_next_row_data(&row_data, &context); |
324 | ASSERT_EQ(ret, 0); |
325 | ASSERT_EQ(context.status, RowDataStatus::SCHEMA_CHANGED); |
326 | } |
327 | |
328 | TEST_F(BinlogReaderTest, TestInitWithGetTableSchemaFailed) { |
329 | BinlogReader reader(table_name_, fetcher_, mgr_); |
330 | // init table schema |
331 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
332 | .Times(1) |
333 | .WillOnce(testing::Return(1)) |
334 | .RetiresOnSaturation(); |
335 | int ret = reader.init(); |
336 | ASSERT_EQ(ret, ErrorCode_ExecuteMysql); |
337 | } |
338 | |
339 | TEST_F(BinlogReaderTest, TestStartWithInitEventFetcherFailed) { |
340 | BinlogReader reader(table_name_, fetcher_, mgr_); |
341 | |
342 | MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult(); |
343 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
344 | // init table schema |
345 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
346 | .Times(2) |
347 | .WillOnce(Invoke([&result1](const std::string &, |
348 | MysqlResultWrapperPtr *out, bool) -> int { |
349 | *out = result1; |
350 | return 0; |
351 | })) |
352 | .WillOnce(Invoke([&result](const std::string &, |
353 | MysqlResultWrapperPtr *out, bool) -> int { |
354 | *out = result; |
355 | return 0; |
356 | })) |
357 | .RetiresOnSaturation(); |
358 | int ret = reader.init(); |
359 | ASSERT_EQ(ret, 0); |
360 | |
361 | // first set check sum |
362 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
363 | .WillOnce(::testing::Return(1)) |
364 | .RetiresOnSaturation(); |
365 | ret = reader.start(ctx_); |
366 | ASSERT_EQ(ret, 1); |
367 | } |
368 | |
369 | TEST_F(BinlogReaderTest, TestGetNextRowDataWithNoMoreData) { |
370 | BinlogReader reader(table_name_, fetcher_, mgr_); |
371 | MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult(); |
372 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
373 | // init table schema |
374 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
375 | .Times(2) |
376 | .WillOnce(Invoke([&result1](const std::string &, |
377 | MysqlResultWrapperPtr *out, bool) -> int { |
378 | *out = result1; |
379 | return 0; |
380 | })) |
381 | .WillOnce(Invoke([&result](const std::string &, |
382 | MysqlResultWrapperPtr *out, bool) -> int { |
383 | *out = result; |
384 | return 0; |
385 | })) |
386 | .RetiresOnSaturation(); |
387 | |
388 | int ret = reader.init(); |
389 | ASSERT_EQ(ret, 0); |
390 | |
391 | // first set check sum |
392 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
393 | .Times(2) |
394 | .WillOnce(::testing::Return(0)) |
395 | .WillOnce(::testing::Return(0)) |
396 | .RetiresOnSaturation(); |
397 | // second request dump |
398 | EXPECT_CALL(*connector2_, execute_simple_command(_, _, _)) |
399 | .WillOnce(::testing::Return(0)) |
400 | .RetiresOnSaturation(); |
401 | |
402 | ret = reader.start(ctx_); |
403 | ASSERT_EQ(ret, 0); |
404 | |
405 | // fetch data |
406 | std::string no_more_event = BuildNoMoreDataEvent(); |
407 | EXPECT_CALL(*connector2_, client_safe_read(_)) |
408 | .WillOnce(Invoke([&no_more_event](unsigned long *len) -> int { |
409 | *len = no_more_event.size(); |
410 | return 0; |
411 | })) |
412 | .RetiresOnSaturation(); |
413 | |
414 | EXPECT_CALL(*connector2_, data()) |
415 | .WillOnce(Invoke([&no_more_event]() -> unsigned char * { |
416 | return (unsigned char *)no_more_event.c_str(); |
417 | })) |
418 | .RetiresOnSaturation(); |
419 | |
420 | WriteRequest::Row row_data; |
421 | LsnContext context; |
422 | ret = reader.get_next_row_data(&row_data, &context); |
423 | ASSERT_EQ(ret, 0); |
424 | ASSERT_EQ(context.status, RowDataStatus::NO_MORE_DATA); |
425 | } |
426 | |