1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "index/collection.h"
18#include <gtest/gtest.h>
19
20using namespace proxima::be;
21using namespace proxima::be::index;
22
23class CollectionTest : public testing::Test {
24 protected:
25 void SetUp() {
26 char cmd_buf[100];
27 snprintf(cmd_buf, 100, "rm -rf ./teachers/");
28 system(cmd_buf);
29
30 FillSchema();
31 }
32
33 void TearDown() {}
34
35 void FillSchema() {
36 schema_ = std::make_shared<meta::CollectionMeta>();
37 meta::ColumnMetaPtr column_meta = std::make_shared<meta::ColumnMeta>();
38 column_meta->set_name("face");
39 column_meta->set_index_type(IndexTypes::PROXIMA_GRAPH_INDEX);
40 column_meta->set_data_type(DataTypes::VECTOR_FP32);
41 column_meta->set_dimension(16);
42 column_meta->mutable_parameters()->set("metric_type", "SquaredEuclidean");
43 schema_->append(column_meta);
44 schema_->set_name("teachers");
45 schema_->set_revision(0);
46 // Set default value to 0
47 schema_->set_max_docs_per_segment(0);
48 }
49
50 protected:
51 meta::CollectionMetaPtr schema_{};
52};
53
54TEST_F(CollectionTest, TestGeneral) {
55 index::ThreadPool thread_pool(10, false);
56 CollectionPtr collection =
57 Collection::Create(schema_->name(), "./", schema_, 10, &thread_pool);
58 ASSERT_NE(collection, nullptr);
59 ReadOptions read_options;
60 read_options.use_mmap = true;
61 read_options.create_new = true;
62 int ret = collection->open(read_options);
63 ASSERT_EQ(ret, 0);
64
65 for (size_t i = 0; i < 1000; i++) {
66 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
67 CollectionDataset::RowData *new_row = add_records->add_row_data();
68 new_row->primary_key = i;
69 new_row->operation_type = OperationTypes::INSERT;
70 new_row->lsn = i;
71 new_row->forward_data = "hello";
72
73 CollectionDataset::ColumnData new_column;
74 new_column.column_name = "face";
75 new_column.data_type = DataTypes::VECTOR_FP32;
76 new_column.dimension = 16;
77
78 std::vector<float> fvec(16U);
79 for (size_t j = 0; j < 16U; j++) {
80 fvec[j] = i * 1.0f;
81 }
82 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
83 new_column.data = vector;
84
85 new_row->column_datas.emplace_back(new_column);
86 ret = collection->write_records(*add_records);
87 ASSERT_EQ(ret, 0);
88
89 uint64_t lsn;
90 std::string lsn_context;
91 ret = collection->get_latest_lsn(&lsn, &lsn_context);
92 ASSERT_EQ(ret, 0);
93 ASSERT_EQ(lsn, i);
94 }
95
96 // test search query
97 std::vector<SegmentPtr> segments;
98 ret = collection->get_segments(&segments);
99 ASSERT_EQ(ret, 0);
100 ASSERT_EQ(segments.size(), 1);
101
102 ASSERT_EQ(segments[0]->collection_name(), "teachers");
103 ASSERT_EQ(segments[0]->segment_id(), 0);
104 ASSERT_EQ(segments[0]->doc_count(), 1000);
105
106 for (size_t i = 0; i < 1000; i++) {
107 std::vector<float> fvec(16U);
108 for (size_t j = 0; j < 16U; j++) {
109 fvec[j] = i * 1.0f;
110 }
111 std::string query((char *)fvec.data(), fvec.size() * sizeof(float));
112 QueryParams query_params;
113 query_params.topk = 10;
114 query_params.data_type = DataTypes::VECTOR_FP32;
115 query_params.dimension = 16;
116
117 QueryResultList result_list;
118 ret = segments[0]->knn_search("face", query, query_params, &result_list);
119 ASSERT_EQ(ret, 0);
120 ASSERT_EQ(result_list.size(), 10);
121 ASSERT_EQ(result_list[0].primary_key, i);
122 ASSERT_EQ(result_list[0].score, 0.0f);
123 ASSERT_EQ(result_list[0].lsn, i);
124 }
125}
126
127TEST_F(CollectionTest, TestDumpSegment) {
128 index::ThreadPool thread_pool(10, false);
129 CollectionPtr collection =
130 Collection::Create(schema_->name(), "./", schema_, 10, &thread_pool);
131 ASSERT_NE(collection, nullptr);
132
133 ReadOptions read_options;
134 read_options.use_mmap = true;
135 read_options.create_new = true;
136 int ret = collection->open(read_options);
137 ASSERT_EQ(ret, 0);
138
139 // add segment dump document limit
140 schema_->set_max_docs_per_segment(900);
141
142 for (size_t i = 0; i < 2000; i++) {
143 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
144 CollectionDataset::RowData *new_row = add_records->add_row_data();
145 new_row->primary_key = i;
146 new_row->operation_type = OperationTypes::INSERT;
147 new_row->lsn = i;
148 new_row->forward_data = "hello";
149
150 CollectionDataset::ColumnData new_column;
151 new_column.column_name = "face";
152 new_column.data_type = DataTypes::VECTOR_FP32;
153 new_column.dimension = 16;
154
155 std::vector<float> fvec(16U);
156 for (size_t j = 0; j < 16U; j++) {
157 fvec[j] = i * 1.0f;
158 }
159 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
160 new_column.data = vector;
161
162 new_row->column_datas.emplace_back(new_column);
163 ret = collection->write_records(*add_records);
164 ASSERT_EQ(ret, 0);
165
166 if (i > 0 && i % 900 == 0) {
167 sleep(2);
168 }
169 }
170
171 CollectionStats stats;
172 ret = collection->get_stats(&stats);
173 ASSERT_EQ(ret, 0);
174 ASSERT_EQ(stats.total_doc_count, 2000);
175 ASSERT_EQ(stats.delete_doc_count, 0);
176 ASSERT_EQ(stats.total_segment_count, 3);
177 ASSERT_EQ(stats.total_index_file_count, 8);
178 ASSERT_GT(stats.total_index_file_size, 2000U);
179 ASSERT_EQ(stats.segment_stats.size(), 3);
180
181 ASSERT_EQ(stats.segment_stats[0].segment_id, 0);
182 ASSERT_EQ(stats.segment_stats[0].state, SegmentState::PERSIST);
183 ASSERT_EQ(stats.segment_stats[0].doc_count, 900);
184 ASSERT_EQ(stats.segment_stats[0].min_doc_id, 0);
185 ASSERT_EQ(stats.segment_stats[0].max_doc_id, 899);
186 ASSERT_EQ(stats.segment_stats[0].min_primary_key, 0);
187 ASSERT_EQ(stats.segment_stats[0].max_primary_key, 899);
188 ASSERT_EQ(stats.segment_stats[0].min_lsn, 0);
189 ASSERT_EQ(stats.segment_stats[0].max_lsn, 899);
190 ASSERT_EQ(stats.segment_stats[0].index_file_count, 1);
191 ASSERT_GT(stats.segment_stats[0].index_file_size, 0);
192
193 ASSERT_EQ(stats.segment_stats[1].segment_id, 1);
194 ASSERT_EQ(stats.segment_stats[1].state, SegmentState::PERSIST);
195 ASSERT_EQ(stats.segment_stats[1].doc_count, 900);
196 ASSERT_EQ(stats.segment_stats[1].min_doc_id, 1899);
197 ASSERT_EQ(stats.segment_stats[1].max_doc_id, 2798);
198 ASSERT_EQ(stats.segment_stats[1].min_primary_key, 900);
199 ASSERT_EQ(stats.segment_stats[1].max_primary_key, 1799);
200 ASSERT_EQ(stats.segment_stats[1].min_lsn, 900);
201 ASSERT_EQ(stats.segment_stats[1].max_lsn, 1799);
202 ASSERT_EQ(stats.segment_stats[1].index_file_count, 1);
203 ASSERT_GT(stats.segment_stats[1].index_file_size, 0);
204
205 ASSERT_EQ(stats.segment_stats[2].segment_id, 2);
206 ASSERT_EQ(stats.segment_stats[2].state, SegmentState::WRITING);
207 ASSERT_EQ(stats.segment_stats[2].doc_count, 200);
208 ASSERT_EQ(stats.segment_stats[2].min_doc_id, 3798);
209 ASSERT_EQ(stats.segment_stats[2].max_doc_id, 3997);
210 ASSERT_EQ(stats.segment_stats[2].min_primary_key, 1800);
211 ASSERT_EQ(stats.segment_stats[2].max_primary_key, 1999);
212 ASSERT_EQ(stats.segment_stats[2].min_lsn, 1800);
213 ASSERT_EQ(stats.segment_stats[2].max_lsn, 1999);
214 ASSERT_EQ(stats.segment_stats[2].index_file_count, 2);
215 ASSERT_GT(stats.segment_stats[2].index_file_size, 0);
216
217 std::vector<SegmentPtr> segments;
218 ret = collection->get_segments(&segments);
219 ASSERT_EQ(ret, 0);
220 ASSERT_EQ(segments.size(), 3);
221
222 for (size_t i = 0; i < 2000; i++) {
223 std::vector<float> fvec(16U);
224 for (size_t j = 0; j < 16U; j++) {
225 fvec[j] = i * 1.0f;
226 }
227 std::string query((char *)fvec.data(), fvec.size() * sizeof(float));
228 QueryParams query_params;
229 query_params.topk = 10;
230 query_params.data_type = DataTypes::VECTOR_FP32;
231 query_params.dimension = 16;
232
233 QueryResultList all_result;
234 for (size_t j = 0; j < segments.size(); j++) {
235 QueryResultList result_list;
236 ret = segments[j]->knn_search("face", query, query_params, &result_list);
237 ASSERT_EQ(ret, 0);
238 all_result.insert(all_result.end(), result_list.begin(),
239 result_list.end());
240 }
241 std::sort(all_result.begin(), all_result.end());
242 ASSERT_EQ(all_result[0].primary_key, i);
243 ASSERT_EQ(all_result[0].score, 0.0f);
244 ASSERT_EQ(all_result[0].lsn, i);
245 }
246}
247
248TEST_F(CollectionTest, TestDeleteRecord) {
249 index::ThreadPool thread_pool(10, false);
250 CollectionPtr collection =
251 Collection::Create(schema_->name(), "./", schema_, 10, &thread_pool);
252 ASSERT_NE(collection, nullptr);
253
254 ReadOptions read_options;
255 read_options.use_mmap = true;
256 read_options.create_new = true;
257 int ret = collection->open(read_options);
258 ASSERT_EQ(ret, 0);
259
260 // insert 1000 records
261 for (size_t i = 0; i < 1000; i++) {
262 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
263 CollectionDataset::RowData *new_row = add_records->add_row_data();
264 new_row->primary_key = i;
265 new_row->operation_type = OperationTypes::INSERT;
266 new_row->lsn = i;
267 new_row->forward_data = "hello";
268
269 CollectionDataset::ColumnData new_column;
270 new_column.column_name = "face";
271 new_column.data_type = DataTypes::VECTOR_FP32;
272 new_column.dimension = 16;
273
274 std::vector<float> fvec(16U);
275 for (size_t j = 0; j < 16U; j++) {
276 fvec[j] = i * 1.0f;
277 }
278
279 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
280 new_column.data = vector;
281
282 new_row->column_datas.emplace_back(new_column);
283 ret = collection->write_records(*add_records);
284 ASSERT_EQ(ret, 0);
285 }
286
287 std::vector<SegmentPtr> segments;
288 ret = collection->get_segments(&segments);
289 ASSERT_EQ(ret, 0);
290 ASSERT_EQ(segments.size(), 1);
291
292 // search front 500 records
293 for (size_t i = 0; i < 500; i++) {
294 std::vector<float> fvec(16U);
295 for (size_t j = 0; j < 16U; j++) {
296 fvec[j] = i * 1.0f;
297 }
298 std::string query((char *)fvec.data(), fvec.size() * sizeof(float));
299 QueryParams query_params;
300 query_params.topk = 10;
301 query_params.data_type = DataTypes::VECTOR_FP32;
302 query_params.dimension = 16;
303
304 QueryResultList result_list;
305 ret = segments[0]->knn_search("face", query, query_params, &result_list);
306 ASSERT_EQ(ret, 0);
307
308 ASSERT_EQ(result_list[0].primary_key, i);
309 ASSERT_EQ(result_list[0].score, 0.0f);
310 ASSERT_EQ(result_list[0].lsn, i);
311 }
312
313 // delete front 500 records
314 for (size_t i = 0; i < 500; i++) {
315 CollectionDatasetPtr del_records = std::make_shared<CollectionDataset>(1);
316 CollectionDataset::RowData *new_row = del_records->add_row_data();
317 new_row->primary_key = i;
318 new_row->operation_type = OperationTypes::DELETE;
319
320 ret = collection->write_records(*del_records);
321 ASSERT_EQ(ret, 0);
322 }
323
324 // search front 500 records again
325 for (size_t i = 0; i < 500; i++) {
326 std::vector<float> fvec(16U);
327 for (size_t j = 0; j < 16U; j++) {
328 fvec[j] = i * 1.0f;
329 }
330 std::string query((char *)fvec.data(), fvec.size() * sizeof(float));
331 QueryParams query_params;
332 query_params.topk = 10;
333 query_params.data_type = DataTypes::VECTOR_FP32;
334 query_params.dimension = 16;
335
336 QueryResultList result_list;
337 ret = segments[0]->knn_search("face", query, query_params, &result_list);
338 ASSERT_EQ(ret, 0);
339
340 ASSERT_NE(result_list[0].primary_key, i);
341 ASSERT_NE(result_list[0].score, 0.0f);
342 ASSERT_NE(result_list[0].lsn, i);
343 }
344}
345
346
347TEST_F(CollectionTest, TestUpdateRecord) {
348 index::ThreadPool thread_pool(10, false);
349 CollectionPtr collection =
350 Collection::Create(schema_->name(), "./", schema_, 10, &thread_pool);
351 ASSERT_NE(collection, nullptr);
352
353 ReadOptions read_options;
354 read_options.use_mmap = true;
355 read_options.create_new = true;
356 int ret = collection->open(read_options);
357 ASSERT_EQ(ret, 0);
358
359 // insert 1000 records
360 for (size_t i = 0; i < 1000; i++) {
361 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
362 CollectionDataset::RowData *new_row = add_records->add_row_data();
363 new_row->primary_key = i;
364 new_row->operation_type = OperationTypes::INSERT;
365 new_row->lsn = i;
366 new_row->lsn_check = true;
367 new_row->forward_data = "hello";
368
369 CollectionDataset::ColumnData new_column;
370 new_column.column_name = "face";
371 new_column.data_type = DataTypes::VECTOR_FP32;
372 new_column.dimension = 16;
373
374 std::vector<float> fvec(16U);
375 for (size_t j = 0; j < 16U; j++) {
376 fvec[j] = i * 1.0f;
377 }
378
379 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
380 new_column.data = vector;
381
382 new_row->column_datas.emplace_back(new_column);
383 ret = collection->write_records(*add_records);
384 ASSERT_EQ(ret, 0);
385 }
386
387 uint64_t lsn;
388 std::string lsn_context;
389 ret = collection->get_latest_lsn(&lsn, &lsn_context);
390 ASSERT_EQ(ret, 0);
391 ASSERT_EQ(lsn, 999);
392
393 // update 1000 wrong records
394 for (size_t i = 0; i < 1000; i++) {
395 CollectionDatasetPtr update_records =
396 std::make_shared<CollectionDataset>(1);
397
398 CollectionDataset::RowData *new_row = update_records->add_row_data();
399 new_row->primary_key = i;
400 new_row->operation_type = OperationTypes::UPDATE;
401 new_row->lsn = i;
402 new_row->lsn_check = true;
403 new_row->forward_data = "hello_update";
404
405 CollectionDataset::ColumnData new_column;
406 new_column.column_name = "face";
407 new_column.data_type = DataTypes::VECTOR_FP32;
408 new_column.dimension = 16;
409
410 std::vector<float> fvec(16U);
411 for (size_t j = 0; j < 16U; j++) {
412 fvec[j] = i * 1.0f;
413 }
414
415 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
416 new_column.data = vector;
417
418 new_row->column_datas.emplace_back(new_column);
419 ret = collection->write_records(*update_records);
420 ASSERT_NE(ret, 0);
421 }
422
423 // update 1000 right records
424 for (size_t i = 0; i < 1000; i++) {
425 CollectionDatasetPtr update_records =
426 std::make_shared<CollectionDataset>(1);
427
428 CollectionDataset::RowData *new_row = update_records->add_row_data();
429 new_row->primary_key = i;
430 new_row->operation_type = OperationTypes::UPDATE;
431 new_row->lsn = i + 1;
432 new_row->lsn_check = true;
433 new_row->forward_data = "hello_update";
434
435 CollectionDataset::ColumnData new_column;
436 new_column.column_name = "face";
437 new_column.data_type = DataTypes::VECTOR_FP32;
438 new_column.dimension = 16;
439
440 std::vector<float> fvec(16U);
441 for (size_t j = 0; j < 16U; j++) {
442 fvec[j] = i * 1.0f;
443 }
444
445 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
446 new_column.data = vector;
447
448 new_row->column_datas.emplace_back(new_column);
449 ret = collection->write_records(*update_records);
450 ASSERT_EQ(ret, 0);
451 }
452
453 ret = collection->get_latest_lsn(&lsn, &lsn_context);
454 ASSERT_EQ(ret, 0);
455 ASSERT_EQ(lsn, 1000);
456
457 // search 1000 records
458 std::vector<SegmentPtr> segments;
459 ret = collection->get_segments(&segments);
460 ASSERT_EQ(ret, 0);
461 ASSERT_EQ(segments.size(), 1);
462
463 for (size_t i = 0; i < 1000; i++) {
464 std::vector<float> fvec(16U);
465 for (size_t j = 0; j < 16U; j++) {
466 fvec[j] = i * 1.0f;
467 }
468 std::string query((char *)fvec.data(), fvec.size() * sizeof(float));
469 QueryParams query_params;
470 query_params.topk = 10;
471 query_params.data_type = DataTypes::VECTOR_FP32;
472 query_params.dimension = 16;
473
474 QueryResultList result_list;
475 ret = segments[0]->knn_search("face", query, query_params, &result_list);
476 ASSERT_EQ(ret, 0);
477
478 ASSERT_EQ(result_list[0].primary_key, i);
479 ASSERT_EQ(result_list[0].score, 0.0f);
480 ASSERT_EQ(result_list[0].lsn, i + 1);
481 ASSERT_EQ(result_list[0].forward_data, "hello_update");
482 }
483}
484
485void do_insert_record(Collection *collection, size_t number) {
486 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
487 CollectionDataset::RowData *new_row = add_records->add_row_data();
488 new_row->primary_key = number;
489 new_row->operation_type = OperationTypes::INSERT;
490 new_row->lsn = number;
491 new_row->forward_data = "hello";
492
493 CollectionDataset::ColumnData new_column;
494 new_column.column_name = "face";
495 new_column.data_type = DataTypes::VECTOR_FP32;
496 new_column.dimension = 16;
497
498 std::vector<float> fvec(16U);
499 for (size_t j = 0; j < 16U; j++) {
500 fvec[j] = number * 1.0f;
501 }
502
503 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
504 new_column.data = vector;
505
506 new_row->column_datas.emplace_back(new_column);
507 int ret = collection->write_records(*add_records);
508 ASSERT_EQ(ret, 0);
509}
510
511void do_search_record(Collection *collection, size_t number,
512 bool expect_found) {
513 std::vector<SegmentPtr> segments;
514 int ret = collection->get_segments(&segments);
515 ASSERT_EQ(ret, 0);
516 ASSERT_EQ(segments.size(), 1);
517
518 std::vector<float> fvec(16U);
519 for (size_t j = 0; j < 16U; j++) {
520 fvec[j] = number * 1.0f;
521 }
522 std::string query((char *)fvec.data(), fvec.size() * sizeof(float));
523 QueryParams query_params;
524 query_params.topk = 10;
525 query_params.data_type = DataTypes::VECTOR_FP32;
526 query_params.dimension = 16;
527 query_params.query_id = number;
528
529 QueryResultList result_list;
530 ret = segments[0]->knn_search("face", query, query_params, &result_list);
531 ASSERT_EQ(ret, 0);
532
533 if (expect_found) {
534 ASSERT_EQ(result_list[0].primary_key, number);
535 ASSERT_EQ(result_list[0].score, 0.0f);
536 ASSERT_EQ(result_list[0].lsn, number);
537 ASSERT_EQ(result_list[0].forward_data, "hello");
538 } else {
539 ASSERT_NE(result_list[0].primary_key, number);
540 ASSERT_NE(result_list[0].score, 0.0f);
541 }
542}
543
544void do_delete_record(Collection *collection, size_t number) {
545 CollectionDatasetPtr delete_record = std::make_shared<CollectionDataset>(1);
546 CollectionDataset::RowData *new_row = delete_record->add_row_data();
547 new_row->primary_key = number;
548 new_row->operation_type = OperationTypes::DELETE;
549 new_row->lsn = number + 1;
550 int ret = collection->write_records(*delete_record);
551 ASSERT_EQ(ret, 0);
552}
553
554void do_update_record(Collection *collection, size_t number) {
555 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
556 CollectionDataset::RowData *new_row = add_records->add_row_data();
557 new_row->primary_key = number;
558 new_row->operation_type = OperationTypes::UPDATE;
559 new_row->lsn = number + 1;
560 new_row->forward_data = "hello_update";
561
562 CollectionDataset::ColumnData new_column;
563 new_column.column_name = "face";
564 new_column.data_type = DataTypes::VECTOR_FP32;
565 new_column.dimension = 16;
566
567 std::vector<float> fvec(16U);
568 for (size_t j = 0; j < 16U; j++) {
569 fvec[j] = number * 1.0f;
570 }
571
572 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
573 new_column.data = vector;
574
575 new_row->column_datas.emplace_back(new_column);
576 int ret = collection->write_records(*add_records);
577 ASSERT_EQ(ret, 0);
578}
579
580void do_hybrid_ops(Collection *collection, size_t number) {
581 do_insert_record(collection, number);
582 do_search_record(collection, number, true);
583
584 do_delete_record(collection, number);
585 do_search_record(collection, number, false);
586}
587
588TEST_F(CollectionTest, TestMultiThread) {
589 index::ThreadPool thread_pool(10, false);
590 CollectionPtr collection =
591 Collection::Create(schema_->name(), "./", schema_, 10, &thread_pool);
592 ASSERT_NE(collection, nullptr);
593
594 ReadOptions read_options;
595 read_options.use_mmap = true;
596 read_options.create_new = true;
597 int ret = collection->open(read_options);
598 ASSERT_EQ(ret, 0);
599
600 // test multithread insert records
601 auto group = thread_pool.make_group();
602 for (size_t i = 0; i < 1000; i++) {
603 group->submit(ailego::Closure::New(&do_insert_record, collection.get(), i));
604 }
605 group->wait_finish();
606
607 // test multithread search records
608 for (size_t i = 0; i < 1000; i++) {
609 group->submit(
610 ailego::Closure::New(&do_search_record, collection.get(), i, true));
611 }
612 group->wait_finish();
613
614 // test multithread delete records
615 for (size_t i = 0; i < 500; i++) {
616 group->submit(ailego::Closure::New(&do_delete_record, collection.get(), i));
617 }
618 group->wait_finish();
619
620 // test multithread update records
621 for (size_t i = 500; i < 1000; i++) {
622 group->submit(ailego::Closure::New(&do_update_record, collection.get(), i));
623 }
624 group->wait_finish();
625
626 // test multithread hybrid operations
627 for (size_t i = 1000; i < 2000; i++) {
628 group->submit(ailego::Closure::New(&do_hybrid_ops, collection.get(), i));
629 }
630 group->wait_finish();
631}
632
633TEST_F(CollectionTest, TestUpdateSchema) {
634 index::ThreadPool thread_pool(10, false);
635 CollectionPtr collection =
636 Collection::Create(schema_->name(), "./", schema_, 10, &thread_pool);
637 ASSERT_NE(collection, nullptr);
638
639 ReadOptions read_options;
640 read_options.use_mmap = true;
641 read_options.create_new = true;
642 int ret = collection->open(read_options);
643 ASSERT_EQ(ret, 0);
644
645 schema_->set_max_docs_per_segment(900);
646
647 // create 2 persist segment and 1 memory segment
648 for (size_t i = 0; i < 2000; i++) {
649 CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1);
650 CollectionDataset::RowData *new_row = add_records->add_row_data();
651 new_row->primary_key = i;
652 new_row->operation_type = OperationTypes::INSERT;
653 new_row->lsn = i;
654 new_row->forward_data = "hello";
655
656 CollectionDataset::ColumnData new_column;
657 new_column.column_name = "face";
658 new_column.data_type = DataTypes::VECTOR_FP32;
659 new_column.dimension = 16;
660
661 std::vector<float> fvec(16U);
662 for (size_t j = 0; j < 16U; j++) {
663 fvec[j] = i * 1.0f;
664 }
665 std::string vector((char *)fvec.data(), fvec.size() * sizeof(float));
666 new_column.data = vector;
667
668 new_row->column_datas.emplace_back(new_column);
669 ret = collection->write_records(*add_records);
670 ASSERT_EQ(ret, 0);
671 }
672 sleep(3);
673
674 auto new_schema = std::make_shared<meta::CollectionMeta>();
675 meta::ColumnMetaPtr column_meta = std::make_shared<meta::ColumnMeta>();
676 column_meta->set_name("face1");
677 column_meta->set_index_type(IndexTypes::PROXIMA_GRAPH_INDEX);
678 column_meta->set_data_type(DataTypes::VECTOR_FP32);
679 column_meta->set_dimension(16);
680 column_meta->mutable_parameters()->set("metric_type", "SquaredEuclidean");
681 new_schema->append(column_meta);
682 new_schema->set_name("teachers");
683 new_schema->set_revision(0);
684
685 ret = collection->update_schema(new_schema);
686 ASSERT_NE(ret, 0);
687
688 new_schema->set_revision(1);
689 ret = collection->update_schema(new_schema);
690 ASSERT_EQ(ret, 0);
691}
692