1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Dianzhang.Chen |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include <gmock/gmock.h> |
22 | |
23 | #define private public |
24 | #define protected public |
25 | #include "repository/repository_common/config.h" |
26 | #undef private |
27 | #undef protected |
28 | |
29 | #include "repository/mysql_collection.h" |
30 | #include "mock_index_agent_server.h" |
31 | #include "mock_mysql_handler.h" |
32 | #include "port_helper.h" |
33 | |
34 | const std::string collection_name = "mysql_collection_test.info" ; |
35 | static int PORT = 8010; |
36 | static int PID = 0; |
37 | |
38 | //////////////////////////////////////////////////////////////////// |
39 | class MysqlCollectionRandomTest2 : public ::testing::Test { |
40 | protected: |
41 | MysqlCollectionRandomTest2() {} |
42 | ~MysqlCollectionRandomTest2() {} |
43 | void SetUp() { |
44 | PortHelper::GetPort(&PORT, &PID); |
45 | std::cout << "Server port: " << PORT << std::endl; |
46 | std::string index_uri = "127.0.0.1:" + std::to_string(PORT); |
47 | proxima::be::repository::Config::Instance() |
48 | .repository_config_.mutable_repository_config() |
49 | ->set_index_agent_addr(index_uri); |
50 | std::cout << "Set index addr: " << index_uri << std::endl; |
51 | |
52 | proxima::be::repository::Config::Instance() |
53 | .repository_config_.mutable_repository_config() |
54 | ->set_batch_interval(1000000); |
55 | std::cout << "Set batch_interval to 1s" << std::endl; |
56 | } |
57 | void TearDown() { |
58 | PortHelper::RemovePortFile(PID); |
59 | } |
60 | }; |
61 | |
62 | TEST_F(MysqlCollectionRandomTest2, TestGeneral) { |
63 | brpc::Server server_; |
64 | MockRandomProximaServiceImpl svc_; |
65 | brpc::ServerOptions options; |
66 | ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE)); |
67 | ASSERT_EQ(0, server_.Start(PORT, &options)); |
68 | { |
69 | proto::CollectionConfig config; |
70 | config.set_collection_name(collection_name); |
71 | |
72 | CollectionPtr collection{nullptr}; |
73 | MockMysqlHandlerPtr mysql_handler = |
74 | std::make_shared<MockMysqlHandler>(config); |
75 | EXPECT_CALL(*mysql_handler, init(_)) |
76 | .WillRepeatedly(Return(0)) |
77 | .RetiresOnSaturation(); |
78 | EXPECT_CALL(*mysql_handler, start(_)) |
79 | .WillRepeatedly(Return(0)) |
80 | .RetiresOnSaturation(); |
81 | |
82 | EXPECT_CALL(*mysql_handler, |
83 | get_next_row_data(Matcher<proto::WriteRequest::Row *>(_), |
84 | Matcher<LsnContext *>(_))) |
85 | .WillRepeatedly(Invoke( |
86 | [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int { |
87 | row_data->set_primary_key(1); |
88 | context->status = RowDataStatus::NORMAL; |
89 | return 0; |
90 | })) |
91 | .RetiresOnSaturation(); |
92 | |
93 | EXPECT_CALL(*mysql_handler, |
94 | reset_status(Matcher<ScanMode>(_), |
95 | Matcher<const proto::CollectionConfig &>(_), |
96 | Matcher<const LsnContext &>(_))) |
97 | .WillRepeatedly(Return(0)) |
98 | .RetiresOnSaturation(); |
99 | |
100 | EXPECT_CALL(*mysql_handler, get_fields_meta(_)) |
101 | .WillRepeatedly(Return(0)) |
102 | .RetiresOnSaturation(); |
103 | |
104 | EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _)) |
105 | .WillRepeatedly(Return(0)) |
106 | .RetiresOnSaturation(); |
107 | |
108 | collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler)); |
109 | |
110 | int ret = collection->init(); |
111 | ASSERT_EQ(ret, 0); |
112 | CollectionStatus current_state = collection->state(); |
113 | ASSERT_EQ(current_state, CollectionStatus::INIT); |
114 | collection->run(); |
115 | sleep(3); |
116 | |
117 | // check value |
118 | ASSERT_EQ(svc_.is_server_called(), true); |
119 | LOG_INFO("[test]: Server received records count [%zu]" , |
120 | (size_t)svc_.get_records_count()); |
121 | current_state = collection->state(); |
122 | ASSERT_NE(current_state, CollectionStatus::INIT); |
123 | collection->drop(); |
124 | sleep(3); |
125 | current_state = collection->state(); |
126 | ASSERT_EQ(current_state, CollectionStatus::FINISHED); |
127 | collection->stop(); |
128 | sleep(3); |
129 | LOG_INFO("[test]: Server received records count [%zu]" , |
130 | (size_t)svc_.get_records_count()); |
131 | } |
132 | |
133 | ASSERT_EQ(0, server_.Stop(0)); |
134 | ASSERT_EQ(0, server_.Join()); |
135 | } |
136 | |