1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Jan 2021
18 * \brief Definition of mock index agent server
19 */
20
21#include <random>
22#include <unordered_set>
23#include <brpc/channel.h>
24#include <brpc/controller.h>
25#include <brpc/server.h>
26#include <gtest/gtest.h>
27#include "proto/proxima_be.pb.h"
28#include "repository/repository_common/error_code.h"
29#include "repository/repository_common/logger.h"
30#include "repository/repository_common/version.h"
31
32using namespace proxima::be;
33using namespace ::testing;
34
35/////////////////////////////////////
36class MockGeneralProximaServiceImpl
37 : public ::proxima::be::proto::ProximaService {
38 public:
39 void create_collection(google::protobuf::RpcController *controller,
40 const ::proxima::be::proto::CollectionConfig *request,
41 ::proxima::be::proto::Status *response,
42 ::google::protobuf::Closure *done) {
43 brpc::ClosureGuard done_guard(done);
44 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
45 }
46
47 void drop_collection(google::protobuf::RpcController *controller,
48 const ::proxima::be::proto::CollectionName *request,
49 ::proxima::be::proto::Status *response,
50 ::google::protobuf::Closure *done) {
51 brpc::ClosureGuard done_guard(done);
52 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
53 }
54
55 void describe_collection(
56 google::protobuf::RpcController *controller,
57 const ::proxima::be::proto::CollectionName *request,
58 ::proxima::be::proto::DescribeCollectionResponse *response,
59 ::google::protobuf::Closure *done) {
60 brpc::ClosureGuard done_guard(done);
61 std::unique_lock<std::mutex> ul(server_lock_);
62 auto *info = response->mutable_collection();
63 auto *config = info->mutable_config();
64 config->set_collection_name(request->collection_name());
65 auto lsn_context = info->mutable_latest_lsn_context();
66 if (!mock_context_store_.empty()) {
67 auto last_lsn_context = mock_context_store_.back();
68 lsn_context->set_lsn(last_lsn_context.lsn());
69 lsn_context->set_context(last_lsn_context.context());
70 } else {
71 lsn_context->set_lsn(0);
72 lsn_context->set_context("");
73 }
74 response->mutable_status()->set_code(
75 ::proxima::be::repository::ErrorCode_Success.value());
76 }
77
78 void list_collections(google::protobuf::RpcController *controller,
79 const ::proxima::be::proto::ListCondition *request,
80 ::proxima::be::proto::ListCollectionsResponse *response,
81 ::google::protobuf::Closure *done) {
82 brpc::ClosureGuard done_guard(done);
83 response->mutable_status()->set_code(
84 ::proxima::be::repository::ErrorCode_Success.value());
85 }
86
87 void stats_collection(google::protobuf::RpcController *controller,
88 const ::proxima::be::proto::CollectionName *request,
89 ::proxima::be::proto::StatsCollectionResponse *response,
90 ::google::protobuf::Closure *done) {
91 brpc::ClosureGuard done_guard(done);
92 response->mutable_status()->set_code(
93 ::proxima::be::repository::ErrorCode_Success.value());
94 }
95
96 void write(google::protobuf::RpcController *controller,
97 const ::proxima::be::proto::WriteRequest *request,
98 ::proxima::be::proto::Status *response,
99 ::google::protobuf::Closure *done) {
100 brpc::ClosureGuard done_guard(done);
101 std::unique_lock<std::mutex> ul(server_lock_);
102 LOG_INFO("Mock General ProximaService received request[%s]",
103 request->ShortDebugString().c_str());
104 auto row_size = request->rows_size();
105 for (int i = 0; i < row_size; i++) {
106 auto current_row_context = request->rows(i).lsn_context();
107 mock_context_store_.push_back(current_row_context);
108 }
109 std::string request_string = "";
110 request->SerializeToString(&request_string);
111 request_strings_.push_back(request_string);
112 server_called_++;
113 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
114 }
115
116 void query(google::protobuf::RpcController *controller,
117 const ::proxima::be::proto::QueryRequest *request,
118 ::proxima::be::proto::QueryResponse *response,
119 ::google::protobuf::Closure *done) {
120 brpc::ClosureGuard done_guard(done);
121 response->mutable_status()->set_code(
122 ::proxima::be::repository::ErrorCode_Success.value());
123 }
124
125 void get_version(::google::protobuf::RpcController * /* controller */,
126 const proto::GetVersionRequest * /* request */,
127 proto::GetVersionResponse *response,
128 ::google::protobuf::Closure *done) {
129 brpc::ClosureGuard done_guard(done);
130 response->set_version(repository::Version::String());
131 response->mutable_status()->set_code(0);
132 response->mutable_status()->set_reason(repository::ErrorCode::What(0));
133 }
134
135 std::string get_request_string(int idx) {
136 std::unique_lock<std::mutex> ul(server_lock_);
137 if ((int)server_called_ <= idx) {
138 return "";
139 }
140 std::string ret = request_strings_[idx];
141 return ret;
142 }
143
144 size_t get_server_called_count() {
145 std::unique_lock<std::mutex> ul(server_lock_);
146 return server_called_;
147 }
148
149 private:
150 std::vector<std::string> request_strings_;
151 std::vector<proto::LsnContext> mock_context_store_;
152 size_t server_called_{0};
153 std::mutex server_lock_;
154};
155/////////////////////////////////////
156// todo: don't use ::
157class MockRandomProximaServiceImpl
158 : public ::proxima::be::proto::ProximaService {
159 public:
160 void create_collection(google::protobuf::RpcController *controller,
161 const ::proxima::be::proto::CollectionConfig *request,
162 ::proxima::be::proto::Status *response,
163 ::google::protobuf::Closure *done) {
164 brpc::ClosureGuard done_guard(done);
165 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
166 }
167
168 void drop_collection(google::protobuf::RpcController *controller,
169 const ::proxima::be::proto::CollectionName *request,
170 ::proxima::be::proto::Status *response,
171 ::google::protobuf::Closure *done) {
172 brpc::ClosureGuard done_guard(done);
173 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
174 }
175
176 void describe_collection(
177 google::protobuf::RpcController *controller,
178 const ::proxima::be::proto::CollectionName *request,
179 ::proxima::be::proto::DescribeCollectionResponse *response,
180 ::google::protobuf::Closure *done) {
181 brpc::ClosureGuard done_guard(done);
182 std::unique_lock<std::mutex> ul(server_lock_);
183 auto next = (expect_ >> 1);
184 auto *info = response->mutable_collection();
185 auto *config = info->mutable_config();
186 config->set_collection_name(request->collection_name());
187 auto lsn_context = info->mutable_latest_lsn_context();
188 lsn_context->set_lsn(next);
189 for (auto it = mock_context_store_.rbegin();
190 it != mock_context_store_.rend(); it++) {
191 if (it->lsn() == next) {
192 lsn_context->set_context(it->context());
193 }
194 }
195 expect_ = next + 1;
196 LOG_INFO("expect_: [%zu]", (size_t)expect_);
197 response->mutable_status()->set_code(
198 ::proxima::be::repository::ErrorCode_Success.value());
199 }
200
201 void list_collections(google::protobuf::RpcController *controller,
202 const ::proxima::be::proto::ListCondition *request,
203 ::proxima::be::proto::ListCollectionsResponse *response,
204 ::google::protobuf::Closure *done) {
205 brpc::ClosureGuard done_guard(done);
206 response->mutable_status()->set_code(
207 ::proxima::be::repository::ErrorCode_Success.value());
208 }
209
210 void stats_collection(google::protobuf::RpcController *controller,
211 const ::proxima::be::proto::CollectionName *request,
212 ::proxima::be::proto::StatsCollectionResponse *response,
213 ::google::protobuf::Closure *done) {
214 brpc::ClosureGuard done_guard(done);
215 response->mutable_status()->set_code(
216 ::proxima::be::repository::ErrorCode_Success.value());
217 }
218
219 void write(google::protobuf::RpcController *controller,
220 const ::proxima::be::proto::WriteRequest *request,
221 ::proxima::be::proto::Status *response,
222 ::google::protobuf::Closure *done) {
223 brpc::ClosureGuard done_guard(done);
224 std::unique_lock<std::mutex> ul(server_lock_);
225 LOG_INFO("Mock General ProximaService received request[%s]",
226 request->ShortDebugString().c_str());
227 auto row = request->rows();
228 uint64_t last_lsn = 0;
229 for (auto it = row.begin(); it != row.end(); it++) {
230 if (it == row.begin()) {
231 first_lsn_ = it->lsn_context().lsn();
232 EXPECT_EQ(it->lsn_context().lsn(), expect_);
233 }
234 auto current_row_context = it->lsn_context();
235 mock_context_store_.push_back(current_row_context);
236 last_lsn = current_row_context.lsn();
237 records_count_++;
238 }
239 server_called_ = true;
240 expect_ = last_lsn + 1;
241
242 std::mt19937 gen((std::random_device())());
243 auto temp = (std::uniform_int_distribution<size_t>(0, 10))(gen);
244 int result = 0;
245 if (temp < 7) {
246 result = ::proxima::be::repository::ErrorCode_Success.value();
247 } else if (temp < 9) {
248 result = ::proxima::be::repository::ErrorCode_ExceedRateLimit.value();
249 expect_ = first_lsn_;
250 } else {
251 // todo<cdz>: Add ErrorCode_MismatchedSchema when support update
252 // result =
253 // ::proxima::be::repository::ErrorCode_MismatchedSchema.value();
254 result = ::proxima::be::repository::ErrorCode_Success.value();
255 }
256 LOG_INFO("expect_: [%zu]", (size_t)expect_);
257 response->set_code(result);
258 }
259
260 void query(google::protobuf::RpcController *controller,
261 const ::proxima::be::proto::QueryRequest *request,
262 ::proxima::be::proto::QueryResponse *response,
263 ::google::protobuf::Closure *done) {
264 brpc::ClosureGuard done_guard(done);
265 response->mutable_status()->set_code(
266 ::proxima::be::repository::ErrorCode_Success.value());
267 }
268
269 bool is_server_called() {
270 std::unique_lock<std::mutex> ul(server_lock_);
271 return server_called_;
272 }
273
274
275 uint64_t get_records_count() {
276 std::unique_lock<std::mutex> ul(server_lock_);
277 return records_count_;
278 }
279
280
281 void get_version(::google::protobuf::RpcController * /* controller */,
282 const proto::GetVersionRequest * /* request */,
283 proto::GetVersionResponse *response,
284 ::google::protobuf::Closure *done) {
285 brpc::ClosureGuard done_guard(done);
286 response->set_version(repository::Version::String());
287 response->mutable_status()->set_code(0);
288 response->mutable_status()->set_reason(repository::ErrorCode::What(0));
289 }
290
291
292 private:
293 uint64_t expect_{1};
294 uint64_t first_lsn_{1};
295 bool server_called_{false};
296 uint64_t records_count_{0};
297 std::vector<proto::LsnContext> mock_context_store_;
298 std::mutex server_lock_;
299};
300
301// Use for manager test
302class MockProximaServiceImpl : public ::proxima::be::proto::ProximaService {
303 public:
304 void create_collection(google::protobuf::RpcController *controller,
305 const ::proxima::be::proto::CollectionConfig *request,
306 ::proxima::be::proto::Status *response,
307 ::google::protobuf::Closure *done) {
308 brpc::ClosureGuard done_guard(done);
309 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
310 }
311
312 void drop_collection(google::protobuf::RpcController *controller,
313 const ::proxima::be::proto::CollectionName *request,
314 ::proxima::be::proto::Status *response,
315 ::google::protobuf::Closure *done) {
316 brpc::ClosureGuard done_guard(done);
317 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
318 }
319
320 void describe_collection(
321 google::protobuf::RpcController *controller,
322 const ::proxima::be::proto::CollectionName *request,
323 ::proxima::be::proto::DescribeCollectionResponse *response,
324 ::google::protobuf::Closure *done) {
325 brpc::ClosureGuard done_guard(done);
326
327 std::unique_lock<std::mutex> ul(server_lock_);
328 created_collection_.insert(request->collection_name());
329 auto *info = response->mutable_collection();
330 auto *config = info->mutable_config();
331 config->set_collection_name(request->collection_name());
332 response->mutable_status()->set_code(
333 ::proxima::be::repository::ErrorCode_Success.value());
334 }
335
336 void list_collections(google::protobuf::RpcController *controller,
337 const ::proxima::be::proto::ListCondition *request,
338 ::proxima::be::proto::ListCollectionsResponse *response,
339 ::google::protobuf::Closure *done) {
340 brpc::ClosureGuard done_guard(done);
341
342 for (size_t i = 0; i < collections_name_.size(); i++) {
343 auto *current_collection_info = response->add_collections();
344 // current_config->set_collection_name(collections_name[i]);
345 current_collection_info->set_uuid(collections_uuid_[i]);
346 current_collection_info->set_status(
347 proxima::be::proto::CollectionInfo::CS_SERVING);
348 auto *current_config = current_collection_info->mutable_config();
349 current_config->set_collection_name(collections_name_[i]);
350 // current_config->set_schema_revision(1);
351 }
352 response->mutable_status()->set_code(
353 ::proxima::be::repository::ErrorCode_Success.value());
354 }
355
356 void stats_collection(google::protobuf::RpcController *controller,
357 const ::proxima::be::proto::CollectionName *request,
358 ::proxima::be::proto::StatsCollectionResponse *response,
359 ::google::protobuf::Closure *done) {
360 brpc::ClosureGuard done_guard(done);
361 response->mutable_status()->set_code(
362 ::proxima::be::repository::ErrorCode_Success.value());
363 }
364
365 void write(google::protobuf::RpcController *controller,
366 const ::proxima::be::proto::WriteRequest *request,
367 ::proxima::be::proto::Status *response,
368 ::google::protobuf::Closure *done) {
369 brpc::ClosureGuard done_guard(done);
370 response->set_code(::proxima::be::repository::ErrorCode_Success.value());
371 }
372
373 void query(google::protobuf::RpcController *controller,
374 const ::proxima::be::proto::QueryRequest *request,
375 ::proxima::be::proto::QueryResponse *response,
376 ::google::protobuf::Closure *done) {
377 brpc::ClosureGuard done_guard(done);
378 response->mutable_status()->set_code(
379 ::proxima::be::repository::ErrorCode_Success.value());
380 }
381
382 void get_version(::google::protobuf::RpcController * /* controller */,
383 const proto::GetVersionRequest * /* request */,
384 proto::GetVersionResponse *response,
385 ::google::protobuf::Closure *done) {
386 brpc::ClosureGuard done_guard(done);
387 response->set_version(repository::Version::String());
388 response->mutable_status()->set_code(0);
389 response->mutable_status()->set_reason(repository::ErrorCode::What(0));
390 }
391
392 std::unordered_set<std::string> get_created_collections() {
393 std::unique_lock<std::mutex> ul(server_lock_);
394 return created_collection_;
395 }
396
397 std::vector<std::string> get_collections_name() {
398 return collections_name_;
399 }
400
401 private:
402 std::vector<std::string> collections_name_{"collection1", "collection2",
403 "collection3"};
404 std::vector<std::string> collections_uuid_{
405 "collection1-uuid", "collection2-uuid", "collection3-uuid"};
406 std::unordered_set<std::string> created_collection_{};
407 std::mutex server_lock_;
408};
409