1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Dianzhang.Chen |
17 | * \date Jan 2021 |
18 | * \brief Definition of mock index agent server |
19 | */ |
20 | |
21 | #include <random> |
22 | #include <unordered_set> |
23 | #include <brpc/channel.h> |
24 | #include <brpc/controller.h> |
25 | #include <brpc/server.h> |
26 | #include <gtest/gtest.h> |
27 | #include "proto/proxima_be.pb.h" |
28 | #include "repository/repository_common/error_code.h" |
29 | #include "repository/repository_common/logger.h" |
30 | #include "repository/repository_common/version.h" |
31 | |
32 | using namespace proxima::be; |
33 | using namespace ::testing; |
34 | |
35 | ///////////////////////////////////// |
36 | class MockGeneralProximaServiceImpl |
37 | : public ::proxima::be::proto::ProximaService { |
38 | public: |
39 | void create_collection(google::protobuf::RpcController *controller, |
40 | const ::proxima::be::proto::CollectionConfig *request, |
41 | ::proxima::be::proto::Status *response, |
42 | ::google::protobuf::Closure *done) { |
43 | brpc::ClosureGuard done_guard(done); |
44 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
45 | } |
46 | |
47 | void drop_collection(google::protobuf::RpcController *controller, |
48 | const ::proxima::be::proto::CollectionName *request, |
49 | ::proxima::be::proto::Status *response, |
50 | ::google::protobuf::Closure *done) { |
51 | brpc::ClosureGuard done_guard(done); |
52 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
53 | } |
54 | |
55 | void describe_collection( |
56 | google::protobuf::RpcController *controller, |
57 | const ::proxima::be::proto::CollectionName *request, |
58 | ::proxima::be::proto::DescribeCollectionResponse *response, |
59 | ::google::protobuf::Closure *done) { |
60 | brpc::ClosureGuard done_guard(done); |
61 | std::unique_lock<std::mutex> ul(server_lock_); |
62 | auto *info = response->mutable_collection(); |
63 | auto *config = info->mutable_config(); |
64 | config->set_collection_name(request->collection_name()); |
65 | auto lsn_context = info->mutable_latest_lsn_context(); |
66 | if (!mock_context_store_.empty()) { |
67 | auto last_lsn_context = mock_context_store_.back(); |
68 | lsn_context->set_lsn(last_lsn_context.lsn()); |
69 | lsn_context->set_context(last_lsn_context.context()); |
70 | } else { |
71 | lsn_context->set_lsn(0); |
72 | lsn_context->set_context("" ); |
73 | } |
74 | response->mutable_status()->set_code( |
75 | ::proxima::be::repository::ErrorCode_Success.value()); |
76 | } |
77 | |
78 | void list_collections(google::protobuf::RpcController *controller, |
79 | const ::proxima::be::proto::ListCondition *request, |
80 | ::proxima::be::proto::ListCollectionsResponse *response, |
81 | ::google::protobuf::Closure *done) { |
82 | brpc::ClosureGuard done_guard(done); |
83 | response->mutable_status()->set_code( |
84 | ::proxima::be::repository::ErrorCode_Success.value()); |
85 | } |
86 | |
87 | void stats_collection(google::protobuf::RpcController *controller, |
88 | const ::proxima::be::proto::CollectionName *request, |
89 | ::proxima::be::proto::StatsCollectionResponse *response, |
90 | ::google::protobuf::Closure *done) { |
91 | brpc::ClosureGuard done_guard(done); |
92 | response->mutable_status()->set_code( |
93 | ::proxima::be::repository::ErrorCode_Success.value()); |
94 | } |
95 | |
96 | void write(google::protobuf::RpcController *controller, |
97 | const ::proxima::be::proto::WriteRequest *request, |
98 | ::proxima::be::proto::Status *response, |
99 | ::google::protobuf::Closure *done) { |
100 | brpc::ClosureGuard done_guard(done); |
101 | std::unique_lock<std::mutex> ul(server_lock_); |
102 | LOG_INFO("Mock General ProximaService received request[%s]" , |
103 | request->ShortDebugString().c_str()); |
104 | auto row_size = request->rows_size(); |
105 | for (int i = 0; i < row_size; i++) { |
106 | auto current_row_context = request->rows(i).lsn_context(); |
107 | mock_context_store_.push_back(current_row_context); |
108 | } |
109 | std::string request_string = "" ; |
110 | request->SerializeToString(&request_string); |
111 | request_strings_.push_back(request_string); |
112 | server_called_++; |
113 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
114 | } |
115 | |
116 | void query(google::protobuf::RpcController *controller, |
117 | const ::proxima::be::proto::QueryRequest *request, |
118 | ::proxima::be::proto::QueryResponse *response, |
119 | ::google::protobuf::Closure *done) { |
120 | brpc::ClosureGuard done_guard(done); |
121 | response->mutable_status()->set_code( |
122 | ::proxima::be::repository::ErrorCode_Success.value()); |
123 | } |
124 | |
125 | void get_version(::google::protobuf::RpcController * /* controller */, |
126 | const proto::GetVersionRequest * /* request */, |
127 | proto::GetVersionResponse *response, |
128 | ::google::protobuf::Closure *done) { |
129 | brpc::ClosureGuard done_guard(done); |
130 | response->set_version(repository::Version::String()); |
131 | response->mutable_status()->set_code(0); |
132 | response->mutable_status()->set_reason(repository::ErrorCode::What(0)); |
133 | } |
134 | |
135 | std::string get_request_string(int idx) { |
136 | std::unique_lock<std::mutex> ul(server_lock_); |
137 | if ((int)server_called_ <= idx) { |
138 | return "" ; |
139 | } |
140 | std::string ret = request_strings_[idx]; |
141 | return ret; |
142 | } |
143 | |
144 | size_t get_server_called_count() { |
145 | std::unique_lock<std::mutex> ul(server_lock_); |
146 | return server_called_; |
147 | } |
148 | |
149 | private: |
150 | std::vector<std::string> request_strings_; |
151 | std::vector<proto::LsnContext> mock_context_store_; |
152 | size_t server_called_{0}; |
153 | std::mutex server_lock_; |
154 | }; |
155 | ///////////////////////////////////// |
156 | // todo: don't use :: |
157 | class MockRandomProximaServiceImpl |
158 | : public ::proxima::be::proto::ProximaService { |
159 | public: |
160 | void create_collection(google::protobuf::RpcController *controller, |
161 | const ::proxima::be::proto::CollectionConfig *request, |
162 | ::proxima::be::proto::Status *response, |
163 | ::google::protobuf::Closure *done) { |
164 | brpc::ClosureGuard done_guard(done); |
165 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
166 | } |
167 | |
168 | void drop_collection(google::protobuf::RpcController *controller, |
169 | const ::proxima::be::proto::CollectionName *request, |
170 | ::proxima::be::proto::Status *response, |
171 | ::google::protobuf::Closure *done) { |
172 | brpc::ClosureGuard done_guard(done); |
173 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
174 | } |
175 | |
176 | void describe_collection( |
177 | google::protobuf::RpcController *controller, |
178 | const ::proxima::be::proto::CollectionName *request, |
179 | ::proxima::be::proto::DescribeCollectionResponse *response, |
180 | ::google::protobuf::Closure *done) { |
181 | brpc::ClosureGuard done_guard(done); |
182 | std::unique_lock<std::mutex> ul(server_lock_); |
183 | auto next = (expect_ >> 1); |
184 | auto *info = response->mutable_collection(); |
185 | auto *config = info->mutable_config(); |
186 | config->set_collection_name(request->collection_name()); |
187 | auto lsn_context = info->mutable_latest_lsn_context(); |
188 | lsn_context->set_lsn(next); |
189 | for (auto it = mock_context_store_.rbegin(); |
190 | it != mock_context_store_.rend(); it++) { |
191 | if (it->lsn() == next) { |
192 | lsn_context->set_context(it->context()); |
193 | } |
194 | } |
195 | expect_ = next + 1; |
196 | LOG_INFO("expect_: [%zu]" , (size_t)expect_); |
197 | response->mutable_status()->set_code( |
198 | ::proxima::be::repository::ErrorCode_Success.value()); |
199 | } |
200 | |
201 | void list_collections(google::protobuf::RpcController *controller, |
202 | const ::proxima::be::proto::ListCondition *request, |
203 | ::proxima::be::proto::ListCollectionsResponse *response, |
204 | ::google::protobuf::Closure *done) { |
205 | brpc::ClosureGuard done_guard(done); |
206 | response->mutable_status()->set_code( |
207 | ::proxima::be::repository::ErrorCode_Success.value()); |
208 | } |
209 | |
210 | void stats_collection(google::protobuf::RpcController *controller, |
211 | const ::proxima::be::proto::CollectionName *request, |
212 | ::proxima::be::proto::StatsCollectionResponse *response, |
213 | ::google::protobuf::Closure *done) { |
214 | brpc::ClosureGuard done_guard(done); |
215 | response->mutable_status()->set_code( |
216 | ::proxima::be::repository::ErrorCode_Success.value()); |
217 | } |
218 | |
219 | void write(google::protobuf::RpcController *controller, |
220 | const ::proxima::be::proto::WriteRequest *request, |
221 | ::proxima::be::proto::Status *response, |
222 | ::google::protobuf::Closure *done) { |
223 | brpc::ClosureGuard done_guard(done); |
224 | std::unique_lock<std::mutex> ul(server_lock_); |
225 | LOG_INFO("Mock General ProximaService received request[%s]" , |
226 | request->ShortDebugString().c_str()); |
227 | auto row = request->rows(); |
228 | uint64_t last_lsn = 0; |
229 | for (auto it = row.begin(); it != row.end(); it++) { |
230 | if (it == row.begin()) { |
231 | first_lsn_ = it->lsn_context().lsn(); |
232 | EXPECT_EQ(it->lsn_context().lsn(), expect_); |
233 | } |
234 | auto current_row_context = it->lsn_context(); |
235 | mock_context_store_.push_back(current_row_context); |
236 | last_lsn = current_row_context.lsn(); |
237 | records_count_++; |
238 | } |
239 | server_called_ = true; |
240 | expect_ = last_lsn + 1; |
241 | |
242 | std::mt19937 gen((std::random_device())()); |
243 | auto temp = (std::uniform_int_distribution<size_t>(0, 10))(gen); |
244 | int result = 0; |
245 | if (temp < 7) { |
246 | result = ::proxima::be::repository::ErrorCode_Success.value(); |
247 | } else if (temp < 9) { |
248 | result = ::proxima::be::repository::ErrorCode_ExceedRateLimit.value(); |
249 | expect_ = first_lsn_; |
250 | } else { |
251 | // todo<cdz>: Add ErrorCode_MismatchedSchema when support update |
252 | // result = |
253 | // ::proxima::be::repository::ErrorCode_MismatchedSchema.value(); |
254 | result = ::proxima::be::repository::ErrorCode_Success.value(); |
255 | } |
256 | LOG_INFO("expect_: [%zu]" , (size_t)expect_); |
257 | response->set_code(result); |
258 | } |
259 | |
260 | void query(google::protobuf::RpcController *controller, |
261 | const ::proxima::be::proto::QueryRequest *request, |
262 | ::proxima::be::proto::QueryResponse *response, |
263 | ::google::protobuf::Closure *done) { |
264 | brpc::ClosureGuard done_guard(done); |
265 | response->mutable_status()->set_code( |
266 | ::proxima::be::repository::ErrorCode_Success.value()); |
267 | } |
268 | |
269 | bool is_server_called() { |
270 | std::unique_lock<std::mutex> ul(server_lock_); |
271 | return server_called_; |
272 | } |
273 | |
274 | |
275 | uint64_t get_records_count() { |
276 | std::unique_lock<std::mutex> ul(server_lock_); |
277 | return records_count_; |
278 | } |
279 | |
280 | |
281 | void get_version(::google::protobuf::RpcController * /* controller */, |
282 | const proto::GetVersionRequest * /* request */, |
283 | proto::GetVersionResponse *response, |
284 | ::google::protobuf::Closure *done) { |
285 | brpc::ClosureGuard done_guard(done); |
286 | response->set_version(repository::Version::String()); |
287 | response->mutable_status()->set_code(0); |
288 | response->mutable_status()->set_reason(repository::ErrorCode::What(0)); |
289 | } |
290 | |
291 | |
292 | private: |
293 | uint64_t expect_{1}; |
294 | uint64_t first_lsn_{1}; |
295 | bool server_called_{false}; |
296 | uint64_t records_count_{0}; |
297 | std::vector<proto::LsnContext> mock_context_store_; |
298 | std::mutex server_lock_; |
299 | }; |
300 | |
301 | // Use for manager test |
302 | class MockProximaServiceImpl : public ::proxima::be::proto::ProximaService { |
303 | public: |
304 | void create_collection(google::protobuf::RpcController *controller, |
305 | const ::proxima::be::proto::CollectionConfig *request, |
306 | ::proxima::be::proto::Status *response, |
307 | ::google::protobuf::Closure *done) { |
308 | brpc::ClosureGuard done_guard(done); |
309 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
310 | } |
311 | |
312 | void drop_collection(google::protobuf::RpcController *controller, |
313 | const ::proxima::be::proto::CollectionName *request, |
314 | ::proxima::be::proto::Status *response, |
315 | ::google::protobuf::Closure *done) { |
316 | brpc::ClosureGuard done_guard(done); |
317 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
318 | } |
319 | |
320 | void describe_collection( |
321 | google::protobuf::RpcController *controller, |
322 | const ::proxima::be::proto::CollectionName *request, |
323 | ::proxima::be::proto::DescribeCollectionResponse *response, |
324 | ::google::protobuf::Closure *done) { |
325 | brpc::ClosureGuard done_guard(done); |
326 | |
327 | std::unique_lock<std::mutex> ul(server_lock_); |
328 | created_collection_.insert(request->collection_name()); |
329 | auto *info = response->mutable_collection(); |
330 | auto *config = info->mutable_config(); |
331 | config->set_collection_name(request->collection_name()); |
332 | response->mutable_status()->set_code( |
333 | ::proxima::be::repository::ErrorCode_Success.value()); |
334 | } |
335 | |
336 | void list_collections(google::protobuf::RpcController *controller, |
337 | const ::proxima::be::proto::ListCondition *request, |
338 | ::proxima::be::proto::ListCollectionsResponse *response, |
339 | ::google::protobuf::Closure *done) { |
340 | brpc::ClosureGuard done_guard(done); |
341 | |
342 | for (size_t i = 0; i < collections_name_.size(); i++) { |
343 | auto *current_collection_info = response->add_collections(); |
344 | // current_config->set_collection_name(collections_name[i]); |
345 | current_collection_info->set_uuid(collections_uuid_[i]); |
346 | current_collection_info->set_status( |
347 | proxima::be::proto::CollectionInfo::CS_SERVING); |
348 | auto *current_config = current_collection_info->mutable_config(); |
349 | current_config->set_collection_name(collections_name_[i]); |
350 | // current_config->set_schema_revision(1); |
351 | } |
352 | response->mutable_status()->set_code( |
353 | ::proxima::be::repository::ErrorCode_Success.value()); |
354 | } |
355 | |
356 | void stats_collection(google::protobuf::RpcController *controller, |
357 | const ::proxima::be::proto::CollectionName *request, |
358 | ::proxima::be::proto::StatsCollectionResponse *response, |
359 | ::google::protobuf::Closure *done) { |
360 | brpc::ClosureGuard done_guard(done); |
361 | response->mutable_status()->set_code( |
362 | ::proxima::be::repository::ErrorCode_Success.value()); |
363 | } |
364 | |
365 | void write(google::protobuf::RpcController *controller, |
366 | const ::proxima::be::proto::WriteRequest *request, |
367 | ::proxima::be::proto::Status *response, |
368 | ::google::protobuf::Closure *done) { |
369 | brpc::ClosureGuard done_guard(done); |
370 | response->set_code(::proxima::be::repository::ErrorCode_Success.value()); |
371 | } |
372 | |
373 | void query(google::protobuf::RpcController *controller, |
374 | const ::proxima::be::proto::QueryRequest *request, |
375 | ::proxima::be::proto::QueryResponse *response, |
376 | ::google::protobuf::Closure *done) { |
377 | brpc::ClosureGuard done_guard(done); |
378 | response->mutable_status()->set_code( |
379 | ::proxima::be::repository::ErrorCode_Success.value()); |
380 | } |
381 | |
382 | void get_version(::google::protobuf::RpcController * /* controller */, |
383 | const proto::GetVersionRequest * /* request */, |
384 | proto::GetVersionResponse *response, |
385 | ::google::protobuf::Closure *done) { |
386 | brpc::ClosureGuard done_guard(done); |
387 | response->set_version(repository::Version::String()); |
388 | response->mutable_status()->set_code(0); |
389 | response->mutable_status()->set_reason(repository::ErrorCode::What(0)); |
390 | } |
391 | |
392 | std::unordered_set<std::string> get_created_collections() { |
393 | std::unique_lock<std::mutex> ul(server_lock_); |
394 | return created_collection_; |
395 | } |
396 | |
397 | std::vector<std::string> get_collections_name() { |
398 | return collections_name_; |
399 | } |
400 | |
401 | private: |
402 | std::vector<std::string> collections_name_{"collection1" , "collection2" , |
403 | "collection3" }; |
404 | std::vector<std::string> collections_uuid_{ |
405 | "collection1-uuid" , "collection2-uuid" , "collection3-uuid" }; |
406 | std::unordered_set<std::string> created_collection_{}; |
407 | std::mutex server_lock_; |
408 | }; |
409 | |