1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author Dianzhang.Chen |
17 | * \date Nov 2020 |
18 | * \brief Interface of mysql collection |
19 | */ |
20 | |
21 | #pragma once |
22 | |
23 | #include <mutex> |
24 | #include <brpc/channel.h> |
25 | #include "binlog/mysql_handler.h" |
26 | #include "common/macro_define.h" |
27 | #include "collection.h" |
28 | |
29 | namespace proxima { |
30 | namespace be { |
31 | namespace repository { |
32 | |
33 | class MysqlCollection; |
34 | using MysqlCollectionPtr = std::shared_ptr<MysqlCollection>; |
35 | |
36 | /*! Mysql Collection |
37 | */ |
38 | class MysqlCollection : public Collection { |
39 | public: |
40 | //! Constructor |
41 | MysqlCollection(const proto::CollectionConfig &config, |
42 | MysqlHandlerPtr mysql_handler) |
43 | : config_(config), mysql_handler_(std::move(mysql_handler)) {} |
44 | |
45 | //! Destructor |
46 | ~MysqlCollection() = default; |
47 | |
48 | public: |
49 | //! Initialize MySQL collection |
50 | int init() override; |
51 | |
52 | //! Stop collection |
53 | void stop() override; |
54 | |
55 | //! Start Collection |
56 | void run() override; |
57 | |
58 | //! Update collection |
59 | void update() override; |
60 | |
61 | //! Drop collection |
62 | void drop() override; |
63 | |
64 | //! If collection is finished |
65 | bool finished() const override; |
66 | |
67 | //! Get collection state |
68 | CollectionStatus state() const override; |
69 | |
70 | //! Get schema revision |
71 | uint32_t schema_revision() const override; |
72 | |
73 | private: |
74 | //! Implementation of fetch data |
75 | void fetch_impl(); |
76 | |
77 | //! Implementation of send data |
78 | void send_impl(); |
79 | |
80 | //! Check if collection should terminate |
81 | bool is_valid() const; |
82 | |
83 | //! Clear data queue |
84 | void clear_fetch_data(); |
85 | |
86 | //! Send data to index agent |
87 | int send_data(); |
88 | |
89 | // Load lsn information |
90 | int load_lsn_info(bool is_retry = true); |
91 | |
92 | //! Reset collection |
93 | int reset_collection(); |
94 | |
95 | //! Get collection state |
96 | CollectionStateFlag get_collection_flag(); |
97 | |
98 | //! Process event acording current collection state flag |
99 | void process_event(const CollectionStateFlag &flag); |
100 | |
101 | //! Update collection |
102 | void process_update(); |
103 | |
104 | //! Drop collection |
105 | void process_drop(); |
106 | |
107 | //! Action when state flag is normal |
108 | void process_normal(); |
109 | |
110 | //! Load configs |
111 | void load_config(); |
112 | |
113 | //! Check if data is illegal and handle illegal data |
114 | int verify_and_handle(const LsnContext &context); |
115 | |
116 | //! Get sending request |
117 | void get_sending_request(); |
118 | |
119 | //! Wait until prepared data is ready to send |
120 | bool wait_prepared_data(); |
121 | |
122 | //! Init brpc |
123 | int init_brpc(); |
124 | |
125 | //! Init mysql module |
126 | int init_mysql_module(); |
127 | |
128 | //! Handle no data case |
129 | void handle_no_data(); |
130 | |
131 | //! Handle schema changed case |
132 | void handle_schema_changed(); |
133 | |
134 | //! Update context |
135 | void update_context(const LsnContext &context); |
136 | |
137 | //! Reset brpc request |
138 | int reset_request(); |
139 | |
140 | //! Update request meta information |
141 | int update_request_meta(); |
142 | |
143 | //! Check and update state |
144 | void update_state(); |
145 | |
146 | //! Check if is ready to send data |
147 | bool ready() const; |
148 | |
149 | //! Get sleep time |
150 | uint64_t get_sleep_time() const; |
151 | |
152 | //! Update action |
153 | void update_action(); |
154 | |
155 | //! Wait update command from manager |
156 | void wait_update_command(); |
157 | |
158 | //! Generate request id |
159 | std::string generate_request_id() const; |
160 | |
161 | //! Update lsn map information of row data |
162 | void update_lsn_map_info(proto::WriteRequest::Row *row_data) const; |
163 | |
164 | //! Check whether send request is empty |
165 | bool is_send_request_empty() const; |
166 | |
167 | //! Reset fetch status |
168 | void reset_fetch_status(); |
169 | |
170 | //! Reset send status |
171 | void reset_send_status(); |
172 | |
173 | //! Check if need to wait until send thread finish send data |
174 | bool must_send(uint64_t start_time); |
175 | |
176 | //! Get rod data from binlog handler |
177 | int get_request_row(); |
178 | |
179 | //! Print information of send data |
180 | void print_send_data_info() const; |
181 | |
182 | private: |
183 | //! Disable copy constructor |
184 | MysqlCollection(const MysqlCollection &) = delete; |
185 | |
186 | //! Disable assignment operator |
187 | MysqlCollection &operator=(const MysqlCollection &) = delete; |
188 | |
189 | private: |
190 | uint32_t batch_size_{0}; |
191 | uint32_t batch_interval_{0}; |
192 | |
193 | proto::CollectionConfig config_{}; |
194 | uint64_t agent_timestamp_{0}; |
195 | |
196 | //! Members for state |
197 | std::thread fetch_thread_{}; |
198 | std::thread send_thread_{}; |
199 | std::atomic<CollectionStatus> state_{CollectionStatus::INIT}; |
200 | |
201 | std::atomic<uint32_t> prepared_data_size_{0}; |
202 | std::atomic<bool> ready_{false}; |
203 | std::atomic<bool> reset_{false}; |
204 | |
205 | std::mutex update_mutex_{}; // use for update collection |
206 | std::unique_ptr<proto::WriteRequest> fetch_request_{}; |
207 | std::unique_ptr<proto::WriteRequest> send_request_{}; |
208 | |
209 | ScanMode pull_state_flag_{ScanMode::FULL}; |
210 | std::atomic<CollectionStateFlag> collection_state_flag_{ |
211 | CollectionStateFlag::NORMAL}; |
212 | |
213 | //! Members for brpc |
214 | int max_retry_{0}; |
215 | int brpc_timeout_ms_{0}; |
216 | |
217 | std::string index_server_uri_{}; |
218 | std::string load_balance_{}; |
219 | brpc::Channel channel_{}; |
220 | std::shared_ptr<proto::ProximaService_Stub> stub_{}; |
221 | |
222 | //! Members for lsn |
223 | uint64_t lsn_{0}; |
224 | LsnContext context_{}; |
225 | uint64_t start_time_{0}; |
226 | MysqlHandlerPtr mysql_handler_{}; |
227 | }; |
228 | |
229 | } // end namespace repository |
230 | } // namespace be |
231 | } // end namespace proxima |