1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author DianZhang.Chen
17 * \date Oct 2020
18 * \brief Mysql Mysql interface definition for bilin engine
19 */
20
21#pragma once
22
23//!!! undef VERSION must be after include my_global.h
24#include <myisampack.h>
25#include <mysql.h>
26#include <my_global.h>
27#undef VERSION
28
29#include <ailego/utility/string_helper.h>
30#include "repository/binlog/binlog_event.h"
31
32using namespace proxima::be::repository;
33using namespace ::testing;
34
35namespace proxima {
36namespace be {
37namespace repository {
38
39/*! EventBuilder
40 */
41class EventBuilder {
42 public:
43 static void BuildBasicEvent(EventType type, uchar **data) {
44 uchar *buf = *data;
45 uint32_t timestamp = (uint32_t)std::time(nullptr);
46 int4store(buf, timestamp);
47 buf += 4;
48 *buf = (uchar)type;
49 buf += 1;
50 uint32_t server_id = 10000;
51 int4store(buf, server_id);
52 buf += 4;
53 uint32_t event_size = 0;
54 int4store(buf, event_size);
55 buf += 4;
56 uint32_t log_pos = 0;
57 int4store(buf, log_pos);
58 buf += 4;
59 uint16_t flags = 0;
60 int2store(buf, flags);
61 buf += 2;
62
63 *data = buf;
64 }
65
66 static std::string BuildQueryEvent(const std::string &db,
67 const std::string &query) {
68 std::string buffer(10240, 0);
69 uchar *data = (uchar *)(&(buffer[0]));
70 uchar *start = data;
71 BuildBasicEvent(QUERY_EVENT, &data);
72 data += 8; // slave_proxy_id + execution time
73 uint8_t schema_length = (uint8_t)db.size();
74 *data = (uchar)schema_length;
75 data += 1;
76 data += 2; // error-code
77 uint16_t status_var_len = 0;
78 int2store(data, status_var_len);
79 data += 2;
80 data += status_var_len; // status vars
81 memcpy(data, db.data(), db.size());
82 data += schema_length;
83 *data = '\0';
84 data += 1;
85 memcpy(data, query.data(), query.size());
86 data += query.size();
87 data += 4; // crc
88 size_t len = data - start;
89 buffer.resize(len);
90 return buffer;
91 }
92
93 static std::string BuildRotateEvent(const std::string &file_name,
94 uint64_t position, bool has_crc = true) {
95 std::string buffer(10240, 0);
96 uchar *data = (uchar *)(&(buffer[0]));
97 uchar *start = data;
98 BuildBasicEvent(ROTATE_EVENT, &data);
99 int8store(data, position);
100 data += 8;
101 memcpy(data, file_name.data(), file_name.size());
102 data += file_name.size();
103 if (has_crc) {
104 data += 4;
105 }
106 size_t len = data - start;
107 buffer.resize(len);
108 return buffer;
109 }
110
111 static void SaveColumnMeta(uchar **data,
112 const std::vector<enum_field_types> &column_types,
113 const std::vector<int32_t> &column_metas) {
114 std::string buffer(1024, 0);
115 uchar *ptr = (uchar *)(&(buffer[0]));
116 uchar *start = ptr;
117 for (size_t i = 0; i < column_types.size(); ++i) {
118 switch (column_types[i]) {
119 case MYSQL_TYPE_TINY_BLOB:
120 case MYSQL_TYPE_BLOB:
121 case MYSQL_TYPE_MEDIUM_BLOB:
122 case MYSQL_TYPE_LONG_BLOB:
123 case MYSQL_TYPE_DOUBLE:
124 case MYSQL_TYPE_FLOAT:
125 case MYSQL_TYPE_GEOMETRY:
126 case MYSQL_TYPE_JSON:
127 *ptr = (uchar)column_metas[i];
128 ptr += 1;
129 break;
130 case MYSQL_TYPE_STRING:
131 *(ptr + 1) = (uchar)column_metas[i];
132 ptr += 2;
133 break;
134 case MYSQL_TYPE_BIT:
135 int2store(ptr, (uint16_t)column_metas[i]);
136 ptr += 2;
137 break;
138 case MYSQL_TYPE_VARCHAR:
139 case MYSQL_TYPE_VAR_STRING:
140 int2store(ptr, (uint16_t)column_metas[i]);
141 ptr += 2;
142 break;
143 case MYSQL_TYPE_NEWDECIMAL:
144 int2store(ptr, (uint16_t)column_metas[i]);
145 ptr += 2;
146 break;
147 case MYSQL_TYPE_TIME2:
148 case MYSQL_TYPE_DATETIME2:
149 case MYSQL_TYPE_TIMESTAMP2:
150 *ptr = (uchar)column_metas[i];
151 ptr += 1;
152 break;
153 default:
154 break;
155 }
156 }
157 uint32_t meta_len = ptr - start;
158 uchar *cur = *data;
159 *cur = (uchar)meta_len;
160 cur += 1;
161 memcpy(cur, start, meta_len);
162 *data = cur + meta_len;
163 }
164
165 static std::string BuildTableMapEvent(
166 uint64_t table_id, const std::string &db, const std::string &table,
167 std::vector<enum_field_types> &column_types,
168 std::vector<int32_t> &column_metas, std::vector<bool> &column_nullables) {
169 std::string buffer(10240, 0);
170 uchar *data = (uchar *)(&(buffer[0]));
171 uchar *start = data;
172 BuildBasicEvent(TABLE_MAP_EVENT, &data);
173 // table id
174 int4store(data, (uint32_t)table_id);
175 data += 4;
176 int2store(data, (uint16_t)(table_id >> 32));
177 data += 2;
178 // flags
179 data += 2;
180 // db
181 *data = (uchar)db.size();
182 data += 1;
183 memcpy(data, db.data(), db.size());
184 data += db.size();
185 data += 1;
186 // table
187 *data = (uchar)table.size();
188 data += 1;
189 memcpy(data, table.data(), table.size());
190 data += table.size();
191 data += 1;
192 // column count
193 *data = (uchar)column_types.size();
194 data += 1;
195 // column types
196 memcpy(data, column_types.data(), column_types.size());
197 data += column_types.size();
198 // column meta
199 SaveColumnMeta(&data, column_types, column_metas);
200 uint32_t null_bytes = (column_nullables.size() + 7) / 8;
201 memset(data, 0, null_bytes);
202 for (size_t i = 0; i < column_nullables.size(); ++i) {
203 if (column_nullables[i]) {
204 *(data + i / 8) = *(data + i / 8) & (0x01 << (i % 8));
205 }
206 }
207 data += null_bytes;
208 // crc
209 data += 4;
210 size_t len = data - start;
211 buffer.resize(len);
212 return buffer;
213 }
214
215 static void BuildFieldsValue(
216 const std::vector<bool> &column_null,
217 const std::vector<enum_field_types> &column_types,
218 const std::vector<std::string> &column_values,
219 const TableMapEventPtr &table_map, uchar **data) {
220 uchar *ptr = *data;
221 // null bits
222 uint32_t bytes = (column_null.size() + 7) / 8;
223 memset(ptr, 0, bytes);
224 uint32_t column_count = column_null.size();
225 for (uint32_t i = 0; i < column_count; ++i) {
226 if (column_null[i]) {
227 *(ptr + i / 8) = *(ptr + i / 8) & (0x01 << (i % 8));
228 }
229 }
230 ptr += bytes;
231
232 for (size_t i = 0; i < column_types.size(); ++i) {
233 int32_t meta = table_map->column_info(i).meta;
234 if (column_values[i].empty()) {
235 continue;
236 }
237 switch (column_types[i]) {
238 case MYSQL_TYPE_TINY: {
239 int8_t value;
240 ailego::StringHelper::ToInt8(column_values[i], &value);
241 *((int8_t *)ptr) = value;
242 ptr += 1;
243 } break;
244 case MYSQL_TYPE_SHORT: {
245 int16_t value;
246 ailego::StringHelper::ToInt16(column_values[i], &value);
247 int2store(ptr, (uint16_t)value);
248 ptr += 2;
249 } break;
250 case MYSQL_TYPE_LONG: {
251 int32_t value;
252 ailego::StringHelper::ToInt32(column_values[i], &value);
253 int4store(ptr, (uint32_t)value);
254 ptr += 4;
255 } break;
256 case MYSQL_TYPE_FLOAT: {
257 float value;
258 ailego::StringHelper::ToFloat(column_values[i], &value);
259 *(float *)ptr = value;
260 ptr += 4;
261 } break;
262 case MYSQL_TYPE_DOUBLE: {
263 double value;
264 ailego::StringHelper::ToDouble(column_values[i], &value);
265 *(double *)ptr = value;
266 ptr += 8;
267 } break;
268 case MYSQL_TYPE_TIMESTAMP:
269 case MYSQL_TYPE_TIMESTAMP2: {
270 uint32_t value;
271 ailego::StringHelper::ToUint32(column_values[i], &value);
272 mi_int4store(ptr, value);
273 ptr += 4;
274 } break;
275 case MYSQL_TYPE_LONGLONG: {
276 int64_t value;
277 ailego::StringHelper::ToInt64(column_values[i], &value);
278 int8store(ptr, value);
279 ptr += 8;
280 } break;
281 case MYSQL_TYPE_INT24: {
282 int32_t value;
283 ailego::StringHelper::ToInt32(column_values[i], &value);
284 int2store(ptr, (uint16_t)value);
285 *(ptr + 2) = (uchar)(((uint32_t)value) >> 16);
286 ptr += 3;
287 } break;
288 case MYSQL_TYPE_DATE: {
289 uint32_t value;
290 ailego::StringHelper::ToUint32(column_values[i], &value);
291 int2store(ptr, (uint16_t)value);
292 *(ptr + 2) = (uchar)(value >> 16);
293 ptr += 3;
294 } break;
295 case MYSQL_TYPE_TIME:
296 case MYSQL_TYPE_TIME2: {
297 int64_t value;
298 ailego::StringHelper::ToInt64(column_values[i], &value);
299 value += TIME_INT_OFS;
300 mi_int3store(ptr, value);
301 ptr += 3;
302 } break;
303 case MYSQL_TYPE_DATETIME:
304 case MYSQL_TYPE_DATETIME2: {
305 int64_t value;
306 ailego::StringHelper::ToInt64(column_values[i], &value);
307 value += DATETIMEF_INT_OFS;
308 mi_int5store(ptr, value);
309 ptr += 5;
310 } break;
311 case MYSQL_TYPE_YEAR: {
312 uint8_t value;
313 ailego::StringHelper::ToUint8(column_values[i], &value);
314 *(uint8_t *)ptr = value;
315 ptr += 1;
316 } break;
317 case MYSQL_TYPE_BIT:
318 break;
319 case MYSQL_TYPE_JSON:
320 break;
321 case MYSQL_TYPE_NEWDECIMAL:
322 break;
323 case MYSQL_TYPE_BLOB:
324 break;
325 case MYSQL_TYPE_VARCHAR:
326 case MYSQL_TYPE_VAR_STRING: {
327 if (meta < 256) {
328 *(uint8_t *)ptr = (uint8_t)column_values[i].size();
329 ptr += 1;
330 } else {
331 int2store(ptr, (uint16_t)column_values[i].size());
332 ptr += 2;
333 }
334 memcpy(ptr, column_values[i].data(), column_values[i].size());
335 ptr += column_values[i].size();
336 } break;
337 case MYSQL_TYPE_STRING: {
338 memcpy(ptr, column_values[i].data(), column_values[i].size());
339 ptr += column_values[i].size();
340 } break;
341 case MYSQL_TYPE_GEOMETRY:
342 break;
343 default:
344 break;
345 }
346 }
347 *data = ptr;
348 }
349
350 static std::string BuildWriteRowsEvent(
351 uint64_t table_id, const std::vector<bool> &column_null,
352 const std::vector<enum_field_types> &column_types,
353 const std::vector<std::string> &column_values,
354 const TableMapEventPtr &table_map,
355 EventType event_type = WRITE_ROWS_EVENT_V1, size_t rows_count = 1) {
356 std::string buffer(10240, 0);
357 uchar *data = (uchar *)(&(buffer[0]));
358 uchar *start = data;
359 BuildBasicEvent(event_type, &data);
360 // table id
361 int4store(data, (uint32_t)table_id);
362 data += 4;
363 int2store(data, (uint16_t)(table_id >> 32));
364 data += 2;
365 // flags
366 data += 2;
367 // extra data
368 int2store(data, (uint16_t)2);
369 data += 2;
370 // columns count
371 uint32_t column_count = column_null.size();
372 *data = (uchar)column_count;
373 data += 1;
374 // present columns
375 uint32_t bytes = (column_count + 7) / 8;
376 for (uint32_t i = 0; i < bytes; ++i) {
377 *(data + i) = 0xFF;
378 }
379 data += bytes;
380
381 // rows buffer
382 for (size_t i = 0; i < rows_count; ++i) {
383 BuildFieldsValue(column_null, column_types, column_values, table_map,
384 &data);
385 }
386
387 // crc
388 data += 4;
389 size_t len = data - start;
390 buffer.resize(len);
391 return buffer;
392 }
393
394 static std::string BuildDeleteRowsEvent(
395 uint64_t table_id, const std::vector<bool> &column_null,
396 const std::vector<enum_field_types> &column_types,
397 const std::vector<std::string> &values,
398 const TableMapEventPtr &table_map) {
399 return BuildWriteRowsEvent(table_id, column_null, column_types, values,
400 table_map, DELETE_ROWS_EVENT_V1);
401 }
402
403 static std::string BuildUpdateRowsEvent(
404 uint64_t table_id, const std::vector<bool> &column_null,
405 const std::vector<enum_field_types> &column_types,
406 const std::vector<std::string> &old_values,
407 const std::vector<std::string> &new_values,
408 const TableMapEventPtr &table_map) {
409 std::string buffer(10240, 0);
410 uchar *data = (uchar *)(&(buffer[0]));
411 uchar *start = data;
412 BuildBasicEvent(UPDATE_ROWS_EVENT_V1, &data);
413 // table id
414 int4store(data, (uint32_t)table_id);
415 data += 4;
416 int2store(data, (uint16_t)(table_id >> 32));
417 data += 2;
418 // flags
419 data += 2;
420 // extra data
421 int2store(data, (uint16_t)2);
422 data += 2;
423 // columns count
424 uint32_t column_count = column_null.size();
425 *data = (uchar)column_count;
426 data += 1;
427 // present columns update
428 uint32_t bytes = (column_count + 7) / 8;
429 for (uint32_t i = 0; i < bytes; ++i) {
430 *(data + i) = 0xFF;
431 }
432 data += bytes;
433 for (uint32_t i = 0; i < bytes; ++i) {
434 *(data + i) = 0xFF;
435 }
436 data += bytes;
437
438 // rows buffer
439 BuildFieldsValue(column_null, column_types, old_values, table_map, &data);
440
441 BuildFieldsValue(column_null, column_types, new_values, table_map, &data);
442
443 // crc
444 data += 4;
445 size_t len = data - start;
446 buffer.resize(len);
447 return buffer;
448 }
449
450 static std::string BuildOtherEvent(EventType type) {
451 std::string buffer(10240, 0);
452 uchar *data = (uchar *)(&(buffer[0]));
453 uchar *start = data;
454 BuildBasicEvent(type, &data);
455 data += 4;
456
457 size_t len = data - start;
458 buffer.resize(len);
459 return buffer;
460 }
461
462 private:
463 const static int64_t TIME_INT_OFS = 0x800000;
464 const static uint64_t DATETIMEF_INT_OFS = 0x8000000000;
465};
466
467} // namespace repository
468} // namespace be
469} // namespace proxima
470