1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Dec 2020
18 * \brief
19 */
20
21#include <gmock/gmock.h>
22
23#define private public
24#define protected public
25#include "repository/repository_common/config.h"
26#undef private
27#undef protected
28
29#include "repository/mysql_collection.h"
30#include "mock_index_agent_server.h"
31#include "mock_mysql_handler.h"
32#include "port_helper.h"
33
34const std::string collection_name = "mysql_collection_test.info";
35static int PORT = 8010;
36static int PID = 0;
37
38////////////////////////////////////////////////////////////////////
39class MysqlCollectionRandomTest2 : public ::testing::Test {
40 protected:
41 MysqlCollectionRandomTest2() {}
42 ~MysqlCollectionRandomTest2() {}
43 void SetUp() {
44 PortHelper::GetPort(&PORT, &PID);
45 std::cout << "Server port: " << PORT << std::endl;
46 std::string index_uri = "127.0.0.1:" + std::to_string(PORT);
47 proxima::be::repository::Config::Instance()
48 .repository_config_.mutable_repository_config()
49 ->set_index_agent_addr(index_uri);
50 std::cout << "Set index addr: " << index_uri << std::endl;
51
52 proxima::be::repository::Config::Instance()
53 .repository_config_.mutable_repository_config()
54 ->set_batch_interval(1000000);
55 std::cout << "Set batch_interval to 1s" << std::endl;
56 }
57 void TearDown() {
58 PortHelper::RemovePortFile(PID);
59 }
60};
61
62TEST_F(MysqlCollectionRandomTest2, TestGeneral) {
63 brpc::Server server_;
64 MockRandomProximaServiceImpl svc_;
65 brpc::ServerOptions options;
66 ASSERT_EQ(0, server_.AddService(&svc_, brpc::SERVER_DOESNT_OWN_SERVICE));
67 ASSERT_EQ(0, server_.Start(PORT, &options));
68 {
69 proto::CollectionConfig config;
70 config.set_collection_name(collection_name);
71
72 CollectionPtr collection{nullptr};
73 MockMysqlHandlerPtr mysql_handler =
74 std::make_shared<MockMysqlHandler>(config);
75 EXPECT_CALL(*mysql_handler, init(_))
76 .WillRepeatedly(Return(0))
77 .RetiresOnSaturation();
78 EXPECT_CALL(*mysql_handler, start(_))
79 .WillRepeatedly(Return(0))
80 .RetiresOnSaturation();
81
82 EXPECT_CALL(*mysql_handler,
83 get_next_row_data(Matcher<proto::WriteRequest::Row *>(_),
84 Matcher<LsnContext *>(_)))
85 .WillRepeatedly(Invoke(
86 [](proto::WriteRequest::Row *row_data, LsnContext *context) -> int {
87 row_data->set_primary_key(1);
88 context->status = RowDataStatus::NORMAL;
89 return 0;
90 }))
91 .RetiresOnSaturation();
92
93 EXPECT_CALL(*mysql_handler,
94 reset_status(Matcher<ScanMode>(_),
95 Matcher<const proto::CollectionConfig &>(_),
96 Matcher<const LsnContext &>(_)))
97 .WillRepeatedly(Return(0))
98 .RetiresOnSaturation();
99
100 EXPECT_CALL(*mysql_handler, get_fields_meta(_))
101 .WillRepeatedly(Return(0))
102 .RetiresOnSaturation();
103
104 EXPECT_CALL(*mysql_handler, get_table_snapshot(_, _))
105 .WillRepeatedly(Return(0))
106 .RetiresOnSaturation();
107
108 collection.reset(new (std::nothrow) MysqlCollection(config, mysql_handler));
109
110 int ret = collection->init();
111 ASSERT_EQ(ret, 0);
112 CollectionStatus current_state = collection->state();
113 ASSERT_EQ(current_state, CollectionStatus::INIT);
114 collection->run();
115 sleep(3);
116
117 // check value
118 ASSERT_EQ(svc_.is_server_called(), true);
119 LOG_INFO("[test]: Server received records count [%zu]",
120 (size_t)svc_.get_records_count());
121 current_state = collection->state();
122 ASSERT_NE(current_state, CollectionStatus::INIT);
123 collection->drop();
124 sleep(3);
125 current_state = collection->state();
126 ASSERT_EQ(current_state, CollectionStatus::FINISHED);
127 collection->stop();
128 sleep(3);
129 LOG_INFO("[test]: Server received records count [%zu]",
130 (size_t)svc_.get_records_count());
131 }
132
133 ASSERT_EQ(0, server_.Stop(0));
134 ASSERT_EQ(0, server_.Join());
135}
136