1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Dianzhang.Chen |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gmock/gmock.h> |
22 | |
23 | |
24 | #define private public |
25 | #define protected public |
26 | #include "repository/repository_common/config.h" |
27 | #undef private |
28 | #undef protected |
29 | |
30 | #include "repository/mysql_collection.h" |
31 | #include "mock_index_agent_server.h" |
32 | #include "mock_mysql_handler.h" |
33 | #include "port_helper.h" |
34 | |
35 | const std::string collection_name = "mysql_collection_test.info" ; |
36 | static int PORT = 8010; |
37 | static int PID = 0; |
38 | |
39 | //////////////////////////////////////////////////////////////////// |
40 | class MysqlCollectionSchemaChangeTest2 : public ::testing::Test { |
41 | protected: |
42 | MysqlCollectionSchemaChangeTest2() {} |
43 | ~MysqlCollectionSchemaChangeTest2() {} |
44 | void SetUp() { |
45 | PortHelper::GetPort(&PORT, &PID); |
46 | std::cout << "Server port: " << PORT << std::endl; |
47 | std::string index_uri = "127.0.0.1:" + std::to_string(PORT); |
48 | proxima::be::repository::Config::Instance() |
49 | .repository_config_.mutable_repository_config() |
50 | ->set_index_agent_addr(index_uri); |
51 | std::cout << "Set index addr: " << index_uri << std::endl; |
52 | |
53 | proxima::be::repository::Config::Instance() |
54 | .repository_config_.mutable_repository_config() |
55 | ->set_batch_interval(1000000); |
56 | std::cout << "Set batch_interval to 1s" << std::endl; |
57 | } |
58 | void TearDown() { |
59 | PortHelper::RemovePortFile(PID); |
60 | } |
61 | }; |
62 | |
63 | TEST_F(MysqlCollectionSchemaChangeTest2, TestGeneral) { |
64 | brpc::Server server_; |
65 | MockGeneralProximaServiceImpl svc_; |
66 | brpc::ServerOptions options; |
67 | ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE)); |
68 | ASSERT_EQ(0, server_.Start(PORT, &options)); |
69 | { |
70 | proto::CollectionConfig config; |
71 | config.set_collection_name(collection_name); |
72 | |
73 | CollectionPtr collection{nullptr}; |
74 | MockMysqlHandlerPtr mysql_handler = |
75 | std::make_shared<MockMysqlHandler>(config); |
76 | EXPECT_CALL(*mysql_handler, init(_)) |
77 | .WillRepeatedly(Return(0)) |
78 | .RetiresOnSaturation(); |
79 | EXPECT_CALL(*mysql_handler, start(_)) |
80 | .WillRepeatedly(Return(0)) |
81 | .RetiresOnSaturation(); |
82 | |
83 | EXPECT_CALL(*mysql_handler, |
84 | get_next_row_data(Matcher<proto::WriteRequest::Row *>(_), |
85 | Matcher<LsnContext *>(_))) |
86 | .WillOnce(Invoke( |
87 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
88 | row_data->set_primary_key(1); |
89 | row_data->mutable_lsn_context()->set_lsn(1); |
90 | context->status = RowDataStatus::NORMAL; |
91 | return 0; |
92 | })) |
93 | .WillOnce(Invoke( |
94 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
95 | row_data->set_primary_key(2); |
96 | row_data->mutable_lsn_context()->set_lsn(2); |
97 | context->status = RowDataStatus::NORMAL; |
98 | return 0; |
99 | })) |
100 | .WillOnce(Invoke( |
101 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
102 | row_data->set_primary_key(3); |
103 | context->status = RowDataStatus::SCHEMA_CHANGED; |
104 | return 0; |
105 | })) |
106 | .WillOnce(Invoke( |
107 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
108 | row_data->set_primary_key(3); |
109 | row_data->mutable_lsn_context()->set_lsn(3); |
110 | context->status = RowDataStatus::NORMAL; |
111 | return 0; |
112 | })) |
113 | .WillOnce(Invoke( |
114 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
115 | row_data->set_primary_key(4); |
116 | row_data->mutable_lsn_context()->set_lsn(4); |
117 | context->status = RowDataStatus::NORMAL; |
118 | return 0; |
119 | })) |
120 | .WillRepeatedly(Invoke( |
121 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
122 | row_data->set_primary_key(3); |
123 | context->status = RowDataStatus::NO_MORE_DATA; |
124 | return 0; |
125 | })) |
126 | .RetiresOnSaturation(); |
127 | |
128 | EXPECT_CALL(*mysql_handler, |
129 | reset_status(Matcher<ScanMode>(_), |
130 | Matcher<const proto::CollectionConfig &>(_), |
131 | Matcher<const LsnContext &>(_))) |
132 | // EXPECT_CALL(*mysql_handler, reset_status(_, _, _)) |
133 | .WillRepeatedly(Return(0)) |
134 | .RetiresOnSaturation(); |
135 | |
136 | EXPECT_CALL(*mysql_handler, get_fields_meta(_)) |
137 | .WillOnce(Invoke([](proto::WriteRequest::RowMeta *meta) -> int { |
138 | meta->add_index_column_metas()->set_column_name("index1" ); |
139 | meta->add_index_column_metas()->set_column_name("index2" ); |
140 | meta->add_forward_column_names("forward1" ); |
141 | meta->add_forward_column_names("forward2" ); |
142 | return 0; |
143 | })) |
144 | .WillOnce(Invoke([](proto::WriteRequest::RowMeta *meta) -> int { |
145 | meta->add_index_column_metas()->set_column_name("new-index1" ); |
146 | meta->add_index_column_metas()->set_column_name("new-index2" ); |
147 | meta->add_forward_column_names("new-forward1" ); |
148 | meta->add_forward_column_names("new-forward2" ); |
149 | return 0; |
150 | })) |
151 | .WillRepeatedly(Return(0)) |
152 | .RetiresOnSaturation(); |
153 | |
154 | EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _)) |
155 | .WillRepeatedly(Return(0)) |
156 | .RetiresOnSaturation(); |
157 | |
158 | collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler)); |
159 | |
160 | int ret = collection->init(); |
161 | ASSERT_EQ(ret, 0); |
162 | CollectionStatus current_state = collection->state(); |
163 | ASSERT_EQ(current_state, CollectionStatus::INIT); |
164 | collection->run(); |
165 | sleep(1); |
166 | |
167 | // check value |
168 | ASSERT_EQ(svc_.get_server_called_count(), 2); |
169 | proto::WriteRequest request; |
170 | std::string request_str = svc_.get_request_string(0); |
171 | ASSERT_EQ(request_str.empty(), false); |
172 | ret = request.ParseFromString(request_str); |
173 | |
174 | ASSERT_EQ(ret, true); |
175 | auto rows = request.rows(); |
176 | ASSERT_EQ(rows.size(), 2); |
177 | auto row1 = rows[0]; |
178 | auto row2 = rows[1]; |
179 | // row1 |
180 | ASSERT_EQ(row1.primary_key(), 1); |
181 | ASSERT_EQ(row1.lsn_context().lsn(), 1); |
182 | // row2 |
183 | ASSERT_EQ(row2.primary_key(), 2); |
184 | ASSERT_EQ(row2.lsn_context().lsn(), 2); |
185 | // meta |
186 | auto meta = request.row_meta(); |
187 | auto index_column_metas = meta.index_column_metas(); |
188 | ASSERT_EQ(index_column_metas[0].column_name(), "index1" ); |
189 | ASSERT_EQ(index_column_metas[1].column_name(), "index2" ); |
190 | ASSERT_EQ(meta.index_column_metas_size(), 2); |
191 | auto forward_column_names = meta.forward_column_names(); |
192 | ASSERT_EQ(forward_column_names[0], "forward1" ); |
193 | ASSERT_EQ(forward_column_names[1], "forward2" ); |
194 | |
195 | proto::WriteRequest request2; |
196 | request_str = svc_.get_request_string(1); |
197 | ASSERT_EQ(request_str.empty(), false); |
198 | ret = request.ParseFromString(request_str); |
199 | |
200 | ASSERT_EQ(ret, true); |
201 | rows = request.rows(); |
202 | ASSERT_EQ(rows.size(), 2); |
203 | row1 = rows[0]; |
204 | row2 = rows[1]; |
205 | // row1 |
206 | ASSERT_EQ(row1.primary_key(), 3); |
207 | ASSERT_EQ(row1.lsn_context().lsn(), 3); |
208 | // row2 |
209 | ASSERT_EQ(row2.primary_key(), 4); |
210 | ASSERT_EQ(row2.lsn_context().lsn(), 4); |
211 | // meta |
212 | meta = request.row_meta(); |
213 | index_column_metas = meta.index_column_metas(); |
214 | ASSERT_EQ(index_column_metas[0].column_name(), "new-index1" ); |
215 | ASSERT_EQ(index_column_metas[1].column_name(), "new-index2" ); |
216 | ASSERT_EQ(meta.index_column_metas_size(), 2); |
217 | forward_column_names = meta.forward_column_names(); |
218 | ASSERT_EQ(forward_column_names[0], "new-forward1" ); |
219 | ASSERT_EQ(forward_column_names[1], "new-forward2" ); |
220 | // exit |
221 | collection->stop(); |
222 | sleep(2); |
223 | LOG_INFO("MysqlCollectionTest2::TestGeneral PASS" ); |
224 | } |
225 | |
226 | ASSERT_EQ(0, server_.Stop(0)); |
227 | ASSERT_EQ(0, server_.Join()); |
228 | } |
229 | |