1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author hongqing.hu |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gtest/gtest.h> |
22 | |
23 | #define private public |
24 | #include "repository/binlog/info_fetcher.h" |
25 | #include "repository/binlog/rows_event_parser.h" |
26 | #include "mock_mysql_connector.h" |
27 | #undef private |
28 | #include "repository/repository_common/error_code.h" |
29 | #include "event_builder.h" |
30 | |
31 | using namespace ::proxima::be; |
32 | using namespace proxima::be::repository; |
33 | |
34 | class RowsEventParserTest : public testing::Test { |
35 | protected: |
36 | void SetUp() { |
37 | mgr_ = std::make_shared<MysqlConnectorManager>(); |
38 | ASSERT_TRUE(mgr_); |
39 | connector_ = std::make_shared<MockMysqlConnector>(); |
40 | ASSERT_TRUE(connector_); |
41 | mgr_->put(connector_); |
42 | connection_uri_ = "mysql://127.0.0.1:3306/mytest" ; |
43 | user_ = "root" ; |
44 | password_ = "root" ; |
45 | table_name_ = "table" ; |
46 | db_ = "mytest" ; |
47 | InitTableSchema(); |
48 | |
49 | table_id_ = 1000; |
50 | } |
51 | |
52 | void TearDown() {} |
53 | |
54 | MockMysqlResultWrapperPtr BuildQuerySchemaResult() { |
55 | MockMysqlResultWrapperPtr result = |
56 | std::make_shared<MockMysqlResultWrapper>(); |
57 | result->append_field_meta("id" , MYSQL_TYPE_LONG, 11, 0, |
58 | AUTO_INCREMENT_FLAG); |
59 | result->append_field_meta("name" , MYSQL_TYPE_VAR_STRING, 100); |
60 | result->append_field_meta("age" , MYSQL_TYPE_LONG, 11); |
61 | result->append_field_meta("score" , MYSQL_TYPE_FLOAT, 12); |
62 | result->append_field_meta("vector1" , MYSQL_TYPE_VAR_STRING, 1024); |
63 | result->append_field_meta("vector2" , MYSQL_TYPE_VAR_STRING, 1024); |
64 | result->append_field_meta("vector3" , MYSQL_TYPE_VAR_STRING, 1024); |
65 | column_types_.push_back(MYSQL_TYPE_LONG); |
66 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
67 | column_types_.push_back(MYSQL_TYPE_LONG); |
68 | column_types_.push_back(MYSQL_TYPE_FLOAT); |
69 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
70 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
71 | column_types_.push_back(MYSQL_TYPE_VAR_STRING); |
72 | column_metas_.push_back(0); |
73 | column_metas_.push_back(2); |
74 | column_metas_.push_back(0); |
75 | column_metas_.push_back(0); |
76 | column_metas_.push_back(2); |
77 | column_metas_.push_back(2); |
78 | column_metas_.push_back(2); |
79 | |
80 | return result; |
81 | } |
82 | |
83 | MockMysqlResultWrapperPtr BuildQueryCollationResult() { |
84 | MockMysqlResultWrapperPtr result = |
85 | std::make_shared<MockMysqlResultWrapper>(); |
86 | result->append_field_meta("Field" , MYSQL_TYPE_VAR_STRING, 11); |
87 | result->append_field_meta("Type" , MYSQL_TYPE_VAR_STRING, 100); |
88 | result->append_field_meta("Collation" , MYSQL_TYPE_VAR_STRING, 11); |
89 | |
90 | std::vector<std::string> values1 = {"id" , "" , "" }; |
91 | result->append_row_values(values1); |
92 | std::vector<std::string> values2 = {"name" , "" , "utf8_general_ci" }; |
93 | result->append_row_values(values2); |
94 | std::vector<std::string> values3 = {"age" , "" , "" }; |
95 | result->append_row_values(values3); |
96 | std::vector<std::string> values4 = {"score" , "" , "utf8_general_ci" }; |
97 | result->append_row_values(values4); |
98 | std::vector<std::string> values5 = {"vector1" , "" , "utf8_general_ci" }; |
99 | result->append_row_values(values5); |
100 | std::vector<std::string> values6 = {"vector2" , "" , "utf8_general_ci" }; |
101 | result->append_row_values(values6); |
102 | std::vector<std::string> values7 = {"vector3" , "" , "utf8_general_ci" }; |
103 | result->append_row_values(values7); |
104 | |
105 | return result; |
106 | } |
107 | |
108 | void BuildCollectionConfig(CollectionConfig &config) { |
109 | auto *repo = config.mutable_repository_config(); |
110 | repo->set_repository_type(CollectionConfig::RepositoryConfig::RT_DATABASE); |
111 | repo->set_repository_name(table_name_); |
112 | auto *database = repo->mutable_database(); |
113 | database->set_connection_uri(connection_uri_); |
114 | database->set_table_name(table_name_); |
115 | database->set_user(user_); |
116 | database->set_password(password_); |
117 | |
118 | config.add_forward_column_names("name" ); |
119 | config.add_forward_column_names("age" ); |
120 | auto *index1 = config.add_index_column_params(); |
121 | index1->set_column_name("vector1" ); |
122 | auto *index2 = config.add_index_column_params(); |
123 | index2->set_column_name("vector2" ); |
124 | } |
125 | |
126 | void InitTableSchema() { |
127 | CollectionConfig config; |
128 | BuildCollectionConfig(config); |
129 | |
130 | // init |
131 | ailego::Uri test_uri; |
132 | ASSERT_TRUE(ailego::Uri::Parse(connection_uri_.c_str(), &test_uri)); |
133 | EXPECT_CALL(*connector_, uri()) |
134 | .WillOnce( |
135 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
136 | .RetiresOnSaturation(); |
137 | fetcher_ = std::make_shared<InfoFetcher>(config, mgr_); |
138 | int ret = fetcher_->init(); |
139 | ASSERT_EQ(ret, 0); |
140 | |
141 | MockMysqlResultWrapperPtr result1 = BuildQueryCollationResult(); |
142 | MockMysqlResultWrapperPtr result = BuildQuerySchemaResult(); |
143 | EXPECT_CALL(*connector_, execute_query(_, _, _)) |
144 | .Times(2) |
145 | .WillOnce(Invoke([&result1](const std::string &, |
146 | MysqlResultWrapperPtr *out, bool) -> int { |
147 | *out = result1; |
148 | return 0; |
149 | })) |
150 | .WillOnce(Invoke([&result](const std::string &, |
151 | MysqlResultWrapperPtr *out, bool) -> int { |
152 | *out = result; |
153 | return 0; |
154 | })) |
155 | .RetiresOnSaturation(); |
156 | |
157 | // get table schema |
158 | ret = fetcher_->get_table_schema(table_name_, &schema_); |
159 | ASSERT_EQ(ret, 0); |
160 | } |
161 | |
162 | TableMapEventPtr BuildTableMapEvent() { |
163 | std::vector<bool> column_nulls(column_types_.size(), false); |
164 | column_nulls[column_types_.size() - 1] = true; |
165 | std::string table_map = EventBuilder::BuildTableMapEvent( |
166 | table_id_, db_, table_name_, column_types_, column_metas_, |
167 | column_nulls); |
168 | return std::make_shared<TableMapEvent>(table_map.data(), table_map.size()); |
169 | } |
170 | |
171 | RowsEventPtr BuildWriteRowsEvent(std::vector<std::string> &column_values, |
172 | const TableMapEventPtr &table_map) { |
173 | std::vector<bool> column_nulls(column_types_.size(), false); |
174 | std::string rows_str = EventBuilder::BuildWriteRowsEvent( |
175 | table_id_, column_nulls, column_types_, column_values, table_map); |
176 | return std::make_shared<RowsEvent>(rows_str.data(), rows_str.size()); |
177 | } |
178 | |
179 | RowsEventPtr BuildDeleteRowsEvent(std::vector<std::string> &column_values, |
180 | const TableMapEventPtr &table_map) { |
181 | std::vector<bool> column_nulls(column_types_.size(), false); |
182 | std::string rows_str = EventBuilder::BuildDeleteRowsEvent( |
183 | table_id_, column_nulls, column_types_, column_values, table_map); |
184 | return std::make_shared<RowsEvent>(rows_str.data(), rows_str.size()); |
185 | } |
186 | |
187 | RowsEventPtr BuildUpdateRowsEvent(std::vector<std::string> &old_values, |
188 | std::vector<std::string> &new_values, |
189 | const TableMapEventPtr &table_map) { |
190 | std::vector<bool> column_nulls(column_types_.size(), false); |
191 | std::string rows_str = EventBuilder::BuildUpdateRowsEvent( |
192 | table_id_, column_nulls, column_types_, old_values, new_values, |
193 | table_map); |
194 | return std::make_shared<RowsEvent>(rows_str.data(), rows_str.size()); |
195 | } |
196 | |
197 | protected: |
198 | MysqlConnectorManagerPtr mgr_{}; |
199 | MockMysqlConnectorPtr connector_{}; |
200 | std::string connection_uri_{}; |
201 | std::string user_{}; |
202 | std::string password_{}; |
203 | std::string table_name_{}; |
204 | std::string db_{}; |
205 | InfoFetcherPtr fetcher_{}; |
206 | TableSchemaPtr schema_; |
207 | uint64_t table_id_{0}; |
208 | std::vector<enum_field_types> column_types_{}; |
209 | std::vector<int32_t> column_metas_{}; |
210 | }; |
211 | |
212 | TEST_F(RowsEventParserTest, TestSimple) { |
213 | RowsEventParser parser(schema_); |
214 | TableMapEventPtr table_map = BuildTableMapEvent(); |
215 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
216 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
217 | RowsEventPtr event = BuildWriteRowsEvent(values, table_map); |
218 | ASSERT_TRUE(event); |
219 | event->table_map_ = table_map; |
220 | WriteRequest::Row row_data; |
221 | LsnContext ctx; |
222 | int ret = parser.parse(event.get(), &row_data, &ctx); |
223 | ASSERT_EQ(ret, 0); |
224 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
225 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
226 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30); |
227 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
228 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
229 | } |
230 | |
231 | TEST_F(RowsEventParserTest, TestParseWriteEventSuccess) { |
232 | RowsEventParser parser(schema_); |
233 | TableMapEventPtr table_map = BuildTableMapEvent(); |
234 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
235 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
236 | RowsEventPtr event = BuildWriteRowsEvent(values, table_map); |
237 | ASSERT_TRUE(event); |
238 | event->table_map_ = table_map; |
239 | WriteRequest::Row row_data; |
240 | LsnContext ctx; |
241 | int ret = parser.parse(event.get(), &row_data, &ctx); |
242 | ASSERT_EQ(ret, 0); |
243 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
244 | ASSERT_EQ(row_data.operation_type(), proto::OP_INSERT); |
245 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
246 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30); |
247 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
248 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
249 | } |
250 | |
251 | TEST_F(RowsEventParserTest, TestParseDeleteEventSuccess) { |
252 | RowsEventParser parser(schema_); |
253 | TableMapEventPtr table_map = BuildTableMapEvent(); |
254 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
255 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
256 | RowsEventPtr event = BuildDeleteRowsEvent(values, table_map); |
257 | ASSERT_TRUE(event); |
258 | event->table_map_ = table_map; |
259 | WriteRequest::Row row_data; |
260 | LsnContext ctx; |
261 | int ret = parser.parse(event.get(), &row_data, &ctx); |
262 | ASSERT_EQ(ret, 0); |
263 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
264 | ASSERT_EQ(row_data.operation_type(), proto::OP_DELETE); |
265 | } |
266 | |
267 | TEST_F(RowsEventParserTest, TestParseUpdateEventSuccess) { |
268 | RowsEventParser parser(schema_); |
269 | TableMapEventPtr table_map = BuildTableMapEvent(); |
270 | std::vector<std::string> old_values = { |
271 | "1" , "name1" , "30" , "123.456" , "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
272 | std::vector<std::string> new_values = { |
273 | "1" , "name2" , "40" , "123.456" , "2,2,3,4" , "2,2,3,5" , "1,2,3,6" }; |
274 | RowsEventPtr event = BuildUpdateRowsEvent(old_values, new_values, table_map); |
275 | ASSERT_TRUE(event); |
276 | event->table_map_ = table_map; |
277 | WriteRequest::Row row_data; |
278 | LsnContext ctx; |
279 | int ret = parser.parse(event.get(), &row_data, &ctx); |
280 | ASSERT_EQ(ret, 0); |
281 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
282 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2" ); |
283 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 40); |
284 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4" ); |
285 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5" ); |
286 | } |
287 | |
288 | TEST_F(RowsEventParserTest, TestParseFailedWithSchemaMismatched) { |
289 | schema_->add_field(FieldPtr()); |
290 | RowsEventParser parser(schema_); |
291 | TableMapEventPtr table_map = BuildTableMapEvent(); |
292 | std::vector<std::string> old_values = { |
293 | "1" , "name1" , "30" , "123.456" , "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
294 | std::vector<std::string> new_values = { |
295 | "1" , "name2" , "40" , "123.456" , "2,2,3,4" , "2,2,3,5" , "1,2,3,6" }; |
296 | RowsEventPtr event = BuildUpdateRowsEvent(old_values, new_values, table_map); |
297 | ASSERT_TRUE(event); |
298 | event->table_map_ = table_map; |
299 | WriteRequest::Row row_data; |
300 | LsnContext ctx; |
301 | int ret = parser.parse(event.get(), &row_data, &ctx); |
302 | ASSERT_EQ(ret, ErrorCode_InvalidRowData); |
303 | } |
304 | |
305 | TEST_F(RowsEventParserTest, TestParseFailedWithParseRowData) { |
306 | RowsEventParser parser(schema_); |
307 | TableMapEventPtr table_map = BuildTableMapEvent(); |
308 | std::vector<std::string> values = {"1" , "name1" , "30" , "123.456" , |
309 | "1,2,3,4" , "1,2,3,5" , "1,2,3,6" }; |
310 | RowsEventPtr event = BuildDeleteRowsEvent(values, table_map); |
311 | ASSERT_TRUE(event); |
312 | event->cur_buf_ = nullptr; |
313 | event->table_map_ = table_map; |
314 | WriteRequest::Row row_data; |
315 | LsnContext ctx; |
316 | int ret = parser.parse(event.get(), &row_data, &ctx); |
317 | ASSERT_EQ(ret, ErrorCode_InvalidRowData); |
318 | } |
319 | |
320 | TEST_F(RowsEventParserTest, TestGetAutoIncrementId) { |
321 | RowsEventParser parser(schema_); |
322 | uint64_t id; |
323 | GenericValue value; |
324 | { |
325 | value.set_int32_value(100); |
326 | id = parser.get_auto_increment_id(value); |
327 | ASSERT_EQ(id, 100); |
328 | } |
329 | { |
330 | value.set_int64_value(1000); |
331 | id = parser.get_auto_increment_id(value); |
332 | ASSERT_EQ(id, 1000); |
333 | } |
334 | { |
335 | value.set_uint32_value(100); |
336 | id = parser.get_auto_increment_id(value); |
337 | ASSERT_EQ(id, 100); |
338 | } |
339 | { |
340 | value.set_uint64_value(100); |
341 | id = parser.get_auto_increment_id(value); |
342 | ASSERT_EQ(id, 100); |
343 | } |
344 | { |
345 | value.set_bytes_value("100" ); |
346 | id = parser.get_auto_increment_id(value); |
347 | ASSERT_EQ(id, INVALID_PRIMARY_KEY); |
348 | } |
349 | } |
350 | |