1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author hongqing.hu |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gtest/gtest.h> |
22 | |
23 | #define private public |
24 | #include "repository/binlog/event_fetcher.h" |
25 | #include "event_builder.h" |
26 | #include "mock_mysql_connector.h" |
27 | #include "mysql_result_builder.h" |
28 | #undef private |
29 | #include "repository/repository_common/error_code.h" |
30 | #include "event_builder.h" |
31 | |
32 | using namespace ::proxima::be; |
33 | using namespace proxima::be::repository; |
34 | |
35 | class EventFetcherTest : public testing::Test { |
36 | protected: |
37 | void SetUp() { |
38 | mgr_ = std::make_shared<MysqlConnectorManager>(); |
39 | ASSERT_TRUE(mgr_); |
40 | connector_ = std::make_shared<MockMysqlConnector>(); |
41 | ASSERT_TRUE(connector_); |
42 | mgr_->put(connector_); |
43 | connection_uri_ = "mysql://root:[email protected]:3306/mytest" ; |
44 | ASSERT_TRUE(ailego::Uri::Parse(connection_uri_.c_str(), &uri_)); |
45 | table_name_ = "table" ; |
46 | db_ = "mytest" ; |
47 | file_name_ = "binlog.000004" ; |
48 | BuildSchemaInfo(); |
49 | table_id_ = 1000; |
50 | } |
51 | |
52 | void TearDown() {} |
53 | |
54 | void BuildSchemaInfo() { |
55 | column_types_.push_back(MYSQL_TYPE_LONG); |
56 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
57 | column_types_.push_back(MYSQL_TYPE_LONG); |
58 | column_types_.push_back(MYSQL_TYPE_FLOAT); |
59 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
60 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
61 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
62 | column_metas_.push_back(0); |
63 | column_metas_.push_back(2); |
64 | column_metas_.push_back(0); |
65 | column_metas_.push_back(0); |
66 | column_metas_.push_back(2); |
67 | column_metas_.push_back(2); |
68 | column_metas_.push_back(2); |
69 | } |
70 | |
71 | void InitFetcher() { |
72 | // init |
73 | fetcher_ = std::make_shared<EventFetcher>(mgr_); |
74 | // set checksum |
75 | EXPECT_CALL(*connector_, execute_query(_, _, _)) |
76 | .Times(2) |
77 | .WillOnce(testing::Return(0)) |
78 | .WillOnce(testing::Return(0)) |
79 | .RetiresOnSaturation(); |
80 | |
81 | // request dump |
82 | EXPECT_CALL(*connector_, execute_simple_command(_, _, _)) |
83 | .WillOnce(testing::Return(0)) |
84 | .RetiresOnSaturation(); |
85 | |
86 | int ret = fetcher_->init(file_name_, 4); |
87 | ASSERT_EQ(ret, 0); |
88 | } |
89 | |
90 | std::string BuildTableMapEventStr() { |
91 | std::vector<bool> column_nulls(column_types_.size(), false); |
92 | column_nulls[column_types_.size() - 1] = true; |
93 | std::string table_map = EventBuilder::BuildTableMapEvent( |
94 | table_id_, db_, table_name_, column_types_, column_metas_, |
95 | column_nulls); |
96 | return " " + table_map; |
97 | } |
98 | |
99 | std::string BuildNoMoreDataEvent() { |
100 | std::string str(1, (char)254); |
101 | return str; |
102 | } |
103 | |
104 | std::string BuildOtherEventStr(EventType type) { |
105 | return " " + EventBuilder::BuildOtherEvent(type); |
106 | } |
107 | |
108 | std::string BuildQueryEventStr(const std::string &query) { |
109 | return " " + EventBuilder::BuildQueryEvent(db_, query); |
110 | } |
111 | |
112 | std::string BuildRotateEventStr(const std::string &file, bool has_crc) { |
113 | return " " + EventBuilder::BuildRotateEvent(file, 4, has_crc); |
114 | } |
115 | |
116 | std::string BuildWriteRowsEventStr(std::vector<std::string> &column_values) { |
117 | std::string event_str = BuildTableMapEventStr(); |
118 | TableMapEventPtr table_map = std::make_shared<TableMapEvent>( |
119 | event_str.substr(1).c_str(), event_str.size() - 1); |
120 | std::vector<bool> column_nulls(column_types_.size(), false); |
121 | std::string rows_str = EventBuilder::BuildWriteRowsEvent( |
122 | table_id_, column_nulls, column_types_, column_values, table_map); |
123 | return " " + rows_str; |
124 | } |
125 | |
126 | protected: |
127 | MysqlConnectorManagerPtr mgr_{}; |
128 | MockMysqlConnectorPtr connector_{}; |
129 | std::string connection_uri_{}; |
130 | ailego::Uri uri_{}; |
131 | std::string table_name_{}; |
132 | EventFetcherPtr fetcher_{}; |
133 | std::string file_name_{}; |
134 | std::string db_{}; |
135 | uint64_t table_id_{0}; |
136 | std::vector<enum_field_types> column_types_{}; |
137 | std::vector<int32_t> column_metas_{}; |
138 | }; |
139 | |
140 | TEST_F(EventFetcherTest, TestGeneral) { |
141 | InitFetcher(); |
142 | |
143 | std::string table_map_str = BuildTableMapEventStr(); |
144 | BasicEventPtr event; |
145 | EXPECT_CALL(*connector_, client_safe_read(_)) |
146 | .WillOnce(Invoke([&table_map_str](unsigned long *len) -> int { |
147 | *len = table_map_str.size(); |
148 | return 0; |
149 | })) |
150 | .RetiresOnSaturation(); |
151 | |
152 | EXPECT_CALL(*connector_, data()) |
153 | .WillOnce(Invoke([&table_map_str]() -> unsigned char * { |
154 | return (unsigned char *)table_map_str.c_str(); |
155 | })) |
156 | .RetiresOnSaturation(); |
157 | |
158 | int ret = fetcher_->fetch(&event); |
159 | ASSERT_EQ(ret, 0); |
160 | ASSERT_EQ(event->type(), TABLE_MAP_EVENT); |
161 | } |
162 | |
163 | // TEST_F(EventFetcherTest, TestFetchWithReadDataFailed) { |
164 | // InitFetcher(); |
165 | // fetcher_->need_reconnect_ = true; |
166 | |
167 | // BasicEventPtr event; |
168 | // EXPECT_CALL(*connector_, reconnect()) |
169 | // .Times(1) |
170 | // .WillOnce(testing::Return(false)) |
171 | // .RetiresOnSaturation(); |
172 | |
173 | // int ret = fetcher_->fetch(&event); |
174 | // ASSERT_EQ(ret, ErrorCode_ConnectMysql); |
175 | // } |
176 | |
177 | // TEST_F(EventFetcherTest, TestFetchWithNoMoreData) { |
178 | // InitFetcher(); |
179 | |
180 | // std::string event_str = BuildNoMoreDataEvent(); |
181 | // BasicEventPtr event; |
182 | // EXPECT_CALL(*connector_, client_safe_read(_)) |
183 | // .WillOnce(Invoke([&event_str](unsigned long *len) -> int { |
184 | // *len = event_str.size(); |
185 | // return 0; |
186 | // })) |
187 | // .RetiresOnSaturation(); |
188 | |
189 | // EXPECT_CALL(*connector_, data()) |
190 | // .WillOnce(Invoke([&event_str]() -> unsigned char * { |
191 | // return (unsigned char *)event_str.c_str(); |
192 | // })) |
193 | // .RetiresOnSaturation(); |
194 | |
195 | // int ret = fetcher_->fetch(&event); |
196 | // ASSERT_EQ(ret, ErrorCode_BinlogNoMoreData); |
197 | // } |
198 | |
199 | // TEST_F(EventFetcherTest, TestFetchWithQueryEvent) { |
200 | // InitFetcher(); |
201 | |
202 | // std::string event_str = BuildQueryEventStr("test query"); |
203 | // BasicEventPtr event; |
204 | // EXPECT_CALL(*connector_, client_safe_read(_)) |
205 | // .WillOnce(Invoke([&event_str](unsigned long *len) -> int { |
206 | // *len = event_str.size(); |
207 | // return 0; |
208 | // })) |
209 | // .RetiresOnSaturation(); |
210 | |
211 | // EXPECT_CALL(*connector_, data()) |
212 | // .WillOnce(Invoke([&event_str]() -> unsigned char * { |
213 | // return (unsigned char *)event_str.c_str(); |
214 | // })) |
215 | // .RetiresOnSaturation(); |
216 | |
217 | // int ret = fetcher_->fetch(&event); |
218 | // ASSERT_EQ(ret, 0); |
219 | // ASSERT_EQ(event->type(), QUERY_EVENT); |
220 | // } |
221 | |
222 | // TEST_F(EventFetcherTest, TestFetchWithRotateEvent) { |
223 | // InitFetcher(); |
224 | |
225 | // std::string event_str = BuildRotateEventStr(file_name_, true); |
226 | // BasicEventPtr event; |
227 | // EXPECT_CALL(*connector_, client_safe_read(_)) |
228 | // .WillOnce(Invoke([&event_str](unsigned long *len) -> int { |
229 | // *len = event_str.size(); |
230 | // return 0; |
231 | // })) |
232 | // .RetiresOnSaturation(); |
233 | |
234 | // EXPECT_CALL(*connector_, data()) |
235 | // .WillOnce(Invoke([&event_str]() -> unsigned char * { |
236 | // return (unsigned char *)event_str.c_str(); |
237 | // })) |
238 | // .RetiresOnSaturation(); |
239 | |
240 | // int ret = fetcher_->fetch(&event); |
241 | // ASSERT_EQ(ret, 0); |
242 | // ASSERT_EQ(event->type(), ROTATE_EVENT); |
243 | // } |
244 | |
245 | // TEST_F(EventFetcherTest, TestFetchWithTableMapEvent) { |
246 | // InitFetcher(); |
247 | |
248 | // std::string event_str = BuildTableMapEventStr(); |
249 | // BasicEventPtr event; |
250 | // EXPECT_CALL(*connector_, client_safe_read(_)) |
251 | // .WillOnce(Invoke([&event_str](unsigned long *len) -> int { |
252 | // *len = event_str.size(); |
253 | // return 0; |
254 | // })) |
255 | // .RetiresOnSaturation(); |
256 | |
257 | // EXPECT_CALL(*connector_, data()) |
258 | // .WillOnce(Invoke([&event_str]() -> unsigned char * { |
259 | // return (unsigned char *)event_str.c_str(); |
260 | // })) |
261 | // .RetiresOnSaturation(); |
262 | |
263 | // int ret = fetcher_->fetch(&event); |
264 | // ASSERT_EQ(ret, 0); |
265 | // ASSERT_EQ(event->type(), TABLE_MAP_EVENT); |
266 | // } |
267 | |
268 | // TEST_F(EventFetcherTest, TestFetchWithWriteRowsEvent) { |
269 | // InitFetcher(); |
270 | |
271 | // std::vector<std::string> values = {"1", "name1", "30", "123.456", |
272 | // "1,2,3,4", "1,2,3,5", "1,2,3,6"}; |
273 | // std::string event_str = BuildWriteRowsEventStr(values); |
274 | // BasicEventPtr event; |
275 | // EXPECT_CALL(*connector_, client_safe_read(_)) |
276 | // .WillOnce(Invoke([&event_str](unsigned long *len) -> int { |
277 | // *len = event_str.size(); |
278 | // return 0; |
279 | // })) |
280 | // .RetiresOnSaturation(); |
281 | |
282 | // EXPECT_CALL(*connector_, data()) |
283 | // .WillOnce(Invoke([&event_str]() -> unsigned char * { |
284 | // return (unsigned char *)event_str.c_str(); |
285 | // })) |
286 | // .RetiresOnSaturation(); |
287 | |
288 | // int ret = fetcher_->fetch(&event); |
289 | // ASSERT_EQ(ret, 0); |
290 | // ASSERT_EQ(event->type(), WRITE_ROWS_EVENT_V1); |
291 | // } |
292 | |
293 | // TEST_F(EventFetcherTest, TestFetchWithOtherEvent) { |
294 | // InitFetcher(); |
295 | |
296 | // std::string event_str = BuildOtherEventStr(HEARTBEAT_LOG_EVENT); |
297 | // BasicEventPtr event; |
298 | // EXPECT_CALL(*connector_, client_safe_read(_)) |
299 | // .WillOnce(Invoke([&event_str](unsigned long *len) -> int { |
300 | // *len = event_str.size(); |
301 | // return 0; |
302 | // })) |
303 | // .RetiresOnSaturation(); |
304 | |
305 | // EXPECT_CALL(*connector_, data()) |
306 | // .WillOnce(Invoke([&event_str]() -> unsigned char * { |
307 | // return (unsigned char *)event_str.c_str(); |
308 | // })) |
309 | // .RetiresOnSaturation(); |
310 | |
311 | // int ret = fetcher_->fetch(&event); |
312 | // ASSERT_EQ(ret, 0); |
313 | // ASSERT_EQ(event->type(), HEARTBEAT_LOG_EVENT); |
314 | // } |
315 | |
316 | // TEST_F(EventFetcherTest, TestUpdateLsnInfo) { |
317 | // InitFetcher(); |
318 | |
319 | // std::string file_name("binlog.000001"); |
320 | // uint64_t position = 4; |
321 | |
322 | // // lsn valid |
323 | // EXPECT_CALL(*connector_, execute_query(_, _, _)) |
324 | // .Times(1) |
325 | // .WillOnce(testing::Return(0)) |
326 | // .RetiresOnSaturation(); |
327 | |
328 | // int ret = fetcher_->update_lsn_info(file_name, position); |
329 | // ASSERT_EQ(ret, 0); |
330 | |
331 | // ASSERT_EQ(fetcher_->file_name_, file_name); |
332 | // ASSERT_EQ(fetcher_->position_, position); |
333 | |
334 | // MysqlResultBuilder builder; |
335 | // MysqlResultWrapperPtr result = builder.BuildShowBinaryLogsResult(); |
336 | // // lsn invalid |
337 | // EXPECT_CALL(*connector_, execute_query(_, _, _)) |
338 | // .Times(2) |
339 | // .WillOnce(testing::Return(1)) |
340 | // .WillOnce(Invoke([&result](const std::string &, |
341 | // MysqlResultWrapperPtr *out, bool) -> int { |
342 | // *out = result; |
343 | // return 0; |
344 | // })) |
345 | // .RetiresOnSaturation(); |
346 | |
347 | |
348 | // ret = fetcher_->update_lsn_info(file_name, position); |
349 | // ASSERT_EQ(ret, 0); |
350 | // ASSERT_EQ(fetcher_->file_name_, "binlog.000004"); |
351 | // ASSERT_EQ(fetcher_->position_, 4); |
352 | // } |
353 | |