1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author hongqing.hu
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gtest/gtest.h>
22
23#define private public
24#include "repository/binlog/info_fetcher.h"
25#include "repository/binlog/rows_event_parser.h"
26#include "mock_mysql_connector.h"
27#undef private
28#include "repository/repository_common/error_code.h"
29#include "event_builder.h"
30
31using namespace ::proxima::be;
32using namespace proxima::be::repository;
33
34class RowsEventParserTest : public testing::Test {
35 protected:
36 void SetUp() {
37 mgr_ = std::make_shared<MysqlConnectorManager>();
38 ASSERT_TRUE(mgr_);
39 connector_ = std::make_shared<MockMysqlConnector>();
40 ASSERT_TRUE(connector_);
41 mgr_->put(connector_);
42 connection_uri_ = "mysql://127.0.0.1:3306/mytest";
43 user_ = "root";
44 password_ = "root";
45 table_name_ = "table";
46 db_ = "mytest";
47 InitTableSchema();
48
49 table_id_ = 1000;
50 }
51
52 void TearDown() {}
53
54 MockMysqlResultWrapperPtr BuildQuerySchemaResult() {
55 MockMysqlResultWrapperPtr result =
56 std::make_shared<MockMysqlResultWrapper>();
57 result->append_field_meta("id", MYSQL_TYPE_LONG, 11, 0,
58 AUTO_INCREMENT_FLAG);
59 result->append_field_meta("name", MYSQL_TYPE_VAR_STRING, 100);
60 result->append_field_meta("age", MYSQL_TYPE_LONG, 11);
61 result->append_field_meta("score", MYSQL_TYPE_FLOAT, 12);
62 result->append_field_meta("vector1", MYSQL_TYPE_VAR_STRING, 1024);
63 result->append_field_meta("vector2", MYSQL_TYPE_VAR_STRING, 1024);
64 result->append_field_meta("vector3", MYSQL_TYPE_VAR_STRING, 1024);
65 column_types_.push_back(MYSQL_TYPE_LONG);
66 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
67 column_types_.push_back(MYSQL_TYPE_LONG);
68 column_types_.push_back(MYSQL_TYPE_FLOAT);
69 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
70 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
71 column_types_.push_back(MYSQL_TYPE_VAR_STRING);
72 column_metas_.push_back(0);
73 column_metas_.push_back(2);
74 column_metas_.push_back(0);
75 column_metas_.push_back(0);
76 column_metas_.push_back(2);
77 column_metas_.push_back(2);
78 column_metas_.push_back(2);
79
80 return result;
81 }
82
83 MockMysqlResultWrapperPtr BuildQueryCollationResult() {
84 MockMysqlResultWrapperPtr result =
85 std::make_shared<MockMysqlResultWrapper>();
86 result->append_field_meta("Field", MYSQL_TYPE_VAR_STRING, 11);
87 result->append_field_meta("Type", MYSQL_TYPE_VAR_STRING, 100);
88 result->append_field_meta("Collation", MYSQL_TYPE_VAR_STRING, 11);
89
90 std::vector<std::string> values1 = {"id", "", ""};
91 result->append_row_values(values1);
92 std::vector<std::string> values2 = {"name", "", "utf8_general_ci"};
93 result->append_row_values(values2);
94 std::vector<std::string> values3 = {"age", "", ""};
95 result->append_row_values(values3);
96 std::vector<std::string> values4 = {"score", "", "utf8_general_ci"};
97 result->append_row_values(values4);
98 std::vector<std::string> values5 = {"vector1", "", "utf8_general_ci"};
99 result->append_row_values(values5);
100 std::vector<std::string> values6 = {"vector2", "", "utf8_general_ci"};
101 result->append_row_values(values6);
102 std::vector<std::string> values7 = {"vector3", "", "utf8_general_ci"};
103 result->append_row_values(values7);
104
105 return result;
106 }
107
108 void BuildCollectionConfig(CollectionConfig &config) {
109 auto *repo = config.mutable_repository_config();
110 repo->set_repository_type(CollectionConfig::RepositoryConfig::RT_DATABASE);
111 repo->set_repository_name(table_name_);
112 auto *database = repo->mutable_database();
113 database->set_connection_uri(connection_uri_);
114 database->set_table_name(table_name_);
115 database->set_user(user_);
116 database->set_password(password_);
117
118 config.add_forward_column_names("name");
119 config.add_forward_column_names("age");
120 auto *index1 = config.add_index_column_params();
121 index1->set_column_name("vector1");
122 auto *index2 = config.add_index_column_params();
123 index2->set_column_name("vector2");
124 }
125
126 void InitTableSchema() {
127 CollectionConfig config;
128 BuildCollectionConfig(config);
129
130 // init
131 ailego::Uri test_uri;
132 ASSERT_TRUE(ailego::Uri::Parse(connection_uri_.c_str(), &test_uri));
133 EXPECT_CALL(*connector_, uri())
134 .WillOnce(
135 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
136 .RetiresOnSaturation();
137 fetcher_ = std::make_shared<InfoFetcher>(config, mgr_);
138 int ret = fetcher_->init();
139 ASSERT_EQ(ret, 0);
140
141 MockMysqlResultWrapperPtr result1 = BuildQueryCollationResult();
142 MockMysqlResultWrapperPtr result = BuildQuerySchemaResult();
143 EXPECT_CALL(*connector_, execute_query(_, _, _))
144 .Times(2)
145 .WillOnce(Invoke([&result1](const std::string &,
146 MysqlResultWrapperPtr *out, bool) -> int {
147 *out = result1;
148 return 0;
149 }))
150 .WillOnce(Invoke([&result](const std::string &,
151 MysqlResultWrapperPtr *out, bool) -> int {
152 *out = result;
153 return 0;
154 }))
155 .RetiresOnSaturation();
156
157 // get table schema
158 ret = fetcher_->get_table_schema(table_name_, &schema_);
159 ASSERT_EQ(ret, 0);
160 }
161
162 TableMapEventPtr BuildTableMapEvent() {
163 std::vector<bool> column_nulls(column_types_.size(), false);
164 column_nulls[column_types_.size() - 1] = true;
165 std::string table_map = EventBuilder::BuildTableMapEvent(
166 table_id_, db_, table_name_, column_types_, column_metas_,
167 column_nulls);
168 return std::make_shared<TableMapEvent>(table_map.data(), table_map.size());
169 }
170
171 RowsEventPtr BuildWriteRowsEvent(std::vector<std::string> &column_values,
172 const TableMapEventPtr &table_map) {
173 std::vector<bool> column_nulls(column_types_.size(), false);
174 std::string rows_str = EventBuilder::BuildWriteRowsEvent(
175 table_id_, column_nulls, column_types_, column_values, table_map);
176 return std::make_shared<RowsEvent>(rows_str.data(), rows_str.size());
177 }
178
179 RowsEventPtr BuildDeleteRowsEvent(std::vector<std::string> &column_values,
180 const TableMapEventPtr &table_map) {
181 std::vector<bool> column_nulls(column_types_.size(), false);
182 std::string rows_str = EventBuilder::BuildDeleteRowsEvent(
183 table_id_, column_nulls, column_types_, column_values, table_map);
184 return std::make_shared<RowsEvent>(rows_str.data(), rows_str.size());
185 }
186
187 RowsEventPtr BuildUpdateRowsEvent(std::vector<std::string> &old_values,
188 std::vector<std::string> &new_values,
189 const TableMapEventPtr &table_map) {
190 std::vector<bool> column_nulls(column_types_.size(), false);
191 std::string rows_str = EventBuilder::BuildUpdateRowsEvent(
192 table_id_, column_nulls, column_types_, old_values, new_values,
193 table_map);
194 return std::make_shared<RowsEvent>(rows_str.data(), rows_str.size());
195 }
196
197 protected:
198 MysqlConnectorManagerPtr mgr_{};
199 MockMysqlConnectorPtr connector_{};
200 std::string connection_uri_{};
201 std::string user_{};
202 std::string password_{};
203 std::string table_name_{};
204 std::string db_{};
205 InfoFetcherPtr fetcher_{};
206 TableSchemaPtr schema_;
207 uint64_t table_id_{0};
208 std::vector<enum_field_types> column_types_{};
209 std::vector<int32_t> column_metas_{};
210};
211
212TEST_F(RowsEventParserTest, TestSimple) {
213 RowsEventParser parser(schema_);
214 TableMapEventPtr table_map = BuildTableMapEvent();
215 std::vector<std::string> values = {"1", "name1", "30", "123.456",
216 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
217 RowsEventPtr event = BuildWriteRowsEvent(values, table_map);
218 ASSERT_TRUE(event);
219 event->table_map_ = table_map;
220 WriteRequest::Row row_data;
221 LsnContext ctx;
222 int ret = parser.parse(event.get(), &row_data, &ctx);
223 ASSERT_EQ(ret, 0);
224 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
225 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
226 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30);
227 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
228 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
229}
230
231TEST_F(RowsEventParserTest, TestParseWriteEventSuccess) {
232 RowsEventParser parser(schema_);
233 TableMapEventPtr table_map = BuildTableMapEvent();
234 std::vector<std::string> values = {"1", "name1", "30", "123.456",
235 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
236 RowsEventPtr event = BuildWriteRowsEvent(values, table_map);
237 ASSERT_TRUE(event);
238 event->table_map_ = table_map;
239 WriteRequest::Row row_data;
240 LsnContext ctx;
241 int ret = parser.parse(event.get(), &row_data, &ctx);
242 ASSERT_EQ(ret, 0);
243 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
244 ASSERT_EQ(row_data.operation_type(), proto::OP_INSERT);
245 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
246 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 30);
247 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
248 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
249}
250
251TEST_F(RowsEventParserTest, TestParseDeleteEventSuccess) {
252 RowsEventParser parser(schema_);
253 TableMapEventPtr table_map = BuildTableMapEvent();
254 std::vector<std::string> values = {"1", "name1", "30", "123.456",
255 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
256 RowsEventPtr event = BuildDeleteRowsEvent(values, table_map);
257 ASSERT_TRUE(event);
258 event->table_map_ = table_map;
259 WriteRequest::Row row_data;
260 LsnContext ctx;
261 int ret = parser.parse(event.get(), &row_data, &ctx);
262 ASSERT_EQ(ret, 0);
263 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
264 ASSERT_EQ(row_data.operation_type(), proto::OP_DELETE);
265}
266
267TEST_F(RowsEventParserTest, TestParseUpdateEventSuccess) {
268 RowsEventParser parser(schema_);
269 TableMapEventPtr table_map = BuildTableMapEvent();
270 std::vector<std::string> old_values = {
271 "1", "name1", "30", "123.456", "1,2,3,4", "1,2,3,5", "1,2,3,6"};
272 std::vector<std::string> new_values = {
273 "1", "name2", "40", "123.456", "2,2,3,4", "2,2,3,5", "1,2,3,6"};
274 RowsEventPtr event = BuildUpdateRowsEvent(old_values, new_values, table_map);
275 ASSERT_TRUE(event);
276 event->table_map_ = table_map;
277 WriteRequest::Row row_data;
278 LsnContext ctx;
279 int ret = parser.parse(event.get(), &row_data, &ctx);
280 ASSERT_EQ(ret, 0);
281 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
282 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2");
283 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 40);
284 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4");
285 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5");
286}
287
288TEST_F(RowsEventParserTest, TestParseFailedWithSchemaMismatched) {
289 schema_->add_field(FieldPtr());
290 RowsEventParser parser(schema_);
291 TableMapEventPtr table_map = BuildTableMapEvent();
292 std::vector<std::string> old_values = {
293 "1", "name1", "30", "123.456", "1,2,3,4", "1,2,3,5", "1,2,3,6"};
294 std::vector<std::string> new_values = {
295 "1", "name2", "40", "123.456", "2,2,3,4", "2,2,3,5", "1,2,3,6"};
296 RowsEventPtr event = BuildUpdateRowsEvent(old_values, new_values, table_map);
297 ASSERT_TRUE(event);
298 event->table_map_ = table_map;
299 WriteRequest::Row row_data;
300 LsnContext ctx;
301 int ret = parser.parse(event.get(), &row_data, &ctx);
302 ASSERT_EQ(ret, ErrorCode_InvalidRowData);
303}
304
305TEST_F(RowsEventParserTest, TestParseFailedWithParseRowData) {
306 RowsEventParser parser(schema_);
307 TableMapEventPtr table_map = BuildTableMapEvent();
308 std::vector<std::string> values = {"1", "name1", "30", "123.456",
309 "1,2,3,4", "1,2,3,5", "1,2,3,6"};
310 RowsEventPtr event = BuildDeleteRowsEvent(values, table_map);
311 ASSERT_TRUE(event);
312 event->cur_buf_ = nullptr;
313 event->table_map_ = table_map;
314 WriteRequest::Row row_data;
315 LsnContext ctx;
316 int ret = parser.parse(event.get(), &row_data, &ctx);
317 ASSERT_EQ(ret, ErrorCode_InvalidRowData);
318}
319
320TEST_F(RowsEventParserTest, TestGetAutoIncrementId) {
321 RowsEventParser parser(schema_);
322 uint64_t id;
323 GenericValue value;
324 {
325 value.set_int32_value(100);
326 id = parser.get_auto_increment_id(value);
327 ASSERT_EQ(id, 100);
328 }
329 {
330 value.set_int64_value(1000);
331 id = parser.get_auto_increment_id(value);
332 ASSERT_EQ(id, 1000);
333 }
334 {
335 value.set_uint32_value(100);
336 id = parser.get_auto_increment_id(value);
337 ASSERT_EQ(id, 100);
338 }
339 {
340 value.set_uint64_value(100);
341 id = parser.get_auto_increment_id(value);
342 ASSERT_EQ(id, 100);
343 }
344 {
345 value.set_bytes_value("100");
346 id = parser.get_auto_increment_id(value);
347 ASSERT_EQ(id, INVALID_PRIMARY_KEY);
348 }
349}
350