1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Hongqing.hu
17 * \date Mar 2021
18 * \brief Implements write request builder interface
19 */
20
21#include "write_request_builder.h"
22#include "common/types_helper.h"
23#include "proto_converter.h"
24
25namespace proxima {
26namespace be {
27namespace server {
28
29using RequestType = agent::WriteRequest::RequestType;
30
31int WriteRequestBuilder::build(const meta::CollectionMeta &meta,
32 const agent::ColumnOrder &column_order,
33 const proto::WriteRequest &pb_request,
34 agent::WriteRequest *write_request) {
35 // get if indexes and forwards sequence are strictly match collection meta
36 bool index_full_match = false;
37 bool forward_full_match = false;
38 get_index_and_forward_mode(pb_request, meta, &index_full_match,
39 &forward_full_match);
40 // validate indexes and forward values size
41 int ret = validate_request(pb_request, meta, column_order, index_full_match,
42 forward_full_match);
43 if (ret != 0) {
44 return ret;
45 }
46
47 RequestType request_type =
48 meta.repository() ? RequestType::PROXY : RequestType::DIRECT;
49 if (request_type == RequestType::PROXY) {
50 ret = build_proxy_request(meta, column_order, pb_request, index_full_match,
51 forward_full_match, write_request);
52 } else {
53 ret = build_direct_request(meta, column_order, pb_request, index_full_match,
54 forward_full_match, write_request);
55 }
56 if (ret != 0) {
57 LOG_ERROR("Build write request failed. collection[%s]",
58 pb_request.collection_name().c_str());
59 return ret;
60 }
61
62 write_request->set_request_type(request_type);
63
64 return 0;
65}
66
67void WriteRequestBuilder::get_index_and_forward_mode(
68 const proto::WriteRequest &request, const meta::CollectionMeta &meta,
69 bool *index_full_match, bool *forward_full_match) {
70 // check index columns size
71 auto &request_index_columns = request.row_meta().index_column_metas();
72 auto &meta_index_columns = meta.index_columns();
73 size_t index_column_size = static_cast<size_t>(request_index_columns.size());
74 size_t meta_columns_size = meta_index_columns.size();
75
76 // get index column full match
77 *index_full_match = false;
78 if (meta_columns_size == index_column_size) {
79 size_t i = 0;
80 for (i = 0; i < meta_columns_size; ++i) {
81 if (meta_index_columns[i]->name() !=
82 request_index_columns[i].column_name()) {
83 break;
84 }
85 }
86 if (i == meta_columns_size) {
87 *index_full_match = true;
88 }
89 }
90
91 // get forward column full match
92 auto &request_forward_columns = request.row_meta().forward_column_names();
93 auto &meta_forward_columns = meta.forward_columns();
94 size_t request_forward_size =
95 static_cast<size_t>(request_forward_columns.size());
96 size_t meta_forward_size = meta_forward_columns.size();
97 *forward_full_match = false;
98 if (meta_forward_size == request_forward_size) {
99 size_t i = 0;
100 for (i = 0; i < meta_forward_size; ++i) {
101 if (meta_forward_columns[i] != request_forward_columns[i]) {
102 break;
103 }
104 }
105 if (i == meta_forward_size) {
106 *forward_full_match = true;
107 }
108 }
109}
110
111int WriteRequestBuilder::validate_request(
112 const proto::WriteRequest &request, const meta::CollectionMeta &meta,
113 const agent::ColumnOrder &column_order, bool index_full_match,
114 bool forward_full_match) {
115 auto &collection = request.collection_name();
116 auto &request_index_metas = request.row_meta().index_column_metas();
117 size_t index_column_size = static_cast<size_t>(request_index_metas.size());
118 size_t meta_index_size = meta.index_columns().size();
119 // check request is empty
120 if (!request.rows_size()) {
121 LOG_ERROR("Write request is empty. collection[%s]", collection.c_str());
122 return ErrorCode_InvalidWriteRequest;
123 }
124
125 // check index
126 if (index_column_size != (size_t)request_index_metas.size()) {
127 LOG_ERROR(
128 "Collection index column meta size mismatched. "
129 "meta[%zu] index[%zu] collection[%s]",
130 index_column_size, (size_t)request_index_metas.size(),
131 collection.c_str());
132 return ErrorCode_InvalidWriteRequest;
133 }
134
135 if (meta_index_size < index_column_size) {
136 LOG_ERROR(
137 "Collection index columns size mismatched. meta[%zu] "
138 "request[%zu] collection[%s]",
139 meta_index_size, index_column_size, collection.c_str());
140 return ErrorCode_InvalidWriteRequest;
141 }
142 if (!index_full_match) {
143 auto &index_order = column_order.get_index_order();
144 for (size_t i = 0; i < index_column_size; ++i) {
145 auto &index_column = request_index_metas[i].column_name();
146 auto it = index_order.find(index_column);
147 if (it == index_order.end()) {
148 LOG_ERROR("Collection index field invalid. request[%s] collection[%s]",
149 index_column.c_str(), collection.c_str());
150 return ErrorCode_InvalidWriteRequest;
151 }
152 }
153 }
154 for (size_t i = 0; i < index_column_size; ++i) {
155 auto &index_column = request_index_metas[i].column_name();
156 auto column_meta = meta.column_by_name(index_column);
157 if (!column_meta) {
158 LOG_ERROR("Invalid index column. name[%s] collection[%s]",
159 index_column.c_str(), collection.c_str());
160 return ErrorCode_InvalidWriteRequest;
161 }
162 if (column_meta->dimension() != request_index_metas[i].dimension()) {
163 LOG_ERROR(
164 "Index column dimension mismatched. "
165 "meta[%u] request[%u] column[%s] collection[%s]",
166 column_meta->dimension(), request_index_metas[i].dimension(),
167 index_column.c_str(), collection.c_str());
168 return ErrorCode_InvalidWriteRequest;
169 }
170 }
171
172 // check forward
173 auto &request_forward_columns = request.row_meta().forward_column_names();
174 size_t request_forward_size =
175 static_cast<size_t>(request_forward_columns.size());
176 size_t meta_forward_size = meta.forward_columns().size();
177 if (meta_forward_size < request_forward_size) {
178 LOG_ERROR(
179 "Collection forward columns size mismatched. meta[%zu] "
180 "request[%zu] collection[%s]",
181 meta_forward_size, request_forward_size, collection.c_str());
182 return ErrorCode_InvalidWriteRequest;
183 }
184 if (!forward_full_match) {
185 auto &forward_order = column_order.get_forward_order();
186 for (auto &forward_column : request_forward_columns) {
187 auto it = forward_order.find(forward_column);
188 if (it == forward_order.end()) {
189 LOG_ERROR(
190 "Collection forward field invalid. request[%s] collection[%s]",
191 forward_column.c_str(), collection.c_str());
192 return ErrorCode_InvalidWriteRequest;
193 }
194 }
195 }
196
197 // check index&forward data
198 for (int i = 0; i < request.rows_size(); ++i) {
199 auto &row = request.rows(i);
200 if (row.operation_type() == proxima::be::proto::OP_DELETE) {
201 continue;
202 }
203 if (!index_column_size) {
204 LOG_ERROR("Row index column names is empty. collection[%s]",
205 collection.c_str());
206 return ErrorCode_InvalidWriteRequest;
207 }
208 size_t index_value_size =
209 static_cast<size_t>(row.index_column_values().values_size());
210 if (index_value_size != index_column_size) {
211 LOG_ERROR(
212 "Row index columns size mismatched. meta[%zu] "
213 "values[%zu] collection[%s]",
214 index_column_size, index_value_size, collection.c_str());
215 return ErrorCode_InvalidWriteRequest;
216 }
217
218 size_t forward_value_size =
219 static_cast<size_t>(row.forward_column_values().values_size());
220 if (forward_value_size != request_forward_size) {
221 LOG_ERROR(
222 "Row forward columns size mismatched. meta[%zu] "
223 "values[%zu] collection[%s]",
224 request_forward_size, forward_value_size, collection.c_str());
225 return ErrorCode_InvalidWriteRequest;
226 }
227 }
228
229 return 0;
230}
231
232int WriteRequestBuilder::build_proxy_request(
233 const meta::CollectionMeta &meta, const agent::ColumnOrder &column_order,
234 const proto::WriteRequest &pb_request, bool index_full_match,
235 bool forward_full_match, agent::WriteRequest *write_request) {
236 auto &row_meta = pb_request.row_meta();
237 auto &collection = pb_request.collection_name();
238
239 for (int i = 0; i < pb_request.rows_size(); ++i) {
240 // schema revision default 0
241 index::CollectionDatasetPtr record =
242 std::make_shared<index::CollectionDataset>(0);
243 auto &row = pb_request.rows(i);
244 int ret = build_record(row, row_meta, meta, column_order, index_full_match,
245 forward_full_match, record.get());
246 if (ret != 0) {
247 LOG_ERROR("Build record failed. id[%d] collection[%s]", i,
248 collection.c_str());
249 return ret;
250 }
251 write_request->add_collection_dataset(record);
252 }
253
254 write_request->set_magic_number(pb_request.magic_number());
255 write_request->set_collection_name(collection);
256
257 return 0;
258}
259
260int WriteRequestBuilder::build_direct_request(
261 const meta::CollectionMeta &meta, const agent::ColumnOrder &column_order,
262 const proto::WriteRequest &pb_request, bool index_full_match,
263 bool forward_full_match, agent::WriteRequest *write_request) {
264 auto &row_meta = pb_request.row_meta();
265 auto &collection = pb_request.collection_name();
266
267 // schema revision default 0
268 index::CollectionDatasetPtr dataset =
269 std::make_shared<index::CollectionDataset>(0);
270
271 for (int i = 0; i < pb_request.rows_size(); ++i) {
272 auto &row = pb_request.rows(i);
273 int ret = build_record(row, row_meta, meta, column_order, index_full_match,
274 forward_full_match, dataset.get());
275 if (ret != 0) {
276 LOG_ERROR("Build record failed. id[%d] collection[%s]", i,
277 collection.c_str());
278 return ret;
279 }
280 }
281
282 write_request->add_collection_dataset(dataset);
283 write_request->set_collection_name(collection);
284
285 return 0;
286}
287
288int WriteRequestBuilder::build_record(
289 const proto::WriteRequest::Row &row,
290 const proto::WriteRequest::RowMeta &row_meta,
291 const meta::CollectionMeta &meta, const agent::ColumnOrder &column_order,
292 bool index_full_match, bool forward_full_match,
293 index::CollectionDataset *dataset) {
294 auto *row_data = dataset->add_row_data();
295 row_data->primary_key = row.primary_key();
296
297 // set lsn context
298 if (meta.repository()) {
299 if (!row.has_lsn_context()) {
300 LOG_ERROR("Row not set lsn_context field. pk[%zu] collection[%s]",
301 (size_t)row.primary_key(), meta.name().c_str());
302 return ErrorCode_EmptyLsnContext;
303 }
304 row_data->lsn_check = true;
305 auto &lsn_ctx = row.lsn_context();
306 row_data->lsn = lsn_ctx.lsn();
307 row_data->lsn_context = lsn_ctx.context();
308 } else {
309 row_data->lsn_check = false;
310 }
311
312 row_data->operation_type = OperationTypesCodeBook::Get(row.operation_type());
313
314 if (row_data->operation_type == OperationTypes::DELETE) {
315 return 0;
316 }
317
318 // build forwards data
319 int ret = build_forwards_data(row, row_meta, column_order, meta,
320 forward_full_match, row_data);
321 if (ret != 0) {
322 LOG_ERROR("Build forwards data failed. collection[%s]",
323 meta.name().c_str());
324 return ret;
325 }
326
327 // build index data
328 ret = build_indexes_data(row, row_meta, meta, index_full_match, row_data);
329 if (ret != 0) {
330 LOG_ERROR("Build indexes data failed. collection[%s]", meta.name().c_str());
331 return ret;
332 }
333
334 return 0;
335}
336
337int WriteRequestBuilder::build_forwards_data(
338 const proto::WriteRequest::Row &row,
339 const proto::WriteRequest::RowMeta &row_meta,
340 const agent::ColumnOrder &column_order, const meta::CollectionMeta &meta,
341 bool forward_full_match, index::CollectionDataset::RowData *row_data) {
342 // if forward_full_match is true, direct serialized
343 auto *forward_data = &(row_data->forward_data);
344 if (forward_full_match) {
345 if (!row.forward_column_values().SerializeToString(forward_data)) {
346 LOG_ERROR("Forward columns serialize failed. collection[%s]",
347 meta.name().c_str());
348 return ErrorCode_SerializeError;
349 }
350 return 0;
351 }
352
353 // init the value list
354 proto::GenericValueList value_list;
355 auto &forward_order = column_order.get_forward_order();
356 size_t meta_forward_size = meta.forward_columns().size();
357 for (size_t i = 0; i < meta_forward_size; ++i) {
358 value_list.add_values();
359 }
360
361 // fill the value list
362 auto &request_forward_columns = row_meta.forward_column_names();
363 auto &forward_values = row.forward_column_values().values();
364 for (int i = 0; i < request_forward_columns.size(); ++i) {
365 auto &forward_column = request_forward_columns[i];
366 auto it = forward_order.find(forward_column);
367 if (it != forward_order.end()) {
368 if (it->second < meta_forward_size) {
369 value_list.mutable_values(it->second)->CopyFrom(forward_values[i]);
370 } else {
371 LOG_ERROR(
372 "Forward order invalid. forward[%s] index[%zu] "
373 "max_size[%zu] collection[%s]",
374 forward_column.c_str(), it->second, meta_forward_size,
375 meta.name().c_str());
376 return ErrorCode_RuntimeError;
377 }
378 } else {
379 LOG_ERROR("Find forward order failed. forward[%s] collection[%s]",
380 forward_column.c_str(), meta.name().c_str());
381 return ErrorCode_InvalidWriteRequest;
382 }
383 }
384
385 // copy forward data
386 if (!value_list.SerializeToString(forward_data)) {
387 LOG_ERROR("Forward columns serialize failed. collection[%s]",
388 meta.name().c_str());
389 return ErrorCode_SerializeError;
390 }
391
392 return 0;
393}
394
395int WriteRequestBuilder::build_indexes_data(
396 const proto::WriteRequest::Row &row,
397 const proto::WriteRequest::RowMeta &row_meta,
398 const meta::CollectionMeta &meta, bool index_full_match,
399 index::CollectionDataset::RowData *row_data) {
400 auto &index_column_metas = row_meta.index_column_metas();
401 int index_column_size = index_column_metas.size();
402 row_data->column_datas.resize(index_column_size);
403
404 auto &index_values = row.index_column_values().values();
405 auto &column_meta_list = meta.index_columns();
406 for (int i = 0; i < index_column_size; ++i) {
407 meta::ColumnMetaPtr column_meta;
408 if (index_full_match) {
409 column_meta = column_meta_list[i];
410 } else {
411 column_meta = meta.column_by_name(index_column_metas[i].column_name());
412 }
413 if (!column_meta) {
414 LOG_ERROR("Find index column failed. column[%s] collection[%s]",
415 index_column_metas[i].column_name().c_str(),
416 meta.name().c_str());
417 return ErrorCode_MismatchedIndexColumn;
418 }
419 auto value_type = index_values[i].value_oneof_case();
420 if ((column_meta->index_type() != IndexTypes::PROXIMA_GRAPH_INDEX) ||
421 (value_type != proto::GenericValue::ValueOneofCase::kStringValue &&
422 value_type != proto::GenericValue::ValueOneofCase::kBytesValue)) {
423 LOG_ERROR(
424 "Only support PROXIMA_GRAPH_INDEX && (string or bytes) type."
425 " collection[%s]",
426 meta.name().c_str());
427 return ErrorCode_MismatchedIndexColumn;
428 }
429
430 int ret = 0;
431 auto &column_data = row_data->column_datas[i];
432 if (value_type == proto::GenericValue::ValueOneofCase::kStringValue) {
433 ret = ProtoConverter::ConvertIndexData(
434 index_values[i].string_value(), *column_meta, index_column_metas[i],
435 false, &column_data);
436 } else {
437 ret = ProtoConverter::ConvertIndexData(
438 index_values[i].bytes_value(), *column_meta, index_column_metas[i],
439 true, &column_data);
440 }
441 if (ret != 0) {
442 LOG_ERROR("Convert collection index data failed. collection[%s]",
443 meta.name().c_str());
444 return ret;
445 }
446 }
447
448 return 0;
449}
450
451
452} // end namespace server
453} // namespace be
454} // end namespace proxima
455