1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of proxima request handler
19 */
20
21#include "proxima_request_handler.h"
22#include <ailego/encoding/json.h>
23#include <ailego/utility/string_helper.h>
24#include <brpc/server.h>
25
26#ifdef __GNUC__
27#pragma GCC diagnostic push
28#pragma GCC diagnostic ignored "-Wshadow"
29#pragma GCC diagnostic ignored "-Wunused-parameter"
30#endif
31#include <google/protobuf/util/json_util.h>
32#ifdef __GNUC__
33#pragma GCC diagnostic pop
34#endif
35
36#include "agent/write_request.h"
37#include "common/error_code.h"
38#include "common/protobuf_helper.h"
39#include "metrics/metrics.h"
40#include "write_request_builder.h"
41
42
43namespace proxima {
44namespace be {
45namespace server {
46
47using metrics::ProtocolType;
48
49namespace {
50
51static void inline SetStatus(int ret, proto::Status *status) {
52 status->set_code(ret);
53 status->set_reason(ErrorCode::What(ret));
54}
55
56//! Parse collection meta from json string
57static inline int ParseRequestFromJson(const std::string &json_str,
58 google::protobuf::Message *meta) {
59 ProtobufHelper::JsonParseOptions options;
60 // ignore params which can't be automatic parse from json
61 options.ignore_unknown_fields = true;
62
63 if (!ProtobufHelper::JsonToMessage(json_str, options, meta)) {
64 LOG_ERROR("ParseRequestFromJson failed. json[%s]", json_str.c_str());
65 return ErrorCode_InvalidArgument;
66 }
67 return 0;
68}
69
70//! Serialize response to controller
71static inline void SerializeResponse(const google::protobuf::Message &response,
72 brpc::Controller *brpc_controller) {
73 brpc_controller->http_response().set_content_type("application/json");
74 std::string json_resp;
75 if (!ProtobufHelper::MessageToJson(response, &json_resp)) {
76 LOG_ERROR("Can't serialize PB response to json. message[%s]",
77 response.ShortDebugString().c_str());
78 } else {
79 brpc_controller->response_attachment().append(json_resp);
80 }
81}
82
83static inline void UnknownMethod(brpc::Controller *controller,
84 int allowed_method,
85 google::protobuf::Message *rsp,
86 proto::Status *status) {
87 SetStatus(PROXIMA_BE_ERROR_CODE(InvalidQuery), status);
88 controller->http_response().set_status_code(
89 brpc::HTTP_STATUS_METHOD_NOT_ALLOWED);
90 status->mutable_reason()->append(": invalid http method");
91 const char *allowed = nullptr;
92 switch (allowed_method) {
93 case brpc::HTTP_METHOD_POST:
94 allowed = "POST";
95 break;
96 case brpc::HTTP_METHOD_GET:
97 allowed = "GET";
98 break;
99 case brpc::HTTP_METHOD_PUT:
100 allowed = "PUT";
101 break;
102 case brpc::HTTP_METHOD_DELETE:
103 allowed = "DELETE";
104 break;
105 // default ignore
106 }
107 if (allowed) {
108 controller->http_response().SetHeader("Allowed", allowed);
109 }
110 SerializeResponse(*rsp, controller);
111}
112
113} // namespace
114
115#define RETURN_IF_NOT_HTTP_METHOD(CONTROLLER, METHOD, RSP, STATUS) \
116 if (CONTROLLER->http_request().method() != METHOD) { \
117 UnknownMethod(CONTROLLER, METHOD, RSP, STATUS); \
118 return; \
119 }
120
121ProximaRequestHandler::ProximaRequestHandler(
122 const agent::IndexAgentPtr &p_index_agent,
123 const query::QueryAgentPtr &p_query_agent,
124 const admin::AdminAgentPtr &p_admin_agent)
125 : index_agent_(p_index_agent),
126 query_agent_(p_query_agent),
127 admin_agent_(p_admin_agent) {}
128
129
130void ProximaRequestHandler::create_collection(
131 ::google::protobuf::RpcController * /*controller*/,
132 const proto::CollectionConfig *request, proto::Status *response,
133 ::google::protobuf::Closure *done) {
134 brpc::ClosureGuard done_guard(done);
135 int ret = admin_agent_->create_collection(*request);
136 SetStatus(ret, response);
137}
138
139void ProximaRequestHandler::drop_collection(
140 ::google::protobuf::RpcController * /*controller*/,
141 const proto::CollectionName *request, proto::Status *response,
142 ::google::protobuf::Closure *done) {
143 brpc::ClosureGuard done_guard(done);
144 int ret = admin_agent_->drop_collection(request->collection_name());
145 SetStatus(ret, response);
146}
147
148void ProximaRequestHandler::describe_collection(
149 ::google::protobuf::RpcController * /*controller*/,
150 const proto::CollectionName *request,
151 proto::DescribeCollectionResponse *response,
152 ::google::protobuf::Closure *done) {
153 brpc::ClosureGuard done_guard(done);
154 int ret =
155 admin_agent_->describe_collection(request->collection_name(), response);
156 SetStatus(ret, response->mutable_status());
157}
158
159void ProximaRequestHandler::list_collections(
160 ::google::protobuf::RpcController * /*controller*/,
161 const proto::ListCondition *request,
162 proto::ListCollectionsResponse *response,
163 ::google::protobuf::Closure *done) {
164 brpc::ClosureGuard done_guard(done);
165 int ret = admin_agent_->list_collections(*request, response);
166 SetStatus(ret, response->mutable_status());
167}
168
169void ProximaRequestHandler::stats_collection(
170 ::google::protobuf::RpcController * /*controller*/,
171 const proto::CollectionName *request,
172 proto::StatsCollectionResponse *response,
173 ::google::protobuf::Closure *done) {
174 brpc::ClosureGuard done_guard(done);
175 int ret =
176 admin_agent_->stats_collection(request->collection_name(), response);
177 SetStatus(ret, response->mutable_status());
178}
179
180void ProximaRequestHandler::write(
181 ::google::protobuf::RpcController * /*controller*/,
182 const proto::WriteRequest *request, proto::Status *response,
183 ::google::protobuf::Closure *done) {
184 int code = 0;
185 metrics::WriteMetrics metrics{ProtocolType::kGrpc, &code};
186 metrics.update_with_write_request(*request);
187 LOG_DEBUG("%s", request->ShortDebugString().c_str());
188 brpc::ClosureGuard done_guard(done);
189 code = this->write_impl(*request, response);
190}
191
192void ProximaRequestHandler::query(::google::protobuf::RpcController *,
193 const proto::QueryRequest *request,
194 proto::QueryResponse *response,
195 ::google::protobuf::Closure *done) {
196 brpc::ClosureGuard done_guard(done);
197 ailego::ElapsedTime latency;
198 int code = 0;
199 metrics::QueryMetrics metrics{metrics::ProtocolType::kGrpc, &code};
200 metrics.update_with_query_request(*request);
201
202 code = query_agent_->search(request, response);
203 if (code != 0) {
204 LOG_ERROR("Can't handle query. code[%d] what[%s]", code,
205 ErrorCode::What(code));
206 }
207
208 response->set_latency_us(latency.micro_seconds());
209 SetStatus(code, response->mutable_status());
210}
211
212void ProximaRequestHandler::get_document_by_key(
213 ::google::protobuf::RpcController * /*controller*/,
214 const proto::GetDocumentRequest *request,
215 proto::GetDocumentResponse *response, ::google::protobuf::Closure *done) {
216 brpc::ClosureGuard done_guard(done);
217 int code = 0;
218 metrics::GetDocumentMetrics metrics{ProtocolType::kGrpc, &code};
219 code = query_agent_->search_by_key(request, response);
220 if (code != 0) {
221 LOG_ERROR("Can't handle query. code[%d] what[%s]", code,
222 ErrorCode::What(code));
223 }
224 SetStatus(code, response->mutable_status());
225}
226
227void ProximaRequestHandler::get_version(
228 ::google::protobuf::RpcController * /* controller */,
229 const proto::GetVersionRequest * /* request */,
230 proto::GetVersionResponse *response, ::google::protobuf::Closure *done) {
231 brpc::ClosureGuard done_guard(done);
232 response->set_version(version_);
233 SetStatus(0, response->mutable_status());
234}
235
236void ProximaRequestHandler::collection(
237 ::google::protobuf::RpcController *controller,
238 const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/,
239 ::google::protobuf::Closure *done) {
240 brpc::ClosureGuard done_guard(done);
241 auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller);
242 auto method = brpc_controller->http_request().method();
243 if (method == brpc::HttpMethod::HTTP_METHOD_POST) {
244 create_collection(brpc_controller);
245 } else if (method == brpc::HttpMethod::HTTP_METHOD_GET) {
246 describe_collection(brpc_controller);
247 } else if (method == brpc::HttpMethod::HTTP_METHOD_DELETE) {
248 drop_collection(brpc_controller);
249 } else {
250 proto::Status status;
251 SetStatus(PROXIMA_BE_ERROR_CODE(InvalidQuery), &status);
252 status.set_reason(": invalid http method");
253 brpc_controller->http_response().set_status_code(
254 brpc::HTTP_STATUS_METHOD_NOT_ALLOWED);
255 SerializeResponse(status, brpc_controller);
256 }
257}
258
259void ProximaRequestHandler::stats_collection(
260 ::google::protobuf::RpcController *controller,
261 const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/,
262 ::google::protobuf::Closure *done) {
263 brpc::ClosureGuard done_guard(done);
264 auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller);
265
266 proto::StatsCollectionResponse pb_response;
267 RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_GET,
268 &pb_response, pb_response.mutable_status())
269 std::string collection_name;
270 int code = parse_collection(brpc_controller, &collection_name);
271 if (code == 0) {
272 code = admin_agent_->stats_collection(collection_name, &pb_response);
273 }
274 SetStatus(code, pb_response.mutable_status());
275 SerializeResponse(pb_response, brpc_controller);
276}
277
278void ProximaRequestHandler::write(::google::protobuf::RpcController *controller,
279 const proto::HttpRequest * /*request*/,
280 proto::HttpResponse * /*response*/,
281 ::google::protobuf::Closure *done) {
282 int code = 0;
283 metrics::WriteMetrics metrics{ProtocolType::kHttp, &code};
284 brpc::ClosureGuard done_guard(done);
285
286 auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller);
287 proto::Status status;
288 RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_POST, &status,
289 &status)
290
291 std::string collection_name;
292 code = parse_collection(brpc_controller, &collection_name);
293 if (code == 0) {
294 const std::string http_body =
295 brpc_controller->request_attachment().to_string();
296 proto::WriteRequest pb_request;
297 code = ParseRequestFromJson(http_body, &pb_request);
298 if (code == 0) {
299 pb_request.set_collection_name(collection_name);
300 metrics.update_with_write_request(pb_request);
301 code = this->write_impl(pb_request, &status);
302 } else {
303 SetStatus(code, &status);
304 }
305 }
306
307 SerializeResponse(status, brpc_controller);
308}
309
310void ProximaRequestHandler::query(::google::protobuf::RpcController *controller,
311 const proto::HttpRequest * /*request*/,
312 proto::HttpResponse * /*response*/,
313 ::google::protobuf::Closure *done) {
314 brpc::ClosureGuard done_guard(done);
315 ailego::ElapsedTime latency;
316 int code = 0;
317 metrics::QueryMetrics metrics{metrics::ProtocolType::kHttp, &code};
318 auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller);
319
320 // Check http method
321 proto::QueryResponse pb_response;
322 RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_POST,
323 &pb_response, pb_response.mutable_status())
324
325 std::string collection_name;
326 code = parse_collection(brpc_controller, &collection_name);
327 if (code == 0) {
328 const std::string body = brpc_controller->request_attachment().to_string();
329 proto::QueryRequest pb_request;
330 code = ParseRequestFromJson(body, &pb_request);
331 if (code == 0) {
332 metrics.update_with_query_request(pb_request);
333 pb_request.set_collection_name(collection_name);
334 code = query_agent_->search(&pb_request, &pb_response);
335 if (code != 0) {
336 LOG_ERROR("Can't handle query. code[%d] what[%s]", code,
337 ErrorCode::What(code));
338 }
339 }
340 }
341
342 pb_response.set_latency_us(latency.micro_seconds());
343 SetStatus(code, pb_response.mutable_status());
344 SerializeResponse(pb_response, brpc_controller);
345}
346
347void ProximaRequestHandler::get_document_by_key(
348 ::google::protobuf::RpcController *controller,
349 const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/,
350 ::google::protobuf::Closure *done) {
351 brpc::ClosureGuard done_guard(done);
352 auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller);
353
354 int code = 0;
355 metrics::GetDocumentMetrics metrics{ProtocolType::kHttp, &code};
356
357 // Check http method
358 proto::GetDocumentResponse pb_response;
359 RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_GET,
360 &pb_response, pb_response.mutable_status())
361
362 std::string collection_name;
363 code = parse_collection(brpc_controller, &collection_name);
364 if (code == 0) {
365 proto::GetDocumentRequest pb_request;
366 pb_request.set_collection_name(collection_name);
367 auto *key = brpc_controller->http_request().uri().GetQuery("key");
368 if (key) {
369 pb_request.set_primary_key(std::strtoull(key->c_str(), nullptr, 10));
370 code = query_agent_->search_by_key(&pb_request, &pb_response);
371 if (code != 0) {
372 LOG_ERROR("Can't handle query. code[%d] what[%s]", code,
373 ErrorCode::What(code));
374 }
375 } else {
376 code = PROXIMA_BE_ERROR_CODE(InvalidArgument);
377 }
378 }
379
380 SetStatus(code, pb_response.mutable_status());
381 SerializeResponse(pb_response, brpc_controller);
382}
383
384void ProximaRequestHandler::list_collections(
385 ::google::protobuf::RpcController *controller,
386 const proto::HttpRequest * /*request*/, proto::HttpResponse * /*response*/,
387 ::google::protobuf::Closure *done) {
388 brpc::ClosureGuard done_guard(done);
389 auto *brpc_controller = static_cast<brpc::Controller *>(controller);
390
391 proto::ListCollectionsResponse pb_response;
392 RETURN_IF_NOT_HTTP_METHOD(brpc_controller, brpc::HTTP_METHOD_GET,
393 &pb_response, pb_response.mutable_status())
394 proto::ListCondition pb_request;
395 auto *repo = brpc_controller->http_request().uri().GetQuery("repository");
396 if (repo) {
397 pb_request.set_repository_name(*repo);
398 }
399 int code = admin_agent_->list_collections(pb_request, &pb_response);
400 SetStatus(code, pb_response.mutable_status());
401 SerializeResponse(pb_response, brpc_controller);
402}
403
404void ProximaRequestHandler::get_version(
405 ::google::protobuf::RpcController *controller,
406 const proto::HttpRequest * /* request */,
407 proto::HttpResponse * /* response */, ::google::protobuf::Closure *done) {
408 brpc::ClosureGuard done_guard(done);
409 auto *brpc_controller = dynamic_cast<brpc::Controller *>(controller);
410
411 proto::GetVersionResponse pb_response;
412 pb_response.set_version(version_);
413 SetStatus(0, pb_response.mutable_status());
414 SerializeResponse(pb_response, brpc_controller);
415}
416
417int ProximaRequestHandler::write_impl(const proto::WriteRequest &request,
418 proto::Status *response) {
419 auto &collection_name = request.collection_name();
420 auto meta = index_agent_->get_collection_meta(collection_name);
421 auto column_order = index_agent_->get_column_order(collection_name);
422 if (!meta || !column_order) {
423 SetStatus(ErrorCode_InexistentCollection, response);
424 LOG_ERROR("Invalid collection. collection[%s]", collection_name.c_str());
425 return ErrorCode_InexistentCollection;
426 }
427
428 agent::WriteRequest write_request;
429 int code =
430 WriteRequestBuilder::build(*meta, *column_order, request, &write_request);
431 if (code != 0) {
432 SetStatus(code, response);
433 LOG_ERROR("Write request builder build failed. code[%d] collection[%s]",
434 code, collection_name.c_str());
435 return code;
436 }
437
438 code = index_agent_->write(write_request);
439 if (code != 0) {
440 LOG_ERROR("Index agent write request failed. code[%d] collection[%s]", code,
441 collection_name.c_str());
442 }
443
444 SetStatus(code, response);
445 return code;
446}
447
448void ProximaRequestHandler::create_collection(brpc::Controller *controller) {
449 proto::Status pb_response;
450 RETURN_IF_NOT_HTTP_METHOD(controller, brpc::HTTP_METHOD_POST, &pb_response,
451 &pb_response)
452 const std::string &http_body = controller->request_attachment().to_string();
453 proto::CollectionConfig pb_request;
454
455 int code = ParseRequestFromJson(http_body, &pb_request);
456 if (code == 0) {
457 if (pb_request.collection_name().empty()) {
458 pb_request.set_collection_name(
459 controller->http_request().unresolved_path());
460 }
461 code = admin_agent_->create_collection(pb_request);
462 }
463
464 SetStatus(code, &pb_response);
465 SerializeResponse(pb_response, controller);
466}
467
468void ProximaRequestHandler::describe_collection(brpc::Controller *controller) {
469 proto::DescribeCollectionResponse pb_response;
470 RETURN_IF_NOT_HTTP_METHOD(controller, brpc::HTTP_METHOD_GET, &pb_response,
471 pb_response.mutable_status())
472 const std::string &collection_name =
473 controller->http_request().unresolved_path();
474 int code = admin_agent_->describe_collection(collection_name, &pb_response);
475 SetStatus(code, pb_response.mutable_status());
476 SerializeResponse(pb_response, controller);
477}
478
479void ProximaRequestHandler::drop_collection(brpc::Controller *controller) {
480 proto::Status pb_response;
481 RETURN_IF_NOT_HTTP_METHOD(controller, brpc::HTTP_METHOD_DELETE, &pb_response,
482 &pb_response)
483 const std::string &collection_name =
484 controller->http_request().unresolved_path();
485 int code = admin_agent_->drop_collection(collection_name);
486
487 SetStatus(code, &pb_response);
488 SerializeResponse(pb_response, controller);
489}
490
491int ProximaRequestHandler::parse_collection(brpc::Controller *controller,
492 std::string *collection_name) {
493 auto &path = controller->http_request().uri().path();
494 std::vector<std::string> elements;
495 ailego::StringHelper::Split(path, "/", &elements);
496 if (elements.size() < 4) {
497 return PROXIMA_BE_ERROR_CODE(InvalidArgument);
498 }
499 collection_name->assign(elements[3]);
500 return 0;
501}
502
503} // end namespace server
504} // namespace be
505} // end namespace proxima
506