1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Hongqing.hu |
17 | * \date Mar 2021 |
18 | * \brief Implements write request builder interface |
19 | */ |
20 | |
21 | #include "write_request_builder.h" |
22 | #include "common/types_helper.h" |
23 | #include "proto_converter.h" |
24 | |
25 | namespace proxima { |
26 | namespace be { |
27 | namespace server { |
28 | |
29 | using RequestType = agent::WriteRequest::RequestType; |
30 | |
31 | int WriteRequestBuilder::build(const meta::CollectionMeta &meta, |
32 | const agent::ColumnOrder &column_order, |
33 | const proto::WriteRequest &pb_request, |
34 | agent::WriteRequest *write_request) { |
35 | // get if indexes and forwards sequence are strictly match collection meta |
36 | bool index_full_match = false; |
37 | bool forward_full_match = false; |
38 | get_index_and_forward_mode(pb_request, meta, &index_full_match, |
39 | &forward_full_match); |
40 | // validate indexes and forward values size |
41 | int ret = validate_request(pb_request, meta, column_order, index_full_match, |
42 | forward_full_match); |
43 | if (ret != 0) { |
44 | return ret; |
45 | } |
46 | |
47 | RequestType request_type = |
48 | meta.repository() ? RequestType::PROXY : RequestType::DIRECT; |
49 | if (request_type == RequestType::PROXY) { |
50 | ret = build_proxy_request(meta, column_order, pb_request, index_full_match, |
51 | forward_full_match, write_request); |
52 | } else { |
53 | ret = build_direct_request(meta, column_order, pb_request, index_full_match, |
54 | forward_full_match, write_request); |
55 | } |
56 | if (ret != 0) { |
57 | LOG_ERROR("Build write request failed. collection[%s]" , |
58 | pb_request.collection_name().c_str()); |
59 | return ret; |
60 | } |
61 | |
62 | write_request->set_request_type(request_type); |
63 | |
64 | return 0; |
65 | } |
66 | |
67 | void WriteRequestBuilder::get_index_and_forward_mode( |
68 | const proto::WriteRequest &request, const meta::CollectionMeta &meta, |
69 | bool *index_full_match, bool *forward_full_match) { |
70 | // check index columns size |
71 | auto &request_index_columns = request.row_meta().index_column_metas(); |
72 | auto &meta_index_columns = meta.index_columns(); |
73 | size_t index_column_size = static_cast<size_t>(request_index_columns.size()); |
74 | size_t meta_columns_size = meta_index_columns.size(); |
75 | |
76 | // get index column full match |
77 | *index_full_match = false; |
78 | if (meta_columns_size == index_column_size) { |
79 | size_t i = 0; |
80 | for (i = 0; i < meta_columns_size; ++i) { |
81 | if (meta_index_columns[i]->name() != |
82 | request_index_columns[i].column_name()) { |
83 | break; |
84 | } |
85 | } |
86 | if (i == meta_columns_size) { |
87 | *index_full_match = true; |
88 | } |
89 | } |
90 | |
91 | // get forward column full match |
92 | auto &request_forward_columns = request.row_meta().forward_column_names(); |
93 | auto &meta_forward_columns = meta.forward_columns(); |
94 | size_t request_forward_size = |
95 | static_cast<size_t>(request_forward_columns.size()); |
96 | size_t meta_forward_size = meta_forward_columns.size(); |
97 | *forward_full_match = false; |
98 | if (meta_forward_size == request_forward_size) { |
99 | size_t i = 0; |
100 | for (i = 0; i < meta_forward_size; ++i) { |
101 | if (meta_forward_columns[i] != request_forward_columns[i]) { |
102 | break; |
103 | } |
104 | } |
105 | if (i == meta_forward_size) { |
106 | *forward_full_match = true; |
107 | } |
108 | } |
109 | } |
110 | |
111 | int WriteRequestBuilder::validate_request( |
112 | const proto::WriteRequest &request, const meta::CollectionMeta &meta, |
113 | const agent::ColumnOrder &column_order, bool index_full_match, |
114 | bool forward_full_match) { |
115 | auto &collection = request.collection_name(); |
116 | auto &request_index_metas = request.row_meta().index_column_metas(); |
117 | size_t index_column_size = static_cast<size_t>(request_index_metas.size()); |
118 | size_t meta_index_size = meta.index_columns().size(); |
119 | // check request is empty |
120 | if (!request.rows_size()) { |
121 | LOG_ERROR("Write request is empty. collection[%s]" , collection.c_str()); |
122 | return ErrorCode_InvalidWriteRequest; |
123 | } |
124 | |
125 | // check index |
126 | if (index_column_size != (size_t)request_index_metas.size()) { |
127 | LOG_ERROR( |
128 | "Collection index column meta size mismatched. " |
129 | "meta[%zu] index[%zu] collection[%s]" , |
130 | index_column_size, (size_t)request_index_metas.size(), |
131 | collection.c_str()); |
132 | return ErrorCode_InvalidWriteRequest; |
133 | } |
134 | |
135 | if (meta_index_size < index_column_size) { |
136 | LOG_ERROR( |
137 | "Collection index columns size mismatched. meta[%zu] " |
138 | "request[%zu] collection[%s]" , |
139 | meta_index_size, index_column_size, collection.c_str()); |
140 | return ErrorCode_InvalidWriteRequest; |
141 | } |
142 | if (!index_full_match) { |
143 | auto &index_order = column_order.get_index_order(); |
144 | for (size_t i = 0; i < index_column_size; ++i) { |
145 | auto &index_column = request_index_metas[i].column_name(); |
146 | auto it = index_order.find(index_column); |
147 | if (it == index_order.end()) { |
148 | LOG_ERROR("Collection index field invalid. request[%s] collection[%s]" , |
149 | index_column.c_str(), collection.c_str()); |
150 | return ErrorCode_InvalidWriteRequest; |
151 | } |
152 | } |
153 | } |
154 | for (size_t i = 0; i < index_column_size; ++i) { |
155 | auto &index_column = request_index_metas[i].column_name(); |
156 | auto column_meta = meta.column_by_name(index_column); |
157 | if (!column_meta) { |
158 | LOG_ERROR("Invalid index column. name[%s] collection[%s]" , |
159 | index_column.c_str(), collection.c_str()); |
160 | return ErrorCode_InvalidWriteRequest; |
161 | } |
162 | if (column_meta->dimension() != request_index_metas[i].dimension()) { |
163 | LOG_ERROR( |
164 | "Index column dimension mismatched. " |
165 | "meta[%u] request[%u] column[%s] collection[%s]" , |
166 | column_meta->dimension(), request_index_metas[i].dimension(), |
167 | index_column.c_str(), collection.c_str()); |
168 | return ErrorCode_InvalidWriteRequest; |
169 | } |
170 | } |
171 | |
172 | // check forward |
173 | auto &request_forward_columns = request.row_meta().forward_column_names(); |
174 | size_t request_forward_size = |
175 | static_cast<size_t>(request_forward_columns.size()); |
176 | size_t meta_forward_size = meta.forward_columns().size(); |
177 | if (meta_forward_size < request_forward_size) { |
178 | LOG_ERROR( |
179 | "Collection forward columns size mismatched. meta[%zu] " |
180 | "request[%zu] collection[%s]" , |
181 | meta_forward_size, request_forward_size, collection.c_str()); |
182 | return ErrorCode_InvalidWriteRequest; |
183 | } |
184 | if (!forward_full_match) { |
185 | auto &forward_order = column_order.get_forward_order(); |
186 | for (auto &forward_column : request_forward_columns) { |
187 | auto it = forward_order.find(forward_column); |
188 | if (it == forward_order.end()) { |
189 | LOG_ERROR( |
190 | "Collection forward field invalid. request[%s] collection[%s]" , |
191 | forward_column.c_str(), collection.c_str()); |
192 | return ErrorCode_InvalidWriteRequest; |
193 | } |
194 | } |
195 | } |
196 | |
197 | // check index&forward data |
198 | for (int i = 0; i < request.rows_size(); ++i) { |
199 | auto &row = request.rows(i); |
200 | if (row.operation_type() == proxima::be::proto::OP_DELETE) { |
201 | continue; |
202 | } |
203 | if (!index_column_size) { |
204 | LOG_ERROR("Row index column names is empty. collection[%s]" , |
205 | collection.c_str()); |
206 | return ErrorCode_InvalidWriteRequest; |
207 | } |
208 | size_t index_value_size = |
209 | static_cast<size_t>(row.index_column_values().values_size()); |
210 | if (index_value_size != index_column_size) { |
211 | LOG_ERROR( |
212 | "Row index columns size mismatched. meta[%zu] " |
213 | "values[%zu] collection[%s]" , |
214 | index_column_size, index_value_size, collection.c_str()); |
215 | return ErrorCode_InvalidWriteRequest; |
216 | } |
217 | |
218 | size_t forward_value_size = |
219 | static_cast<size_t>(row.forward_column_values().values_size()); |
220 | if (forward_value_size != request_forward_size) { |
221 | LOG_ERROR( |
222 | "Row forward columns size mismatched. meta[%zu] " |
223 | "values[%zu] collection[%s]" , |
224 | request_forward_size, forward_value_size, collection.c_str()); |
225 | return ErrorCode_InvalidWriteRequest; |
226 | } |
227 | } |
228 | |
229 | return 0; |
230 | } |
231 | |
232 | int WriteRequestBuilder::build_proxy_request( |
233 | const meta::CollectionMeta &meta, const agent::ColumnOrder &column_order, |
234 | const proto::WriteRequest &pb_request, bool index_full_match, |
235 | bool forward_full_match, agent::WriteRequest *write_request) { |
236 | auto &row_meta = pb_request.row_meta(); |
237 | auto &collection = pb_request.collection_name(); |
238 | |
239 | for (int i = 0; i < pb_request.rows_size(); ++i) { |
240 | // schema revision default 0 |
241 | index::CollectionDatasetPtr record = |
242 | std::make_shared<index::CollectionDataset>(0); |
243 | auto &row = pb_request.rows(i); |
244 | int ret = build_record(row, row_meta, meta, column_order, index_full_match, |
245 | forward_full_match, record.get()); |
246 | if (ret != 0) { |
247 | LOG_ERROR("Build record failed. id[%d] collection[%s]" , i, |
248 | collection.c_str()); |
249 | return ret; |
250 | } |
251 | write_request->add_collection_dataset(record); |
252 | } |
253 | |
254 | write_request->set_magic_number(pb_request.magic_number()); |
255 | write_request->set_collection_name(collection); |
256 | |
257 | return 0; |
258 | } |
259 | |
260 | int WriteRequestBuilder::build_direct_request( |
261 | const meta::CollectionMeta &meta, const agent::ColumnOrder &column_order, |
262 | const proto::WriteRequest &pb_request, bool index_full_match, |
263 | bool forward_full_match, agent::WriteRequest *write_request) { |
264 | auto &row_meta = pb_request.row_meta(); |
265 | auto &collection = pb_request.collection_name(); |
266 | |
267 | // schema revision default 0 |
268 | index::CollectionDatasetPtr dataset = |
269 | std::make_shared<index::CollectionDataset>(0); |
270 | |
271 | for (int i = 0; i < pb_request.rows_size(); ++i) { |
272 | auto &row = pb_request.rows(i); |
273 | int ret = build_record(row, row_meta, meta, column_order, index_full_match, |
274 | forward_full_match, dataset.get()); |
275 | if (ret != 0) { |
276 | LOG_ERROR("Build record failed. id[%d] collection[%s]" , i, |
277 | collection.c_str()); |
278 | return ret; |
279 | } |
280 | } |
281 | |
282 | write_request->add_collection_dataset(dataset); |
283 | write_request->set_collection_name(collection); |
284 | |
285 | return 0; |
286 | } |
287 | |
288 | int WriteRequestBuilder::build_record( |
289 | const proto::WriteRequest::Row &row, |
290 | const proto::WriteRequest::RowMeta &row_meta, |
291 | const meta::CollectionMeta &meta, const agent::ColumnOrder &column_order, |
292 | bool index_full_match, bool forward_full_match, |
293 | index::CollectionDataset *dataset) { |
294 | auto *row_data = dataset->add_row_data(); |
295 | row_data->primary_key = row.primary_key(); |
296 | |
297 | // set lsn context |
298 | if (meta.repository()) { |
299 | if (!row.has_lsn_context()) { |
300 | LOG_ERROR("Row not set lsn_context field. pk[%zu] collection[%s]" , |
301 | (size_t)row.primary_key(), meta.name().c_str()); |
302 | return ErrorCode_EmptyLsnContext; |
303 | } |
304 | row_data->lsn_check = true; |
305 | auto &lsn_ctx = row.lsn_context(); |
306 | row_data->lsn = lsn_ctx.lsn(); |
307 | row_data->lsn_context = lsn_ctx.context(); |
308 | } else { |
309 | row_data->lsn_check = false; |
310 | } |
311 | |
312 | row_data->operation_type = OperationTypesCodeBook::Get(row.operation_type()); |
313 | |
314 | if (row_data->operation_type == OperationTypes::DELETE) { |
315 | return 0; |
316 | } |
317 | |
318 | // build forwards data |
319 | int ret = build_forwards_data(row, row_meta, column_order, meta, |
320 | forward_full_match, row_data); |
321 | if (ret != 0) { |
322 | LOG_ERROR("Build forwards data failed. collection[%s]" , |
323 | meta.name().c_str()); |
324 | return ret; |
325 | } |
326 | |
327 | // build index data |
328 | ret = build_indexes_data(row, row_meta, meta, index_full_match, row_data); |
329 | if (ret != 0) { |
330 | LOG_ERROR("Build indexes data failed. collection[%s]" , meta.name().c_str()); |
331 | return ret; |
332 | } |
333 | |
334 | return 0; |
335 | } |
336 | |
337 | int WriteRequestBuilder::build_forwards_data( |
338 | const proto::WriteRequest::Row &row, |
339 | const proto::WriteRequest::RowMeta &row_meta, |
340 | const agent::ColumnOrder &column_order, const meta::CollectionMeta &meta, |
341 | bool forward_full_match, index::CollectionDataset::RowData *row_data) { |
342 | // if forward_full_match is true, direct serialized |
343 | auto *forward_data = &(row_data->forward_data); |
344 | if (forward_full_match) { |
345 | if (!row.forward_column_values().SerializeToString(forward_data)) { |
346 | LOG_ERROR("Forward columns serialize failed. collection[%s]" , |
347 | meta.name().c_str()); |
348 | return ErrorCode_SerializeError; |
349 | } |
350 | return 0; |
351 | } |
352 | |
353 | // init the value list |
354 | proto::GenericValueList value_list; |
355 | auto &forward_order = column_order.get_forward_order(); |
356 | size_t meta_forward_size = meta.forward_columns().size(); |
357 | for (size_t i = 0; i < meta_forward_size; ++i) { |
358 | value_list.add_values(); |
359 | } |
360 | |
361 | // fill the value list |
362 | auto &request_forward_columns = row_meta.forward_column_names(); |
363 | auto &forward_values = row.forward_column_values().values(); |
364 | for (int i = 0; i < request_forward_columns.size(); ++i) { |
365 | auto &forward_column = request_forward_columns[i]; |
366 | auto it = forward_order.find(forward_column); |
367 | if (it != forward_order.end()) { |
368 | if (it->second < meta_forward_size) { |
369 | value_list.mutable_values(it->second)->CopyFrom(forward_values[i]); |
370 | } else { |
371 | LOG_ERROR( |
372 | "Forward order invalid. forward[%s] index[%zu] " |
373 | "max_size[%zu] collection[%s]" , |
374 | forward_column.c_str(), it->second, meta_forward_size, |
375 | meta.name().c_str()); |
376 | return ErrorCode_RuntimeError; |
377 | } |
378 | } else { |
379 | LOG_ERROR("Find forward order failed. forward[%s] collection[%s]" , |
380 | forward_column.c_str(), meta.name().c_str()); |
381 | return ErrorCode_InvalidWriteRequest; |
382 | } |
383 | } |
384 | |
385 | // copy forward data |
386 | if (!value_list.SerializeToString(forward_data)) { |
387 | LOG_ERROR("Forward columns serialize failed. collection[%s]" , |
388 | meta.name().c_str()); |
389 | return ErrorCode_SerializeError; |
390 | } |
391 | |
392 | return 0; |
393 | } |
394 | |
395 | int WriteRequestBuilder::build_indexes_data( |
396 | const proto::WriteRequest::Row &row, |
397 | const proto::WriteRequest::RowMeta &row_meta, |
398 | const meta::CollectionMeta &meta, bool index_full_match, |
399 | index::CollectionDataset::RowData *row_data) { |
400 | auto &index_column_metas = row_meta.index_column_metas(); |
401 | int index_column_size = index_column_metas.size(); |
402 | row_data->column_datas.resize(index_column_size); |
403 | |
404 | auto &index_values = row.index_column_values().values(); |
405 | auto &column_meta_list = meta.index_columns(); |
406 | for (int i = 0; i < index_column_size; ++i) { |
407 | meta::ColumnMetaPtr column_meta; |
408 | if (index_full_match) { |
409 | column_meta = column_meta_list[i]; |
410 | } else { |
411 | column_meta = meta.column_by_name(index_column_metas[i].column_name()); |
412 | } |
413 | if (!column_meta) { |
414 | LOG_ERROR("Find index column failed. column[%s] collection[%s]" , |
415 | index_column_metas[i].column_name().c_str(), |
416 | meta.name().c_str()); |
417 | return ErrorCode_MismatchedIndexColumn; |
418 | } |
419 | auto value_type = index_values[i].value_oneof_case(); |
420 | if ((column_meta->index_type() != IndexTypes::PROXIMA_GRAPH_INDEX) || |
421 | (value_type != proto::GenericValue::ValueOneofCase::kStringValue && |
422 | value_type != proto::GenericValue::ValueOneofCase::kBytesValue)) { |
423 | LOG_ERROR( |
424 | "Only support PROXIMA_GRAPH_INDEX && (string or bytes) type." |
425 | " collection[%s]" , |
426 | meta.name().c_str()); |
427 | return ErrorCode_MismatchedIndexColumn; |
428 | } |
429 | |
430 | int ret = 0; |
431 | auto &column_data = row_data->column_datas[i]; |
432 | if (value_type == proto::GenericValue::ValueOneofCase::kStringValue) { |
433 | ret = ProtoConverter::ConvertIndexData( |
434 | index_values[i].string_value(), *column_meta, index_column_metas[i], |
435 | false, &column_data); |
436 | } else { |
437 | ret = ProtoConverter::ConvertIndexData( |
438 | index_values[i].bytes_value(), *column_meta, index_column_metas[i], |
439 | true, &column_data); |
440 | } |
441 | if (ret != 0) { |
442 | LOG_ERROR("Convert collection index data failed. collection[%s]" , |
443 | meta.name().c_str()); |
444 | return ret; |
445 | } |
446 | } |
447 | |
448 | return 0; |
449 | } |
450 | |
451 | |
452 | } // end namespace server |
453 | } // namespace be |
454 | } // end namespace proxima |
455 | |