1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author Dianzhang.Chen
17 * \date Nov 2020
18 * \brief Interface of mysql collection
19 */
20
21#pragma once
22
23#include <mutex>
24#include <brpc/channel.h>
25#include "binlog/mysql_handler.h"
26#include "common/macro_define.h"
27#include "collection.h"
28
29namespace proxima {
30namespace be {
31namespace repository {
32
33class MysqlCollection;
34using MysqlCollectionPtr = std::shared_ptr<MysqlCollection>;
35
36/*! Mysql Collection
37 */
38class MysqlCollection : public Collection {
39 public:
40 //! Constructor
41 MysqlCollection(const proto::CollectionConfig &config,
42 MysqlHandlerPtr mysql_handler)
43 : config_(config), mysql_handler_(std::move(mysql_handler)) {}
44
45 //! Destructor
46 ~MysqlCollection() = default;
47
48 public:
49 //! Initialize MySQL collection
50 int init() override;
51
52 //! Stop collection
53 void stop() override;
54
55 //! Start Collection
56 void run() override;
57
58 //! Update collection
59 void update() override;
60
61 //! Drop collection
62 void drop() override;
63
64 //! If collection is finished
65 bool finished() const override;
66
67 //! Get collection state
68 CollectionStatus state() const override;
69
70 //! Get schema revision
71 uint32_t schema_revision() const override;
72
73 private:
74 //! Implementation of fetch data
75 void fetch_impl();
76
77 //! Implementation of send data
78 void send_impl();
79
80 //! Check if collection should terminate
81 bool is_valid() const;
82
83 //! Clear data queue
84 void clear_fetch_data();
85
86 //! Send data to index agent
87 int send_data();
88
89 // Load lsn information
90 int load_lsn_info(bool is_retry = true);
91
92 //! Reset collection
93 int reset_collection();
94
95 //! Get collection state
96 CollectionStateFlag get_collection_flag();
97
98 //! Process event acording current collection state flag
99 void process_event(const CollectionStateFlag &flag);
100
101 //! Update collection
102 void process_update();
103
104 //! Drop collection
105 void process_drop();
106
107 //! Action when state flag is normal
108 void process_normal();
109
110 //! Load configs
111 void load_config();
112
113 //! Check if data is illegal and handle illegal data
114 int verify_and_handle(const LsnContext &context);
115
116 //! Get sending request
117 void get_sending_request();
118
119 //! Wait until prepared data is ready to send
120 bool wait_prepared_data();
121
122 //! Init brpc
123 int init_brpc();
124
125 //! Init mysql module
126 int init_mysql_module();
127
128 //! Handle no data case
129 void handle_no_data();
130
131 //! Handle schema changed case
132 void handle_schema_changed();
133
134 //! Update context
135 void update_context(const LsnContext &context);
136
137 //! Reset brpc request
138 int reset_request();
139
140 //! Update request meta information
141 int update_request_meta();
142
143 //! Check and update state
144 void update_state();
145
146 //! Check if is ready to send data
147 bool ready() const;
148
149 //! Get sleep time
150 uint64_t get_sleep_time() const;
151
152 //! Update action
153 void update_action();
154
155 //! Wait update command from manager
156 void wait_update_command();
157
158 //! Generate request id
159 std::string generate_request_id() const;
160
161 //! Update lsn map information of row data
162 void update_lsn_map_info(proto::WriteRequest::Row *row_data) const;
163
164 //! Check whether send request is empty
165 bool is_send_request_empty() const;
166
167 //! Reset fetch status
168 void reset_fetch_status();
169
170 //! Reset send status
171 void reset_send_status();
172
173 //! Check if need to wait until send thread finish send data
174 bool must_send(uint64_t start_time);
175
176 //! Get rod data from binlog handler
177 int get_request_row();
178
179 //! Print information of send data
180 void print_send_data_info() const;
181
182 private:
183 //! Disable copy constructor
184 MysqlCollection(const MysqlCollection &) = delete;
185
186 //! Disable assignment operator
187 MysqlCollection &operator=(const MysqlCollection &) = delete;
188
189 private:
190 uint32_t batch_size_{0};
191 uint32_t batch_interval_{0};
192
193 proto::CollectionConfig config_{};
194 uint64_t agent_timestamp_{0};
195
196 //! Members for state
197 std::thread fetch_thread_{};
198 std::thread send_thread_{};
199 std::atomic<CollectionStatus> state_{CollectionStatus::INIT};
200
201 std::atomic<uint32_t> prepared_data_size_{0};
202 std::atomic<bool> ready_{false};
203 std::atomic<bool> reset_{false};
204
205 std::mutex update_mutex_{}; // use for update collection
206 std::unique_ptr<proto::WriteRequest> fetch_request_{};
207 std::unique_ptr<proto::WriteRequest> send_request_{};
208
209 ScanMode pull_state_flag_{ScanMode::FULL};
210 std::atomic<CollectionStateFlag> collection_state_flag_{
211 CollectionStateFlag::NORMAL};
212
213 //! Members for brpc
214 int max_retry_{0};
215 int brpc_timeout_ms_{0};
216
217 std::string index_server_uri_{};
218 std::string load_balance_{};
219 brpc::Channel channel_{};
220 std::shared_ptr<proto::ProximaService_Stub> stub_{};
221
222 //! Members for lsn
223 uint64_t lsn_{0};
224 LsnContext context_{};
225 uint64_t start_time_{0};
226 MysqlHandlerPtr mysql_handler_{};
227};
228
229} // end namespace repository
230} // namespace be
231} // end namespace proxima