1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include "index/collection.h" |
18 | #include <gtest/gtest.h> |
19 | |
20 | using namespace proxima::be; |
21 | using namespace proxima::be::index; |
22 | |
23 | class CollectionTest : public testing::Test { |
24 | protected: |
25 | void SetUp() { |
26 | char cmd_buf[100]; |
27 | snprintf(cmd_buf, 100, "rm -rf ./teachers/" ); |
28 | system(cmd_buf); |
29 | |
30 | FillSchema(); |
31 | } |
32 | |
33 | void TearDown() {} |
34 | |
35 | void FillSchema() { |
36 | schema_ = std::make_shared<meta::CollectionMeta>(); |
37 | meta::ColumnMetaPtr column_meta = std::make_shared<meta::ColumnMeta>(); |
38 | column_meta->set_name("face" ); |
39 | column_meta->set_index_type(IndexTypes::PROXIMA_GRAPH_INDEX); |
40 | column_meta->set_data_type(DataTypes::VECTOR_FP32); |
41 | column_meta->set_dimension(16); |
42 | column_meta->mutable_parameters()->set("metric_type" , "SquaredEuclidean" ); |
43 | schema_->append(column_meta); |
44 | schema_->set_name("teachers" ); |
45 | schema_->set_revision(0); |
46 | // Set default value to 0 |
47 | schema_->set_max_docs_per_segment(0); |
48 | } |
49 | |
50 | protected: |
51 | meta::CollectionMetaPtr schema_{}; |
52 | }; |
53 | |
54 | TEST_F(CollectionTest, TestGeneral) { |
55 | index::ThreadPool thread_pool(10, false); |
56 | CollectionPtr collection = |
57 | Collection::Create(schema_->name(), "./" , schema_, 10, &thread_pool); |
58 | ASSERT_NE(collection, nullptr); |
59 | ReadOptions read_options; |
60 | read_options.use_mmap = true; |
61 | read_options.create_new = true; |
62 | int ret = collection->open(read_options); |
63 | ASSERT_EQ(ret, 0); |
64 | |
65 | for (size_t i = 0; i < 1000; i++) { |
66 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
67 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
68 | new_row->primary_key = i; |
69 | new_row->operation_type = OperationTypes::INSERT; |
70 | new_row->lsn = i; |
71 | new_row->forward_data = "hello" ; |
72 | |
73 | CollectionDataset::ColumnData new_column; |
74 | new_column.column_name = "face" ; |
75 | new_column.data_type = DataTypes::VECTOR_FP32; |
76 | new_column.dimension = 16; |
77 | |
78 | std::vector<float> fvec(16U); |
79 | for (size_t j = 0; j < 16U; j++) { |
80 | fvec[j] = i * 1.0f; |
81 | } |
82 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
83 | new_column.data = vector; |
84 | |
85 | new_row->column_datas.emplace_back(new_column); |
86 | ret = collection->write_records(*add_records); |
87 | ASSERT_EQ(ret, 0); |
88 | |
89 | uint64_t lsn; |
90 | std::string lsn_context; |
91 | ret = collection->get_latest_lsn(&lsn, &lsn_context); |
92 | ASSERT_EQ(ret, 0); |
93 | ASSERT_EQ(lsn, i); |
94 | } |
95 | |
96 | // test search query |
97 | std::vector<SegmentPtr> segments; |
98 | ret = collection->get_segments(&segments); |
99 | ASSERT_EQ(ret, 0); |
100 | ASSERT_EQ(segments.size(), 1); |
101 | |
102 | ASSERT_EQ(segments[0]->collection_name(), "teachers" ); |
103 | ASSERT_EQ(segments[0]->segment_id(), 0); |
104 | ASSERT_EQ(segments[0]->doc_count(), 1000); |
105 | |
106 | for (size_t i = 0; i < 1000; i++) { |
107 | std::vector<float> fvec(16U); |
108 | for (size_t j = 0; j < 16U; j++) { |
109 | fvec[j] = i * 1.0f; |
110 | } |
111 | std::string query((char *)fvec.data(), fvec.size() * sizeof(float)); |
112 | QueryParams query_params; |
113 | query_params.topk = 10; |
114 | query_params.data_type = DataTypes::VECTOR_FP32; |
115 | query_params.dimension = 16; |
116 | |
117 | QueryResultList result_list; |
118 | ret = segments[0]->knn_search("face" , query, query_params, &result_list); |
119 | ASSERT_EQ(ret, 0); |
120 | ASSERT_EQ(result_list.size(), 10); |
121 | ASSERT_EQ(result_list[0].primary_key, i); |
122 | ASSERT_EQ(result_list[0].score, 0.0f); |
123 | ASSERT_EQ(result_list[0].lsn, i); |
124 | } |
125 | } |
126 | |
127 | TEST_F(CollectionTest, TestDumpSegment) { |
128 | index::ThreadPool thread_pool(10, false); |
129 | CollectionPtr collection = |
130 | Collection::Create(schema_->name(), "./" , schema_, 10, &thread_pool); |
131 | ASSERT_NE(collection, nullptr); |
132 | |
133 | ReadOptions read_options; |
134 | read_options.use_mmap = true; |
135 | read_options.create_new = true; |
136 | int ret = collection->open(read_options); |
137 | ASSERT_EQ(ret, 0); |
138 | |
139 | // add segment dump document limit |
140 | schema_->set_max_docs_per_segment(900); |
141 | |
142 | for (size_t i = 0; i < 2000; i++) { |
143 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
144 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
145 | new_row->primary_key = i; |
146 | new_row->operation_type = OperationTypes::INSERT; |
147 | new_row->lsn = i; |
148 | new_row->forward_data = "hello" ; |
149 | |
150 | CollectionDataset::ColumnData new_column; |
151 | new_column.column_name = "face" ; |
152 | new_column.data_type = DataTypes::VECTOR_FP32; |
153 | new_column.dimension = 16; |
154 | |
155 | std::vector<float> fvec(16U); |
156 | for (size_t j = 0; j < 16U; j++) { |
157 | fvec[j] = i * 1.0f; |
158 | } |
159 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
160 | new_column.data = vector; |
161 | |
162 | new_row->column_datas.emplace_back(new_column); |
163 | ret = collection->write_records(*add_records); |
164 | ASSERT_EQ(ret, 0); |
165 | |
166 | if (i > 0 && i % 900 == 0) { |
167 | sleep(2); |
168 | } |
169 | } |
170 | |
171 | CollectionStats stats; |
172 | ret = collection->get_stats(&stats); |
173 | ASSERT_EQ(ret, 0); |
174 | ASSERT_EQ(stats.total_doc_count, 2000); |
175 | ASSERT_EQ(stats.delete_doc_count, 0); |
176 | ASSERT_EQ(stats.total_segment_count, 3); |
177 | ASSERT_EQ(stats.total_index_file_count, 8); |
178 | ASSERT_GT(stats.total_index_file_size, 2000U); |
179 | ASSERT_EQ(stats.segment_stats.size(), 3); |
180 | |
181 | ASSERT_EQ(stats.segment_stats[0].segment_id, 0); |
182 | ASSERT_EQ(stats.segment_stats[0].state, SegmentState::PERSIST); |
183 | ASSERT_EQ(stats.segment_stats[0].doc_count, 900); |
184 | ASSERT_EQ(stats.segment_stats[0].min_doc_id, 0); |
185 | ASSERT_EQ(stats.segment_stats[0].max_doc_id, 899); |
186 | ASSERT_EQ(stats.segment_stats[0].min_primary_key, 0); |
187 | ASSERT_EQ(stats.segment_stats[0].max_primary_key, 899); |
188 | ASSERT_EQ(stats.segment_stats[0].min_lsn, 0); |
189 | ASSERT_EQ(stats.segment_stats[0].max_lsn, 899); |
190 | ASSERT_EQ(stats.segment_stats[0].index_file_count, 1); |
191 | ASSERT_GT(stats.segment_stats[0].index_file_size, 0); |
192 | |
193 | ASSERT_EQ(stats.segment_stats[1].segment_id, 1); |
194 | ASSERT_EQ(stats.segment_stats[1].state, SegmentState::PERSIST); |
195 | ASSERT_EQ(stats.segment_stats[1].doc_count, 900); |
196 | ASSERT_EQ(stats.segment_stats[1].min_doc_id, 1899); |
197 | ASSERT_EQ(stats.segment_stats[1].max_doc_id, 2798); |
198 | ASSERT_EQ(stats.segment_stats[1].min_primary_key, 900); |
199 | ASSERT_EQ(stats.segment_stats[1].max_primary_key, 1799); |
200 | ASSERT_EQ(stats.segment_stats[1].min_lsn, 900); |
201 | ASSERT_EQ(stats.segment_stats[1].max_lsn, 1799); |
202 | ASSERT_EQ(stats.segment_stats[1].index_file_count, 1); |
203 | ASSERT_GT(stats.segment_stats[1].index_file_size, 0); |
204 | |
205 | ASSERT_EQ(stats.segment_stats[2].segment_id, 2); |
206 | ASSERT_EQ(stats.segment_stats[2].state, SegmentState::WRITING); |
207 | ASSERT_EQ(stats.segment_stats[2].doc_count, 200); |
208 | ASSERT_EQ(stats.segment_stats[2].min_doc_id, 3798); |
209 | ASSERT_EQ(stats.segment_stats[2].max_doc_id, 3997); |
210 | ASSERT_EQ(stats.segment_stats[2].min_primary_key, 1800); |
211 | ASSERT_EQ(stats.segment_stats[2].max_primary_key, 1999); |
212 | ASSERT_EQ(stats.segment_stats[2].min_lsn, 1800); |
213 | ASSERT_EQ(stats.segment_stats[2].max_lsn, 1999); |
214 | ASSERT_EQ(stats.segment_stats[2].index_file_count, 2); |
215 | ASSERT_GT(stats.segment_stats[2].index_file_size, 0); |
216 | |
217 | std::vector<SegmentPtr> segments; |
218 | ret = collection->get_segments(&segments); |
219 | ASSERT_EQ(ret, 0); |
220 | ASSERT_EQ(segments.size(), 3); |
221 | |
222 | for (size_t i = 0; i < 2000; i++) { |
223 | std::vector<float> fvec(16U); |
224 | for (size_t j = 0; j < 16U; j++) { |
225 | fvec[j] = i * 1.0f; |
226 | } |
227 | std::string query((char *)fvec.data(), fvec.size() * sizeof(float)); |
228 | QueryParams query_params; |
229 | query_params.topk = 10; |
230 | query_params.data_type = DataTypes::VECTOR_FP32; |
231 | query_params.dimension = 16; |
232 | |
233 | QueryResultList all_result; |
234 | for (size_t j = 0; j < segments.size(); j++) { |
235 | QueryResultList result_list; |
236 | ret = segments[j]->knn_search("face" , query, query_params, &result_list); |
237 | ASSERT_EQ(ret, 0); |
238 | all_result.insert(all_result.end(), result_list.begin(), |
239 | result_list.end()); |
240 | } |
241 | std::sort(all_result.begin(), all_result.end()); |
242 | ASSERT_EQ(all_result[0].primary_key, i); |
243 | ASSERT_EQ(all_result[0].score, 0.0f); |
244 | ASSERT_EQ(all_result[0].lsn, i); |
245 | } |
246 | } |
247 | |
248 | TEST_F(CollectionTest, TestDeleteRecord) { |
249 | index::ThreadPool thread_pool(10, false); |
250 | CollectionPtr collection = |
251 | Collection::Create(schema_->name(), "./" , schema_, 10, &thread_pool); |
252 | ASSERT_NE(collection, nullptr); |
253 | |
254 | ReadOptions read_options; |
255 | read_options.use_mmap = true; |
256 | read_options.create_new = true; |
257 | int ret = collection->open(read_options); |
258 | ASSERT_EQ(ret, 0); |
259 | |
260 | // insert 1000 records |
261 | for (size_t i = 0; i < 1000; i++) { |
262 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
263 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
264 | new_row->primary_key = i; |
265 | new_row->operation_type = OperationTypes::INSERT; |
266 | new_row->lsn = i; |
267 | new_row->forward_data = "hello" ; |
268 | |
269 | CollectionDataset::ColumnData new_column; |
270 | new_column.column_name = "face" ; |
271 | new_column.data_type = DataTypes::VECTOR_FP32; |
272 | new_column.dimension = 16; |
273 | |
274 | std::vector<float> fvec(16U); |
275 | for (size_t j = 0; j < 16U; j++) { |
276 | fvec[j] = i * 1.0f; |
277 | } |
278 | |
279 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
280 | new_column.data = vector; |
281 | |
282 | new_row->column_datas.emplace_back(new_column); |
283 | ret = collection->write_records(*add_records); |
284 | ASSERT_EQ(ret, 0); |
285 | } |
286 | |
287 | std::vector<SegmentPtr> segments; |
288 | ret = collection->get_segments(&segments); |
289 | ASSERT_EQ(ret, 0); |
290 | ASSERT_EQ(segments.size(), 1); |
291 | |
292 | // search front 500 records |
293 | for (size_t i = 0; i < 500; i++) { |
294 | std::vector<float> fvec(16U); |
295 | for (size_t j = 0; j < 16U; j++) { |
296 | fvec[j] = i * 1.0f; |
297 | } |
298 | std::string query((char *)fvec.data(), fvec.size() * sizeof(float)); |
299 | QueryParams query_params; |
300 | query_params.topk = 10; |
301 | query_params.data_type = DataTypes::VECTOR_FP32; |
302 | query_params.dimension = 16; |
303 | |
304 | QueryResultList result_list; |
305 | ret = segments[0]->knn_search("face" , query, query_params, &result_list); |
306 | ASSERT_EQ(ret, 0); |
307 | |
308 | ASSERT_EQ(result_list[0].primary_key, i); |
309 | ASSERT_EQ(result_list[0].score, 0.0f); |
310 | ASSERT_EQ(result_list[0].lsn, i); |
311 | } |
312 | |
313 | // delete front 500 records |
314 | for (size_t i = 0; i < 500; i++) { |
315 | CollectionDatasetPtr del_records = std::make_shared<CollectionDataset>(1); |
316 | CollectionDataset::RowData *new_row = del_records->add_row_data(); |
317 | new_row->primary_key = i; |
318 | new_row->operation_type = OperationTypes::DELETE; |
319 | |
320 | ret = collection->write_records(*del_records); |
321 | ASSERT_EQ(ret, 0); |
322 | } |
323 | |
324 | // search front 500 records again |
325 | for (size_t i = 0; i < 500; i++) { |
326 | std::vector<float> fvec(16U); |
327 | for (size_t j = 0; j < 16U; j++) { |
328 | fvec[j] = i * 1.0f; |
329 | } |
330 | std::string query((char *)fvec.data(), fvec.size() * sizeof(float)); |
331 | QueryParams query_params; |
332 | query_params.topk = 10; |
333 | query_params.data_type = DataTypes::VECTOR_FP32; |
334 | query_params.dimension = 16; |
335 | |
336 | QueryResultList result_list; |
337 | ret = segments[0]->knn_search("face" , query, query_params, &result_list); |
338 | ASSERT_EQ(ret, 0); |
339 | |
340 | ASSERT_NE(result_list[0].primary_key, i); |
341 | ASSERT_NE(result_list[0].score, 0.0f); |
342 | ASSERT_NE(result_list[0].lsn, i); |
343 | } |
344 | } |
345 | |
346 | |
347 | TEST_F(CollectionTest, TestUpdateRecord) { |
348 | index::ThreadPool thread_pool(10, false); |
349 | CollectionPtr collection = |
350 | Collection::Create(schema_->name(), "./" , schema_, 10, &thread_pool); |
351 | ASSERT_NE(collection, nullptr); |
352 | |
353 | ReadOptions read_options; |
354 | read_options.use_mmap = true; |
355 | read_options.create_new = true; |
356 | int ret = collection->open(read_options); |
357 | ASSERT_EQ(ret, 0); |
358 | |
359 | // insert 1000 records |
360 | for (size_t i = 0; i < 1000; i++) { |
361 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
362 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
363 | new_row->primary_key = i; |
364 | new_row->operation_type = OperationTypes::INSERT; |
365 | new_row->lsn = i; |
366 | new_row->lsn_check = true; |
367 | new_row->forward_data = "hello" ; |
368 | |
369 | CollectionDataset::ColumnData new_column; |
370 | new_column.column_name = "face" ; |
371 | new_column.data_type = DataTypes::VECTOR_FP32; |
372 | new_column.dimension = 16; |
373 | |
374 | std::vector<float> fvec(16U); |
375 | for (size_t j = 0; j < 16U; j++) { |
376 | fvec[j] = i * 1.0f; |
377 | } |
378 | |
379 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
380 | new_column.data = vector; |
381 | |
382 | new_row->column_datas.emplace_back(new_column); |
383 | ret = collection->write_records(*add_records); |
384 | ASSERT_EQ(ret, 0); |
385 | } |
386 | |
387 | uint64_t lsn; |
388 | std::string lsn_context; |
389 | ret = collection->get_latest_lsn(&lsn, &lsn_context); |
390 | ASSERT_EQ(ret, 0); |
391 | ASSERT_EQ(lsn, 999); |
392 | |
393 | // update 1000 wrong records |
394 | for (size_t i = 0; i < 1000; i++) { |
395 | CollectionDatasetPtr update_records = |
396 | std::make_shared<CollectionDataset>(1); |
397 | |
398 | CollectionDataset::RowData *new_row = update_records->add_row_data(); |
399 | new_row->primary_key = i; |
400 | new_row->operation_type = OperationTypes::UPDATE; |
401 | new_row->lsn = i; |
402 | new_row->lsn_check = true; |
403 | new_row->forward_data = "hello_update" ; |
404 | |
405 | CollectionDataset::ColumnData new_column; |
406 | new_column.column_name = "face" ; |
407 | new_column.data_type = DataTypes::VECTOR_FP32; |
408 | new_column.dimension = 16; |
409 | |
410 | std::vector<float> fvec(16U); |
411 | for (size_t j = 0; j < 16U; j++) { |
412 | fvec[j] = i * 1.0f; |
413 | } |
414 | |
415 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
416 | new_column.data = vector; |
417 | |
418 | new_row->column_datas.emplace_back(new_column); |
419 | ret = collection->write_records(*update_records); |
420 | ASSERT_NE(ret, 0); |
421 | } |
422 | |
423 | // update 1000 right records |
424 | for (size_t i = 0; i < 1000; i++) { |
425 | CollectionDatasetPtr update_records = |
426 | std::make_shared<CollectionDataset>(1); |
427 | |
428 | CollectionDataset::RowData *new_row = update_records->add_row_data(); |
429 | new_row->primary_key = i; |
430 | new_row->operation_type = OperationTypes::UPDATE; |
431 | new_row->lsn = i + 1; |
432 | new_row->lsn_check = true; |
433 | new_row->forward_data = "hello_update" ; |
434 | |
435 | CollectionDataset::ColumnData new_column; |
436 | new_column.column_name = "face" ; |
437 | new_column.data_type = DataTypes::VECTOR_FP32; |
438 | new_column.dimension = 16; |
439 | |
440 | std::vector<float> fvec(16U); |
441 | for (size_t j = 0; j < 16U; j++) { |
442 | fvec[j] = i * 1.0f; |
443 | } |
444 | |
445 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
446 | new_column.data = vector; |
447 | |
448 | new_row->column_datas.emplace_back(new_column); |
449 | ret = collection->write_records(*update_records); |
450 | ASSERT_EQ(ret, 0); |
451 | } |
452 | |
453 | ret = collection->get_latest_lsn(&lsn, &lsn_context); |
454 | ASSERT_EQ(ret, 0); |
455 | ASSERT_EQ(lsn, 1000); |
456 | |
457 | // search 1000 records |
458 | std::vector<SegmentPtr> segments; |
459 | ret = collection->get_segments(&segments); |
460 | ASSERT_EQ(ret, 0); |
461 | ASSERT_EQ(segments.size(), 1); |
462 | |
463 | for (size_t i = 0; i < 1000; i++) { |
464 | std::vector<float> fvec(16U); |
465 | for (size_t j = 0; j < 16U; j++) { |
466 | fvec[j] = i * 1.0f; |
467 | } |
468 | std::string query((char *)fvec.data(), fvec.size() * sizeof(float)); |
469 | QueryParams query_params; |
470 | query_params.topk = 10; |
471 | query_params.data_type = DataTypes::VECTOR_FP32; |
472 | query_params.dimension = 16; |
473 | |
474 | QueryResultList result_list; |
475 | ret = segments[0]->knn_search("face" , query, query_params, &result_list); |
476 | ASSERT_EQ(ret, 0); |
477 | |
478 | ASSERT_EQ(result_list[0].primary_key, i); |
479 | ASSERT_EQ(result_list[0].score, 0.0f); |
480 | ASSERT_EQ(result_list[0].lsn, i + 1); |
481 | ASSERT_EQ(result_list[0].forward_data, "hello_update" ); |
482 | } |
483 | } |
484 | |
485 | void do_insert_record(Collection *collection, size_t number) { |
486 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
487 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
488 | new_row->primary_key = number; |
489 | new_row->operation_type = OperationTypes::INSERT; |
490 | new_row->lsn = number; |
491 | new_row->forward_data = "hello" ; |
492 | |
493 | CollectionDataset::ColumnData new_column; |
494 | new_column.column_name = "face" ; |
495 | new_column.data_type = DataTypes::VECTOR_FP32; |
496 | new_column.dimension = 16; |
497 | |
498 | std::vector<float> fvec(16U); |
499 | for (size_t j = 0; j < 16U; j++) { |
500 | fvec[j] = number * 1.0f; |
501 | } |
502 | |
503 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
504 | new_column.data = vector; |
505 | |
506 | new_row->column_datas.emplace_back(new_column); |
507 | int ret = collection->write_records(*add_records); |
508 | ASSERT_EQ(ret, 0); |
509 | } |
510 | |
511 | void do_search_record(Collection *collection, size_t number, |
512 | bool expect_found) { |
513 | std::vector<SegmentPtr> segments; |
514 | int ret = collection->get_segments(&segments); |
515 | ASSERT_EQ(ret, 0); |
516 | ASSERT_EQ(segments.size(), 1); |
517 | |
518 | std::vector<float> fvec(16U); |
519 | for (size_t j = 0; j < 16U; j++) { |
520 | fvec[j] = number * 1.0f; |
521 | } |
522 | std::string query((char *)fvec.data(), fvec.size() * sizeof(float)); |
523 | QueryParams query_params; |
524 | query_params.topk = 10; |
525 | query_params.data_type = DataTypes::VECTOR_FP32; |
526 | query_params.dimension = 16; |
527 | query_params.query_id = number; |
528 | |
529 | QueryResultList result_list; |
530 | ret = segments[0]->knn_search("face" , query, query_params, &result_list); |
531 | ASSERT_EQ(ret, 0); |
532 | |
533 | if (expect_found) { |
534 | ASSERT_EQ(result_list[0].primary_key, number); |
535 | ASSERT_EQ(result_list[0].score, 0.0f); |
536 | ASSERT_EQ(result_list[0].lsn, number); |
537 | ASSERT_EQ(result_list[0].forward_data, "hello" ); |
538 | } else { |
539 | ASSERT_NE(result_list[0].primary_key, number); |
540 | ASSERT_NE(result_list[0].score, 0.0f); |
541 | } |
542 | } |
543 | |
544 | void do_delete_record(Collection *collection, size_t number) { |
545 | CollectionDatasetPtr delete_record = std::make_shared<CollectionDataset>(1); |
546 | CollectionDataset::RowData *new_row = delete_record->add_row_data(); |
547 | new_row->primary_key = number; |
548 | new_row->operation_type = OperationTypes::DELETE; |
549 | new_row->lsn = number + 1; |
550 | int ret = collection->write_records(*delete_record); |
551 | ASSERT_EQ(ret, 0); |
552 | } |
553 | |
554 | void do_update_record(Collection *collection, size_t number) { |
555 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
556 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
557 | new_row->primary_key = number; |
558 | new_row->operation_type = OperationTypes::UPDATE; |
559 | new_row->lsn = number + 1; |
560 | new_row->forward_data = "hello_update" ; |
561 | |
562 | CollectionDataset::ColumnData new_column; |
563 | new_column.column_name = "face" ; |
564 | new_column.data_type = DataTypes::VECTOR_FP32; |
565 | new_column.dimension = 16; |
566 | |
567 | std::vector<float> fvec(16U); |
568 | for (size_t j = 0; j < 16U; j++) { |
569 | fvec[j] = number * 1.0f; |
570 | } |
571 | |
572 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
573 | new_column.data = vector; |
574 | |
575 | new_row->column_datas.emplace_back(new_column); |
576 | int ret = collection->write_records(*add_records); |
577 | ASSERT_EQ(ret, 0); |
578 | } |
579 | |
580 | void do_hybrid_ops(Collection *collection, size_t number) { |
581 | do_insert_record(collection, number); |
582 | do_search_record(collection, number, true); |
583 | |
584 | do_delete_record(collection, number); |
585 | do_search_record(collection, number, false); |
586 | } |
587 | |
588 | TEST_F(CollectionTest, TestMultiThread) { |
589 | index::ThreadPool thread_pool(10, false); |
590 | CollectionPtr collection = |
591 | Collection::Create(schema_->name(), "./" , schema_, 10, &thread_pool); |
592 | ASSERT_NE(collection, nullptr); |
593 | |
594 | ReadOptions read_options; |
595 | read_options.use_mmap = true; |
596 | read_options.create_new = true; |
597 | int ret = collection->open(read_options); |
598 | ASSERT_EQ(ret, 0); |
599 | |
600 | // test multithread insert records |
601 | auto group = thread_pool.make_group(); |
602 | for (size_t i = 0; i < 1000; i++) { |
603 | group->submit(ailego::Closure::New(&do_insert_record, collection.get(), i)); |
604 | } |
605 | group->wait_finish(); |
606 | |
607 | // test multithread search records |
608 | for (size_t i = 0; i < 1000; i++) { |
609 | group->submit( |
610 | ailego::Closure::New(&do_search_record, collection.get(), i, true)); |
611 | } |
612 | group->wait_finish(); |
613 | |
614 | // test multithread delete records |
615 | for (size_t i = 0; i < 500; i++) { |
616 | group->submit(ailego::Closure::New(&do_delete_record, collection.get(), i)); |
617 | } |
618 | group->wait_finish(); |
619 | |
620 | // test multithread update records |
621 | for (size_t i = 500; i < 1000; i++) { |
622 | group->submit(ailego::Closure::New(&do_update_record, collection.get(), i)); |
623 | } |
624 | group->wait_finish(); |
625 | |
626 | // test multithread hybrid operations |
627 | for (size_t i = 1000; i < 2000; i++) { |
628 | group->submit(ailego::Closure::New(&do_hybrid_ops, collection.get(), i)); |
629 | } |
630 | group->wait_finish(); |
631 | } |
632 | |
633 | TEST_F(CollectionTest, TestUpdateSchema) { |
634 | index::ThreadPool thread_pool(10, false); |
635 | CollectionPtr collection = |
636 | Collection::Create(schema_->name(), "./" , schema_, 10, &thread_pool); |
637 | ASSERT_NE(collection, nullptr); |
638 | |
639 | ReadOptions read_options; |
640 | read_options.use_mmap = true; |
641 | read_options.create_new = true; |
642 | int ret = collection->open(read_options); |
643 | ASSERT_EQ(ret, 0); |
644 | |
645 | schema_->set_max_docs_per_segment(900); |
646 | |
647 | // create 2 persist segment and 1 memory segment |
648 | for (size_t i = 0; i < 2000; i++) { |
649 | CollectionDatasetPtr add_records = std::make_shared<CollectionDataset>(1); |
650 | CollectionDataset::RowData *new_row = add_records->add_row_data(); |
651 | new_row->primary_key = i; |
652 | new_row->operation_type = OperationTypes::INSERT; |
653 | new_row->lsn = i; |
654 | new_row->forward_data = "hello" ; |
655 | |
656 | CollectionDataset::ColumnData new_column; |
657 | new_column.column_name = "face" ; |
658 | new_column.data_type = DataTypes::VECTOR_FP32; |
659 | new_column.dimension = 16; |
660 | |
661 | std::vector<float> fvec(16U); |
662 | for (size_t j = 0; j < 16U; j++) { |
663 | fvec[j] = i * 1.0f; |
664 | } |
665 | std::string vector((char *)fvec.data(), fvec.size() * sizeof(float)); |
666 | new_column.data = vector; |
667 | |
668 | new_row->column_datas.emplace_back(new_column); |
669 | ret = collection->write_records(*add_records); |
670 | ASSERT_EQ(ret, 0); |
671 | } |
672 | sleep(3); |
673 | |
674 | auto new_schema = std::make_shared<meta::CollectionMeta>(); |
675 | meta::ColumnMetaPtr column_meta = std::make_shared<meta::ColumnMeta>(); |
676 | column_meta->set_name("face1" ); |
677 | column_meta->set_index_type(IndexTypes::PROXIMA_GRAPH_INDEX); |
678 | column_meta->set_data_type(DataTypes::VECTOR_FP32); |
679 | column_meta->set_dimension(16); |
680 | column_meta->mutable_parameters()->set("metric_type" , "SquaredEuclidean" ); |
681 | new_schema->append(column_meta); |
682 | new_schema->set_name("teachers" ); |
683 | new_schema->set_revision(0); |
684 | |
685 | ret = collection->update_schema(new_schema); |
686 | ASSERT_NE(ret, 0); |
687 | |
688 | new_schema->set_revision(1); |
689 | ret = collection->update_schema(new_schema); |
690 | ASSERT_EQ(ret, 0); |
691 | } |
692 | |