1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author guonix
17 * \date Dec 2020
18 * \brief
19 */
20
21#include "query_agent.h"
22#include "common/config.h"
23#include "common/profiler.h"
24#include "query_service_builder.h"
25
26namespace proxima {
27namespace be {
28namespace query {
29
30/*!
31 * QueryAgent implementation
32 */
33class QueryAgentImpl : public QueryAgent {
34 public:
35 //! Constructor
36 explicit QueryAgentImpl(QueryServicePtr query_service)
37 : query_service_(std::move(query_service)), stopped_(false) {}
38
39 //! Destructor
40 ~QueryAgentImpl() override = default;
41
42 public:
43 //! Get Query Service Instance
44 QueryServicePtr get_service() const override {
45 return query_service_;
46 }
47
48 public:
49 //! Query Service
50 int search(const proto::QueryRequest *query,
51 proto::QueryResponse *response) override {
52 int error_code = PROXIMA_BE_ERROR_CODE(StoppedService);
53 if (!is_running()) {
54 LOG_WARN("QueryAgent stopped, invoke start and try again.");
55 return error_code;
56 }
57 ProfilerPtr profiler = std::make_shared<Profiler>(query->debug_mode());
58 profiler->start();
59 error_code = query_service_->search(query, response, profiler);
60 profiler->stop();
61 if (profiler->enabled()) {
62 response->set_debug_info(profiler->as_json_string());
63 }
64 return error_code;
65 }
66
67 //! Query Service
68 int search_by_key(const proto::GetDocumentRequest *query,
69 proto::GetDocumentResponse *response) override {
70 if (!is_running()) {
71 LOG_WARN("QueryAgent stopped, invoke start and try again.");
72 return PROXIMA_BE_ERROR_CODE(StoppedService);
73 }
74 ProfilerPtr profiler = std::make_shared<Profiler>(query->debug_mode());
75 profiler->start();
76 int error_code = query_service_->search_by_key(query, response, profiler);
77 profiler->stop();
78 if (profiler->enabled()) {
79 response->set_debug_info(profiler->as_json_string());
80 }
81 return error_code;
82 }
83
84 public:
85 //! Init Query Agent
86 int init() override {
87 LOG_INFO("QueryAgent initialize complete.");
88 return 0;
89 }
90
91 //! Cleanup Query Agent
92 int cleanup() override {
93 int error_code = query_service_->cleanup();
94 if (error_code == 0) {
95 LOG_INFO("QueryAgent cleanup complete.");
96 } else {
97 LOG_ERROR("QueryAgent cleanup failed. code[%d], what[%s]", error_code,
98 ErrorCode::What(error_code));
99 }
100 return error_code;
101 }
102
103 //! Start background service
104 int start() override {
105 stopped_.store(false);
106 LOG_INFO("QueryAgent start complete.");
107 return 0;
108 }
109
110 //! Stop background service
111 int stop() override {
112 LOG_INFO("QueryAgent stopped.");
113 stopped_.store(true);
114 return 0;
115 }
116
117 //! Check QueryAgent is running
118 int is_running() override {
119 return !stopped_;
120 }
121
122 private:
123 //! Handler for query service
124 QueryServicePtr query_service_{nullptr};
125
126 //! Stopped flag
127 std::atomic_bool stopped_{false};
128};
129
130//! Create one QueryAgent instance
131//! @param: concurrency: the buckets of execution queue, equal 0, means
132// using hardware concurrency
133QueryAgentPtr QueryAgent::Create(index::IndexServicePtr index_service,
134 meta::MetaServicePtr meta_service,
135 uint32_t concurrency) {
136 return std::make_shared<QueryAgentImpl>(QueryServiceBuilder::Create(
137 std::move(index_service), std::move(meta_service), concurrency));
138}
139
140} // namespace query
141} // namespace be
142} // namespace proxima
143