1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation with grpc protocol protocol
19 */
20
21#include "grpc_client.h"
22#include <iostream>
23#include <ailego/utility/string_helper.h>
24#include "version.h"
25
26namespace proxima {
27namespace be {
28
29Status GrpcProximaSearchClient::connect(const ChannelOptions &options) {
30 Status status;
31
32 brpc::ChannelOptions brpc_options;
33 brpc_options.protocol = "h2:grpc";
34 brpc_options.timeout_ms = options.timeout_ms;
35 brpc_options.max_retry = options.max_retry;
36
37 int ret = client_channel_.Init("rr", &brpc_options);
38 if (ret != 0) {
39 status.code = ErrorCode_InitChannel;
40 status.reason = "Init client channel failed";
41 return status;
42 }
43
44 for (uint32_t i = 0; i < options.connection_count; i++) {
45 auto *sub_channel = new brpc::Channel;
46 brpc_options.connection_group =
47 std::string("group").append(std::to_string(i));
48 ret = sub_channel->Init(options.host.c_str(), &brpc_options);
49 if (ret != 0) {
50 status.code = ErrorCode_InitChannel;
51 status.reason = "Init sub client channel failed.";
52 return status;
53 }
54
55 ret = client_channel_.AddChannel(sub_channel, nullptr);
56 if (ret != 0) {
57 status.code = ErrorCode_InitChannel;
58 status.reason = "Add sub channel failed.";
59 return status;
60 }
61 }
62
63 // check versions of client and server if matched first
64 if (!check_server_version(&status)) {
65 return status;
66 }
67
68 connected_ = true;
69 return status;
70}
71
72Status GrpcProximaSearchClient::close() {
73 // DO NOTHING NOW
74 connected_ = false;
75 return Status();
76}
77
78#define CHECK_CONNECTED() \
79 if (!connected_) { \
80 status.code = ErrorCode_NotConnected; \
81 status.reason = "Not connected yet"; \
82 return status; \
83 }
84
85#define RETURN_STATUS(cntl, response) \
86 if (cntl.Failed()) { \
87 status.code = ErrorCode_RpcError; \
88 status.reason = cntl.ErrorText(); \
89 } else { \
90 status.code = response.code(); \
91 status.reason = response.reason(); \
92 } \
93 return status;
94
95
96Status GrpcProximaSearchClient::create_collection(
97 const CollectionConfig &config) {
98 Status status;
99
100 // Check connected
101 CHECK_CONNECTED();
102
103 // Validate request
104 status = this->validate(config);
105 if (status.code != 0) {
106 return status;
107 }
108
109 // Prepare
110 brpc::Controller cntl;
111 proto::CollectionConfig request;
112 this->convert(config, &request);
113 proto::Status response;
114
115 this->rpc_create_collection(&cntl, &request, &response);
116
117 RETURN_STATUS(cntl, response);
118}
119
120Status GrpcProximaSearchClient::drop_collection(
121 const std::string &collection_name) {
122 Status status;
123
124 // Check connected
125 CHECK_CONNECTED();
126
127 // Validate request
128 if (collection_name.empty()) {
129 status.code = ErrorCode_ValidateError;
130 status.reason = "Collection name can't be empty";
131 return status;
132 }
133
134 // Prepare
135 brpc::Controller cntl;
136 proto::Status response;
137 proto::CollectionName request;
138 request.set_collection_name(collection_name);
139
140 this->rpc_drop_collection(&cntl, &request, &response);
141
142 // Send rpc request
143 RETURN_STATUS(cntl, response);
144}
145
146Status GrpcProximaSearchClient::describe_collection(
147 const std::string &collection_name, CollectionInfo *collection_info) {
148 Status status;
149
150 // Check connected
151 CHECK_CONNECTED();
152
153 // Validate request
154 if (collection_name.empty()) {
155 status.code = ErrorCode_ValidateError;
156 status.reason = "Collection name can't be empty";
157 return status;
158 }
159
160 // Prepare
161 brpc::Controller cntl;
162 proto::DescribeCollectionResponse response;
163 proto::CollectionName request;
164 request.set_collection_name(collection_name);
165
166 // Send rpc request
167 this->rpc_describe_collection(&cntl, &request, &response);
168
169 // Transform response
170 if (!cntl.Failed() && response.status().code() == 0) {
171 this->convert(response.collection(), collection_info);
172 }
173
174 RETURN_STATUS(cntl, response.status());
175}
176
177Status GrpcProximaSearchClient::stats_collection(
178 const std::string &collection_name, CollectionStats *stats) {
179 Status status;
180
181 // Check connected
182 CHECK_CONNECTED();
183
184 // Validate request
185 if (collection_name.empty()) {
186 status.code = ErrorCode_ValidateError;
187 status.reason = "Collection name can't be empty";
188 return status;
189 }
190
191 // Prepare
192 brpc::Controller cntl;
193 proto::StatsCollectionResponse response;
194 proto::CollectionName request;
195 request.set_collection_name(collection_name);
196
197 this->rpc_stats_collection(&cntl, &request, &response);
198
199 // Transform response
200 if (!cntl.Failed() && response.status().code() == 0) {
201 this->convert(response.collection_stats(), stats);
202 }
203
204 RETURN_STATUS(cntl, response.status());
205}
206
207Status GrpcProximaSearchClient::list_collections(
208 std::vector<CollectionInfo> *collections) {
209 Status status;
210
211 // Check connected
212 CHECK_CONNECTED();
213
214 // Prepare
215 brpc::Controller cntl;
216 proto::ListCollectionsResponse response;
217 proto::ListCondition request;
218
219 // Send rpc request
220 this->rpc_list_collections(&cntl, &request, &response);
221
222 // Transform response
223 if (!cntl.Failed() && response.status().code() == 0) {
224 for (int i = 0; i < response.collections_size(); i++) {
225 CollectionInfo ci;
226 this->convert(response.collections(i), &ci);
227 collections->emplace_back(ci);
228 }
229 }
230
231 RETURN_STATUS(cntl, response.status());
232}
233
234Status GrpcProximaSearchClient::write(const WriteRequest &write_request) {
235 Status status;
236
237 // Check connected
238 CHECK_CONNECTED();
239
240 // Validate
241 auto &pb_req = (const PbWriteRequest &)write_request;
242 status = this->validate(pb_req);
243 if (status.code != 0) {
244 return status;
245 }
246
247 // Prepare
248 brpc::Controller cntl;
249 proto::Status response;
250
251 this->rpc_write(&cntl, pb_req.data(), &response);
252
253 RETURN_STATUS(cntl, response);
254}
255
256Status GrpcProximaSearchClient::query(const QueryRequest &query_request,
257 QueryResponse *query_response) {
258 Status status;
259
260 // Check connected
261 CHECK_CONNECTED();
262
263 // Validate
264 auto &pb_req = (const PbQueryRequest &)query_request;
265 auto *pb_resp = (PbQueryResponse *)query_response;
266
267 status = this->validate(pb_req);
268 if (status.code != 0) {
269 return status;
270 }
271
272 // Prepare
273 brpc::Controller cntl;
274
275 this->rpc_query(&cntl, pb_req.data(), pb_resp->data());
276
277 RETURN_STATUS(cntl, pb_resp->data()->status());
278}
279
280Status GrpcProximaSearchClient::get_document_by_key(
281 const GetDocumentRequest &get_request, GetDocumentResponse *get_response) {
282 Status status;
283
284 // Check connected
285 CHECK_CONNECTED();
286
287 // Validate
288 auto &pb_req = (const PbGetDocumentRequest &)get_request;
289 auto *pb_resp = (PbGetDocumentResponse *)get_response;
290
291 status = this->validate(pb_req);
292 if (status.code != 0) {
293 return status;
294 }
295
296 // Prepare
297 brpc::Controller cntl;
298
299 this->rpc_get_document_by_key(&cntl, pb_req.data(), pb_resp->data());
300
301 RETURN_STATUS(cntl, pb_resp->data()->status());
302}
303
304bool GrpcProximaSearchClient::check_server_version(Status *status) {
305 proto::ProximaService_Stub stub(&client_channel_);
306 brpc::Controller cntl;
307 proto::GetVersionRequest request;
308 proto::GetVersionResponse response;
309
310 stub.get_version(&cntl, &request, &response, nullptr);
311 if (cntl.Failed()) {
312 status->code = ErrorCode_RpcError;
313 status->reason = cntl.ErrorText();
314 return false;
315 }
316
317 if (response.status().code() != 0) {
318 status->code = response.status().code();
319 status->reason = response.status().reason();
320 }
321
322 std::string server_version = response.version();
323 std::string client_version = Version::String();
324 if (server_version == client_version) {
325 return true;
326 }
327
328 // Temporarily we just use first two seq number of version string to compare
329 // For exp: version[0.1.2] match version[0.1.3] with "0.1"
330 std::vector<std::string> server_sub_seqs;
331 ailego::StringHelper::Split(server_version, '.', &server_sub_seqs);
332 std::vector<std::string> client_sub_seqs;
333 ailego::StringHelper::Split(client_version, '.', &client_sub_seqs);
334
335 int compare_count = 2;
336 for (int i = 0; i < compare_count; i++) {
337 if (client_sub_seqs[i] != server_sub_seqs[i]) {
338 status->code = ErrorCode_MismatchedVersion;
339 status->reason = std::string()
340 .append("client version:")
341 .append(Version::String())
342 .append(" not match server version:")
343 .append(server_version);
344 return false;
345 }
346 }
347
348 return true;
349}
350
351void GrpcProximaSearchClient::convert(const CollectionConfig &config,
352 proto::CollectionConfig *pb_request) {
353 pb_request->set_collection_name(config.collection_name);
354 pb_request->set_max_docs_per_segment(config.max_docs_per_segment);
355
356 for (auto &it : config.forward_columns) {
357 pb_request->add_forward_column_names(it);
358 }
359
360 for (auto &it : config.index_columns) {
361 auto *param = pb_request->add_index_column_params();
362 param->set_column_name(it.column_name);
363 param->set_index_type((proto::IndexType)it.index_type);
364 param->set_data_type((proto::DataType)it.data_type);
365 param->set_dimension(it.dimension);
366
367 for (auto &kv : it.extra_params) {
368 auto *extra_param = param->add_extra_params();
369 extra_param->set_key(kv.key);
370 extra_param->set_value(kv.value);
371 }
372 }
373
374 auto &input_repo = config.database_repository;
375 if (!input_repo.repository_name.empty()) {
376 auto *repo_config = pb_request->mutable_repository_config();
377 repo_config->set_repository_type(
378 proto::CollectionConfig::RepositoryConfig::RT_DATABASE);
379 repo_config->set_repository_name(input_repo.repository_name);
380 repo_config->mutable_database()->set_connection_uri(
381 input_repo.connection_uri);
382 repo_config->mutable_database()->set_table_name(input_repo.table_name);
383 repo_config->mutable_database()->set_user(input_repo.user);
384 repo_config->mutable_database()->set_password(input_repo.password);
385 }
386}
387
388void GrpcProximaSearchClient::convert(const proto::CollectionInfo &pb_response,
389 CollectionInfo *collection_info) {
390 collection_info->collection_name = pb_response.config().collection_name();
391 collection_info->collection_status =
392 (CollectionInfo::CollectionStatus)pb_response.status();
393 collection_info->collection_uuid = pb_response.uuid();
394 collection_info->latest_lsn = pb_response.latest_lsn_context().lsn();
395 collection_info->latest_lsn_context =
396 pb_response.latest_lsn_context().context();
397 collection_info->magic_number = pb_response.magic_number();
398 collection_info->max_docs_per_segment =
399 pb_response.config().max_docs_per_segment();
400
401 // copy forward columns
402 for (int i = 0; i < pb_response.config().forward_column_names_size(); i++) {
403 collection_info->forward_columns.emplace_back(
404 pb_response.config().forward_column_names(i));
405 }
406
407 // copy index columns
408 for (int i = 0; i < pb_response.config().index_column_params_size(); i++) {
409 auto &rp = pb_response.config().index_column_params(i);
410 IndexColumnParam index_param;
411 index_param.column_name = rp.column_name();
412 index_param.index_type = (IndexType)rp.index_type();
413 index_param.data_type = (DataType)rp.data_type();
414 index_param.dimension = rp.dimension();
415 for (int j = 0; j < rp.extra_params_size(); j++) {
416 KVPair pair;
417 pair.key = rp.extra_params(j).key();
418 pair.value = rp.extra_params(j).value();
419 index_param.extra_params.emplace_back(pair);
420 }
421 collection_info->index_columns.emplace_back(index_param);
422 }
423
424 // copy database repository
425 if (pb_response.config().has_repository_config() &&
426 pb_response.config().repository_config().repository_type() ==
427 proto::CollectionConfig::RepositoryConfig::RT_DATABASE) {
428 auto &rc = pb_response.config().repository_config();
429 collection_info->database_repository.repository_name = rc.repository_name();
430 collection_info->database_repository.connection_uri =
431 rc.database().connection_uri();
432 collection_info->database_repository.table_name =
433 rc.database().table_name();
434 collection_info->database_repository.user = rc.database().user();
435 collection_info->database_repository.password = rc.database().password();
436 }
437}
438
439void GrpcProximaSearchClient::convert(const proto::CollectionStats &pb_response,
440 CollectionStats *collection_stats) {
441 collection_stats->collection_name = pb_response.collection_name();
442 collection_stats->total_doc_count = pb_response.total_doc_count();
443 collection_stats->total_segment_count = pb_response.total_segment_count();
444 collection_stats->total_index_file_count =
445 pb_response.total_index_file_count();
446 collection_stats->total_index_file_size = pb_response.total_index_file_size();
447
448 for (int i = 0; i < pb_response.segment_stats_size(); i++) {
449 auto &ss = pb_response.segment_stats(i);
450 CollectionStats::SegmentStats segment_stats;
451 segment_stats.segment_id = ss.segment_id();
452 segment_stats.segment_state = (CollectionStats::SegmentState)ss.state();
453 segment_stats.doc_count = ss.doc_count();
454 segment_stats.index_file_count = ss.index_file_count();
455 segment_stats.index_file_size = ss.index_file_size();
456 segment_stats.min_doc_id = ss.min_doc_id();
457 segment_stats.max_doc_id = ss.max_doc_id();
458 segment_stats.min_primary_key = ss.min_primary_key();
459 segment_stats.max_primary_key = ss.max_primary_key();
460 segment_stats.min_timestamp = ss.min_timestamp();
461 segment_stats.max_timestamp = ss.max_timestamp();
462 segment_stats.min_lsn = ss.min_lsn();
463 segment_stats.max_lsn = ss.max_lsn();
464 collection_stats->segment_stats.emplace_back(segment_stats);
465 }
466}
467
468Status GrpcProximaSearchClient::validate(const CollectionConfig &config) {
469 Status status;
470 if (config.collection_name.empty()) {
471 status.code = ErrorCode_ValidateError;
472 status.reason = "Collection name can't be empty";
473 return status;
474 }
475
476 if (config.index_columns.size() == 0) {
477 status.code = ErrorCode_ValidateError;
478 status.reason = "Index columns can't be empty";
479 return status;
480 }
481
482 for (auto &index_column : config.index_columns) {
483 if (index_column.column_name.empty()) {
484 status.code = ErrorCode_ValidateError;
485 status.reason = "Column name can't be empty";
486 return status;
487 }
488
489 if (index_column.dimension == 0U) {
490 status.code = ErrorCode_ValidateError;
491 status.reason = "Dimension can't be 0";
492 return status;
493 }
494
495 if (index_column.data_type == DataType::UNDEFINED) {
496 status.code = ErrorCode_ValidateError;
497 status.reason = "Data type can't be undefined";
498 return status;
499 }
500 }
501 return status;
502}
503
504Status GrpcProximaSearchClient::validate(const PbWriteRequest &request) {
505 Status status;
506 auto *wreq = request.data();
507 if (!wreq) {
508 status.code = ErrorCode_ValidateError;
509 status.reason = "Invalid write request";
510 return status;
511 }
512
513 if (wreq->collection_name().empty()) {
514 status.code = ErrorCode_ValidateError;
515 status.reason = "Collection name can't be empty";
516 return status;
517 }
518
519 if (wreq->rows_size() == 0) {
520 status.code = ErrorCode_ValidateError;
521 status.reason = "Rows can't be empty";
522 return status;
523 }
524
525 for (int i = 0; i < wreq->rows_size(); i++) {
526 auto &row = wreq->rows(i);
527 if (row.operation_type() == proto::OperationType::OP_INSERT ||
528 row.operation_type() == proto::OperationType::OP_UPDATE) {
529 if (row.index_column_values().values_size() !=
530 wreq->row_meta().index_column_metas_size()) {
531 status.code = ErrorCode_ValidateError;
532 status.reason = "Index columns not match values";
533 return status;
534 }
535
536 if (row.forward_column_values().values_size() !=
537 wreq->row_meta().forward_column_names_size()) {
538 status.code = ErrorCode_ValidateError;
539 status.reason = "Forward columns not match values";
540 return status;
541 }
542 }
543 }
544
545 return status;
546}
547
548Status GrpcProximaSearchClient::validate(const PbQueryRequest &request) {
549 Status status;
550 auto *qreq = request.data();
551 if (!qreq) {
552 status.code = ErrorCode_ValidateError;
553 status.reason = "Invalid query request.";
554 return status;
555 }
556
557 if (qreq->collection_name().empty()) {
558 status.code = ErrorCode_ValidateError;
559 status.reason = "Collection name can't be empty";
560 return status;
561 }
562
563 if (qreq->knn_param().column_name().empty()) {
564 status.code = ErrorCode_ValidateError;
565 status.reason = "Knn param column name can't be empty";
566 return status;
567 }
568
569 if (qreq->knn_param().topk() == 0U) {
570 status.code = ErrorCode_ValidateError;
571 status.reason = "Knn param topk can't be 0";
572 return status;
573 }
574
575 if (qreq->knn_param().features().empty() &&
576 qreq->knn_param().matrix().empty()) {
577 status.code = ErrorCode_ValidateError;
578 status.reason =
579 "Knn param features and matrix can't be empty at the same time";
580 return status;
581 }
582
583 if (qreq->knn_param().batch_count() == 0U) {
584 status.code = ErrorCode_ValidateError;
585 status.reason = "Knn param batch count can't be 0";
586 return status;
587 }
588
589 if (qreq->knn_param().dimension() == 0U) {
590 status.code = ErrorCode_ValidateError;
591 status.reason = "Knn param dimension can't be 0";
592 return status;
593 }
594
595 if (qreq->knn_param().data_type() == proto::DataType::DT_UNDEFINED) {
596 status.code = ErrorCode_ValidateError;
597 status.reason = "Knn param data type can't be undefined";
598 return status;
599 }
600
601 return status;
602}
603
604Status GrpcProximaSearchClient::validate(const PbGetDocumentRequest &request) {
605 Status status;
606 auto *gdreq = request.data();
607 if (!gdreq) {
608 status.code = ErrorCode_ValidateError;
609 status.reason = "Invalid get document request";
610 return status;
611 }
612
613 if (gdreq->collection_name().empty()) {
614 status.code = ErrorCode_ValidateError;
615 status.reason = "Collection name can't be empty";
616 return status;
617 }
618
619 return status;
620}
621
622void GrpcProximaSearchClient::rpc_create_collection(
623 brpc::Controller *cntl, const proto::CollectionConfig *request,
624 proto::Status *response) {
625 proto::ProximaService_Stub stub(&client_channel_);
626 stub.create_collection(cntl, request, response, nullptr);
627}
628
629void GrpcProximaSearchClient::rpc_drop_collection(
630 brpc::Controller *cntl, const proto::CollectionName *request,
631 proto::Status *response) {
632 proto::ProximaService_Stub stub(&client_channel_);
633 stub.drop_collection(cntl, request, response, nullptr);
634}
635
636void GrpcProximaSearchClient::rpc_describe_collection(
637 brpc::Controller *cntl, const proto::CollectionName *request,
638 proto::DescribeCollectionResponse *response) {
639 proto::ProximaService_Stub stub(&client_channel_);
640 stub.describe_collection(cntl, request, response, nullptr);
641}
642
643void GrpcProximaSearchClient::rpc_stats_collection(
644 brpc::Controller *cntl, const proto::CollectionName *request,
645 proto::StatsCollectionResponse *response) {
646 proto::ProximaService_Stub stub(&client_channel_);
647 stub.stats_collection(cntl, request, response, nullptr);
648}
649
650void GrpcProximaSearchClient::rpc_list_collections(
651 brpc::Controller *cntl, const proto::ListCondition *request,
652 proto::ListCollectionsResponse *response) {
653 proto::ProximaService_Stub stub(&client_channel_);
654 stub.list_collections(cntl, request, response, nullptr);
655}
656
657void GrpcProximaSearchClient::rpc_write(brpc::Controller *cntl,
658 const proto::WriteRequest *request,
659 proto::Status *response) {
660 proto::ProximaService_Stub stub(&client_channel_);
661 stub.write(cntl, request, response, nullptr);
662}
663
664void GrpcProximaSearchClient::rpc_query(brpc::Controller *cntl,
665 const proto::QueryRequest *request,
666 proto::QueryResponse *response) {
667 proto::ProximaService_Stub stub(&client_channel_);
668 stub.query(cntl, request, response, nullptr);
669}
670
671void GrpcProximaSearchClient::rpc_get_document_by_key(
672 brpc::Controller *cntl, const proto::GetDocumentRequest *request,
673 proto::GetDocumentResponse *response) {
674 proto::ProximaService_Stub stub(&client_channel_);
675 stub.get_document_by_key(cntl, request, response, nullptr);
676}
677
678
679#undef CHECK_CONNECTED
680#undef RETURN_STATUS
681
682} // namespace be
683} // end namespace proxima
684