1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author hongqing.hu
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gtest/gtest.h>
22
23#define private public
24#include "repository/binlog/table_reader.h"
25#include "mock_mysql_connector.h"
26#include "mysql_result_builder.h"
27#undef private
28
29#include "repository/repository_common/error_code.h"
30
31using namespace ::proxima::be;
32using namespace proxima::be::repository;
33
34class TableReaderTest : public testing::Test {
35 protected:
36 void SetUp() {
37 mgr_ = std::make_shared<MysqlConnectorManager>();
38 ASSERT_TRUE(mgr_);
39 connector1_ = std::make_shared<MockMysqlConnector>();
40 ASSERT_TRUE(connector1_);
41 mgr_->put(connector1_);
42 connector2_ = std::make_shared<MockMysqlConnector>();
43 ASSERT_TRUE(connector2_);
44 mgr_->put(connector2_);
45 table_name_ = builder_.table_name_;
46 ctx_.seq_id = 1;
47 InitFetcher();
48 }
49
50 void TearDown() {}
51
52 void InitFetcher() {
53 builder_.BuildCollectionConfig();
54
55 // init
56 ailego::Uri test_uri = builder_.uri_;
57 EXPECT_CALL(*connector1_, uri())
58 .WillOnce(
59 Invoke([&test_uri]() -> const ailego::Uri & { return test_uri; }))
60 .RetiresOnSaturation();
61 fetcher_ = std::make_shared<InfoFetcher>(builder_.config_, mgr_);
62 int ret = fetcher_->init();
63 ASSERT_EQ(ret, 0);
64 ASSERT_EQ(fetcher_->database(), "mytest");
65 }
66
67 protected:
68 MysqlConnectorManagerPtr mgr_{};
69 MockMysqlConnectorPtr connector1_{};
70 MockMysqlConnectorPtr connector2_{};
71 std::string table_name_{};
72 InfoFetcherPtr fetcher_{};
73 LsnContext ctx_;
74 MysqlResultBuilder builder_{};
75};
76
77TEST_F(TableReaderTest, TestGeneral) {
78 TableReader reader(table_name_, fetcher_, mgr_);
79
80 // build schema result
81 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
82 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
83 EXPECT_CALL(*connector1_, execute_query(_, _, _))
84 .Times(2)
85 .WillOnce(Invoke([&result1](const std::string &,
86 MysqlResultWrapperPtr *out, bool) -> int {
87 *out = result1;
88 return 0;
89 }))
90 .WillOnce(
91 Invoke([&schema_result](const std::string &,
92 MysqlResultWrapperPtr *out, bool) -> int {
93 *out = schema_result;
94 return 0;
95 }))
96 .RetiresOnSaturation();
97 // init table reader
98 int ret = reader.init();
99 ASSERT_EQ(ret, 0);
100
101 // build scan table result
102 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
103 EXPECT_CALL(*connector2_, execute_query(_, _, _))
104 .WillOnce(Invoke([&scan_result](const std::string &,
105 MysqlResultWrapperPtr *out, bool) -> int {
106 *out = scan_result;
107 return 0;
108 }))
109 .RetiresOnSaturation();
110 ret = reader.start(ctx_);
111 ASSERT_EQ(ret, 0);
112
113 // get next row data
114 WriteRequest::Row row_data;
115 LsnContext ctx;
116 ret = reader.get_next_row_data(&row_data, &ctx);
117 ASSERT_EQ(ret, 0);
118 ASSERT_EQ(ctx.seq_id, (uint64_t)1);
119 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
120 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
121 ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT);
122 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
123 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 18);
124 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
125 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
126
127 row_data.Clear();
128 ret = reader.get_next_row_data(&row_data, &ctx);
129 ASSERT_EQ(ret, 0);
130 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
131 ASSERT_EQ(ctx.seq_id, (uint64_t)2);
132 ASSERT_EQ(row_data.primary_key(), (uint64_t)2);
133 ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT);
134 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2");
135 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 19);
136 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4");
137 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5");
138
139 row_data.Clear();
140 ret = reader.get_next_row_data(&row_data, &ctx);
141 ASSERT_EQ(ret, 0);
142 ASSERT_EQ(ctx.status, RowDataStatus::NO_MORE_DATA);
143}
144
145TEST_F(TableReaderTest, TestInitSuccess) {
146 TableReader reader(table_name_, fetcher_, mgr_);
147
148 // build schema result
149 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
150 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
151 EXPECT_CALL(*connector1_, execute_query(_, _, _))
152 .Times(2)
153 .WillOnce(Invoke([&result1](const std::string &,
154 MysqlResultWrapperPtr *out, bool) -> int {
155 *out = result1;
156 return 0;
157 }))
158 .WillOnce(
159 Invoke([&schema_result](const std::string &,
160 MysqlResultWrapperPtr *out, bool) -> int {
161 *out = schema_result;
162 return 0;
163 }))
164 .RetiresOnSaturation();
165 // init table reader
166 int ret = reader.init();
167 ASSERT_EQ(ret, 0);
168
169 // build scan table result
170 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
171 EXPECT_CALL(*connector2_, execute_query(_, _, _))
172 .WillOnce(Invoke([&scan_result](const std::string &,
173 MysqlResultWrapperPtr *out, bool) -> int {
174 *out = scan_result;
175 return 0;
176 }))
177 .RetiresOnSaturation();
178 ret = reader.start(ctx_);
179 ASSERT_EQ(ret, 0);
180}
181
182TEST_F(TableReaderTest, TestInitFailedWithGetConnector) {
183 MysqlConnectorManagerPtr mgr = std::make_shared<MysqlConnectorManager>();
184 MysqlConnectorPtr connector;
185 mgr->put(connector);
186 TableReader reader(table_name_, fetcher_, mgr);
187 int ret = reader.init();
188 ASSERT_EQ(ret, ErrorCode_RuntimeError);
189}
190
191TEST_F(TableReaderTest, TestInitFailedWithGetTableSchema) {
192 TableReader reader(table_name_, fetcher_, mgr_);
193
194 // build schema result
195 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
196 EXPECT_CALL(*connector1_, execute_query(_, _, _))
197 .WillOnce(
198 Invoke([&schema_result](const std::string &,
199 MysqlResultWrapperPtr *out, bool) -> int {
200 *out = schema_result;
201 return ErrorCode_ExecuteMysql;
202 }))
203 .RetiresOnSaturation();
204 // init table reader
205 int ret = reader.init();
206 ASSERT_EQ(ret, ErrorCode_ExecuteMysql);
207}
208
209TEST_F(TableReaderTest, TestStartFailedWithPrepareReader) {
210 TableReader reader(table_name_, fetcher_, mgr_);
211
212 // build schema result
213 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
214 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
215 EXPECT_CALL(*connector1_, execute_query(_, _, _))
216 .Times(2)
217 .WillOnce(Invoke([&result1](const std::string &,
218 MysqlResultWrapperPtr *out, bool) -> int {
219 *out = result1;
220 return 0;
221 }))
222 .WillOnce(
223 Invoke([&schema_result](const std::string &,
224 MysqlResultWrapperPtr *out, bool) -> int {
225 *out = schema_result;
226 return 0;
227 }))
228 .RetiresOnSaturation();
229 // init table reader
230 int ret = reader.init();
231 ASSERT_EQ(ret, 0);
232
233 // build scan table result
234 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
235 EXPECT_CALL(*connector2_, execute_query(_, _, _))
236 .WillOnce(Invoke([&scan_result](const std::string &,
237 MysqlResultWrapperPtr *out, bool) -> int {
238 *out = scan_result;
239 return ErrorCode_ExecuteMysql;
240 }))
241 .RetiresOnSaturation();
242 ret = reader.start(ctx_);
243 ASSERT_EQ(ret, ErrorCode_ExecuteMysql);
244}
245
246TEST_F(TableReaderTest, TestGetNextRowDataSuccess) {
247 TableReader reader(table_name_, fetcher_, mgr_);
248
249 // build schema result
250 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
251 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
252 EXPECT_CALL(*connector1_, execute_query(_, _, _))
253 .Times(2)
254 .WillOnce(Invoke([&result1](const std::string &,
255 MysqlResultWrapperPtr *out, bool) -> int {
256 *out = result1;
257 return 0;
258 }))
259 .WillOnce(
260 Invoke([&schema_result](const std::string &,
261 MysqlResultWrapperPtr *out, bool) -> int {
262 *out = schema_result;
263 return 0;
264 }))
265 .RetiresOnSaturation();
266
267 // init table reader
268 int ret = reader.init();
269 ASSERT_EQ(ret, 0);
270
271 // build scan table result
272 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
273 EXPECT_CALL(*connector2_, execute_query(_, _, _))
274 .WillOnce(Invoke([&scan_result](const std::string &,
275 MysqlResultWrapperPtr *out, bool) -> int {
276 *out = scan_result;
277 return 0;
278 }))
279 .RetiresOnSaturation();
280 ret = reader.start(ctx_);
281 ASSERT_EQ(ret, 0);
282
283 // get next row data
284 WriteRequest::Row row_data;
285 LsnContext ctx;
286 ret = reader.get_next_row_data(&row_data, &ctx);
287 ASSERT_EQ(ret, 0);
288 ASSERT_EQ(ctx.seq_id, (uint64_t)1);
289 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
290 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
291 ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT);
292 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
293 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 18);
294 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
295 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
296
297 row_data.Clear();
298 ret = reader.get_next_row_data(&row_data, &ctx);
299 ASSERT_EQ(ret, 0);
300 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
301 ASSERT_EQ(ctx.seq_id, (uint64_t)2);
302 ASSERT_EQ(row_data.primary_key(), (uint64_t)2);
303 ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT);
304 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2");
305 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 19);
306 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4");
307 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5");
308
309 row_data.Clear();
310 ret = reader.get_next_row_data(&row_data, &ctx);
311 ASSERT_EQ(ret, 0);
312 ASSERT_EQ(ctx.status, RowDataStatus::NO_MORE_DATA);
313}
314
315TEST_F(TableReaderTest, TestGetNextRowDataFailed) {
316 TableReader reader(table_name_, fetcher_, mgr_);
317
318 // build schema result
319 MockMysqlResultWrapperPtr schema_result = builder_.BuildQuerySchemaResult();
320 MockMysqlResultWrapperPtr result1 = builder_.BuildQueryCollationResult();
321 EXPECT_CALL(*connector1_, execute_query(_, _, _))
322 .Times(2)
323 .WillOnce(Invoke([&result1](const std::string &,
324 MysqlResultWrapperPtr *out, bool) -> int {
325 *out = result1;
326 return 0;
327 }))
328 .WillOnce(
329 Invoke([&schema_result](const std::string &,
330 MysqlResultWrapperPtr *out, bool) -> int {
331 *out = schema_result;
332 return 0;
333 }))
334 .RetiresOnSaturation();
335
336 // init table reader
337 int ret = reader.init();
338 ASSERT_EQ(ret, 0);
339
340 // build scan table result
341 MockMysqlResultWrapperPtr scan_result = builder_.BuildScanTableResult();
342 EXPECT_CALL(*connector2_, execute_query(_, _, _))
343 .WillOnce(Invoke([&scan_result](const std::string &,
344 MysqlResultWrapperPtr *out, bool) -> int {
345 *out = scan_result;
346 return 0;
347 }))
348 .RetiresOnSaturation();
349
350 ret = reader.start(ctx_);
351 ASSERT_EQ(ret, 0);
352
353 // get next row data
354 WriteRequest::Row row_data;
355 LsnContext ctx;
356 ret = reader.get_next_row_data(&row_data, &ctx);
357 ASSERT_EQ(ret, 0);
358 ASSERT_EQ(ctx.seq_id, (uint64_t)1);
359 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
360 ASSERT_EQ(row_data.primary_key(), (uint64_t)1);
361 ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT);
362 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name1");
363 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 18);
364 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "1,2,3,4");
365 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "1,2,3,5");
366
367 row_data.Clear();
368 ret = reader.get_next_row_data(&row_data, &ctx);
369 ASSERT_EQ(ret, 0);
370 ASSERT_EQ(ctx.status, RowDataStatus::NORMAL);
371 ASSERT_EQ(ctx.seq_id, (uint64_t)2);
372 ASSERT_EQ(row_data.primary_key(), (uint64_t)2);
373 ASSERT_EQ(row_data.operation_type(), ::proxima::be::proto::OP_INSERT);
374 ASSERT_EQ(row_data.forward_column_values().values(0).string_value(), "name2");
375 ASSERT_EQ(row_data.forward_column_values().values(1).int32_value(), 19);
376 ASSERT_EQ(row_data.index_column_values().values(0).string_value(), "2,2,3,4");
377 ASSERT_EQ(row_data.index_column_values().values(1).string_value(), "2,2,3,5");
378
379 row_data.Clear();
380 scan_result->set_has_error(true);
381 ret = reader.get_next_row_data(&row_data, &ctx);
382 ASSERT_EQ(ret, ErrorCode_FetchMysqlResult);
383}
384