1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of proxima request handler |
19 | */ |
20 | |
21 | #include "proxima_request_handler.h" |
22 | #include <ailego/encoding/json.h> |
23 | #include <ailego/utility/string_helper.h> |
24 | #include <brpc/server.h> |
25 | |
26 | #ifdef __GNUC__ |
27 | #pragma GCC diagnostic push |
28 | #pragma GCC diagnostic ignored "-Wshadow" |
29 | #pragma GCC diagnostic ignored "-Wunused-parameter" |
30 | #endif |
31 | #include <google/protobuf/util/json_util.h> |
32 | #ifdef __GNUC__ |
33 | #pragma GCC diagnostic pop |
34 | #endif |
35 | |
36 | #include "agent/write_request.h" |
37 | #include "common/error_code.h" |
38 | #include "common/protobuf_helper.h" |
39 | #include "metrics/metrics.h" |
40 | #include "write_request_builder.h" |
41 | |
42 | |
43 | namespace proxima { |
44 | namespace be { |
45 | namespace server { |
46 | |
47 | using metrics::ProtocolType; |
48 | |
49 | namespace { |
50 | |
51 | static void inline SetStatus(int ret, proto::Status *status) { |
52 | status->set_code(ret); |
53 | status->set_reason(ErrorCode::What(ret)); |
54 | } |
55 | |
56 | //! Parse collection meta from json string |
57 | static inline int ParseRequestFromJson(const std::string &json_str, |
58 | google::protobuf::Message *meta) { |
59 | ProtobufHelper::JsonParseOptions options; |
60 | // ignore params which can't be automatic parse from json |
61 | options.ignore_unknown_fields = true; |
62 | |
63 | if (!ProtobufHelper::JsonToMessage(json_str, options, meta)) { |
64 | LOG_ERROR("ParseRequestFromJson failed. json[%s]" , json_str.c_str()); |
65 | return ErrorCode_InvalidArgument; |
66 | } |
67 | return 0; |
68 | } |
69 | |
70 | //! Serialize response to controller |
71 | static inline void SerializeResponse(const google::protobuf::Message &response, |
72 | brpc::Controller *brpc_controller) { |
73 | brpc_controller->http_response().set_content_type("application/json" ); |
74 | std::string json_resp; |
75 | if (!ProtobufHelper::MessageToJson(response, &json_resp)) { |
76 | LOG_ERROR("Can't serialize PB response to json. message[%s]" , |
77 | response.ShortDebugString().c_str()); |
78 | } else { |
79 | brpc_controller->response_attachment().append(json_resp); |
80 | } |
81 | } |
82 | |
83 | static inline void UnknownMethod(brpc::Controller *controller, |
84 | int allowed_method, |
85 | google::protobuf::Message *rsp, |
86 | proto::Status *status) { |
87 | SetStatus(PROXIMA_BE_ERROR_CODE(InvalidQuery), status); |
88 | controller->http_response().set_status_code( |
89 | brpc::HTTP_STATUS_METHOD_NOT_ALLOWED); |
90 | status->mutable_reason()->append(": invalid http method" ); |
91 | const char *allowed = nullptr; |
92 | switch (allowed_method) { |
93 | case brpc::HTTP_METHOD_POST: |
94 | allowed = "POST" ; |
95 | break; |
96 | case brpc::HTTP_METHOD_GET: |
97 | allowed = "GET" ; |
98 | break; |
99 | case brpc::HTTP_METHOD_PUT: |
100 | allowed = "PUT" ; |
101 | break; |
102 | case brpc::HTTP_METHOD_DELETE: |
103 | allowed = "DELETE" ; |
104 | break; |
105 | // default ignore |
106 | } |
107 | if (allowed) { |
108 | controller->http_response().SetHeader("Allowed" , allowed); |
109 | } |
110 | SerializeResponse(*rsp, controller); |
111 | } |
112 | |
113 | } // namespace |
114 | |
115 | #define RETURN_IF_NOT_HTTP_METHOD(CONTROLLER, METHOD, RSP, STATUS) \ |
116 | if (CONTROLLER->http_request().method() != METHOD) { \ |
117 | UnknownMethod(CONTROLLER, METHOD, RSP, STATUS); \ |
118 | return; \ |
119 | } |
120 | |
121 | ProximaRequestHandler::ProximaRequestHandler( |
122 | const agent::IndexAgentPtr &p_index_agent, |
123 | const query::QueryAgentPtr &p_query_agent, |
124 | const admin::AdminAgentPtr &p_admin_agent) |
125 | : index_agent_(p_index_agent), |
126 | query_agent_(p_query_agent), |
127 | admin_agent_(p_admin_agent) {} |
128 | |
129 | |
130 | void ProximaRequestHandler::create_collection( |
131 | ::google::protobuf::RpcController * /*controller*/, |
132 | const proto::CollectionConfig *request, proto::Status *response, |
133 | ::google::protobuf::Closure *done) { |
134 | brpc::ClosureGuard done_guard(done); |
135 | int ret = admin_agent_->create_collection(*request); |
136 | SetStatus(ret, response); |
137 | } |
138 | |
139 | void ProximaRequestHandler::drop_collection( |
140 | ::google::protobuf::RpcController * /*controller*/, |
141 | const proto::CollectionName *request, proto::Status *response, |
142 | ::google::protobuf::Closure *done) { |
143 | brpc::ClosureGuard done_guard(done); |
144 | int ret = admin_agent_->drop_collection(request->collection_name()); |
145 | SetStatus(ret, response); |
146 | } |
147 | |
148 | void ProximaRequestHandler::describe_collection( |
149 | ::google::protobuf::RpcController * /*controller*/, |
150 | const proto::CollectionName *request, |
151 | proto::DescribeCollectionResponse *response, |
152 | ::google::protobuf::Closure *done) { |
153 | brpc::ClosureGuard done_guard(done); |
154 | int ret = |
155 | admin_agent_->describe_collection(request->collection_name(), response); |
156 | SetStatus(ret, response->mutable_status()); |
157 | } |
158 | |
159 | void ProximaRequestHandler::list_collections( |
160 | ::google::protobuf::RpcController * /*controller*/, |
161 | const proto::ListCondition *request, |
162 | proto::ListCollectionsResponse *response, |
163 | ::google::protobuf::Closure *done) { |
164 | brpc::ClosureGuard done_guard(done); |
165 | int ret = admin_agent_->list_collections(*request, response); |
166 | SetStatus(ret, response->mutable_status()); |
167 | } |
168 | |
169 | void ProximaRequestHandler::stats_collection( |
170 | ::google::protobuf::RpcController * /*controller*/, |
171 | const proto::CollectionName *request, |
172 | proto::StatsCollectionResponse *response, |
173 | ::google::protobuf::Closure *done) { |
174 | brpc::ClosureGuard done_guard(done); |
175 | int ret = |
176 | admin_agent_->stats_collection(request->collection_name(), response); |
177 | SetStatus(ret, response->mutable_status()); |
178 | } |
179 | |
180 | void ProximaRequestHandler::write( |
181 | ::google::protobuf::RpcController * /*controller*/, |
182 | const proto::WriteRequest *request, proto::Status *response, |
183 | ::google::protobuf::Closure *done) { |
184 | int code = 0; |
185 | metrics::WriteMetrics metrics{ProtocolType::kGrpc, &code}; |
186 | metrics.update_with_write_request(*request); |
187 | LOG_DEBUG("%s" , request->ShortDebugString().c_str()); |
188 | brpc::ClosureGuard done_guard(done); |
189 | code = this->write_impl(*request, response); |
190 | } |
191 | |
192 | void ProximaRequestHandler::query(::google::protobuf::RpcController *, |
193 | const proto::QueryRequest *request, |
194 | proto::QueryResponse *response, |
195 | ::google::protobuf::Closure *done) { |
196 | brpc::ClosureGuard done_guard(done); |
197 | ailego::ElapsedTime latency; |
198 | int code = 0; |
199 | metrics::QueryMetrics metrics{metrics::ProtocolType::kGrpc, &code}; |
200 | metrics.update_with_query_request(*request); |
201 | |
202 | code = query_agent_->search(request, response); |
203 | if (code != 0) { |
204 | LOG_ERROR("Can't handle query. code[%d] what[%s]" , code, |
205 | ErrorCode::What(code)); |
206 | } |
207 | |
208 | response->set_latency_us(latency.micro_seconds()); |
209 | SetStatus(code, response->mutable_status()); |
210 | } |
211 | |
212 | void ProximaRequestHandler::get_document_by_key( |
213 | ::google::protobuf::RpcController * /*controller*/, |
214 | const proto::GetDocumentRequest *request, |
215 | proto::GetDocumentResponse *response, ::google::protobuf::Closure *done) { |
216 | brpc::ClosureGuard done_guard(done); |
217 | int code = 0; |
218 | metrics::GetDocumentMetrics metrics{ProtocolType::kGrpc, &code}; |
219 | code = query_agent_->search_by_key(request, response); |
220 | if (code != 0) { |
221 | LOG_ERROR("Can't handle query. code[%d] what[%s]" , code, |
222 | ErrorCode::What(code)); |
223 | } |
224 | SetStatus(code, response->mutable_status()); |
225 | } |
226 | |
227 | void ProximaRequestHandler::get_version( |
228 | ::google::protobuf::RpcController * /* controller */, |
229 | const proto::GetVersionRequest * /* request */, |
230 | proto::GetVersionResponse *response, ::google::protobuf::Closure *done) { |
231 | brpc::ClosureGuard done_guard(done); |
232 | response->set_version(version_); |
233 | SetStatus(0, response->mutable_status()); |
234 | } |
235 | |
236 | void ProximaRequestHandler::collection( |
237 | ::google::protobuf::RpcController *controller, |
238 | const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/, |
239 | ::google::protobuf::Closure *done) { |
240 | brpc::ClosureGuard done_guard(done); |
241 | auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller); |
242 | auto method = brpc_controller->http_request().method(); |
243 | if (method == brpc::HttpMethod::HTTP_METHOD_POST) { |
244 | create_collection(brpc_controller); |
245 | } else if (method == brpc::HttpMethod::HTTP_METHOD_GET) { |
246 | describe_collection(brpc_controller); |
247 | } else if (method == brpc::HttpMethod::HTTP_METHOD_DELETE) { |
248 | drop_collection(brpc_controller); |
249 | } else { |
250 | proto::Status status; |
251 | SetStatus(PROXIMA_BE_ERROR_CODE(InvalidQuery), &status); |
252 | status.set_reason(": invalid http method" ); |
253 | brpc_controller->http_response().set_status_code( |
254 | brpc::HTTP_STATUS_METHOD_NOT_ALLOWED); |
255 | SerializeResponse(status, brpc_controller); |
256 | } |
257 | } |
258 | |
259 | void ProximaRequestHandler::stats_collection( |
260 | ::google::protobuf::RpcController *controller, |
261 | const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/, |
262 | ::google::protobuf::Closure *done) { |
263 | brpc::ClosureGuard done_guard(done); |
264 | auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller); |
265 | |
266 | proto::StatsCollectionResponse pb_response; |
267 | RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_GET, |
268 | &pb_response, pb_response.mutable_status()) |
269 | std::string collection_name; |
270 | int code = parse_collection(brpc_controller, &collection_name); |
271 | if (code == 0) { |
272 | code = admin_agent_->stats_collection(collection_name, &pb_response); |
273 | } |
274 | SetStatus(code, pb_response.mutable_status()); |
275 | SerializeResponse(pb_response, brpc_controller); |
276 | } |
277 | |
278 | void ProximaRequestHandler::write(::google::protobuf::RpcController *controller, |
279 | const proto::HttpRequest * /*request*/, |
280 | proto::HttpResponse * /*response*/, |
281 | ::google::protobuf::Closure *done) { |
282 | int code = 0; |
283 | metrics::WriteMetrics metrics{ProtocolType::kHttp, &code}; |
284 | brpc::ClosureGuard done_guard(done); |
285 | |
286 | auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller); |
287 | proto::Status status; |
288 | RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_POST, &status, |
289 | &status) |
290 | |
291 | std::string collection_name; |
292 | code = parse_collection(brpc_controller, &collection_name); |
293 | if (code == 0) { |
294 | const std::string http_body = |
295 | brpc_controller->request_attachment().to_string(); |
296 | proto::WriteRequest pb_request; |
297 | code = ParseRequestFromJson(http_body, &pb_request); |
298 | if (code == 0) { |
299 | pb_request.set_collection_name(collection_name); |
300 | metrics.update_with_write_request(pb_request); |
301 | code = this->write_impl(pb_request, &status); |
302 | } else { |
303 | SetStatus(code, &status); |
304 | } |
305 | } |
306 | |
307 | SerializeResponse(status, brpc_controller); |
308 | } |
309 | |
310 | void ProximaRequestHandler::query(::google::protobuf::RpcController *controller, |
311 | const proto::HttpRequest * /*request*/, |
312 | proto::HttpResponse * /*response*/, |
313 | ::google::protobuf::Closure *done) { |
314 | brpc::ClosureGuard done_guard(done); |
315 | ailego::ElapsedTime latency; |
316 | int code = 0; |
317 | metrics::QueryMetrics metrics{metrics::ProtocolType::kHttp, &code}; |
318 | auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller); |
319 | |
320 | // Check http method |
321 | proto::QueryResponse pb_response; |
322 | RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_POST, |
323 | &pb_response, pb_response.mutable_status()) |
324 | |
325 | std::string collection_name; |
326 | code = parse_collection(brpc_controller, &collection_name); |
327 | if (code == 0) { |
328 | const std::string body = brpc_controller->request_attachment().to_string(); |
329 | proto::QueryRequest pb_request; |
330 | code = ParseRequestFromJson(body, &pb_request); |
331 | if (code == 0) { |
332 | metrics.update_with_query_request(pb_request); |
333 | pb_request.set_collection_name(collection_name); |
334 | code = query_agent_->search(&pb_request, &pb_response); |
335 | if (code != 0) { |
336 | LOG_ERROR("Can't handle query. code[%d] what[%s]" , code, |
337 | ErrorCode::What(code)); |
338 | } |
339 | } |
340 | } |
341 | |
342 | pb_response.set_latency_us(latency.micro_seconds()); |
343 | SetStatus(code, pb_response.mutable_status()); |
344 | SerializeResponse(pb_response, brpc_controller); |
345 | } |
346 | |
347 | void ProximaRequestHandler::get_document_by_key( |
348 | ::google::protobuf::RpcController *controller, |
349 | const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/, |
350 | ::google::protobuf::Closure *done) { |
351 | brpc::ClosureGuard done_guard(done); |
352 | auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller); |
353 | |
354 | int code = 0; |
355 | metrics::GetDocumentMetrics metrics{ProtocolType::kHttp, &code}; |
356 | |
357 | // Check http method |
358 | proto::GetDocumentResponse pb_response; |
359 | RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_GET, |
360 | &pb_response, pb_response.mutable_status()) |
361 | |
362 | std::string collection_name; |
363 | code = parse_collection(brpc_controller, &collection_name); |
364 | if (code == 0) { |
365 | proto::GetDocumentRequest pb_request; |
366 | pb_request.set_collection_name(collection_name); |
367 | auto *key = brpc_controller->http_request().uri().GetQuery("key" ); |
368 | if (key) { |
369 | pb_request.set_primary_key(std::strtoull(key->c_str(), nullptr, 10)); |
370 | code = query_agent_->search_by_key(&pb_request, &pb_response); |
371 | if (code != 0) { |
372 | LOG_ERROR("Can't handle query. code[%d] what[%s]" , code, |
373 | ErrorCode::What(code)); |
374 | } |
375 | } else { |
376 | code = PROXIMA_BE_ERROR_CODE(InvalidArgument); |
377 | } |
378 | } |
379 | |
380 | SetStatus(code, pb_response.mutable_status()); |
381 | SerializeResponse(pb_response, brpc_controller); |
382 | } |
383 | |
384 | void ProximaRequestHandler::list_collections( |
385 | ::google::protobuf::RpcController *controller, |
386 | const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/, |
387 | ::google::protobuf::Closure *done) { |
388 | brpc::ClosureGuard done_guard(done); |
389 | auto *brpc_controller = static_cast<brpc::Controller *>(controller); |
390 | |
391 | proto::ListCollectionsResponse pb_response; |
392 | RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_GET, |
393 | &pb_response, pb_response.mutable_status()) |
394 | proto::ListCondition pb_request; |
395 | auto *repo = brpc_controller->http_request().uri().GetQuery("repository" ); |
396 | if (repo) { |
397 | pb_request.set_repository_name(*repo); |
398 | } |
399 | int code = admin_agent_->list_collections(pb_request, &pb_response); |
400 | SetStatus(code, pb_response.mutable_status()); |
401 | SerializeResponse(pb_response, brpc_controller); |
402 | } |
403 | |
404 | void ProximaRequestHandler::get_version( |
405 | ::google::protobuf::RpcController *controller, |
406 | const proto::HttpRequest * /* request */, |
407 | proto::HttpResponse * /* response */, ::google::protobuf::Closure *done) { |
408 | brpc::ClosureGuard done_guard(done); |
409 | auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller); |
410 | |
411 | proto::GetVersionResponse pb_response; |
412 | pb_response.set_version(version_); |
413 | SetStatus(0, pb_response.mutable_status()); |
414 | SerializeResponse(pb_response, brpc_controller); |
415 | } |
416 | |
417 | int ProximaRequestHandler::write_impl(const proto::WriteRequest &request, |
418 | proto::Status *response) { |
419 | auto &collection_name = request.collection_name(); |
420 | auto meta = index_agent_->get_collection_meta(collection_name); |
421 | auto column_order = index_agent_->get_column_order(collection_name); |
422 | if (!meta || !column_order) { |
423 | SetStatus(ErrorCode_InexistentCollection, response); |
424 | LOG_ERROR("Invalid collection. collection[%s]" , collection_name.c_str()); |
425 | return ErrorCode_InexistentCollection; |
426 | } |
427 | |
428 | agent::WriteRequest write_request; |
429 | int code = |
430 | WriteRequestBuilder::build(*meta, *column_order, request, &write_request); |
431 | if (code != 0) { |
432 | SetStatus(code, response); |
433 | LOG_ERROR("Write request builder build failed. code[%d] collection[%s]" , |
434 | code, collection_name.c_str()); |
435 | return code; |
436 | } |
437 | |
438 | code = index_agent_->write(write_request); |
439 | if (code != 0) { |
440 | LOG_ERROR("Index agent write request failed. code[%d] collection[%s]" , code, |
441 | collection_name.c_str()); |
442 | } |
443 | |
444 | SetStatus(code, response); |
445 | return code; |
446 | } |
447 | |
448 | void ProximaRequestHandler::create_collection(brpc::Controller *controller) { |
449 | proto::Status pb_response; |
450 | RETURN_IF_NOT_HTTP_METHOD(controller, brpc::HTTP_METHOD_POST, &pb_response, |
451 | &pb_response) |
452 | const std::string &http_body = controller->request_attachment().to_string(); |
453 | proto::CollectionConfig pb_request; |
454 | |
455 | int code = ParseRequestFromJson(http_body, &pb_request); |
456 | if (code == 0) { |
457 | if (pb_request.collection_name().empty()) { |
458 | pb_request.set_collection_name( |
459 | controller->http_request().unresolved_path()); |
460 | } |
461 | code = admin_agent_->create_collection(pb_request); |
462 | } |
463 | |
464 | SetStatus(code, &pb_response); |
465 | SerializeResponse(pb_response, controller); |
466 | } |
467 | |
468 | void ProximaRequestHandler::describe_collection(brpc::Controller *controller) { |
469 | proto::DescribeCollectionResponse pb_response; |
470 | RETURN_IF_NOT_HTTP_METHOD(controller, brpc::HTTP_METHOD_GET, &pb_response, |
471 | pb_response.mutable_status()) |
472 | const std::string &collection_name = |
473 | controller->http_request().unresolved_path(); |
474 | int code = admin_agent_->describe_collection(collection_name, &pb_response); |
475 | SetStatus(code, pb_response.mutable_status()); |
476 | SerializeResponse(pb_response, controller); |
477 | } |
478 | |
479 | void ProximaRequestHandler::drop_collection(brpc::Controller *controller) { |
480 | proto::Status pb_response; |
481 | RETURN_IF_NOT_HTTP_METHOD(controller, brpc::HTTP_METHOD_DELETE, &pb_response, |
482 | &pb_response) |
483 | const std::string &collection_name = |
484 | controller->http_request().unresolved_path(); |
485 | int code = admin_agent_->drop_collection(collection_name); |
486 | |
487 | SetStatus(code, &pb_response); |
488 | SerializeResponse(pb_response, controller); |
489 | } |
490 | |
491 | int ProximaRequestHandler::parse_collection(brpc::Controller *controller, |
492 | std::string *collection_name) { |
493 | auto &path = controller->http_request().uri().path(); |
494 | std::vector<std::string> elements; |
495 | ailego::StringHelper::Split(path, "/" , &elements); |
496 | if (elements.size() < 4) { |
497 | return PROXIMA_BE_ERROR_CODE(InvalidArgument); |
498 | } |
499 | collection_name->assign(elements[3]); |
500 | return 0; |
501 | } |
502 | |
503 | } // end namespace server |
504 | } // namespace be |
505 | } // end namespace proxima |
506 | |