1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gmock/gmock.h>
22
23#define private public
24#define protected public
25#include "repository/repository_common/config.h"
26#undef private
27#undef protected
28
29#include "repository/mysql_collection.h"
30#include "mock_index_agent_server.h"
31#include "mock_mysql_handler.h"
32#include "port_helper.h"
33
34
35const std::string collection_name = "mysql_collection_test.info";
36static int PORT = 8010;
37static int PID = 0;
38
39////////////////////////////////////////////////////////////////////
40class MysqlCollectionTest2 : public ::testing::Test {
41 protected:
42 MysqlCollectionTest2() {}
43 ~MysqlCollectionTest2() {}
44 void SetUp() {
45 PortHelper::GetPort(&PORT, &PID);
46 std::cout << "Server port: " << PORT << std::endl;
47 std::string index_uri = "127.0.0.1:" + std::to_string(PORT);
48 proxima::be::repository::Config::Instance()
49 .repository_config_.mutable_repository_config()
50 ->set_index_agent_addr(index_uri);
51 std::cout << "Set index addr: " << index_uri << std::endl;
52
53 proxima::be::repository::Config::Instance()
54 .repository_config_.mutable_repository_config()
55 ->set_batch_interval(1000000);
56 std::cout << "Set batch_interval to 1s" << std::endl;
57 }
58 void TearDown() {
59 PortHelper::RemovePortFile(PID);
60 }
61};
62
63TEST_F(MysqlCollectionTest2, TestGeneral) {
64 brpc::Server server_;
65 MockGeneralProximaServiceImpl svc_;
66 brpc::ServerOptions options;
67 ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE));
68 ASSERT_EQ(0, server_.Start(PORT, &options));
69 {
70 proto::CollectionConfig config;
71 config.set_collection_name(collection_name);
72
73 CollectionPtr collection{nullptr};
74 MockMysqlHandlerPtr mysql_handler =
75 std::make_shared<MockMysqlHandler>(config);
76 EXPECT_CALL(*mysql_handler, init(_))
77 .WillRepeatedly(Return(0))
78 .RetiresOnSaturation();
79 EXPECT_CALL(*mysql_handler, start(_))
80 .WillRepeatedly(Return(0))
81 .RetiresOnSaturation();
82
83 EXPECT_CALL(*mysql_handler,
84 get_next_row_data(Matcher<proto::WriteRequest::Row *>(_),
85 Matcher<LsnContext *>(_)))
86 // EXPECT_CALL(*mysql_handler, get_next_row_data(_, _))
87 .WillOnce(Invoke(
88 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
89 row_data->set_primary_key(1);
90 row_data->mutable_lsn_context()->set_lsn(1);
91 row_data->set_operation_type(
92 proxima::be::proto::OperationType::OP_INSERT);
93 row_data->mutable_index_column_values()
94 ->add_values()
95 ->set_string_value("index_column1");
96 row_data->mutable_index_column_values()
97 ->add_values()
98 ->set_double_value(3.1415926);
99 row_data->mutable_forward_column_values()
100 ->add_values()
101 ->set_string_value("forward_column1");
102 row_data->mutable_forward_column_values()
103 ->add_values()
104 ->set_double_value(3.1415926);
105 context->status = RowDataStatus::NORMAL;
106 return 0;
107 }))
108 .WillOnce(Invoke(
109 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
110 row_data->set_primary_key(2);
111 row_data->mutable_lsn_context()->set_lsn(2);
112 row_data->set_operation_type(
113 proxima::be::proto::OperationType::OP_UPDATE);
114 row_data->mutable_index_column_values()
115 ->add_values()
116 ->set_string_value("index_column1");
117 row_data->mutable_index_column_values()
118 ->add_values()
119 ->set_double_value(3.1415926);
120 row_data->mutable_forward_column_values()
121 ->add_values()
122 ->set_string_value("forward_column1");
123 row_data->mutable_forward_column_values()
124 ->add_values()
125 ->set_double_value(3.1415926);
126 context->status = RowDataStatus::NORMAL;
127 return 0;
128 }))
129 .WillRepeatedly(Invoke(
130 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
131 row_data->set_primary_key(3);
132 context->status = RowDataStatus::NO_MORE_DATA;
133 return 0;
134 }))
135 .RetiresOnSaturation();
136
137 EXPECT_CALL(*mysql_handler,
138 reset_status(Matcher<ScanMode>(_),
139 Matcher<const proto::CollectionConfig &>(_),
140 Matcher<const LsnContext &>(_)))
141 // EXPECT_CALL(*mysql_handler, reset_status(_, _, _))
142 .WillRepeatedly(Return(0))
143 .RetiresOnSaturation();
144
145 EXPECT_CALL(*mysql_handler, get_fields_meta(_))
146 .WillRepeatedly(Return(0))
147 .RetiresOnSaturation();
148
149 EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _))
150 .WillRepeatedly(Return(0))
151 .RetiresOnSaturation();
152
153 collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler));
154
155 int ret = collection->init();
156 ASSERT_EQ(ret, 0);
157 CollectionStatus current_state = collection->state();
158 ASSERT_EQ(current_state, CollectionStatus::INIT);
159 collection->run();
160 sleep(1);
161
162 // check value
163 ASSERT_EQ(svc_.get_server_called_count(), 1);
164 proto::WriteRequest request;
165 std::string request_str = svc_.get_request_string(0);
166 ASSERT_EQ(request_str.empty(), false);
167 ret = request.ParseFromString(request_str);
168
169 ASSERT_EQ(ret, true);
170 auto rows = request.rows();
171 ASSERT_EQ(rows.size(), 2);
172 auto row1 = rows[0];
173 auto row2 = rows[1];
174 // row1
175 ASSERT_EQ(row1.primary_key(), 1);
176 ASSERT_EQ(row1.operation_type(),
177 proxima::be::proto::OperationType::OP_INSERT);
178 ASSERT_EQ(row1.lsn_context().lsn(), 1);
179 ASSERT_EQ(row1.index_column_values().values_size(), 2);
180 auto row1_index_columns = row1.index_column_values().values();
181 auto row1_index_column1 = row1_index_columns[0];
182 auto row1_index_column2 = row1_index_columns[1];
183 ASSERT_EQ(row1_index_column1.value_oneof_case(),
184 row1_index_column1.kStringValue);
185 ASSERT_EQ(row1_index_column2.value_oneof_case(),
186 row1_index_column2.kDoubleValue);
187 ASSERT_EQ(row1_index_column1.string_value(), "index_column1");
188 ASSERT_EQ(row1_index_column2.double_value(), 3.1415926);
189 ASSERT_EQ(row1.forward_column_values().values_size(), 2);
190 auto row1_forward_columns = row1.forward_column_values().values();
191 auto row1_forward_column1 = row1_forward_columns[0];
192 auto row1_forward_column2 = row1_forward_columns[1];
193 ASSERT_EQ(row1_forward_column1.value_oneof_case(),
194 row1_forward_column1.kStringValue);
195 ASSERT_EQ(row1_forward_column2.value_oneof_case(),
196 row1_forward_column2.kDoubleValue);
197 ASSERT_EQ(row1_forward_column1.string_value(), "forward_column1");
198 ASSERT_EQ(row1_forward_column2.double_value(), 3.1415926);
199 // row2
200 ASSERT_EQ(row2.primary_key(), 2);
201 ASSERT_EQ(row2.operation_type(),
202 proxima::be::proto::OperationType::OP_UPDATE);
203 ASSERT_EQ(row2.lsn_context().lsn(), 2);
204 ASSERT_EQ(row2.index_column_values().values_size(), 2);
205 auto row2_index_columns = row2.index_column_values().values();
206 auto row2_index_column1 = row2_index_columns[0];
207 auto row2_index_column2 = row2_index_columns[1];
208 ASSERT_EQ(row2_index_column1.value_oneof_case(),
209 row2_index_column1.kStringValue);
210 ASSERT_EQ(row2_index_column2.value_oneof_case(),
211 row2_index_column2.kDoubleValue);
212 ASSERT_EQ(row2_index_column1.string_value(), "index_column1");
213 ASSERT_EQ(row2_index_column2.double_value(), 3.1415926);
214 ASSERT_EQ(row2.forward_column_values().values_size(), 2);
215 auto row2_forward_columns = row2.forward_column_values().values();
216 auto row2_forward_column1 = row2_forward_columns[0];
217 auto row2_forward_column2 = row2_forward_columns[1];
218 ASSERT_EQ(row2_forward_column1.value_oneof_case(),
219 row2_forward_column1.kStringValue);
220 ASSERT_EQ(row2_forward_column2.value_oneof_case(),
221 row2_forward_column2.kDoubleValue);
222 ASSERT_EQ(row2_forward_column1.string_value(), "forward_column1");
223 ASSERT_EQ(row2_forward_column2.double_value(), 3.1415926);
224 // exit
225 collection->stop();
226 sleep(2);
227 LOG_INFO("MysqlCollectionTest2::TestGeneral PASS");
228 }
229
230 ASSERT_EQ(0, server_.Stop(0));
231 ASSERT_EQ(0, server_.Join());
232}
233