1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Dianzhang.Chen |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gmock/gmock.h> |
22 | |
23 | #define private public |
24 | #define protected public |
25 | #include "repository/repository_common/config.h" |
26 | #undef private |
27 | #undef protected |
28 | |
29 | #include "repository/mysql_collection.h" |
30 | #include "mock_index_agent_server.h" |
31 | #include "mock_mysql_handler.h" |
32 | #include "port_helper.h" |
33 | |
34 | |
35 | const std::string collection_name = "mysql_collection_test.info" ; |
36 | static int PORT = 8010; |
37 | static int PID = 0; |
38 | |
39 | //////////////////////////////////////////////////////////////////// |
40 | class MysqlCollectionTest2 : public ::testing::Test { |
41 | protected: |
42 | MysqlCollectionTest2() {} |
43 | ~MysqlCollectionTest2() {} |
44 | void SetUp() { |
45 | PortHelper::GetPort(&PORT, &PID); |
46 | std::cout << "Server port: " << PORT << std::endl; |
47 | std::string index_uri = "127.0.0.1:" + std::to_string(PORT); |
48 | proxima::be::repository::Config::Instance() |
49 | .repository_config_.mutable_repository_config() |
50 | ->set_index_agent_addr(index_uri); |
51 | std::cout << "Set index addr: " << index_uri << std::endl; |
52 | |
53 | proxima::be::repository::Config::Instance() |
54 | .repository_config_.mutable_repository_config() |
55 | ->set_batch_interval(1000000); |
56 | std::cout << "Set batch_interval to 1s" << std::endl; |
57 | } |
58 | void TearDown() { |
59 | PortHelper::RemovePortFile(PID); |
60 | } |
61 | }; |
62 | |
63 | TEST_F(MysqlCollectionTest2, TestGeneral) { |
64 | brpc::Server server_; |
65 | MockGeneralProximaServiceImpl svc_; |
66 | brpc::ServerOptions options; |
67 | ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE)); |
68 | ASSERT_EQ(0, server_.Start(PORT, &options)); |
69 | { |
70 | proto::CollectionConfig config; |
71 | config.set_collection_name(collection_name); |
72 | |
73 | CollectionPtr collection{nullptr}; |
74 | MockMysqlHandlerPtr mysql_handler = |
75 | std::make_shared<MockMysqlHandler>(config); |
76 | EXPECT_CALL(*mysql_handler, init(_)) |
77 | .WillRepeatedly(Return(0)) |
78 | .RetiresOnSaturation(); |
79 | EXPECT_CALL(*mysql_handler, start(_)) |
80 | .WillRepeatedly(Return(0)) |
81 | .RetiresOnSaturation(); |
82 | |
83 | EXPECT_CALL(*mysql_handler, |
84 | get_next_row_data(Matcher<proto::WriteRequest::Row *>(_), |
85 | Matcher<LsnContext *>(_))) |
86 | // EXPECT_CALL(*mysql_handler, get_next_row_data(_, _)) |
87 | .WillOnce(Invoke( |
88 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
89 | row_data->set_primary_key(1); |
90 | row_data->mutable_lsn_context()->set_lsn(1); |
91 | row_data->set_operation_type( |
92 | proxima::be::proto::OperationType::OP_INSERT); |
93 | row_data->mutable_index_column_values() |
94 | ->add_values() |
95 | ->set_string_value("index_column1" ); |
96 | row_data->mutable_index_column_values() |
97 | ->add_values() |
98 | ->set_double_value(3.1415926); |
99 | row_data->mutable_forward_column_values() |
100 | ->add_values() |
101 | ->set_string_value("forward_column1" ); |
102 | row_data->mutable_forward_column_values() |
103 | ->add_values() |
104 | ->set_double_value(3.1415926); |
105 | context->status = RowDataStatus::NORMAL; |
106 | return 0; |
107 | })) |
108 | .WillOnce(Invoke( |
109 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
110 | row_data->set_primary_key(2); |
111 | row_data->mutable_lsn_context()->set_lsn(2); |
112 | row_data->set_operation_type( |
113 | proxima::be::proto::OperationType::OP_UPDATE); |
114 | row_data->mutable_index_column_values() |
115 | ->add_values() |
116 | ->set_string_value("index_column1" ); |
117 | row_data->mutable_index_column_values() |
118 | ->add_values() |
119 | ->set_double_value(3.1415926); |
120 | row_data->mutable_forward_column_values() |
121 | ->add_values() |
122 | ->set_string_value("forward_column1" ); |
123 | row_data->mutable_forward_column_values() |
124 | ->add_values() |
125 | ->set_double_value(3.1415926); |
126 | context->status = RowDataStatus::NORMAL; |
127 | return 0; |
128 | })) |
129 | .WillRepeatedly(Invoke( |
130 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
131 | row_data->set_primary_key(3); |
132 | context->status = RowDataStatus::NO_MORE_DATA; |
133 | return 0; |
134 | })) |
135 | .RetiresOnSaturation(); |
136 | |
137 | EXPECT_CALL(*mysql_handler, |
138 | reset_status(Matcher<ScanMode>(_), |
139 | Matcher<const proto::CollectionConfig &>(_), |
140 | Matcher<const LsnContext &>(_))) |
141 | // EXPECT_CALL(*mysql_handler, reset_status(_, _, _)) |
142 | .WillRepeatedly(Return(0)) |
143 | .RetiresOnSaturation(); |
144 | |
145 | EXPECT_CALL(*mysql_handler, get_fields_meta(_)) |
146 | .WillRepeatedly(Return(0)) |
147 | .RetiresOnSaturation(); |
148 | |
149 | EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _)) |
150 | .WillRepeatedly(Return(0)) |
151 | .RetiresOnSaturation(); |
152 | |
153 | collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler)); |
154 | |
155 | int ret = collection->init(); |
156 | ASSERT_EQ(ret, 0); |
157 | CollectionStatus current_state = collection->state(); |
158 | ASSERT_EQ(current_state, CollectionStatus::INIT); |
159 | collection->run(); |
160 | sleep(1); |
161 | |
162 | // check value |
163 | ASSERT_EQ(svc_.get_server_called_count(), 1); |
164 | proto::WriteRequest request; |
165 | std::string request_str = svc_.get_request_string(0); |
166 | ASSERT_EQ(request_str.empty(), false); |
167 | ret = request.ParseFromString(request_str); |
168 | |
169 | ASSERT_EQ(ret, true); |
170 | auto rows = request.rows(); |
171 | ASSERT_EQ(rows.size(), 2); |
172 | auto row1 = rows[0]; |
173 | auto row2 = rows[1]; |
174 | // row1 |
175 | ASSERT_EQ(row1.primary_key(), 1); |
176 | ASSERT_EQ(row1.operation_type(), |
177 | proxima::be::proto::OperationType::OP_INSERT); |
178 | ASSERT_EQ(row1.lsn_context().lsn(), 1); |
179 | ASSERT_EQ(row1.index_column_values().values_size(), 2); |
180 | auto row1_index_columns = row1.index_column_values().values(); |
181 | auto row1_index_column1 = row1_index_columns[0]; |
182 | auto row1_index_column2 = row1_index_columns[1]; |
183 | ASSERT_EQ(row1_index_column1.value_oneof_case(), |
184 | row1_index_column1.kStringValue); |
185 | ASSERT_EQ(row1_index_column2.value_oneof_case(), |
186 | row1_index_column2.kDoubleValue); |
187 | ASSERT_EQ(row1_index_column1.string_value(), "index_column1" ); |
188 | ASSERT_EQ(row1_index_column2.double_value(), 3.1415926); |
189 | ASSERT_EQ(row1.forward_column_values().values_size(), 2); |
190 | auto row1_forward_columns = row1.forward_column_values().values(); |
191 | auto row1_forward_column1 = row1_forward_columns[0]; |
192 | auto row1_forward_column2 = row1_forward_columns[1]; |
193 | ASSERT_EQ(row1_forward_column1.value_oneof_case(), |
194 | row1_forward_column1.kStringValue); |
195 | ASSERT_EQ(row1_forward_column2.value_oneof_case(), |
196 | row1_forward_column2.kDoubleValue); |
197 | ASSERT_EQ(row1_forward_column1.string_value(), "forward_column1" ); |
198 | ASSERT_EQ(row1_forward_column2.double_value(), 3.1415926); |
199 | // row2 |
200 | ASSERT_EQ(row2.primary_key(), 2); |
201 | ASSERT_EQ(row2.operation_type(), |
202 | proxima::be::proto::OperationType::OP_UPDATE); |
203 | ASSERT_EQ(row2.lsn_context().lsn(), 2); |
204 | ASSERT_EQ(row2.index_column_values().values_size(), 2); |
205 | auto row2_index_columns = row2.index_column_values().values(); |
206 | auto row2_index_column1 = row2_index_columns[0]; |
207 | auto row2_index_column2 = row2_index_columns[1]; |
208 | ASSERT_EQ(row2_index_column1.value_oneof_case(), |
209 | row2_index_column1.kStringValue); |
210 | ASSERT_EQ(row2_index_column2.value_oneof_case(), |
211 | row2_index_column2.kDoubleValue); |
212 | ASSERT_EQ(row2_index_column1.string_value(), "index_column1" ); |
213 | ASSERT_EQ(row2_index_column2.double_value(), 3.1415926); |
214 | ASSERT_EQ(row2.forward_column_values().values_size(), 2); |
215 | auto row2_forward_columns = row2.forward_column_values().values(); |
216 | auto row2_forward_column1 = row2_forward_columns[0]; |
217 | auto row2_forward_column2 = row2_forward_columns[1]; |
218 | ASSERT_EQ(row2_forward_column1.value_oneof_case(), |
219 | row2_forward_column1.kStringValue); |
220 | ASSERT_EQ(row2_forward_column2.value_oneof_case(), |
221 | row2_forward_column2.kDoubleValue); |
222 | ASSERT_EQ(row2_forward_column1.string_value(), "forward_column1" ); |
223 | ASSERT_EQ(row2_forward_column2.double_value(), 3.1415926); |
224 | // exit |
225 | collection->stop(); |
226 | sleep(2); |
227 | LOG_INFO("MysqlCollectionTest2::TestGeneral PASS" ); |
228 | } |
229 | |
230 | ASSERT_EQ(0, server_.Stop(0)); |
231 | ASSERT_EQ(0, server_.Join()); |
232 | } |
233 | |