1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author hongqing.hu |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gtest/gtest.h> |
22 | |
23 | #define private public |
24 | #include "repository/binlog/table_reader.h" |
25 | #include "mock_mysql_connector.h" |
26 | #include "mysql_result_builder.h" |
27 | #undef private |
28 | |
29 | #include "repository/repository_common/error_code.h" |
30 | |
31 | using namespace ::proxima::be; |
32 | using namespace proxima::be::repository; |
33 | |
34 | class TableReaderTest : public testing::Test { |
35 | protected: |
36 | void SetUp() { |
37 | mgr_ = std::make_shared<MysqlConnectorManager>(); |
38 | ASSERT_TRUE(mgr_); |
39 | connector1_ = std::make_shared<MockMysqlConnector>(); |
40 | ASSERT_TRUE(connector1_); |
41 | mgr_->put(connector1_); |
42 | connector2_ = std::make_shared<MockMysqlConnector>(); |
43 | ASSERT_TRUE(connector2_); |
44 | mgr_->put(connector2_); |
45 | table_name_ = builder_.table_name_; |
46 | ctx_.seq_id = 1; |
47 | InitFetcher(); |
48 | } |
49 | |
50 | void TearDown() {} |
51 | |
52 | void InitFetcher() { |
53 | builder_.BuildCollectionConfig(); |
54 | |
55 | // init |
56 | ailego::Uri test_uri = builder_.uri_; |
57 | EXPECT_CALL(*connector1_, uri()) |
58 | .WillOnce( |
59 | Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; })) |
60 | .RetiresOnSaturation(); |
61 | fetcher_ = std::make_shared<InfoFetcher>(builder_.config_, mgr_); |
62 | int ret = fetcher_->init(); |
63 | ASSERT_EQ(ret, 0); |
64 | ASSERT_EQ(fetcher_->database(), "mytest" ); |
65 | } |
66 | |
67 | protected: |
68 | MysqlConnectorManagerPtr mgr_{}; |
69 | MockMysqlConnectorPtr connector1_{}; |
70 | MockMysqlConnectorPtr connector2_{}; |
71 | std::string table_name_{}; |
72 | InfoFetcherPtr fetcher_{}; |
73 | LsnContext ctx_; |
74 | MysqlResultBuilder builder_{}; |
75 | }; |
76 | |
77 | TEST_F(TableReaderTest, TestGeneral) { |
78 | TableReader reader(table_name_, fetcher_, mgr_); |
79 | |
80 | // build schema result |
81 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
82 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
83 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
84 | .Times(2) |
85 | .WillOnce(Invoke([&result1](const std::string &, |
86 | MysqlResultWrapperPtr *out, bool) -> int { |
87 | *out = result1; |
88 | return 0; |
89 | })) |
90 | .WillOnce( |
91 | Invoke([&schema_result](const std::string &, |
92 | MysqlResultWrapperPtr *out, bool) -> int { |
93 | *out = schema_result; |
94 | return 0; |
95 | })) |
96 | .RetiresOnSaturation(); |
97 | // init table reader |
98 | int ret = reader.init(); |
99 | ASSERT_EQ(ret, 0); |
100 | |
101 | // build scan table result |
102 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
103 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
104 | .WillOnce(Invoke([&scan_result](const std::string &, |
105 | MysqlResultWrapperPtr *out, bool) -> int { |
106 | *out = scan_result; |
107 | return 0; |
108 | })) |
109 | .RetiresOnSaturation(); |
110 | ret = reader.start(ctx_); |
111 | ASSERT_EQ(ret, 0); |
112 | |
113 | // get next row data |
114 | WriteRequest::Row row_data; |
115 | LsnContext ctx; |
116 | ret = reader.get_next_row_data(&row_data, &ctx); |
117 | ASSERT_EQ(ret, 0); |
118 | ASSERT_EQ(ctx.seq_id, (uint64_t)1); |
119 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
120 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
121 | ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT); |
122 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
123 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 18); |
124 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
125 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
126 | |
127 | row_data.Clear(); |
128 | ret = reader.get_next_row_data(&row_data, &ctx); |
129 | ASSERT_EQ(ret, 0); |
130 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
131 | ASSERT_EQ(ctx.seq_id, (uint64_t)2); |
132 | ASSERT_EQ(row_data.primary_key(), (uint64_t)2); |
133 | ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT); |
134 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2" ); |
135 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 19); |
136 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4" ); |
137 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5" ); |
138 | |
139 | row_data.Clear(); |
140 | ret = reader.get_next_row_data(&row_data, &ctx); |
141 | ASSERT_EQ(ret, 0); |
142 | ASSERT_EQ(ctx.status, RowDataStatus::NO_MORE_DATA); |
143 | } |
144 | |
145 | TEST_F(TableReaderTest, TestInitSuccess) { |
146 | TableReader reader(table_name_, fetcher_, mgr_); |
147 | |
148 | // build schema result |
149 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
150 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
151 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
152 | .Times(2) |
153 | .WillOnce(Invoke([&result1](const std::string &, |
154 | MysqlResultWrapperPtr *out, bool) -> int { |
155 | *out = result1; |
156 | return 0; |
157 | })) |
158 | .WillOnce( |
159 | Invoke([&schema_result](const std::string &, |
160 | MysqlResultWrapperPtr *out, bool) -> int { |
161 | *out = schema_result; |
162 | return 0; |
163 | })) |
164 | .RetiresOnSaturation(); |
165 | // init table reader |
166 | int ret = reader.init(); |
167 | ASSERT_EQ(ret, 0); |
168 | |
169 | // build scan table result |
170 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
171 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
172 | .WillOnce(Invoke([&scan_result](const std::string &, |
173 | MysqlResultWrapperPtr *out, bool) -> int { |
174 | *out = scan_result; |
175 | return 0; |
176 | })) |
177 | .RetiresOnSaturation(); |
178 | ret = reader.start(ctx_); |
179 | ASSERT_EQ(ret, 0); |
180 | } |
181 | |
182 | TEST_F(TableReaderTest, TestInitFailedWithGetConnector) { |
183 | MysqlConnectorManagerPtr mgr = std::make_shared<MysqlConnectorManager>(); |
184 | MysqlConnectorPtr connector; |
185 | mgr->put(connector); |
186 | TableReader reader(table_name_, fetcher_, mgr); |
187 | int ret = reader.init(); |
188 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
189 | } |
190 | |
191 | TEST_F(TableReaderTest, TestInitFailedWithGetTableSchema) { |
192 | TableReader reader(table_name_, fetcher_, mgr_); |
193 | |
194 | // build schema result |
195 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
196 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
197 | .WillOnce( |
198 | Invoke([&schema_result](const std::string &, |
199 | MysqlResultWrapperPtr *out, bool) -> int { |
200 | *out = schema_result; |
201 | return ErrorCode_ExecuteMysql; |
202 | })) |
203 | .RetiresOnSaturation(); |
204 | // init table reader |
205 | int ret = reader.init(); |
206 | ASSERT_EQ(ret, ErrorCode_ExecuteMysql); |
207 | } |
208 | |
209 | TEST_F(TableReaderTest, TestStartFailedWithPrepareReader) { |
210 | TableReader reader(table_name_, fetcher_, mgr_); |
211 | |
212 | // build schema result |
213 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
214 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
215 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
216 | .Times(2) |
217 | .WillOnce(Invoke([&result1](const std::string &, |
218 | MysqlResultWrapperPtr *out, bool) -> int { |
219 | *out = result1; |
220 | return 0; |
221 | })) |
222 | .WillOnce( |
223 | Invoke([&schema_result](const std::string &, |
224 | MysqlResultWrapperPtr *out, bool) -> int { |
225 | *out = schema_result; |
226 | return 0; |
227 | })) |
228 | .RetiresOnSaturation(); |
229 | // init table reader |
230 | int ret = reader.init(); |
231 | ASSERT_EQ(ret, 0); |
232 | |
233 | // build scan table result |
234 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
235 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
236 | .WillOnce(Invoke([&scan_result](const std::string &, |
237 | MysqlResultWrapperPtr *out, bool) -> int { |
238 | *out = scan_result; |
239 | return ErrorCode_ExecuteMysql; |
240 | })) |
241 | .RetiresOnSaturation(); |
242 | ret = reader.start(ctx_); |
243 | ASSERT_EQ(ret, ErrorCode_ExecuteMysql); |
244 | } |
245 | |
246 | TEST_F(TableReaderTest, TestGetNextRowDataSuccess) { |
247 | TableReader reader(table_name_, fetcher_, mgr_); |
248 | |
249 | // build schema result |
250 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
251 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
252 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
253 | .Times(2) |
254 | .WillOnce(Invoke([&result1](const std::string &, |
255 | MysqlResultWrapperPtr *out, bool) -> int { |
256 | *out = result1; |
257 | return 0; |
258 | })) |
259 | .WillOnce( |
260 | Invoke([&schema_result](const std::string &, |
261 | MysqlResultWrapperPtr *out, bool) -> int { |
262 | *out = schema_result; |
263 | return 0; |
264 | })) |
265 | .RetiresOnSaturation(); |
266 | |
267 | // init table reader |
268 | int ret = reader.init(); |
269 | ASSERT_EQ(ret, 0); |
270 | |
271 | // build scan table result |
272 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
273 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
274 | .WillOnce(Invoke([&scan_result](const std::string &, |
275 | MysqlResultWrapperPtr *out, bool) -> int { |
276 | *out = scan_result; |
277 | return 0; |
278 | })) |
279 | .RetiresOnSaturation(); |
280 | ret = reader.start(ctx_); |
281 | ASSERT_EQ(ret, 0); |
282 | |
283 | // get next row data |
284 | WriteRequest::Row row_data; |
285 | LsnContext ctx; |
286 | ret = reader.get_next_row_data(&row_data, &ctx); |
287 | ASSERT_EQ(ret, 0); |
288 | ASSERT_EQ(ctx.seq_id, (uint64_t)1); |
289 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
290 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
291 | ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT); |
292 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
293 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 18); |
294 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
295 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
296 | |
297 | row_data.Clear(); |
298 | ret = reader.get_next_row_data(&row_data, &ctx); |
299 | ASSERT_EQ(ret, 0); |
300 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
301 | ASSERT_EQ(ctx.seq_id, (uint64_t)2); |
302 | ASSERT_EQ(row_data.primary_key(), (uint64_t)2); |
303 | ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT); |
304 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2" ); |
305 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 19); |
306 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4" ); |
307 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5" ); |
308 | |
309 | row_data.Clear(); |
310 | ret = reader.get_next_row_data(&row_data, &ctx); |
311 | ASSERT_EQ(ret, 0); |
312 | ASSERT_EQ(ctx.status, RowDataStatus::NO_MORE_DATA); |
313 | } |
314 | |
315 | TEST_F(TableReaderTest, TestGetNextRowDataFailed) { |
316 | TableReader reader(table_name_, fetcher_, mgr_); |
317 | |
318 | // build schema result |
319 | MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult(); |
320 | MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult(); |
321 | EXPECT_CALL(*connector1_, execute_query(_, _, _)) |
322 | .Times(2) |
323 | .WillOnce(Invoke([&result1](const std::string &, |
324 | MysqlResultWrapperPtr *out, bool) -> int { |
325 | *out = result1; |
326 | return 0; |
327 | })) |
328 | .WillOnce( |
329 | Invoke([&schema_result](const std::string &, |
330 | MysqlResultWrapperPtr *out, bool) -> int { |
331 | *out = schema_result; |
332 | return 0; |
333 | })) |
334 | .RetiresOnSaturation(); |
335 | |
336 | // init table reader |
337 | int ret = reader.init(); |
338 | ASSERT_EQ(ret, 0); |
339 | |
340 | // build scan table result |
341 | MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult(); |
342 | EXPECT_CALL(*connector2_, execute_query(_, _, _)) |
343 | .WillOnce(Invoke([&scan_result](const std::string &, |
344 | MysqlResultWrapperPtr *out, bool) -> int { |
345 | *out = scan_result; |
346 | return 0; |
347 | })) |
348 | .RetiresOnSaturation(); |
349 | |
350 | ret = reader.start(ctx_); |
351 | ASSERT_EQ(ret, 0); |
352 | |
353 | // get next row data |
354 | WriteRequest::Row row_data; |
355 | LsnContext ctx; |
356 | ret = reader.get_next_row_data(&row_data, &ctx); |
357 | ASSERT_EQ(ret, 0); |
358 | ASSERT_EQ(ctx.seq_id, (uint64_t)1); |
359 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
360 | ASSERT_EQ(row_data.primary_key(), (uint64_t)1); |
361 | ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT); |
362 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1" ); |
363 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 18); |
364 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4" ); |
365 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5" ); |
366 | |
367 | row_data.Clear(); |
368 | ret = reader.get_next_row_data(&row_data, &ctx); |
369 | ASSERT_EQ(ret, 0); |
370 | ASSERT_EQ(ctx.status, RowDataStatus::NORMAL); |
371 | ASSERT_EQ(ctx.seq_id, (uint64_t)2); |
372 | ASSERT_EQ(row_data.primary_key(), (uint64_t)2); |
373 | ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT); |
374 | ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2" ); |
375 | ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 19); |
376 | ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4" ); |
377 | ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5" ); |
378 | |
379 | row_data.Clear(); |
380 | scan_result->set_has_error(true); |
381 | ret = reader.get_next_row_data(&row_data, &ctx); |
382 | ASSERT_EQ(ret, ErrorCode_FetchMysqlResult); |
383 | } |
384 | |