1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gmock/gmock.h>
22
23
24#define private public
25#define protected public
26#include "repository/repository_common/config.h"
27#undef private
28#undef protected
29
30#include "repository/mysql_collection.h"
31#include "mock_index_agent_server.h"
32#include "mock_mysql_handler.h"
33#include "port_helper.h"
34
35const std::string collection_name = "mysql_collection_test.info";
36static int PORT = 8010;
37static int PID = 0;
38
39////////////////////////////////////////////////////////////////////
40class MysqlCollectionScanTest2 : public ::testing::Test {
41 protected:
42 MysqlCollectionScanTest2() {}
43 ~MysqlCollectionScanTest2() {}
44 void SetUp() {
45 PortHelper::GetPort(&PORT, &PID);
46 std::cout << "Server port: " << PORT << std::endl;
47 std::string index_uri = "127.0.0.1:" + std::to_string(PORT);
48 proxima::be::repository::Config::Instance()
49 .repository_config_.mutable_repository_config()
50 ->set_index_agent_addr(index_uri);
51 std::cout << "Set index addr: " << index_uri << std::endl;
52
53 proxima::be::repository::Config::Instance()
54 .repository_config_.mutable_repository_config()
55 ->set_batch_interval(1000000);
56 std::cout << "Set batch_interval to 1s" << std::endl;
57 }
58 void TearDown() {
59 PortHelper::RemovePortFile(PID);
60 }
61};
62
63TEST_F(MysqlCollectionScanTest2, TestGeneral) {
64 brpc::Server server_;
65 MockGeneralProximaServiceImpl svc_;
66 brpc::ServerOptions options;
67 ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE));
68 ASSERT_EQ(0, server_.Start(PORT, &options));
69 {
70 proto::CollectionConfig config;
71 config.set_collection_name(collection_name);
72
73 CollectionPtr collection{nullptr};
74 MockMysqlHandlerPtr mysql_handler =
75 std::make_shared<MockMysqlHandler>(config);
76 EXPECT_CALL(*mysql_handler, init(_))
77 .WillRepeatedly(Return(0))
78 .RetiresOnSaturation();
79 EXPECT_CALL(*mysql_handler, start(_))
80 .WillRepeatedly(Return(0))
81 .RetiresOnSaturation();
82
83 EXPECT_CALL(*mysql_handler,
84 get_next_row_data(Matcher<proto::WriteRequest::Row *>(_),
85 Matcher<LsnContext *>(_)))
86 .WillOnce(Invoke(
87 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
88 row_data->set_primary_key(1);
89 row_data->mutable_lsn_context()->set_lsn(1);
90 context->status = RowDataStatus::NORMAL;
91 return 0;
92 }))
93 .WillOnce(Invoke(
94 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
95 row_data->set_primary_key(2);
96 row_data->mutable_lsn_context()->set_lsn(2);
97 context->status = RowDataStatus::NORMAL;
98 return 0;
99 }))
100 .WillOnce(Invoke(
101 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
102 row_data->set_primary_key(3);
103 context->status = RowDataStatus::NO_MORE_DATA;
104 return 0;
105 }))
106 .WillOnce(Invoke(
107 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
108 row_data->set_primary_key(3);
109 row_data->mutable_lsn_context()->set_lsn(3);
110 context->status = RowDataStatus::NORMAL;
111 return 0;
112 }))
113 .WillOnce(Invoke(
114 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
115 row_data->set_primary_key(4);
116 row_data->mutable_lsn_context()->set_lsn(4);
117 context->status = RowDataStatus::NORMAL;
118 return 0;
119 }))
120 .WillRepeatedly(Invoke(
121 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
122 row_data->set_primary_key(3);
123 context->status = RowDataStatus::SCHEMA_CHANGED;
124 return 0;
125 }))
126 .RetiresOnSaturation();
127
128 EXPECT_CALL(*mysql_handler,
129 reset_status(Matcher<ScanMode>(_),
130 Matcher<const proto::CollectionConfig &>(_),
131 Matcher<const LsnContext &>(_)))
132 // EXPECT_CALL(*mysql_handler, reset_status(_, _, _))
133 .WillRepeatedly(Return(0))
134 .RetiresOnSaturation();
135
136 EXPECT_CALL(*mysql_handler, get_fields_meta(_))
137 .WillRepeatedly(Invoke([](proto::WriteRequest::RowMeta *meta) -> int {
138 meta->add_index_column_metas()->set_column_name("index1");
139 meta->add_index_column_metas()->set_column_name("index2");
140 meta->add_forward_column_names("forward1");
141 meta->add_forward_column_names("forward2");
142 return 0;
143 }))
144 .RetiresOnSaturation();
145
146 EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _))
147 .WillRepeatedly(Return(0))
148 .RetiresOnSaturation();
149
150 collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler));
151
152 int ret = collection->init();
153 ASSERT_EQ(ret, 0);
154 CollectionStatus current_state = collection->state();
155 ASSERT_EQ(current_state, CollectionStatus::INIT);
156 collection->run();
157 sleep(1);
158
159 // check value
160 ASSERT_EQ(svc_.get_server_called_count(), 2);
161 proto::WriteRequest request;
162 std::string request_str = svc_.get_request_string(0);
163 ASSERT_EQ(request_str.empty(), false);
164 ret = request.ParseFromString(request_str);
165
166 ASSERT_EQ(ret, true);
167 auto rows = request.rows();
168 ASSERT_EQ(rows.size(), 2);
169 auto row1 = rows[0];
170 auto row2 = rows[1];
171 // row1
172 ASSERT_EQ(row1.primary_key(), 1);
173 ASSERT_EQ(row1.lsn_context().lsn(), 1);
174 // row2
175 ASSERT_EQ(row2.primary_key(), 2);
176 ASSERT_EQ(row2.lsn_context().lsn(), 2);
177 // meta
178 auto meta = request.row_meta();
179 ASSERT_EQ(meta.index_column_metas_size(), 2);
180 auto index_column_metas = meta.index_column_metas();
181 ASSERT_EQ(index_column_metas[0].column_name(), "index1");
182 ASSERT_EQ(index_column_metas[1].column_name(), "index2");
183 ASSERT_EQ(meta.index_column_metas_size(), 2);
184 auto forward_column_names = meta.forward_column_names();
185 ASSERT_EQ(forward_column_names[0], "forward1");
186 ASSERT_EQ(forward_column_names[1], "forward2");
187
188 proto::WriteRequest request2;
189 request_str = svc_.get_request_string(1);
190 ASSERT_EQ(request_str.empty(), false);
191 ret = request.ParseFromString(request_str);
192
193 ASSERT_EQ(ret, true);
194 rows = request.rows();
195 ASSERT_EQ(rows.size(), 2);
196 row1 = rows[0];
197 row2 = rows[1];
198 // row1
199 ASSERT_EQ(row1.primary_key(), 3);
200 ASSERT_EQ(row1.lsn_context().lsn(), 3);
201 // row2
202 ASSERT_EQ(row2.primary_key(), 4);
203 ASSERT_EQ(row2.lsn_context().lsn(), 4);
204 // meta
205 meta = request.row_meta();
206 ASSERT_EQ(meta.index_column_metas_size(), 2);
207 index_column_metas = meta.index_column_metas();
208 ASSERT_EQ(index_column_metas[0].column_name(), "index1");
209 ASSERT_EQ(index_column_metas[1].column_name(), "index2");
210 ASSERT_EQ(meta.index_column_metas_size(), 2);
211 forward_column_names = meta.forward_column_names();
212 ASSERT_EQ(forward_column_names[0], "forward1");
213 ASSERT_EQ(forward_column_names[1], "forward2");
214 // exit
215 collection->stop();
216 sleep(2);
217 LOG_INFO("MysqlCollectionTest2::TestGeneral PASS");
218 }
219
220 ASSERT_EQ(0, server_.Stop(0));
221 ASSERT_EQ(0, server_.Join());
222}
223