1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author hongqing.hu
17 * \date Dec 2020
18 * \brief
19 */
20
21
22#include <gtest/gtest.h>
23#define private public
24#define protected public
25#include "agent/index_agent.h"
26#include "index/file_helper.h"
27#include "index/mock_index_service.h" // for MockIndexService
28#include "meta/mock_meta_service.h" // for MockMetaService
29#include "proto/common.pb.h"
30
31#undef protected
32#undef private
33
34using namespace proxima::be;
35using namespace proxima::be::agent;
36
37class IndexAgentTest : public Test {
38 protected:
39 // Sets up the test fixture.
40 void SetUp() override {
41 meta_service_ = std::make_shared<MockMetaService>();
42 char cmd_buf[100];
43 snprintf(cmd_buf, 100, "rm -rf ./agent_friends/");
44 system(cmd_buf);
45
46 FillSchema(&proxy_schema_, proxy_request_, true);
47 FillSchema(&direct_schema_, direct_request_, false);
48
49 collection_path_ = "./" + collection_name_;
50 }
51
52 // Tears down the test fixture.
53 void TearDown() override {
54 meta_service_.reset();
55 index::FileHelper::RemoveDirectory(collection_path_);
56 }
57
58 void FillSchema(meta::CollectionMetaPtr *schema, WriteRequest &request,
59 bool with_repo = true) {
60 *schema = std::make_shared<meta::CollectionMeta>();
61 auto *forward_columns = (*schema)->mutable_forward_columns();
62 forward_columns->emplace_back("age");
63 meta::ColumnMetaPtr column_meta = std::make_shared<meta::ColumnMeta>();
64 column_meta->set_name("face");
65 column_meta->set_index_type(IndexTypes::PROXIMA_GRAPH_INDEX);
66 column_meta->set_data_type(DataTypes::VECTOR_FP32);
67 column_meta->set_dimension(16);
68 column_meta->mutable_parameters()->set("metric_type", "SquaredEuclidean");
69 (*schema)->append(column_meta);
70 (*schema)->set_name(collection_name_);
71 if (with_repo) {
72 meta::RepositoryBasePtr repo =
73 std::make_shared<meta::DatabaseRepositoryMeta>();
74 repo->set_name(collection_name_);
75 (*schema)->set_repository(repo);
76 }
77 request.set_collection_name(collection_name_);
78 index::CollectionDatasetPtr dataset =
79 std::make_shared<index::CollectionDataset>(0);
80 auto *row_data = dataset->add_row_data();
81 row_data->primary_key = 123456;
82 row_data->operation_type = OperationTypes::INSERT;
83
84 if (with_repo) {
85 row_data->lsn_check = true;
86 row_data->lsn = 1;
87 row_data->lsn_context = "binlog:123";
88 request.set_request_type(WriteRequest::RequestType::PROXY);
89 } else {
90 row_data->lsn_check = false;
91 request.set_request_type(WriteRequest::RequestType::DIRECT);
92 }
93
94 row_data->column_datas.resize(1);
95 row_data->column_datas[0].column_name = "face";
96 row_data->column_datas[0].data_type = DataTypes::VECTOR_FP32;
97 row_data->column_datas[0].dimension = 16;
98 std::vector<float> vectors = {1, 2, 3, 4, 5, 6, 7, 8,
99 9, 10, 11, 12, 13, 14, 15, 16};
100 row_data->column_datas[0].data.resize(vectors.size() * sizeof(float));
101 memcpy((void *)&(row_data->column_datas[0].data[0]), (void *)vectors.data(),
102 vectors.size() * sizeof(float));
103 proto::GenericValueList forward_list;
104 forward_list.add_values()->set_int32_value(32);
105 forward_list.SerializeToString(&(row_data->forward_data));
106
107 request.add_collection_dataset(dataset);
108 }
109
110 protected:
111 std::string collection_name_{"agent_friends"};
112 std::string collection_path_{};
113 meta::CollectionMetaPtr proxy_schema_{};
114 meta::CollectionMetaPtr direct_schema_{};
115 WriteRequest proxy_request_{};
116 WriteRequest direct_request_{};
117 MockMetaServicePtr meta_service_{nullptr};
118 MockIndexServicePtr index_service_{nullptr};
119};
120
121TEST_F(IndexAgentTest, TestGeneral) {
122 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
123 int ret = agent->init();
124 ASSERT_EQ(ret, 0);
125
126 EXPECT_CALL(*meta_service_, get_latest_collections(_))
127 .WillOnce(testing::Return(0));
128 ret = agent->start();
129 ASSERT_EQ(ret, 0);
130
131 meta::CollectionMetaPtr schema = proxy_schema_;
132 // create collection
133 EXPECT_CALL(*meta_service_, get_current_collection(_))
134 .Times(1)
135 .WillOnce(
136 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
137 return schema;
138 }))
139 .RetiresOnSaturation();
140 ret = agent->create_collection(collection_name_);
141 ASSERT_EQ(ret, 0);
142
143 // process
144 proxy_request_.set_magic_number(agent->agent_timestamp_);
145 // create collection
146 EXPECT_CALL(*meta_service_, get_current_collection(_))
147 .Times(2)
148 .WillOnce(
149 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
150 return schema;
151 }))
152 .WillOnce(
153 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
154 return schema;
155 }))
156 .RetiresOnSaturation();
157 ret = agent->write(proxy_request_);
158 ASSERT_EQ(ret, 0);
159
160 sleep(1);
161
162 // get collection stats
163 index::CollectionStats stats;
164 ret = agent->get_collection_stats(collection_name_, &stats);
165 ASSERT_EQ(ret, 0);
166
167 // is collection suspend
168 EXPECT_CALL(*meta_service_, get_current_collection(_))
169 .WillOnce(
170 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
171 return schema;
172 }))
173 .RetiresOnSaturation();
174 ASSERT_EQ(agent->is_collection_suspend(collection_name_), false);
175
176 // update collection
177 meta::CollectionMetaPtr new_schema;
178 WriteRequest tmp_request;
179 FillSchema(&new_schema, tmp_request, true);
180 new_schema->set_revision(2);
181 EXPECT_CALL(*meta_service_, get_collection(_, _))
182 .WillOnce(Invoke([&new_schema](const std::string &, uint32_t revision)
183 -> meta::CollectionMetaPtr { return new_schema; }))
184 .RetiresOnSaturation();
185
186 ret = agent->update_collection(collection_name_, 2);
187 ASSERT_EQ(ret, 0);
188
189 sleep(1);
190
191 // drop collection
192 ret = agent->drop_collection(collection_name_);
193 ASSERT_EQ(ret, 0);
194
195 ret = agent->stop();
196 ASSERT_EQ(ret, 0);
197 ret = agent->cleanup();
198 ASSERT_EQ(ret, 0);
199}
200
201TEST_F(IndexAgentTest, TestCreateCollectionWithMetaServiceFailed) {
202 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
203 int ret = agent->init();
204 ASSERT_EQ(ret, 0);
205
206 EXPECT_CALL(*meta_service_, get_latest_collections(_))
207 .WillOnce(testing::Return(0));
208 ret = agent->start();
209 ASSERT_EQ(ret, 0);
210
211 // create collection
212 EXPECT_CALL(*meta_service_, get_current_collection(_))
213 .WillOnce(Invoke([](const std::string &) -> meta::CollectionMetaPtr {
214 return nullptr;
215 }))
216 .RetiresOnSaturation();
217 ret = agent->create_collection(collection_name_);
218 ASSERT_EQ(ret, ErrorCode_InexistentCollection);
219
220 ret = agent->stop();
221 ASSERT_EQ(ret, 0);
222 ret = agent->cleanup();
223 ASSERT_EQ(ret, 0);
224}
225
226TEST_F(IndexAgentTest, TestCreateCollectionWithIndexServiceFailed) {
227 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
228 int ret = agent->init();
229 ASSERT_EQ(ret, 0);
230
231 EXPECT_CALL(*meta_service_, get_latest_collections(_))
232 .WillOnce(testing::Return(0));
233 ret = agent->start();
234 ASSERT_EQ(ret, 0);
235
236 index::FileHelper::CreateDirectory(collection_path_);
237
238 agent->get_service()->index_directory_ = "./";
239 meta::CollectionMetaPtr schema = std::make_shared<meta::CollectionMeta>();
240 // create collection
241 EXPECT_CALL(*meta_service_, get_current_collection(_))
242 .WillOnce(
243 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
244 return schema;
245 }))
246 .RetiresOnSaturation();
247 ret = agent->create_collection(collection_name_);
248 ASSERT_EQ(ret, ErrorCode_DuplicateCollection);
249
250 ret = agent->stop();
251 ASSERT_EQ(ret, 0);
252 ret = agent->cleanup();
253 ASSERT_EQ(ret, 0);
254}
255
256TEST_F(IndexAgentTest, TestUpdateCollectionFailed) {
257 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
258 int ret = agent->init();
259 ASSERT_EQ(ret, 0);
260
261 EXPECT_CALL(*meta_service_, get_latest_collections(_))
262 .WillOnce(testing::Return(0));
263 ret = agent->start();
264 ASSERT_EQ(ret, 0);
265
266 // get counter failed
267 ret = agent->update_collection(collection_name_, 100);
268 ASSERT_EQ(ret, ErrorCode_RuntimeError);
269
270 // create collection
271 meta::CollectionMetaPtr schema = std::make_shared<meta::CollectionMeta>();
272 // create collection
273 EXPECT_CALL(*meta_service_, get_current_collection(_))
274 .WillOnce(
275 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
276 return schema;
277 }))
278 .RetiresOnSaturation();
279 ret = agent->create_collection(collection_name_);
280 ASSERT_EQ(ret, 0);
281
282 // get collection meta failed
283 EXPECT_CALL(*meta_service_, get_collection(_, _))
284 .WillOnce(
285 Invoke([](const std::string &, uint32_t) -> meta::CollectionMetaPtr {
286 return nullptr;
287 }))
288 .RetiresOnSaturation();
289 ret = agent->update_collection(collection_name_, 100);
290 ASSERT_EQ(ret, ErrorCode_InexistentCollection);
291
292
293 // index service update collection failed
294 meta::CollectionMetaPtr new_schema = std::make_shared<meta::CollectionMeta>();
295 EXPECT_CALL(*meta_service_, get_collection(_, _))
296 .WillOnce(Invoke([&new_schema](const std::string &, uint32_t revision)
297 -> meta::CollectionMetaPtr { return new_schema; }))
298 .RetiresOnSaturation();
299 ret = agent->update_collection(collection_name_, 100);
300 ASSERT_EQ(ret, ErrorCode_MismatchedSchema);
301
302 ret = agent->stop();
303 ASSERT_EQ(ret, 0);
304 ret = agent->cleanup();
305 ASSERT_EQ(ret, 0);
306}
307
308TEST_F(IndexAgentTest, TestDropCollectionFailed) {
309 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
310 int ret = agent->init();
311 ASSERT_EQ(ret, 0);
312
313 EXPECT_CALL(*meta_service_, get_latest_collections(_))
314 .WillOnce(testing::Return(0));
315 ret = agent->start();
316 ASSERT_EQ(ret, 0);
317
318 // get counter failed
319 ret = agent->drop_collection(collection_name_);
320 ASSERT_EQ(ret, ErrorCode_InexistentCollection);
321
322 ret = agent->stop();
323 ASSERT_EQ(ret, 0);
324 ret = agent->cleanup();
325 ASSERT_EQ(ret, 0);
326}
327
328TEST_F(IndexAgentTest, TestGetCollectionStatsFailed) {
329 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
330 int ret = agent->init();
331 ASSERT_EQ(ret, 0);
332
333 EXPECT_CALL(*meta_service_, get_latest_collections(_))
334 .WillOnce(testing::Return(0));
335 ret = agent->start();
336 ASSERT_EQ(ret, 0);
337
338 // get collection stats
339 index::CollectionStats stats;
340 ret = agent->get_collection_stats(collection_name_, &stats);
341 ASSERT_EQ(ret, ErrorCode_InexistentCollection);
342
343 ret = agent->stop();
344 ASSERT_EQ(ret, 0);
345 ret = agent->cleanup();
346 ASSERT_EQ(ret, 0);
347}
348
349TEST_F(IndexAgentTest, TestIsCollectionSuspendFailed) {
350 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
351 int ret = agent->init();
352 ASSERT_EQ(ret, 0);
353
354 EXPECT_CALL(*meta_service_, get_latest_collections(_))
355 .WillOnce(testing::Return(0));
356 ret = agent->start();
357 ASSERT_EQ(ret, 0);
358
359 // is collection suspend
360 meta::CollectionMetaPtr schema;
361 EXPECT_CALL(*meta_service_, get_current_collection(_))
362 .WillOnce(
363 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
364 return schema;
365 }))
366 .RetiresOnSaturation();
367 bool result = agent->is_collection_suspend(collection_name_);
368 ASSERT_EQ(result, false);
369
370 ret = agent->stop();
371 ASSERT_EQ(ret, 0);
372 ret = agent->cleanup();
373 ASSERT_EQ(ret, 0);
374}
375
376TEST_F(IndexAgentTest, TestInitFailed) {
377 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
378 agent->meta_service_ = nullptr;
379
380 // meta service nullptr
381 int ret = agent->init();
382 ASSERT_EQ(ret, ErrorCode_RuntimeError);
383}
384
385TEST_F(IndexAgentTest, TestStartFailed) {
386 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
387 int ret = agent->init();
388 ASSERT_EQ(ret, 0);
389
390 // load index service failed
391 EXPECT_CALL(*meta_service_, get_latest_collections(_))
392 .WillOnce(testing::Return(1));
393 ret = agent->start();
394 ASSERT_EQ(ret, 1);
395}
396
397TEST_F(IndexAgentTest, TestLoadIndexServiceFailed) {
398 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
399 int ret = agent->init();
400 ASSERT_EQ(ret, 0);
401
402 ret = agent->index_service_->start();
403 ASSERT_EQ(ret, 0);
404
405 // get latest collections failed
406 EXPECT_CALL(*meta_service_, get_latest_collections(_))
407 .WillOnce(testing::Return(1));
408 ret = agent->load_index_service();
409 ASSERT_EQ(ret, 1);
410
411 meta::CollectionMetaPtr schema = proxy_schema_;
412 EXPECT_CALL(*meta_service_, get_latest_collections(_))
413 .WillOnce(Invoke([&schema](meta::CollectionMetaPtrList *schemas) -> int {
414 schemas->emplace_back(schema);
415 return 0;
416 }))
417 .RetiresOnSaturation();
418
419 ret = agent->load_index_service();
420 ASSERT_EQ(ret, ErrorCode_InvalidIndexDataFormat);
421}
422
423TEST_F(IndexAgentTest, TestWriteSuccessWithProxy) {
424 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
425 int ret = agent->init();
426 ASSERT_EQ(ret, 0);
427
428 EXPECT_CALL(*meta_service_, get_latest_collections(_))
429 .WillOnce(testing::Return(0));
430 ret = agent->start();
431 ASSERT_EQ(ret, 0);
432
433 meta::CollectionMetaPtr schema = proxy_schema_;
434 // create collection
435 EXPECT_CALL(*meta_service_, get_current_collection(_))
436 .Times(1)
437 .WillOnce(
438 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
439 return schema;
440 }))
441 .RetiresOnSaturation();
442 ret = agent->create_collection(collection_name_);
443 ASSERT_EQ(ret, 0);
444
445 // process
446 proxy_request_.set_magic_number(agent->agent_timestamp_);
447 // create collection
448 EXPECT_CALL(*meta_service_, get_current_collection(_))
449 .Times(2)
450 .WillOnce(
451 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
452 return schema;
453 }))
454 .WillOnce(
455 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
456 return schema;
457 }))
458 .RetiresOnSaturation();
459 ret = agent->write(proxy_request_);
460 ASSERT_EQ(ret, 0);
461
462 sleep(1);
463
464 // drop collection
465 ret = agent->drop_collection(collection_name_);
466 ASSERT_EQ(ret, 0);
467
468 ret = agent->stop();
469 ASSERT_EQ(ret, 0);
470 ret = agent->cleanup();
471 ASSERT_EQ(ret, 0);
472}
473
474TEST_F(IndexAgentTest, TestWriteSuccessWithDirect) {
475 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
476 int ret = agent->init();
477 ASSERT_EQ(ret, 0);
478
479 EXPECT_CALL(*meta_service_, get_latest_collections(_))
480 .WillOnce(testing::Return(0));
481 ret = agent->start();
482 ASSERT_EQ(ret, 0);
483
484 meta::CollectionMetaPtr schema = direct_schema_;
485 // create collection
486 EXPECT_CALL(*meta_service_, get_current_collection(_))
487 .Times(1)
488 .WillOnce(
489 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
490 return schema;
491 }))
492 .RetiresOnSaturation();
493 ret = agent->create_collection(collection_name_);
494 ASSERT_EQ(ret, 0);
495
496 // process
497 EXPECT_CALL(*meta_service_, get_current_collection(_))
498 .Times(2)
499 .WillOnce(
500 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
501 return schema;
502 }))
503 .WillOnce(
504 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
505 return schema;
506 }))
507 .RetiresOnSaturation();
508 ret = agent->write(direct_request_);
509 ASSERT_EQ(ret, 0);
510
511 sleep(1);
512
513 // drop collection
514 ret = agent->drop_collection(collection_name_);
515 ASSERT_EQ(ret, 0);
516
517 ret = agent->stop();
518 ASSERT_EQ(ret, 0);
519 ret = agent->cleanup();
520 ASSERT_EQ(ret, 0);
521}
522
523TEST_F(IndexAgentTest, TestWriteSuccessWithDirectRepeatedWrite) {
524 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
525 int ret = agent->init();
526 ASSERT_EQ(ret, 0);
527
528 EXPECT_CALL(*meta_service_, get_latest_collections(_))
529 .WillOnce(testing::Return(0));
530 ret = agent->start();
531 ASSERT_EQ(ret, 0);
532
533 meta::CollectionMetaPtr schema = direct_schema_;
534 // create collection
535 EXPECT_CALL(*meta_service_, get_current_collection(_))
536 .Times(1)
537 .WillOnce(
538 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
539 return schema;
540 }))
541 .RetiresOnSaturation();
542 ret = agent->create_collection(collection_name_);
543 ASSERT_EQ(ret, 0);
544
545 // process
546 EXPECT_CALL(*meta_service_, get_current_collection(_))
547 .Times(2)
548 .WillOnce(
549 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
550 return schema;
551 }))
552 .WillOnce(
553 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
554 return schema;
555 }))
556 .RetiresOnSaturation();
557 ret = agent->write(direct_request_);
558 ASSERT_EQ(ret, 0);
559
560 sleep(1);
561
562 // process
563 EXPECT_CALL(*meta_service_, get_current_collection(_))
564 .Times(2)
565 .WillOnce(
566 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
567 return schema;
568 }))
569 .WillOnce(
570 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
571 return schema;
572 }))
573 .RetiresOnSaturation();
574 ret = agent->write(direct_request_);
575 ASSERT_EQ(ret, ErrorCode_DuplicateKey);
576
577 sleep(1);
578
579 // drop collection
580 ret = agent->drop_collection(collection_name_);
581 ASSERT_EQ(ret, 0);
582
583 ret = agent->stop();
584 ASSERT_EQ(ret, 0);
585 ret = agent->cleanup();
586 ASSERT_EQ(ret, 0);
587}
588
589TEST_F(IndexAgentTest, TestWriteWithEmptyRequest) {
590 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
591 int ret = agent->init();
592 ASSERT_EQ(ret, 0);
593
594 EXPECT_CALL(*meta_service_, get_latest_collections(_))
595 .WillOnce(testing::Return(0));
596 ret = agent->start();
597 ASSERT_EQ(ret, 0);
598
599 WriteRequest request;
600 ret = agent->write(request);
601 ASSERT_EQ(ret, 0);
602}
603
604TEST_F(IndexAgentTest, TestWriteFailedWithCollectionSuspend) {
605 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
606 int ret = agent->init();
607 ASSERT_EQ(ret, 0);
608
609 EXPECT_CALL(*meta_service_, get_latest_collections(_))
610 .WillOnce(testing::Return(0));
611 ret = agent->start();
612 ASSERT_EQ(ret, 0);
613
614 meta::CollectionMetaPtr schema = proxy_schema_;
615 // create collection
616 EXPECT_CALL(*meta_service_, get_current_collection(_))
617 .Times(1)
618 .WillOnce(
619 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
620 return schema;
621 }))
622 .RetiresOnSaturation();
623 ret = agent->create_collection(collection_name_);
624 ASSERT_EQ(ret, 0);
625
626 // process
627 proxy_request_.set_magic_number(agent->agent_timestamp_);
628 schema->set_writable(false);
629 // create collection
630 EXPECT_CALL(*meta_service_, get_current_collection(_))
631 .Times(1)
632 .WillOnce(
633 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
634 return schema;
635 }))
636 .RetiresOnSaturation();
637 ret = agent->write(proxy_request_);
638 ASSERT_EQ(ret, ErrorCode_SuspendedCollection);
639
640 // drop collection
641 ret = agent->drop_collection(collection_name_);
642 ASSERT_EQ(ret, 0);
643
644 ret = agent->stop();
645 ASSERT_EQ(ret, 0);
646 ret = agent->cleanup();
647 ASSERT_EQ(ret, 0);
648}
649
650TEST_F(IndexAgentTest, TestWriteFailedWithMagicNumber) {
651 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
652 int ret = agent->init();
653 ASSERT_EQ(ret, 0);
654
655 EXPECT_CALL(*meta_service_, get_latest_collections(_))
656 .WillOnce(testing::Return(0));
657 ret = agent->start();
658 ASSERT_EQ(ret, 0);
659
660 meta::CollectionMetaPtr schema = proxy_schema_;
661 // create collection
662 EXPECT_CALL(*meta_service_, get_current_collection(_))
663 .Times(1)
664 .WillOnce(
665 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
666 return schema;
667 }))
668 .RetiresOnSaturation();
669 ret = agent->create_collection(collection_name_);
670 ASSERT_EQ(ret, 0);
671
672 // process
673 EXPECT_CALL(*meta_service_, get_current_collection(_))
674 .Times(1)
675 .WillOnce(
676 Invoke([&schema](const std::string &) -> meta::CollectionMetaPtr {
677 return schema;
678 }))
679 .RetiresOnSaturation();
680 ret = agent->write(proxy_request_);
681 ASSERT_EQ(ret, ErrorCode_MismatchedMagicNumber);
682
683 sleep(1);
684
685 // drop collection
686 ret = agent->drop_collection(collection_name_);
687 ASSERT_EQ(ret, 0);
688
689 ret = agent->stop();
690 ASSERT_EQ(ret, 0);
691 ret = agent->cleanup();
692 ASSERT_EQ(ret, 0);
693}
694
695TEST_F(IndexAgentTest, TestWriteDatasetFailed) {
696 IndexAgentPtr agent = IndexAgent::Create(meta_service_);
697 int ret = agent->init();
698 ASSERT_EQ(ret, 0);
699
700 // load index service failed
701 EXPECT_CALL(*meta_service_, get_latest_collections(_))
702 .WillOnce(testing::Return(0));
703 ret = agent->start();
704 ASSERT_EQ(ret, 0);
705
706 index::CollectionDatasetPtr record = std::make_shared<CollectionDataset>(0);
707 CollectionCounterPtr counter = std::make_shared<CollectionCounter>();
708 agent->write_dataset("invalid", record, counter.get());
709
710 ret = agent->stop();
711 ASSERT_EQ(ret, 0);
712 ret = agent->cleanup();
713 ASSERT_EQ(ret, 0);
714}
715