1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author hongqing.hu |
17 | * \date Dec 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | |
22 | #include <gtest/gtest.h> |
23 | #define private public |
24 | #define protected public |
25 | #include "agent/index_agent.h" |
26 | #include "index/file_helper.h" |
27 | #include "index/mock_index_service.h" // for MockIndexService |
28 | #include "meta/mock_meta_service.h" // for MockMetaService |
29 | #include "proto/common.pb.h" |
30 | |
31 | #undef protected |
32 | #undef private |
33 | |
34 | using namespace proxima::be; |
35 | using namespace proxima::be::agent; |
36 | |
37 | class IndexAgentTest : public Test { |
38 | protected: |
39 | // Sets up the test fixture. |
40 | void SetUp() override { |
41 | meta_service_ = std::make_shared<MockMetaService>(); |
42 | char cmd_buf[100]; |
43 | snprintf(cmd_buf, 100, "rm -rf ./agent_friends/" ); |
44 | system(cmd_buf); |
45 | |
46 | FillSchema(&proxy_schema_, proxy_request_, true); |
47 | FillSchema(&direct_schema_, direct_request_, false); |
48 | |
49 | collection_path_ = "./" + collection_name_; |
50 | } |
51 | |
52 | // Tears down the test fixture. |
53 | void TearDown() override { |
54 | meta_service_.reset(); |
55 | index::FileHelper::RemoveDirectory(collection_path_); |
56 | } |
57 | |
58 | void FillSchema(meta::CollectionMetaPtr *schema, WriteRequest &request, |
59 | bool with_repo = true) { |
60 | *schema = std::make_shared<meta::CollectionMeta>(); |
61 | auto *forward_columns = (*schema)->mutable_forward_columns(); |
62 | forward_columns->emplace_back("age" ); |
63 | meta::ColumnMetaPtr column_meta = std::make_shared<meta::ColumnMeta>(); |
64 | column_meta->set_name("face" ); |
65 | column_meta->set_index_type(IndexTypes::PROXIMA_GRAPH_INDEX); |
66 | column_meta->set_data_type(DataTypes::VECTOR_FP32); |
67 | column_meta->set_dimension(16); |
68 | column_meta->mutable_parameters()->set("metric_type" , "SquaredEuclidean" ); |
69 | (*schema)->append(column_meta); |
70 | (*schema)->set_name(collection_name_); |
71 | if (with_repo) { |
72 | meta::RepositoryBasePtr repo = |
73 | std::make_shared<meta::DatabaseRepositoryMeta>(); |
74 | repo->set_name(collection_name_); |
75 | (*schema)->set_repository(repo); |
76 | } |
77 | request.set_collection_name(collection_name_); |
78 | index::CollectionDatasetPtr dataset = |
79 | std::make_shared<index::CollectionDataset>(0); |
80 | auto *row_data = dataset->add_row_data(); |
81 | row_data->primary_key = 123456; |
82 | row_data->operation_type = OperationTypes::INSERT; |
83 | |
84 | if (with_repo) { |
85 | row_data->lsn_check = true; |
86 | row_data->lsn = 1; |
87 | row_data->lsn_context = "binlog:123" ; |
88 | request.set_request_type(WriteRequest::RequestType::PROXY); |
89 | } else { |
90 | row_data->lsn_check = false; |
91 | request.set_request_type(WriteRequest::RequestType::DIRECT); |
92 | } |
93 | |
94 | row_data->column_datas.resize(1); |
95 | row_data->column_datas[0].column_name = "face" ; |
96 | row_data->column_datas[0].data_type = DataTypes::VECTOR_FP32; |
97 | row_data->column_datas[0].dimension = 16; |
98 | std::vector<float> vectors = {1, 2, 3, 4, 5, 6, 7, 8, |
99 | 9, 10, 11, 12, 13, 14, 15, 16}; |
100 | row_data->column_datas[0].data.resize(vectors.size() * sizeof(float)); |
101 | memcpy((void *)&(row_data->column_datas[0].data[0]), (void *)vectors.data(), |
102 | vectors.size() * sizeof(float)); |
103 | proto::GenericValueList forward_list; |
104 | forward_list.add_values()->set_int32_value(32); |
105 | forward_list.SerializeToString(&(row_data->forward_data)); |
106 | |
107 | request.add_collection_dataset(dataset); |
108 | } |
109 | |
110 | protected: |
111 | std::string collection_name_{"agent_friends" }; |
112 | std::string collection_path_{}; |
113 | meta::CollectionMetaPtr proxy_schema_{}; |
114 | meta::CollectionMetaPtr direct_schema_{}; |
115 | WriteRequest proxy_request_{}; |
116 | WriteRequest direct_request_{}; |
117 | MockMetaServicePtr meta_service_{nullptr}; |
118 | MockIndexServicePtr index_service_{nullptr}; |
119 | }; |
120 | |
121 | TEST_F(IndexAgentTest, TestGeneral) { |
122 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
123 | int ret = agent->init(); |
124 | ASSERT_EQ(ret, 0); |
125 | |
126 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
127 | .WillOnce(testing::Return(0)); |
128 | ret = agent->start(); |
129 | ASSERT_EQ(ret, 0); |
130 | |
131 | meta::CollectionMetaPtr schema = proxy_schema_; |
132 | // create collection |
133 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
134 | .Times(1) |
135 | .WillOnce( |
136 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
137 | return schema; |
138 | })) |
139 | .RetiresOnSaturation(); |
140 | ret = agent->create_collection(collection_name_); |
141 | ASSERT_EQ(ret, 0); |
142 | |
143 | // process |
144 | proxy_request_.set_magic_number(agent->agent_timestamp_); |
145 | // create collection |
146 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
147 | .Times(2) |
148 | .WillOnce( |
149 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
150 | return schema; |
151 | })) |
152 | .WillOnce( |
153 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
154 | return schema; |
155 | })) |
156 | .RetiresOnSaturation(); |
157 | ret = agent->write(proxy_request_); |
158 | ASSERT_EQ(ret, 0); |
159 | |
160 | sleep(1); |
161 | |
162 | // get collection stats |
163 | index::CollectionStats stats; |
164 | ret = agent->get_collection_stats(collection_name_, &stats); |
165 | ASSERT_EQ(ret, 0); |
166 | |
167 | // is collection suspend |
168 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
169 | .WillOnce( |
170 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
171 | return schema; |
172 | })) |
173 | .RetiresOnSaturation(); |
174 | ASSERT_EQ(agent->is_collection_suspend(collection_name_), false); |
175 | |
176 | // update collection |
177 | meta::CollectionMetaPtr new_schema; |
178 | WriteRequest tmp_request; |
179 | FillSchema(&new_schema, tmp_request, true); |
180 | new_schema->set_revision(2); |
181 | EXPECT_CALL(*meta_service_, get_collection(_, _)) |
182 | .WillOnce(Invoke([&new_schema](const std::string &, uint32_t revision) |
183 | -> meta::CollectionMetaPtr { return new_schema; })) |
184 | .RetiresOnSaturation(); |
185 | |
186 | ret = agent->update_collection(collection_name_, 2); |
187 | ASSERT_EQ(ret, 0); |
188 | |
189 | sleep(1); |
190 | |
191 | // drop collection |
192 | ret = agent->drop_collection(collection_name_); |
193 | ASSERT_EQ(ret, 0); |
194 | |
195 | ret = agent->stop(); |
196 | ASSERT_EQ(ret, 0); |
197 | ret = agent->cleanup(); |
198 | ASSERT_EQ(ret, 0); |
199 | } |
200 | |
201 | TEST_F(IndexAgentTest, TestCreateCollectionWithMetaServiceFailed) { |
202 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
203 | int ret = agent->init(); |
204 | ASSERT_EQ(ret, 0); |
205 | |
206 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
207 | .WillOnce(testing::Return(0)); |
208 | ret = agent->start(); |
209 | ASSERT_EQ(ret, 0); |
210 | |
211 | // create collection |
212 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
213 | .WillOnce(Invoke([](const std::string &) -> meta::CollectionMetaPtr { |
214 | return nullptr; |
215 | })) |
216 | .RetiresOnSaturation(); |
217 | ret = agent->create_collection(collection_name_); |
218 | ASSERT_EQ(ret, ErrorCode_InexistentCollection); |
219 | |
220 | ret = agent->stop(); |
221 | ASSERT_EQ(ret, 0); |
222 | ret = agent->cleanup(); |
223 | ASSERT_EQ(ret, 0); |
224 | } |
225 | |
226 | TEST_F(IndexAgentTest, TestCreateCollectionWithIndexServiceFailed) { |
227 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
228 | int ret = agent->init(); |
229 | ASSERT_EQ(ret, 0); |
230 | |
231 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
232 | .WillOnce(testing::Return(0)); |
233 | ret = agent->start(); |
234 | ASSERT_EQ(ret, 0); |
235 | |
236 | index::FileHelper::CreateDirectory(collection_path_); |
237 | |
238 | agent->get_service()->index_directory_ = "./" ; |
239 | meta::CollectionMetaPtr schema = std::make_shared<meta::CollectionMeta>(); |
240 | // create collection |
241 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
242 | .WillOnce( |
243 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
244 | return schema; |
245 | })) |
246 | .RetiresOnSaturation(); |
247 | ret = agent->create_collection(collection_name_); |
248 | ASSERT_EQ(ret, ErrorCode_DuplicateCollection); |
249 | |
250 | ret = agent->stop(); |
251 | ASSERT_EQ(ret, 0); |
252 | ret = agent->cleanup(); |
253 | ASSERT_EQ(ret, 0); |
254 | } |
255 | |
256 | TEST_F(IndexAgentTest, TestUpdateCollectionFailed) { |
257 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
258 | int ret = agent->init(); |
259 | ASSERT_EQ(ret, 0); |
260 | |
261 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
262 | .WillOnce(testing::Return(0)); |
263 | ret = agent->start(); |
264 | ASSERT_EQ(ret, 0); |
265 | |
266 | // get counter failed |
267 | ret = agent->update_collection(collection_name_, 100); |
268 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
269 | |
270 | // create collection |
271 | meta::CollectionMetaPtr schema = std::make_shared<meta::CollectionMeta>(); |
272 | // create collection |
273 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
274 | .WillOnce( |
275 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
276 | return schema; |
277 | })) |
278 | .RetiresOnSaturation(); |
279 | ret = agent->create_collection(collection_name_); |
280 | ASSERT_EQ(ret, 0); |
281 | |
282 | // get collection meta failed |
283 | EXPECT_CALL(*meta_service_, get_collection(_, _)) |
284 | .WillOnce( |
285 | Invoke([](const std::string &, uint32_t) -> meta::CollectionMetaPtr { |
286 | return nullptr; |
287 | })) |
288 | .RetiresOnSaturation(); |
289 | ret = agent->update_collection(collection_name_, 100); |
290 | ASSERT_EQ(ret, ErrorCode_InexistentCollection); |
291 | |
292 | |
293 | // index service update collection failed |
294 | meta::CollectionMetaPtr new_schema = std::make_shared<meta::CollectionMeta>(); |
295 | EXPECT_CALL(*meta_service_, get_collection(_, _)) |
296 | .WillOnce(Invoke([&new_schema](const std::string &, uint32_t revision) |
297 | -> meta::CollectionMetaPtr { return new_schema; })) |
298 | .RetiresOnSaturation(); |
299 | ret = agent->update_collection(collection_name_, 100); |
300 | ASSERT_EQ(ret, ErrorCode_MismatchedSchema); |
301 | |
302 | ret = agent->stop(); |
303 | ASSERT_EQ(ret, 0); |
304 | ret = agent->cleanup(); |
305 | ASSERT_EQ(ret, 0); |
306 | } |
307 | |
308 | TEST_F(IndexAgentTest, TestDropCollectionFailed) { |
309 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
310 | int ret = agent->init(); |
311 | ASSERT_EQ(ret, 0); |
312 | |
313 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
314 | .WillOnce(testing::Return(0)); |
315 | ret = agent->start(); |
316 | ASSERT_EQ(ret, 0); |
317 | |
318 | // get counter failed |
319 | ret = agent->drop_collection(collection_name_); |
320 | ASSERT_EQ(ret, ErrorCode_InexistentCollection); |
321 | |
322 | ret = agent->stop(); |
323 | ASSERT_EQ(ret, 0); |
324 | ret = agent->cleanup(); |
325 | ASSERT_EQ(ret, 0); |
326 | } |
327 | |
328 | TEST_F(IndexAgentTest, TestGetCollectionStatsFailed) { |
329 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
330 | int ret = agent->init(); |
331 | ASSERT_EQ(ret, 0); |
332 | |
333 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
334 | .WillOnce(testing::Return(0)); |
335 | ret = agent->start(); |
336 | ASSERT_EQ(ret, 0); |
337 | |
338 | // get collection stats |
339 | index::CollectionStats stats; |
340 | ret = agent->get_collection_stats(collection_name_, &stats); |
341 | ASSERT_EQ(ret, ErrorCode_InexistentCollection); |
342 | |
343 | ret = agent->stop(); |
344 | ASSERT_EQ(ret, 0); |
345 | ret = agent->cleanup(); |
346 | ASSERT_EQ(ret, 0); |
347 | } |
348 | |
349 | TEST_F(IndexAgentTest, TestIsCollectionSuspendFailed) { |
350 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
351 | int ret = agent->init(); |
352 | ASSERT_EQ(ret, 0); |
353 | |
354 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
355 | .WillOnce(testing::Return(0)); |
356 | ret = agent->start(); |
357 | ASSERT_EQ(ret, 0); |
358 | |
359 | // is collection suspend |
360 | meta::CollectionMetaPtr schema; |
361 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
362 | .WillOnce( |
363 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
364 | return schema; |
365 | })) |
366 | .RetiresOnSaturation(); |
367 | bool result = agent->is_collection_suspend(collection_name_); |
368 | ASSERT_EQ(result, false); |
369 | |
370 | ret = agent->stop(); |
371 | ASSERT_EQ(ret, 0); |
372 | ret = agent->cleanup(); |
373 | ASSERT_EQ(ret, 0); |
374 | } |
375 | |
376 | TEST_F(IndexAgentTest, TestInitFailed) { |
377 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
378 | agent->meta_service_ = nullptr; |
379 | |
380 | // meta service nullptr |
381 | int ret = agent->init(); |
382 | ASSERT_EQ(ret, ErrorCode_RuntimeError); |
383 | } |
384 | |
385 | TEST_F(IndexAgentTest, TestStartFailed) { |
386 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
387 | int ret = agent->init(); |
388 | ASSERT_EQ(ret, 0); |
389 | |
390 | // load index service failed |
391 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
392 | .WillOnce(testing::Return(1)); |
393 | ret = agent->start(); |
394 | ASSERT_EQ(ret, 1); |
395 | } |
396 | |
397 | TEST_F(IndexAgentTest, TestLoadIndexServiceFailed) { |
398 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
399 | int ret = agent->init(); |
400 | ASSERT_EQ(ret, 0); |
401 | |
402 | ret = agent->index_service_->start(); |
403 | ASSERT_EQ(ret, 0); |
404 | |
405 | // get latest collections failed |
406 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
407 | .WillOnce(testing::Return(1)); |
408 | ret = agent->load_index_service(); |
409 | ASSERT_EQ(ret, 1); |
410 | |
411 | meta::CollectionMetaPtr schema = proxy_schema_; |
412 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
413 | .WillOnce(Invoke([&schema](meta::CollectionMetaPtrList *schemas) -> int { |
414 | schemas->emplace_back(schema); |
415 | return 0; |
416 | })) |
417 | .RetiresOnSaturation(); |
418 | |
419 | ret = agent->load_index_service(); |
420 | ASSERT_EQ(ret, ErrorCode_InvalidIndexDataFormat); |
421 | } |
422 | |
423 | TEST_F(IndexAgentTest, TestWriteSuccessWithProxy) { |
424 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
425 | int ret = agent->init(); |
426 | ASSERT_EQ(ret, 0); |
427 | |
428 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
429 | .WillOnce(testing::Return(0)); |
430 | ret = agent->start(); |
431 | ASSERT_EQ(ret, 0); |
432 | |
433 | meta::CollectionMetaPtr schema = proxy_schema_; |
434 | // create collection |
435 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
436 | .Times(1) |
437 | .WillOnce( |
438 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
439 | return schema; |
440 | })) |
441 | .RetiresOnSaturation(); |
442 | ret = agent->create_collection(collection_name_); |
443 | ASSERT_EQ(ret, 0); |
444 | |
445 | // process |
446 | proxy_request_.set_magic_number(agent->agent_timestamp_); |
447 | // create collection |
448 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
449 | .Times(2) |
450 | .WillOnce( |
451 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
452 | return schema; |
453 | })) |
454 | .WillOnce( |
455 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
456 | return schema; |
457 | })) |
458 | .RetiresOnSaturation(); |
459 | ret = agent->write(proxy_request_); |
460 | ASSERT_EQ(ret, 0); |
461 | |
462 | sleep(1); |
463 | |
464 | // drop collection |
465 | ret = agent->drop_collection(collection_name_); |
466 | ASSERT_EQ(ret, 0); |
467 | |
468 | ret = agent->stop(); |
469 | ASSERT_EQ(ret, 0); |
470 | ret = agent->cleanup(); |
471 | ASSERT_EQ(ret, 0); |
472 | } |
473 | |
474 | TEST_F(IndexAgentTest, TestWriteSuccessWithDirect) { |
475 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
476 | int ret = agent->init(); |
477 | ASSERT_EQ(ret, 0); |
478 | |
479 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
480 | .WillOnce(testing::Return(0)); |
481 | ret = agent->start(); |
482 | ASSERT_EQ(ret, 0); |
483 | |
484 | meta::CollectionMetaPtr schema = direct_schema_; |
485 | // create collection |
486 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
487 | .Times(1) |
488 | .WillOnce( |
489 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
490 | return schema; |
491 | })) |
492 | .RetiresOnSaturation(); |
493 | ret = agent->create_collection(collection_name_); |
494 | ASSERT_EQ(ret, 0); |
495 | |
496 | // process |
497 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
498 | .Times(2) |
499 | .WillOnce( |
500 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
501 | return schema; |
502 | })) |
503 | .WillOnce( |
504 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
505 | return schema; |
506 | })) |
507 | .RetiresOnSaturation(); |
508 | ret = agent->write(direct_request_); |
509 | ASSERT_EQ(ret, 0); |
510 | |
511 | sleep(1); |
512 | |
513 | // drop collection |
514 | ret = agent->drop_collection(collection_name_); |
515 | ASSERT_EQ(ret, 0); |
516 | |
517 | ret = agent->stop(); |
518 | ASSERT_EQ(ret, 0); |
519 | ret = agent->cleanup(); |
520 | ASSERT_EQ(ret, 0); |
521 | } |
522 | |
523 | TEST_F(IndexAgentTest, TestWriteSuccessWithDirectRepeatedWrite) { |
524 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
525 | int ret = agent->init(); |
526 | ASSERT_EQ(ret, 0); |
527 | |
528 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
529 | .WillOnce(testing::Return(0)); |
530 | ret = agent->start(); |
531 | ASSERT_EQ(ret, 0); |
532 | |
533 | meta::CollectionMetaPtr schema = direct_schema_; |
534 | // create collection |
535 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
536 | .Times(1) |
537 | .WillOnce( |
538 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
539 | return schema; |
540 | })) |
541 | .RetiresOnSaturation(); |
542 | ret = agent->create_collection(collection_name_); |
543 | ASSERT_EQ(ret, 0); |
544 | |
545 | // process |
546 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
547 | .Times(2) |
548 | .WillOnce( |
549 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
550 | return schema; |
551 | })) |
552 | .WillOnce( |
553 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
554 | return schema; |
555 | })) |
556 | .RetiresOnSaturation(); |
557 | ret = agent->write(direct_request_); |
558 | ASSERT_EQ(ret, 0); |
559 | |
560 | sleep(1); |
561 | |
562 | // process |
563 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
564 | .Times(2) |
565 | .WillOnce( |
566 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
567 | return schema; |
568 | })) |
569 | .WillOnce( |
570 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
571 | return schema; |
572 | })) |
573 | .RetiresOnSaturation(); |
574 | ret = agent->write(direct_request_); |
575 | ASSERT_EQ(ret, ErrorCode_DuplicateKey); |
576 | |
577 | sleep(1); |
578 | |
579 | // drop collection |
580 | ret = agent->drop_collection(collection_name_); |
581 | ASSERT_EQ(ret, 0); |
582 | |
583 | ret = agent->stop(); |
584 | ASSERT_EQ(ret, 0); |
585 | ret = agent->cleanup(); |
586 | ASSERT_EQ(ret, 0); |
587 | } |
588 | |
589 | TEST_F(IndexAgentTest, TestWriteWithEmptyRequest) { |
590 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
591 | int ret = agent->init(); |
592 | ASSERT_EQ(ret, 0); |
593 | |
594 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
595 | .WillOnce(testing::Return(0)); |
596 | ret = agent->start(); |
597 | ASSERT_EQ(ret, 0); |
598 | |
599 | WriteRequest request; |
600 | ret = agent->write(request); |
601 | ASSERT_EQ(ret, 0); |
602 | } |
603 | |
604 | TEST_F(IndexAgentTest, TestWriteFailedWithCollectionSuspend) { |
605 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
606 | int ret = agent->init(); |
607 | ASSERT_EQ(ret, 0); |
608 | |
609 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
610 | .WillOnce(testing::Return(0)); |
611 | ret = agent->start(); |
612 | ASSERT_EQ(ret, 0); |
613 | |
614 | meta::CollectionMetaPtr schema = proxy_schema_; |
615 | // create collection |
616 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
617 | .Times(1) |
618 | .WillOnce( |
619 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
620 | return schema; |
621 | })) |
622 | .RetiresOnSaturation(); |
623 | ret = agent->create_collection(collection_name_); |
624 | ASSERT_EQ(ret, 0); |
625 | |
626 | // process |
627 | proxy_request_.set_magic_number(agent->agent_timestamp_); |
628 | schema->set_writable(false); |
629 | // create collection |
630 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
631 | .Times(1) |
632 | .WillOnce( |
633 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
634 | return schema; |
635 | })) |
636 | .RetiresOnSaturation(); |
637 | ret = agent->write(proxy_request_); |
638 | ASSERT_EQ(ret, ErrorCode_SuspendedCollection); |
639 | |
640 | // drop collection |
641 | ret = agent->drop_collection(collection_name_); |
642 | ASSERT_EQ(ret, 0); |
643 | |
644 | ret = agent->stop(); |
645 | ASSERT_EQ(ret, 0); |
646 | ret = agent->cleanup(); |
647 | ASSERT_EQ(ret, 0); |
648 | } |
649 | |
650 | TEST_F(IndexAgentTest, TestWriteFailedWithMagicNumber) { |
651 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
652 | int ret = agent->init(); |
653 | ASSERT_EQ(ret, 0); |
654 | |
655 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
656 | .WillOnce(testing::Return(0)); |
657 | ret = agent->start(); |
658 | ASSERT_EQ(ret, 0); |
659 | |
660 | meta::CollectionMetaPtr schema = proxy_schema_; |
661 | // create collection |
662 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
663 | .Times(1) |
664 | .WillOnce( |
665 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
666 | return schema; |
667 | })) |
668 | .RetiresOnSaturation(); |
669 | ret = agent->create_collection(collection_name_); |
670 | ASSERT_EQ(ret, 0); |
671 | |
672 | // process |
673 | EXPECT_CALL(*meta_service_, get_current_collection(_)) |
674 | .Times(1) |
675 | .WillOnce( |
676 | Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr { |
677 | return schema; |
678 | })) |
679 | .RetiresOnSaturation(); |
680 | ret = agent->write(proxy_request_); |
681 | ASSERT_EQ(ret, ErrorCode_MismatchedMagicNumber); |
682 | |
683 | sleep(1); |
684 | |
685 | // drop collection |
686 | ret = agent->drop_collection(collection_name_); |
687 | ASSERT_EQ(ret, 0); |
688 | |
689 | ret = agent->stop(); |
690 | ASSERT_EQ(ret, 0); |
691 | ret = agent->cleanup(); |
692 | ASSERT_EQ(ret, 0); |
693 | } |
694 | |
695 | TEST_F(IndexAgentTest, TestWriteDatasetFailed) { |
696 | IndexAgentPtr agent = IndexAgent::Create(meta_service_); |
697 | int ret = agent->init(); |
698 | ASSERT_EQ(ret, 0); |
699 | |
700 | // load index service failed |
701 | EXPECT_CALL(*meta_service_, get_latest_collections(_)) |
702 | .WillOnce(testing::Return(0)); |
703 | ret = agent->start(); |
704 | ASSERT_EQ(ret, 0); |
705 | |
706 | index::CollectionDatasetPtr record = std::make_shared<CollectionDataset>(0); |
707 | CollectionCounterPtr counter = std::make_shared<CollectionCounter>(); |
708 | agent->write_dataset("invalid" , record, counter.get()); |
709 | |
710 | ret = agent->stop(); |
711 | ASSERT_EQ(ret, 0); |
712 | ret = agent->cleanup(); |
713 | ASSERT_EQ(ret, 0); |
714 | } |
715 | |