1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author hongqing.hu
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gtest/gtest.h>
22
23#define private public
24#include "repository/binlog/event_fetcher.h"
25#include "event_builder.h"
26#include "mock_mysql_connector.h"
27#include "mysql_result_builder.h"
28#undef private
29#include "repository/repository_common/error_code.h"
30#include "event_builder.h"
31
32using namespace ::proxima::be;
33using namespace proxima::be::repository;
34
35class EventFetcherTest : public testing::Test {
36 protected:
37 void SetUp() {
38 mgr_ = std::make_shared<MysqlConnectorManager>();
39 ASSERT_TRUE(mgr_);
40 connector_ = std::make_shared<MockMysqlConnector>();
41 ASSERT_TRUE(connector_);
42 mgr_->put(connector_);
43 connection_uri_ = "mysql://root:[email protected]:3306/mytest";
44 ASSERT_TRUE(ailego::Uri::Parse(connection_uri_.c_str(), &uri_));
45 table_name_ = "table";
46 db_ = "mytest";
47 file_name_ = "binlog.000004";
48 BuildSchemaInfo();
49 table_id_ = 1000;
50 }
51
52 void TearDown() {}
53
54 void BuildSchemaInfo() {
55 column_types_.push_back(MYSQL_TYPE_LONG);
56 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
57 column_types_.push_back(MYSQL_TYPE_LONG);
58 column_types_.push_back(MYSQL_TYPE_FLOAT);
59 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
60 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
61 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
62 column_metas_.push_back(0);
63 column_metas_.push_back(2);
64 column_metas_.push_back(0);
65 column_metas_.push_back(0);
66 column_metas_.push_back(2);
67 column_metas_.push_back(2);
68 column_metas_.push_back(2);
69 }
70
71 void InitFetcher() {
72 // init
73 fetcher_ = std::make_shared<EventFetcher>(mgr_);
74 // set checksum
75 EXPECT_CALL(*connector_, execute_query(_, _, _))
76 .Times(2)
77 .WillOnce(testing::Return(0))
78 .WillOnce(testing::Return(0))
79 .RetiresOnSaturation();
80
81 // request dump
82 EXPECT_CALL(*connector_, execute_simple_command(_, _, _))
83 .WillOnce(testing::Return(0))
84 .RetiresOnSaturation();
85
86 int ret = fetcher_->init(file_name_, 4);
87 ASSERT_EQ(ret, 0);
88 }
89
90 std::string BuildTableMapEventStr() {
91 std::vector<bool> column_nulls(column_types_.size(), false);
92 column_nulls[column_types_.size() - 1] = true;
93 std::string table_map = EventBuilder::BuildTableMapEvent(
94 table_id_, db_, table_name_, column_types_, column_metas_,
95 column_nulls);
96 return " " + table_map;
97 }
98
99 std::string BuildNoMoreDataEvent() {
100 std::string str(1, (char)254);
101 return str;
102 }
103
104 std::string BuildOtherEventStr(EventType type) {
105 return " " + EventBuilder::BuildOtherEvent(type);
106 }
107
108 std::string BuildQueryEventStr(const std::string &query) {
109 return " " + EventBuilder::BuildQueryEvent(db_, query);
110 }
111
112 std::string BuildRotateEventStr(const std::string &file, bool has_crc) {
113 return " " + EventBuilder::BuildRotateEvent(file, 4, has_crc);
114 }
115
116 std::string BuildWriteRowsEventStr(std::vector<std::string> &column_values) {
117 std::string event_str = BuildTableMapEventStr();
118 TableMapEventPtr table_map = std::make_shared<TableMapEvent>(
119 event_str.substr(1).c_str(), event_str.size() - 1);
120 std::vector<bool> column_nulls(column_types_.size(), false);
121 std::string rows_str = EventBuilder::BuildWriteRowsEvent(
122 table_id_, column_nulls, column_types_, column_values, table_map);
123 return " " + rows_str;
124 }
125
126 protected:
127 MysqlConnectorManagerPtr mgr_{};
128 MockMysqlConnectorPtr connector_{};
129 std::string connection_uri_{};
130 ailego::Uri uri_{};
131 std::string table_name_{};
132 EventFetcherPtr fetcher_{};
133 std::string file_name_{};
134 std::string db_{};
135 uint64_t table_id_{0};
136 std::vector<enum_field_types> column_types_{};
137 std::vector<int32_t> column_metas_{};
138};
139
140TEST_F(EventFetcherTest, TestGeneral) {
141 InitFetcher();
142
143 std::string table_map_str = BuildTableMapEventStr();
144 BasicEventPtr event;
145 EXPECT_CALL(*connector_, client_safe_read(_))
146 .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int {
147 *len = table_map_str.size();
148 return 0;
149 }))
150 .RetiresOnSaturation();
151
152 EXPECT_CALL(*connector_, data())
153 .WillOnce(Invoke([&table_map_str]() -> unsigned char * {
154 return (unsigned char *)table_map_str.c_str();
155 }))
156 .RetiresOnSaturation();
157
158 int ret = fetcher_->fetch(&event);
159 ASSERT_EQ(ret, 0);
160 ASSERT_EQ(event->type(), TABLE_MAP_EVENT);
161}
162
163// TEST_F(EventFetcherTest, TestFetchWithReadDataFailed) {
164// InitFetcher();
165// fetcher_->need_reconnect_ = true;
166
167// BasicEventPtr event;
168// EXPECT_CALL(*connector_, reconnect())
169// .Times(1)
170// .WillOnce(testing::Return(false))
171// .RetiresOnSaturation();
172
173// int ret = fetcher_->fetch(&event);
174// ASSERT_EQ(ret, ErrorCode_ConnectMysql);
175// }
176
177// TEST_F(EventFetcherTest, TestFetchWithNoMoreData) {
178// InitFetcher();
179
180// std::string event_str = BuildNoMoreDataEvent();
181// BasicEventPtr event;
182// EXPECT_CALL(*connector_, client_safe_read(_))
183// .WillOnce(Invoke([&event_str](unsigned long *len) -> int {
184// *len = event_str.size();
185// return 0;
186// }))
187// .RetiresOnSaturation();
188
189// EXPECT_CALL(*connector_, data())
190// .WillOnce(Invoke([&event_str]() -> unsigned char * {
191// return (unsigned char *)event_str.c_str();
192// }))
193// .RetiresOnSaturation();
194
195// int ret = fetcher_->fetch(&event);
196// ASSERT_EQ(ret, ErrorCode_BinlogNoMoreData);
197// }
198
199// TEST_F(EventFetcherTest, TestFetchWithQueryEvent) {
200// InitFetcher();
201
202// std::string event_str = BuildQueryEventStr("test query");
203// BasicEventPtr event;
204// EXPECT_CALL(*connector_, client_safe_read(_))
205// .WillOnce(Invoke([&event_str](unsigned long *len) -> int {
206// *len = event_str.size();
207// return 0;
208// }))
209// .RetiresOnSaturation();
210
211// EXPECT_CALL(*connector_, data())
212// .WillOnce(Invoke([&event_str]() -> unsigned char * {
213// return (unsigned char *)event_str.c_str();
214// }))
215// .RetiresOnSaturation();
216
217// int ret = fetcher_->fetch(&event);
218// ASSERT_EQ(ret, 0);
219// ASSERT_EQ(event->type(), QUERY_EVENT);
220// }
221
222// TEST_F(EventFetcherTest, TestFetchWithRotateEvent) {
223// InitFetcher();
224
225// std::string event_str = BuildRotateEventStr(file_name_, true);
226// BasicEventPtr event;
227// EXPECT_CALL(*connector_, client_safe_read(_))
228// .WillOnce(Invoke([&event_str](unsigned long *len) -> int {
229// *len = event_str.size();
230// return 0;
231// }))
232// .RetiresOnSaturation();
233
234// EXPECT_CALL(*connector_, data())
235// .WillOnce(Invoke([&event_str]() -> unsigned char * {
236// return (unsigned char *)event_str.c_str();
237// }))
238// .RetiresOnSaturation();
239
240// int ret = fetcher_->fetch(&event);
241// ASSERT_EQ(ret, 0);
242// ASSERT_EQ(event->type(), ROTATE_EVENT);
243// }
244
245// TEST_F(EventFetcherTest, TestFetchWithTableMapEvent) {
246// InitFetcher();
247
248// std::string event_str = BuildTableMapEventStr();
249// BasicEventPtr event;
250// EXPECT_CALL(*connector_, client_safe_read(_))
251// .WillOnce(Invoke([&event_str](unsigned long *len) -> int {
252// *len = event_str.size();
253// return 0;
254// }))
255// .RetiresOnSaturation();
256
257// EXPECT_CALL(*connector_, data())
258// .WillOnce(Invoke([&event_str]() -> unsigned char * {
259// return (unsigned char *)event_str.c_str();
260// }))
261// .RetiresOnSaturation();
262
263// int ret = fetcher_->fetch(&event);
264// ASSERT_EQ(ret, 0);
265// ASSERT_EQ(event->type(), TABLE_MAP_EVENT);
266// }
267
268// TEST_F(EventFetcherTest, TestFetchWithWriteRowsEvent) {
269// InitFetcher();
270
271// std::vector<std::string> values = {"1", "name1", "30", "123.456",
272// "1,2,3,4", "1,2,3,5", "1,2,3,6"};
273// std::string event_str = BuildWriteRowsEventStr(values);
274// BasicEventPtr event;
275// EXPECT_CALL(*connector_, client_safe_read(_))
276// .WillOnce(Invoke([&event_str](unsigned long *len) -> int {
277// *len = event_str.size();
278// return 0;
279// }))
280// .RetiresOnSaturation();
281
282// EXPECT_CALL(*connector_, data())
283// .WillOnce(Invoke([&event_str]() -> unsigned char * {
284// return (unsigned char *)event_str.c_str();
285// }))
286// .RetiresOnSaturation();
287
288// int ret = fetcher_->fetch(&event);
289// ASSERT_EQ(ret, 0);
290// ASSERT_EQ(event->type(), WRITE_ROWS_EVENT_V1);
291// }
292
293// TEST_F(EventFetcherTest, TestFetchWithOtherEvent) {
294// InitFetcher();
295
296// std::string event_str = BuildOtherEventStr(HEARTBEAT_LOG_EVENT);
297// BasicEventPtr event;
298// EXPECT_CALL(*connector_, client_safe_read(_))
299// .WillOnce(Invoke([&event_str](unsigned long *len) -> int {
300// *len = event_str.size();
301// return 0;
302// }))
303// .RetiresOnSaturation();
304
305// EXPECT_CALL(*connector_, data())
306// .WillOnce(Invoke([&event_str]() -> unsigned char * {
307// return (unsigned char *)event_str.c_str();
308// }))
309// .RetiresOnSaturation();
310
311// int ret = fetcher_->fetch(&event);
312// ASSERT_EQ(ret, 0);
313// ASSERT_EQ(event->type(), HEARTBEAT_LOG_EVENT);
314// }
315
316// TEST_F(EventFetcherTest, TestUpdateLsnInfo) {
317// InitFetcher();
318
319// std::string file_name("binlog.000001");
320// uint64_t position = 4;
321
322// // lsn valid
323// EXPECT_CALL(*connector_, execute_query(_, _, _))
324// .Times(1)
325// .WillOnce(testing::Return(0))
326// .RetiresOnSaturation();
327
328// int ret = fetcher_->update_lsn_info(file_name, position);
329// ASSERT_EQ(ret, 0);
330
331// ASSERT_EQ(fetcher_->file_name_, file_name);
332// ASSERT_EQ(fetcher_->position_, position);
333
334// MysqlResultBuilder builder;
335// MysqlResultWrapperPtr result = builder.BuildShowBinaryLogsResult();
336// // lsn invalid
337// EXPECT_CALL(*connector_, execute_query(_, _, _))
338// .Times(2)
339// .WillOnce(testing::Return(1))
340// .WillOnce(Invoke([&result](const std::string &,
341// MysqlResultWrapperPtr *out, bool) -> int {
342// *out = result;
343// return 0;
344// }))
345// .RetiresOnSaturation();
346
347
348// ret = fetcher_->update_lsn_info(file_name, position);
349// ASSERT_EQ(ret, 0);
350// ASSERT_EQ(fetcher_->file_name_, "binlog.000004");
351// ASSERT_EQ(fetcher_->position_, 4);
352// }
353