1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author DianZhang.Chen |
17 | * \date Oct 2020 |
18 | * \brief Mysql Mysql interface definition for bilin engine |
19 | */ |
20 | |
21 | #pragma once |
22 | |
23 | //!!! undef VERSION must be after include my_global.h |
24 | #include <myisampack.h> |
25 | #include <mysql.h> |
26 | #include <my_global.h> |
27 | #undef VERSION |
28 | |
29 | #include <ailego/utility/string_helper.h> |
30 | #include "repository/binlog/binlog_event.h" |
31 | |
32 | using namespace proxima::be::repository; |
33 | using namespace ::testing; |
34 | |
35 | namespace proxima { |
36 | namespace be { |
37 | namespace repository { |
38 | |
39 | /*! EventBuilder |
40 | */ |
41 | class EventBuilder { |
42 | public: |
43 | static void BuildBasicEvent(EventType type, uchar **data) { |
44 | uchar *buf = *data; |
45 | uint32_t timestamp = (uint32_t)std::time(nullptr); |
46 | int4store(buf, timestamp); |
47 | buf += 4; |
48 | *buf = (uchar)type; |
49 | buf += 1; |
50 | uint32_t server_id = 10000; |
51 | int4store(buf, server_id); |
52 | buf += 4; |
53 | uint32_t event_size = 0; |
54 | int4store(buf, event_size); |
55 | buf += 4; |
56 | uint32_t log_pos = 0; |
57 | int4store(buf, log_pos); |
58 | buf += 4; |
59 | uint16_t flags = 0; |
60 | int2store(buf, flags); |
61 | buf += 2; |
62 | |
63 | *data = buf; |
64 | } |
65 | |
66 | static std::string BuildQueryEvent(const std::string &db, |
67 | const std::string &query) { |
68 | std::string buffer(10240, 0); |
69 | uchar *data = (uchar *)(&(buffer[0])); |
70 | uchar *start = data; |
71 | BuildBasicEvent(QUERY_EVENT, &data); |
72 | data += 8; // slave_proxy_id + execution time |
73 | uint8_t schema_length = (uint8_t)db.size(); |
74 | *data = (uchar)schema_length; |
75 | data += 1; |
76 | data += 2; // error-code |
77 | uint16_t status_var_len = 0; |
78 | int2store(data, status_var_len); |
79 | data += 2; |
80 | data += status_var_len; // status vars |
81 | memcpy(data, db.data(), db.size()); |
82 | data += schema_length; |
83 | *data = '\0'; |
84 | data += 1; |
85 | memcpy(data, query.data(), query.size()); |
86 | data += query.size(); |
87 | data += 4; // crc |
88 | size_t len = data - start; |
89 | buffer.resize(len); |
90 | return buffer; |
91 | } |
92 | |
93 | static std::string BuildRotateEvent(const std::string &file_name, |
94 | uint64_t position, bool has_crc = true) { |
95 | std::string buffer(10240, 0); |
96 | uchar *data = (uchar *)(&(buffer[0])); |
97 | uchar *start = data; |
98 | BuildBasicEvent(ROTATE_EVENT, &data); |
99 | int8store(data, position); |
100 | data += 8; |
101 | memcpy(data, file_name.data(), file_name.size()); |
102 | data += file_name.size(); |
103 | if (has_crc) { |
104 | data += 4; |
105 | } |
106 | size_t len = data - start; |
107 | buffer.resize(len); |
108 | return buffer; |
109 | } |
110 | |
111 | static void SaveColumnMeta(uchar **data, |
112 | const std::vector<enum_field_types> &column_types, |
113 | const std::vector<int32_t> &column_metas) { |
114 | std::string buffer(1024, 0); |
115 | uchar *ptr = (uchar *)(&(buffer[0])); |
116 | uchar *start = ptr; |
117 | for (size_t i = 0; i < column_types.size(); ++i) { |
118 | switch (column_types[i]) { |
119 | case MYSQL_TYPE_TINY_BLOB: |
120 | case MYSQL_TYPE_BLOB: |
121 | case MYSQL_TYPE_MEDIUM_BLOB: |
122 | case MYSQL_TYPE_LONG_BLOB: |
123 | case MYSQL_TYPE_DOUBLE: |
124 | case MYSQL_TYPE_FLOAT: |
125 | case MYSQL_TYPE_GEOMETRY: |
126 | case MYSQL_TYPE_JSON: |
127 | *ptr = (uchar)column_metas[i]; |
128 | ptr += 1; |
129 | break; |
130 | case MYSQL_TYPE_STRING: |
131 | *(ptr + 1) = (uchar)column_metas[i]; |
132 | ptr += 2; |
133 | break; |
134 | case MYSQL_TYPE_BIT: |
135 | int2store(ptr, (uint16_t)column_metas[i]); |
136 | ptr += 2; |
137 | break; |
138 | case MYSQL_TYPE_VARCHAR: |
139 | case MYSQL_TYPE_VAR_STRING: |
140 | int2store(ptr, (uint16_t)column_metas[i]); |
141 | ptr += 2; |
142 | break; |
143 | case MYSQL_TYPE_NEWDECIMAL: |
144 | int2store(ptr, (uint16_t)column_metas[i]); |
145 | ptr += 2; |
146 | break; |
147 | case MYSQL_TYPE_TIME2: |
148 | case MYSQL_TYPE_DATETIME2: |
149 | case MYSQL_TYPE_TIMESTAMP2: |
150 | *ptr = (uchar)column_metas[i]; |
151 | ptr += 1; |
152 | break; |
153 | default: |
154 | break; |
155 | } |
156 | } |
157 | uint32_t meta_len = ptr - start; |
158 | uchar *cur = *data; |
159 | *cur = (uchar)meta_len; |
160 | cur += 1; |
161 | memcpy(cur, start, meta_len); |
162 | *data = cur + meta_len; |
163 | } |
164 | |
165 | static std::string BuildTableMapEvent( |
166 | uint64_t table_id, const std::string &db, const std::string &table, |
167 | std::vector<enum_field_types> &column_types, |
168 | std::vector<int32_t> &column_metas, std::vector<bool> &column_nullables) { |
169 | std::string buffer(10240, 0); |
170 | uchar *data = (uchar *)(&(buffer[0])); |
171 | uchar *start = data; |
172 | BuildBasicEvent(TABLE_MAP_EVENT, &data); |
173 | // table id |
174 | int4store(data, (uint32_t)table_id); |
175 | data += 4; |
176 | int2store(data, (uint16_t)(table_id >> 32)); |
177 | data += 2; |
178 | // flags |
179 | data += 2; |
180 | // db |
181 | *data = (uchar)db.size(); |
182 | data += 1; |
183 | memcpy(data, db.data(), db.size()); |
184 | data += db.size(); |
185 | data += 1; |
186 | // table |
187 | *data = (uchar)table.size(); |
188 | data += 1; |
189 | memcpy(data, table.data(), table.size()); |
190 | data += table.size(); |
191 | data += 1; |
192 | // column count |
193 | *data = (uchar)column_types.size(); |
194 | data += 1; |
195 | // column types |
196 | memcpy(data, column_types.data(), column_types.size()); |
197 | data += column_types.size(); |
198 | // column meta |
199 | SaveColumnMeta(&data, column_types, column_metas); |
200 | uint32_t null_bytes = (column_nullables.size() + 7) / 8; |
201 | memset(data, 0, null_bytes); |
202 | for (size_t i = 0; i < column_nullables.size(); ++i) { |
203 | if (column_nullables[i]) { |
204 | *(data + i / 8) = *(data + i / 8) & (0x01 << (i % 8)); |
205 | } |
206 | } |
207 | data += null_bytes; |
208 | // crc |
209 | data += 4; |
210 | size_t len = data - start; |
211 | buffer.resize(len); |
212 | return buffer; |
213 | } |
214 | |
215 | static void BuildFieldsValue( |
216 | const std::vector<bool> &column_null, |
217 | const std::vector<enum_field_types> &column_types, |
218 | const std::vector<std::string> &column_values, |
219 | const TableMapEventPtr &table_map, uchar **data) { |
220 | uchar *ptr = *data; |
221 | // null bits |
222 | uint32_t bytes = (column_null.size() + 7) / 8; |
223 | memset(ptr, 0, bytes); |
224 | uint32_t column_count = column_null.size(); |
225 | for (uint32_t i = 0; i < column_count; ++i) { |
226 | if (column_null[i]) { |
227 | *(ptr + i / 8) = *(ptr + i / 8) & (0x01 << (i % 8)); |
228 | } |
229 | } |
230 | ptr += bytes; |
231 | |
232 | for (size_t i = 0; i < column_types.size(); ++i) { |
233 | int32_t meta = table_map->column_info(i).meta; |
234 | if (column_values[i].empty()) { |
235 | continue; |
236 | } |
237 | switch (column_types[i]) { |
238 | case MYSQL_TYPE_TINY: { |
239 | int8_t value; |
240 | ailego::StringHelper::ToInt8(column_values[i], &value); |
241 | *((int8_t *)ptr) = value; |
242 | ptr += 1; |
243 | } break; |
244 | case MYSQL_TYPE_SHORT: { |
245 | int16_t value; |
246 | ailego::StringHelper::ToInt16(column_values[i], &value); |
247 | int2store(ptr, (uint16_t)value); |
248 | ptr += 2; |
249 | } break; |
250 | case MYSQL_TYPE_LONG: { |
251 | int32_t value; |
252 | ailego::StringHelper::ToInt32(column_values[i], &value); |
253 | int4store(ptr, (uint32_t)value); |
254 | ptr += 4; |
255 | } break; |
256 | case MYSQL_TYPE_FLOAT: { |
257 | float value; |
258 | ailego::StringHelper::ToFloat(column_values[i], &value); |
259 | *(float *)ptr = value; |
260 | ptr += 4; |
261 | } break; |
262 | case MYSQL_TYPE_DOUBLE: { |
263 | double value; |
264 | ailego::StringHelper::ToDouble(column_values[i], &value); |
265 | *(double *)ptr = value; |
266 | ptr += 8; |
267 | } break; |
268 | case MYSQL_TYPE_TIMESTAMP: |
269 | case MYSQL_TYPE_TIMESTAMP2: { |
270 | uint32_t value; |
271 | ailego::StringHelper::ToUint32(column_values[i], &value); |
272 | mi_int4store(ptr, value); |
273 | ptr += 4; |
274 | } break; |
275 | case MYSQL_TYPE_LONGLONG: { |
276 | int64_t value; |
277 | ailego::StringHelper::ToInt64(column_values[i], &value); |
278 | int8store(ptr, value); |
279 | ptr += 8; |
280 | } break; |
281 | case MYSQL_TYPE_INT24: { |
282 | int32_t value; |
283 | ailego::StringHelper::ToInt32(column_values[i], &value); |
284 | int2store(ptr, (uint16_t)value); |
285 | *(ptr + 2) = (uchar)(((uint32_t)value) >> 16); |
286 | ptr += 3; |
287 | } break; |
288 | case MYSQL_TYPE_DATE: { |
289 | uint32_t value; |
290 | ailego::StringHelper::ToUint32(column_values[i], &value); |
291 | int2store(ptr, (uint16_t)value); |
292 | *(ptr + 2) = (uchar)(value >> 16); |
293 | ptr += 3; |
294 | } break; |
295 | case MYSQL_TYPE_TIME: |
296 | case MYSQL_TYPE_TIME2: { |
297 | int64_t value; |
298 | ailego::StringHelper::ToInt64(column_values[i], &value); |
299 | value += TIME_INT_OFS; |
300 | mi_int3store(ptr, value); |
301 | ptr += 3; |
302 | } break; |
303 | case MYSQL_TYPE_DATETIME: |
304 | case MYSQL_TYPE_DATETIME2: { |
305 | int64_t value; |
306 | ailego::StringHelper::ToInt64(column_values[i], &value); |
307 | value += DATETIMEF_INT_OFS; |
308 | mi_int5store(ptr, value); |
309 | ptr += 5; |
310 | } break; |
311 | case MYSQL_TYPE_YEAR: { |
312 | uint8_t value; |
313 | ailego::StringHelper::ToUint8(column_values[i], &value); |
314 | *(uint8_t *)ptr = value; |
315 | ptr += 1; |
316 | } break; |
317 | case MYSQL_TYPE_BIT: |
318 | break; |
319 | case MYSQL_TYPE_JSON: |
320 | break; |
321 | case MYSQL_TYPE_NEWDECIMAL: |
322 | break; |
323 | case MYSQL_TYPE_BLOB: |
324 | break; |
325 | case MYSQL_TYPE_VARCHAR: |
326 | case MYSQL_TYPE_VAR_STRING: { |
327 | if (meta < 256) { |
328 | *(uint8_t *)ptr = (uint8_t)column_values[i].size(); |
329 | ptr += 1; |
330 | } else { |
331 | int2store(ptr, (uint16_t)column_values[i].size()); |
332 | ptr += 2; |
333 | } |
334 | memcpy(ptr, column_values[i].data(), column_values[i].size()); |
335 | ptr += column_values[i].size(); |
336 | } break; |
337 | case MYSQL_TYPE_STRING: { |
338 | memcpy(ptr, column_values[i].data(), column_values[i].size()); |
339 | ptr += column_values[i].size(); |
340 | } break; |
341 | case MYSQL_TYPE_GEOMETRY: |
342 | break; |
343 | default: |
344 | break; |
345 | } |
346 | } |
347 | *data = ptr; |
348 | } |
349 | |
350 | static std::string BuildWriteRowsEvent( |
351 | uint64_t table_id, const std::vector<bool> &column_null, |
352 | const std::vector<enum_field_types> &column_types, |
353 | const std::vector<std::string> &column_values, |
354 | const TableMapEventPtr &table_map, |
355 | EventType event_type = WRITE_ROWS_EVENT_V1, size_t rows_count = 1) { |
356 | std::string buffer(10240, 0); |
357 | uchar *data = (uchar *)(&(buffer[0])); |
358 | uchar *start = data; |
359 | BuildBasicEvent(event_type, &data); |
360 | // table id |
361 | int4store(data, (uint32_t)table_id); |
362 | data += 4; |
363 | int2store(data, (uint16_t)(table_id >> 32)); |
364 | data += 2; |
365 | // flags |
366 | data += 2; |
367 | // extra data |
368 | int2store(data, (uint16_t)2); |
369 | data += 2; |
370 | // columns count |
371 | uint32_t column_count = column_null.size(); |
372 | *data = (uchar)column_count; |
373 | data += 1; |
374 | // present columns |
375 | uint32_t bytes = (column_count + 7) / 8; |
376 | for (uint32_t i = 0; i < bytes; ++i) { |
377 | *(data + i) = 0xFF; |
378 | } |
379 | data += bytes; |
380 | |
381 | // rows buffer |
382 | for (size_t i = 0; i < rows_count; ++i) { |
383 | BuildFieldsValue(column_null, column_types, column_values, table_map, |
384 | &data); |
385 | } |
386 | |
387 | // crc |
388 | data += 4; |
389 | size_t len = data - start; |
390 | buffer.resize(len); |
391 | return buffer; |
392 | } |
393 | |
394 | static std::string BuildDeleteRowsEvent( |
395 | uint64_t table_id, const std::vector<bool> &column_null, |
396 | const std::vector<enum_field_types> &column_types, |
397 | const std::vector<std::string> &values, |
398 | const TableMapEventPtr &table_map) { |
399 | return BuildWriteRowsEvent(table_id, column_null, column_types, values, |
400 | table_map, DELETE_ROWS_EVENT_V1); |
401 | } |
402 | |
403 | static std::string BuildUpdateRowsEvent( |
404 | uint64_t table_id, const std::vector<bool> &column_null, |
405 | const std::vector<enum_field_types> &column_types, |
406 | const std::vector<std::string> &old_values, |
407 | const std::vector<std::string> &new_values, |
408 | const TableMapEventPtr &table_map) { |
409 | std::string buffer(10240, 0); |
410 | uchar *data = (uchar *)(&(buffer[0])); |
411 | uchar *start = data; |
412 | BuildBasicEvent(UPDATE_ROWS_EVENT_V1, &data); |
413 | // table id |
414 | int4store(data, (uint32_t)table_id); |
415 | data += 4; |
416 | int2store(data, (uint16_t)(table_id >> 32)); |
417 | data += 2; |
418 | // flags |
419 | data += 2; |
420 | // extra data |
421 | int2store(data, (uint16_t)2); |
422 | data += 2; |
423 | // columns count |
424 | uint32_t column_count = column_null.size(); |
425 | *data = (uchar)column_count; |
426 | data += 1; |
427 | // present columns update |
428 | uint32_t bytes = (column_count + 7) / 8; |
429 | for (uint32_t i = 0; i < bytes; ++i) { |
430 | *(data + i) = 0xFF; |
431 | } |
432 | data += bytes; |
433 | for (uint32_t i = 0; i < bytes; ++i) { |
434 | *(data + i) = 0xFF; |
435 | } |
436 | data += bytes; |
437 | |
438 | // rows buffer |
439 | BuildFieldsValue(column_null, column_types, old_values, table_map, &data); |
440 | |
441 | BuildFieldsValue(column_null, column_types, new_values, table_map, &data); |
442 | |
443 | // crc |
444 | data += 4; |
445 | size_t len = data - start; |
446 | buffer.resize(len); |
447 | return buffer; |
448 | } |
449 | |
450 | static std::string BuildOtherEvent(EventType type) { |
451 | std::string buffer(10240, 0); |
452 | uchar *data = (uchar *)(&(buffer[0])); |
453 | uchar *start = data; |
454 | BuildBasicEvent(type, &data); |
455 | data += 4; |
456 | |
457 | size_t len = data - start; |
458 | buffer.resize(len); |
459 | return buffer; |
460 | } |
461 | |
462 | private: |
463 | const static int64_t TIME_INT_OFS = 0x800000; |
464 | const static uint64_t DATETIMEF_INT_OFS = 0x8000000000; |
465 | }; |
466 | |
467 | } // namespace repository |
468 | } // namespace be |
469 | } // namespace proxima |
470 | |