1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author hongqing.hu
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gtest/gtest.h>
22
23#define private public
24#include "repository/binlog/binlog_reader.h"
25#include "repository/binlog/info_fetcher.h"
26#include "repository/binlog/rows_event_parser.h"
27#include "mock_mysql_connector.h"
28#undef private
29#include "repository/repository_common/error_code.h"
30#include "event_builder.h"
31#include "mysql_result_builder.h"
32
33using namespace ::proxima::be;
34using namespace proxima::be::repository;
35
36class BinlogReaderTest : public testing::Test {
37 protected:
38 void SetUp() {
39 mgr_ = std::make_shared<MysqlConnectorManager>();
40 ASSERT_TRUE(mgr_);
41 connector1_ = std::make_shared<MockMysqlConnector>();
42 ASSERT_TRUE(connector1_);
43 mgr_->put(connector1_);
44 connector2_ = std::make_shared<MockMysqlConnector>();
45 ASSERT_TRUE(connector2_);
46 mgr_->put(connector2_);
47 InitTableSchema();
48
49 ctx_.position = 4;
50 ctx_.file_name = "binlog.000004";
51
52 table_name_ = builder_.table_name_;
53 }
54
55 void TearDown() {}
56
57 void InitTableSchema() {
58 builder_.BuildCollectionConfig();
59
60 // init
61 ailego::Uri test_uri = builder_.uri_;
62 EXPECT_CALL(*connector1_, uri())
63 .WillOnce(
64 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
65 .RetiresOnSaturation();
66 fetcher_ = std::make_shared<InfoFetcher>(builder_.config_, mgr_);
67 int ret = fetcher_->init();
68 ASSERT_EQ(ret, 0);
69 }
70
71 std::string BuildNoMoreDataEvent() {
72 std::string str(1, (char)254);
73 return str;
74 }
75
76 std::string BuildQueryEventStr(const std::string &query) {
77 return " " + EventBuilder::BuildQueryEvent(builder_.db_, query);
78 }
79
80 std::string BuildRotateEventStr(const std::string &file, bool has_crc) {
81 return " " + EventBuilder::BuildRotateEvent(file, 4, has_crc);
82 }
83
84 protected:
85 MysqlConnectorManagerPtr mgr_{};
86 MockMysqlConnectorPtr connector1_{};
87 MockMysqlConnectorPtr connector2_{};
88 std::string table_name_{};
89 InfoFetcherPtr fetcher_{};
90 TableSchemaPtr schema_;
91 LsnContext ctx_;
92 MysqlResultBuilder builder_{};
93};
94
95TEST_F(BinlogReaderTest, TestSimple) {
96 BinlogReader reader(table_name_, fetcher_, mgr_);
97 MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult();
98 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
99 // init table schema
100 EXPECT_CALL(*connector1_, execute_query(_, _, _))
101 .Times(2)
102 .WillOnce(Invoke([&result1](const std::string &,
103 MysqlResultWrapperPtr *out, bool) -> int {
104 *out = result1;
105 return 0;
106 }))
107 .WillOnce(Invoke([&result](const std::string &,
108 MysqlResultWrapperPtr *out, bool) -> int {
109 *out = result;
110 return 0;
111 }))
112 .RetiresOnSaturation();
113 int ret = reader.init();
114 ASSERT_EQ(ret, 0);
115
116 // first set check sum
117 EXPECT_CALL(*connector2_, execute_query(_, _, _))
118 .Times(2)
119 .WillOnce(::testing::Return(0))
120 .WillOnce(::testing::Return(0))
121 .RetiresOnSaturation();
122 // second request dump
123 EXPECT_CALL(*connector2_, execute_simple_command(_, _, _))
124 .WillOnce(::testing::Return(0))
125 .RetiresOnSaturation();
126 ret = reader.start(ctx_);
127 ASSERT_EQ(ret, 0);
128
129 // fetch data
130 std::string table_map_str = builder_.BuildTableMapEventStr();
131 std::vector<std::string> values = {"1", "name1", "30", "123.456",
132 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
133 std::string write_rows_str = builder_.BuildWriteRowsEventStr(values);
134 EXPECT_CALL(*connector2_, client_safe_read(_))
135 .Times(2)
136 .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int {
137 *len = table_map_str.size();
138 return 0;
139 }))
140 .WillOnce(Invoke([&write_rows_str](unsigned long *len) -> int {
141 *len = write_rows_str.size();
142 return 0;
143 }))
144 .RetiresOnSaturation();
145
146 EXPECT_CALL(*connector2_, data())
147 .Times(2)
148 .WillOnce(Invoke([&table_map_str]() -> unsigned char * {
149 return (unsigned char *)table_map_str.c_str();
150 }))
151 .WillOnce(Invoke([&write_rows_str]() -> unsigned char * {
152 return (unsigned char *)write_rows_str.c_str();
153 }))
154 .RetiresOnSaturation();
155
156 WriteRequest::Row row_data;
157 LsnContext context;
158 ret = reader.get_next_row_data(&row_data, &context);
159 ASSERT_EQ(ret, 0);
160 ASSERT_EQ(context.status, RowDataStatus::NORMAL);
161 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
162 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
163 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30);
164 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
165 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
166}
167
168TEST_F(BinlogReaderTest, TestGetNextRowData) {
169 BinlogReader reader(table_name_, fetcher_, mgr_);
170 MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult();
171 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
172 // init table schema
173 EXPECT_CALL(*connector1_, execute_query(_, _, _))
174 .Times(2)
175 .WillOnce(Invoke([&result1](const std::string &,
176 MysqlResultWrapperPtr *out, bool) -> int {
177 *out = result1;
178 return 0;
179 }))
180 .WillOnce(Invoke([&result](const std::string &,
181 MysqlResultWrapperPtr *out, bool) -> int {
182 *out = result;
183 return 0;
184 }))
185 .RetiresOnSaturation();
186
187 int ret = reader.init();
188 ASSERT_EQ(ret, 0);
189
190 // first set check sum
191 EXPECT_CALL(*connector2_, execute_query(_, _, _))
192 .Times(2)
193 .WillOnce(::testing::Return(0))
194 .WillOnce(::testing::Return(0))
195 .RetiresOnSaturation();
196 // second request dump
197 EXPECT_CALL(*connector2_, execute_simple_command(_, _, _))
198 .WillOnce(::testing::Return(0))
199 .RetiresOnSaturation();
200
201 ret = reader.start(ctx_);
202 ASSERT_EQ(ret, 0);
203
204 // fetch data
205 std::string rotate_event_str = BuildRotateEventStr(ctx_.file_name, false);
206 std::string rotate_event_str1 = BuildRotateEventStr(ctx_.file_name, true);
207 std::string query_event_str = BuildQueryEventStr("query event");
208 std::string query_event_str1 = BuildQueryEventStr("alter table mytest.");
209 std::string table_map_str = builder_.BuildTableMapEventStr();
210 std::vector<std::string> values = {"1", "name1", "30", "123.456",
211 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
212 size_t rows_count = 2;
213 std::string write_rows_str =
214 builder_.BuildWriteRowsEventStr(values, rows_count);
215 EXPECT_CALL(*connector2_, client_safe_read(_))
216 .Times(5)
217 .WillOnce(Invoke([&rotate_event_str](unsigned long *len) -> int {
218 *len = rotate_event_str.size();
219 return 0;
220 }))
221 .WillOnce(Invoke([&query_event_str](unsigned long *len) -> int {
222 *len = query_event_str.size();
223 return 0;
224 }))
225 .WillOnce(Invoke([&rotate_event_str1](unsigned long *len) -> int {
226 *len = rotate_event_str1.size();
227 return 0;
228 }))
229 .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int {
230 *len = table_map_str.size();
231 return 0;
232 }))
233 .WillOnce(Invoke([&write_rows_str](unsigned long *len) -> int {
234 *len = write_rows_str.size();
235 return 0;
236 }))
237 .RetiresOnSaturation();
238
239 EXPECT_CALL(*connector2_, data())
240 .Times(5)
241 .WillOnce(Invoke([&rotate_event_str]() -> unsigned char * {
242 return (unsigned char *)rotate_event_str.c_str();
243 }))
244 .WillOnce(Invoke([&query_event_str]() -> unsigned char * {
245 return (unsigned char *)query_event_str.c_str();
246 }))
247 .WillOnce(Invoke([&rotate_event_str1]() -> unsigned char * {
248 return (unsigned char *)rotate_event_str1.c_str();
249 }))
250 .WillOnce(Invoke([&table_map_str]() -> unsigned char * {
251 return (unsigned char *)table_map_str.c_str();
252 }))
253 .WillOnce(Invoke([&write_rows_str]() -> unsigned char * {
254 return (unsigned char *)write_rows_str.c_str();
255 }))
256 .RetiresOnSaturation();
257
258
259 for (size_t i = 0; i < rows_count; ++i) {
260 WriteRequest::Row row_data;
261 LsnContext context;
262 ret = reader.get_next_row_data(&row_data, &context);
263 ASSERT_EQ(ret, 0);
264 ASSERT_EQ(context.status, RowDataStatus::NORMAL);
265 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
266 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(),
267 "name1");
268 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30);
269 ASSERT_EQ(row_data.index_column_values().values(0).string_value(),
270 "1,2,3,4");
271 ASSERT_EQ(row_data.index_column_values().values(1).string_value(),
272 "1,2,3,5");
273 }
274
275 EXPECT_CALL(*connector2_, client_safe_read(_))
276 .WillOnce(Invoke([&query_event_str1](unsigned long *len) -> int {
277 *len = query_event_str1.size();
278 return 0;
279 }))
280 .RetiresOnSaturation();
281
282 EXPECT_CALL(*connector2_, data())
283 .WillOnce(Invoke([&query_event_str1]() -> unsigned char * {
284 return (unsigned char *)query_event_str1.c_str();
285 }))
286 .RetiresOnSaturation();
287
288 result1->reset();
289 EXPECT_CALL(*connector1_, execute_query(_, _, _))
290 .Times(2)
291 .WillOnce(Invoke([&result1](const std::string &,
292 MysqlResultWrapperPtr *out, bool) -> int {
293 *out = result1;
294 return 0;
295 }))
296 .WillOnce(Invoke([&result](const std::string &,
297 MysqlResultWrapperPtr *out, bool) -> int {
298 *out = result;
299 return ErrorCode_ExecuteMysql;
300 }))
301 .RetiresOnSaturation();
302
303 WriteRequest::Row row_data;
304 LsnContext context;
305 ret = reader.get_next_row_data(&row_data, &context);
306 ASSERT_EQ(ret, ErrorCode_ExecuteMysql);
307
308 result1->reset();
309 EXPECT_CALL(*connector1_, execute_query(_, _, _))
310 .Times(2)
311 .WillOnce(Invoke([&result1](const std::string &,
312 MysqlResultWrapperPtr *out, bool) -> int {
313 *out = result1;
314 return 0;
315 }))
316 .WillOnce(Invoke([&result](const std::string &,
317 MysqlResultWrapperPtr *out, bool) -> int {
318 *out = result;
319 return 0;
320 }))
321 .RetiresOnSaturation();
322
323 ret = reader.get_next_row_data(&row_data, &context);
324 ASSERT_EQ(ret, 0);
325 ASSERT_EQ(context.status, RowDataStatus::SCHEMA_CHANGED);
326}
327
328TEST_F(BinlogReaderTest, TestInitWithGetTableSchemaFailed) {
329 BinlogReader reader(table_name_, fetcher_, mgr_);
330 // init table schema
331 EXPECT_CALL(*connector1_, execute_query(_, _, _))
332 .Times(1)
333 .WillOnce(testing::Return(1))
334 .RetiresOnSaturation();
335 int ret = reader.init();
336 ASSERT_EQ(ret, ErrorCode_ExecuteMysql);
337}
338
339TEST_F(BinlogReaderTest, TestStartWithInitEventFetcherFailed) {
340 BinlogReader reader(table_name_, fetcher_, mgr_);
341
342 MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult();
343 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
344 // init table schema
345 EXPECT_CALL(*connector1_, execute_query(_, _, _))
346 .Times(2)
347 .WillOnce(Invoke([&result1](const std::string &,
348 MysqlResultWrapperPtr *out, bool) -> int {
349 *out = result1;
350 return 0;
351 }))
352 .WillOnce(Invoke([&result](const std::string &,
353 MysqlResultWrapperPtr *out, bool) -> int {
354 *out = result;
355 return 0;
356 }))
357 .RetiresOnSaturation();
358 int ret = reader.init();
359 ASSERT_EQ(ret, 0);
360
361 // first set check sum
362 EXPECT_CALL(*connector2_, execute_query(_, _, _))
363 .WillOnce(::testing::Return(1))
364 .RetiresOnSaturation();
365 ret = reader.start(ctx_);
366 ASSERT_EQ(ret, 1);
367}
368
369TEST_F(BinlogReaderTest, TestGetNextRowDataWithNoMoreData) {
370 BinlogReader reader(table_name_, fetcher_, mgr_);
371 MockMysqlResultWrapperPtr result = builder_.BuildQuerySchemaResult();
372 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
373 // init table schema
374 EXPECT_CALL(*connector1_, execute_query(_, _, _))
375 .Times(2)
376 .WillOnce(Invoke([&result1](const std::string &,
377 MysqlResultWrapperPtr *out, bool) -> int {
378 *out = result1;
379 return 0;
380 }))
381 .WillOnce(Invoke([&result](const std::string &,
382 MysqlResultWrapperPtr *out, bool) -> int {
383 *out = result;
384 return 0;
385 }))
386 .RetiresOnSaturation();
387
388 int ret = reader.init();
389 ASSERT_EQ(ret, 0);
390
391 // first set check sum
392 EXPECT_CALL(*connector2_, execute_query(_, _, _))
393 .Times(2)
394 .WillOnce(::testing::Return(0))
395 .WillOnce(::testing::Return(0))
396 .RetiresOnSaturation();
397 // second request dump
398 EXPECT_CALL(*connector2_, execute_simple_command(_, _, _))
399 .WillOnce(::testing::Return(0))
400 .RetiresOnSaturation();
401
402 ret = reader.start(ctx_);
403 ASSERT_EQ(ret, 0);
404
405 // fetch data
406 std::string no_more_event = BuildNoMoreDataEvent();
407 EXPECT_CALL(*connector2_, client_safe_read(_))
408 .WillOnce(Invoke([&no_more_event](unsigned long *len) -> int {
409 *len = no_more_event.size();
410 return 0;
411 }))
412 .RetiresOnSaturation();
413
414 EXPECT_CALL(*connector2_, data())
415 .WillOnce(Invoke([&no_more_event]() -> unsigned char * {
416 return (unsigned char *)no_more_event.c_str();
417 }))
418 .RetiresOnSaturation();
419
420 WriteRequest::Row row_data;
421 LsnContext context;
422 ret = reader.get_next_row_data(&row_data, &context);
423 ASSERT_EQ(ret, 0);
424 ASSERT_EQ(context.status, RowDataStatus::NO_MORE_DATA);
425}
426