1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gmock/gmock.h>
22
23
24#define private public
25#define protected public
26#include "repository/repository_common/config.h"
27#undef private
28#undef protected
29
30#include "repository/mysql_collection.h"
31#include "mock_index_agent_server.h"
32#include "mock_mysql_handler.h"
33#include "port_helper.h"
34
35const std::string collection_name = "mysql_collection_test.info";
36static int PORT = 8010;
37static int PID = 0;
38
39////////////////////////////////////////////////////////////////////
40class MysqlCollectionSchemaChangeTest2 : public ::testing::Test {
41 protected:
42 MysqlCollectionSchemaChangeTest2() {}
43 ~MysqlCollectionSchemaChangeTest2() {}
44 void SetUp() {
45 PortHelper::GetPort(&PORT, &PID);
46 std::cout << "Server port: " << PORT << std::endl;
47 std::string index_uri = "127.0.0.1:" + std::to_string(PORT);
48 proxima::be::repository::Config::Instance()
49 .repository_config_.mutable_repository_config()
50 ->set_index_agent_addr(index_uri);
51 std::cout << "Set index addr: " << index_uri << std::endl;
52
53 proxima::be::repository::Config::Instance()
54 .repository_config_.mutable_repository_config()
55 ->set_batch_interval(1000000);
56 std::cout << "Set batch_interval to 1s" << std::endl;
57 }
58 void TearDown() {
59 PortHelper::RemovePortFile(PID);
60 }
61};
62
63TEST_F(MysqlCollectionSchemaChangeTest2, TestGeneral) {
64 brpc::Server server_;
65 MockGeneralProximaServiceImpl svc_;
66 brpc::ServerOptions options;
67 ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE));
68 ASSERT_EQ(0, server_.Start(PORT, &options));
69 {
70 proto::CollectionConfig config;
71 config.set_collection_name(collection_name);
72
73 CollectionPtr collection{nullptr};
74 MockMysqlHandlerPtr mysql_handler =
75 std::make_shared<MockMysqlHandler>(config);
76 EXPECT_CALL(*mysql_handler, init(_))
77 .WillRepeatedly(Return(0))
78 .RetiresOnSaturation();
79 EXPECT_CALL(*mysql_handler, start(_))
80 .WillRepeatedly(Return(0))
81 .RetiresOnSaturation();
82
83 EXPECT_CALL(*mysql_handler,
84 get_next_row_data(Matcher<proto::WriteRequest::Row *>(_),
85 Matcher<LsnContext *>(_)))
86 .WillOnce(Invoke(
87 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
88 row_data->set_primary_key(1);
89 row_data->mutable_lsn_context()->set_lsn(1);
90 context->status = RowDataStatus::NORMAL;
91 return 0;
92 }))
93 .WillOnce(Invoke(
94 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
95 row_data->set_primary_key(2);
96 row_data->mutable_lsn_context()->set_lsn(2);
97 context->status = RowDataStatus::NORMAL;
98 return 0;
99 }))
100 .WillOnce(Invoke(
101 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
102 row_data->set_primary_key(3);
103 context->status = RowDataStatus::SCHEMA_CHANGED;
104 return 0;
105 }))
106 .WillOnce(Invoke(
107 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
108 row_data->set_primary_key(3);
109 row_data->mutable_lsn_context()->set_lsn(3);
110 context->status = RowDataStatus::NORMAL;
111 return 0;
112 }))
113 .WillOnce(Invoke(
114 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
115 row_data->set_primary_key(4);
116 row_data->mutable_lsn_context()->set_lsn(4);
117 context->status = RowDataStatus::NORMAL;
118 return 0;
119 }))
120 .WillRepeatedly(Invoke(
121 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
122 row_data->set_primary_key(3);
123 context->status = RowDataStatus::NO_MORE_DATA;
124 return 0;
125 }))
126 .RetiresOnSaturation();
127
128 EXPECT_CALL(*mysql_handler,
129 reset_status(Matcher<ScanMode>(_),
130 Matcher<const proto::CollectionConfig &>(_),
131 Matcher<const LsnContext &>(_)))
132 // EXPECT_CALL(*mysql_handler, reset_status(_, _, _))
133 .WillRepeatedly(Return(0))
134 .RetiresOnSaturation();
135
136 EXPECT_CALL(*mysql_handler, get_fields_meta(_))
137 .WillOnce(Invoke([](proto::WriteRequest::RowMeta *meta) -> int {
138 meta->add_index_column_metas()->set_column_name("index1");
139 meta->add_index_column_metas()->set_column_name("index2");
140 meta->add_forward_column_names("forward1");
141 meta->add_forward_column_names("forward2");
142 return 0;
143 }))
144 .WillOnce(Invoke([](proto::WriteRequest::RowMeta *meta) -> int {
145 meta->add_index_column_metas()->set_column_name("new-index1");
146 meta->add_index_column_metas()->set_column_name("new-index2");
147 meta->add_forward_column_names("new-forward1");
148 meta->add_forward_column_names("new-forward2");
149 return 0;
150 }))
151 .WillRepeatedly(Return(0))
152 .RetiresOnSaturation();
153
154 EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _))
155 .WillRepeatedly(Return(0))
156 .RetiresOnSaturation();
157
158 collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler));
159
160 int ret = collection->init();
161 ASSERT_EQ(ret, 0);
162 CollectionStatus current_state = collection->state();
163 ASSERT_EQ(current_state, CollectionStatus::INIT);
164 collection->run();
165 sleep(1);
166
167 // check value
168 ASSERT_EQ(svc_.get_server_called_count(), 2);
169 proto::WriteRequest request;
170 std::string request_str = svc_.get_request_string(0);
171 ASSERT_EQ(request_str.empty(), false);
172 ret = request.ParseFromString(request_str);
173
174 ASSERT_EQ(ret, true);
175 auto rows = request.rows();
176 ASSERT_EQ(rows.size(), 2);
177 auto row1 = rows[0];
178 auto row2 = rows[1];
179 // row1
180 ASSERT_EQ(row1.primary_key(), 1);
181 ASSERT_EQ(row1.lsn_context().lsn(), 1);
182 // row2
183 ASSERT_EQ(row2.primary_key(), 2);
184 ASSERT_EQ(row2.lsn_context().lsn(), 2);
185 // meta
186 auto meta = request.row_meta();
187 auto index_column_metas = meta.index_column_metas();
188 ASSERT_EQ(index_column_metas[0].column_name(), "index1");
189 ASSERT_EQ(index_column_metas[1].column_name(), "index2");
190 ASSERT_EQ(meta.index_column_metas_size(), 2);
191 auto forward_column_names = meta.forward_column_names();
192 ASSERT_EQ(forward_column_names[0], "forward1");
193 ASSERT_EQ(forward_column_names[1], "forward2");
194
195 proto::WriteRequest request2;
196 request_str = svc_.get_request_string(1);
197 ASSERT_EQ(request_str.empty(), false);
198 ret = request.ParseFromString(request_str);
199
200 ASSERT_EQ(ret, true);
201 rows = request.rows();
202 ASSERT_EQ(rows.size(), 2);
203 row1 = rows[0];
204 row2 = rows[1];
205 // row1
206 ASSERT_EQ(row1.primary_key(), 3);
207 ASSERT_EQ(row1.lsn_context().lsn(), 3);
208 // row2
209 ASSERT_EQ(row2.primary_key(), 4);
210 ASSERT_EQ(row2.lsn_context().lsn(), 4);
211 // meta
212 meta = request.row_meta();
213 index_column_metas = meta.index_column_metas();
214 ASSERT_EQ(index_column_metas[0].column_name(), "new-index1");
215 ASSERT_EQ(index_column_metas[1].column_name(), "new-index2");
216 ASSERT_EQ(meta.index_column_metas_size(), 2);
217 forward_column_names = meta.forward_column_names();
218 ASSERT_EQ(forward_column_names[0], "new-forward1");
219 ASSERT_EQ(forward_column_names[1], "new-forward2");
220 // exit
221 collection->stop();
222 sleep(2);
223 LOG_INFO("MysqlCollectionTest2::TestGeneral PASS");
224 }
225
226 ASSERT_EQ(0, server_.Stop(0));
227 ASSERT_EQ(0, server_.Join());
228}
229