1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation with http protocol
19 */
20
21#include "http_client.h"
22#include <iostream>
23#include <ailego/utility/string_helper.h>
24#include "common/protobuf_helper.h"
25#include "version.h"
26
27namespace proxima {
28namespace be {
29
30Status HttpProximaSearchClient::connect(const ChannelOptions &options) {
31 Status status;
32
33 brpc::ChannelOptions brpc_options;
34 brpc_options.protocol = "http";
35 brpc_options.timeout_ms = options.timeout_ms;
36 brpc_options.max_retry = options.max_retry;
37
38 http_host_ = std::string("http://").append(options.host);
39 int ret = client_channel_.Init(http_host_.c_str(), "", &brpc_options);
40 if (ret != 0) {
41 status.code = ErrorCode_InitChannel;
42 status.reason = "Init client channel failed";
43 return status;
44 }
45
46 // check versions of client and server if matched first
47 status = check_server_version();
48 if (status.code != 0) {
49 return status;
50 }
51
52 connected_ = true;
53 return status;
54}
55
56Status HttpProximaSearchClient::close() {
57 // DO NOTHING NOW
58 connected_ = false;
59 return Status();
60}
61
62void HttpProximaSearchClient::rpc_create_collection(
63 brpc::Controller *cntl, const proto::CollectionConfig *request,
64 proto::Status *response) {
65 std::string url;
66 url.append(http_host_)
67 .append("/v1/collection/")
68 .append(request->collection_name());
69
70 std::string json_body;
71 ProtobufHelper::MessageToJson(*request, &json_body);
72
73 cntl->http_request().uri() = url;
74 cntl->http_request().set_method(brpc::HTTP_METHOD_POST);
75 cntl->request_attachment().append(json_body);
76 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
77
78 if (!cntl->Failed()) {
79 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
80 response);
81 }
82}
83
84void HttpProximaSearchClient::rpc_drop_collection(
85 brpc::Controller *cntl, const proto::CollectionName *request,
86 proto::Status *response) {
87 std::string url;
88 url.append(http_host_)
89 .append("/v1/collection/")
90 .append(request->collection_name());
91 cntl->http_request().uri() = url;
92 cntl->http_request().set_method(brpc::HTTP_METHOD_DELETE);
93 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
94
95 if (!cntl->Failed()) {
96 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
97 response);
98 }
99}
100
101void HttpProximaSearchClient::rpc_describe_collection(
102 brpc::Controller *cntl, const proto::CollectionName *request,
103 proto::DescribeCollectionResponse *response) {
104 std::string url;
105 url.append(http_host_)
106 .append("/v1/collection/")
107 .append(request->collection_name());
108 cntl->http_request().uri() = url;
109 cntl->http_request().set_method(brpc::HTTP_METHOD_GET);
110 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
111
112 if (!cntl->Failed()) {
113 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
114 response);
115 }
116}
117
118
119void HttpProximaSearchClient::rpc_stats_collection(
120 brpc::Controller *cntl, const proto::CollectionName *request,
121 proto::StatsCollectionResponse *response) {
122 std::string url;
123 url.append(http_host_)
124 .append("/v1/collection/")
125 .append(request->collection_name())
126 .append("/stats");
127 cntl->http_request().uri() = url;
128 cntl->http_request().set_method(brpc::HTTP_METHOD_GET);
129 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
130
131 if (!cntl->Failed()) {
132 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
133 response);
134 }
135}
136
137void HttpProximaSearchClient::rpc_list_collections(
138 brpc::Controller *cntl, const proto::ListCondition * /* request */,
139 proto::ListCollectionsResponse *response) {
140 std::string url;
141 url.append(http_host_).append("/v1/collections");
142 cntl->http_request().uri() = url;
143 cntl->http_request().set_method(brpc::HTTP_METHOD_GET);
144 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
145
146 if (!cntl->Failed()) {
147 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
148 response);
149 }
150}
151
152void HttpProximaSearchClient::rpc_write(brpc::Controller *cntl,
153 const proto::WriteRequest *request,
154 proto::Status *response) {
155 std::string url;
156 url.append(http_host_)
157 .append("/v1/collection/")
158 .append(request->collection_name())
159 .append("/index");
160
161 std::string json_body;
162 ProtobufHelper::MessageToJson(*request, &json_body);
163
164 cntl->http_request().uri() = url;
165 cntl->http_request().set_method(brpc::HTTP_METHOD_POST);
166 cntl->request_attachment().append(json_body);
167 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
168
169 if (!cntl->Failed()) {
170 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
171 response);
172 }
173}
174
175void HttpProximaSearchClient::rpc_query(brpc::Controller *cntl,
176 const proto::QueryRequest *request,
177 proto::QueryResponse *response) {
178 std::string url;
179 url.append(http_host_)
180 .append("/v1/collection/")
181 .append(request->collection_name())
182 .append("/query");
183
184 std::string json_body;
185 ProtobufHelper::MessageToJson(*request, &json_body);
186
187 cntl->http_request().uri() = url;
188 cntl->http_request().set_method(brpc::HTTP_METHOD_POST);
189 cntl->request_attachment().append(json_body);
190 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
191
192 if (!cntl->Failed()) {
193 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
194 response);
195 }
196}
197
198void HttpProximaSearchClient::rpc_get_document_by_key(
199 brpc::Controller *cntl, const proto::GetDocumentRequest *request,
200 proto::GetDocumentResponse *response) {
201 std::string url;
202 url.append(http_host_)
203 .append("/v1/collection/")
204 .append(request->collection_name())
205 .append("/doc?key=")
206 .append(std::to_string(request->primary_key()));
207
208 cntl->http_request().uri() = url;
209 cntl->http_request().set_method(brpc::HTTP_METHOD_GET);
210 client_channel_.CallMethod(nullptr, cntl, nullptr, nullptr, nullptr);
211
212 if (!cntl->Failed()) {
213 ProtobufHelper::JsonToMessage(cntl->response_attachment().to_string(),
214 response);
215 }
216}
217
218Status HttpProximaSearchClient::check_server_version() {
219 Status status;
220 std::string url;
221 url.append(http_host_).append("/service_version");
222
223 brpc::Controller cntl;
224 cntl.http_request().uri() = url;
225 cntl.http_request().set_method(brpc::HTTP_METHOD_GET);
226 client_channel_.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
227 if (cntl.Failed()) {
228 status.code = ErrorCode_RpcError;
229 status.reason = cntl.ErrorText();
230 return status;
231 }
232
233 proto::GetVersionResponse resp;
234 ProtobufHelper::JsonToMessage(cntl.response_attachment().to_string(), &resp);
235 std::string server_version = resp.version();
236 std::string client_version = Version::String();
237 if (server_version == client_version) {
238 return status;
239 }
240
241 // Temporarily we just use first two seq number of version string to compare
242 // For exp: version[0.1.2] match version[0.1.3] with "0.1"
243 std::vector<std::string> server_sub_seqs;
244 ailego::StringHelper::Split(server_version, '.', &server_sub_seqs);
245 std::vector<std::string> client_sub_seqs;
246 ailego::StringHelper::Split(client_version, '.', &client_sub_seqs);
247
248 int compare_count = 2;
249 for (int i = 0; i < compare_count; i++) {
250 if (client_sub_seqs[i] != server_sub_seqs[i]) {
251 status.code = ErrorCode_MismatchedVersion;
252 status.reason = std::string()
253 .append("client version:")
254 .append(Version::String())
255 .append(" not match server version:")
256 .append(server_version);
257 return status;
258 }
259 }
260
261 return status;
262}
263
264} // namespace be
265} // namespace proxima
266