1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "index/column/forward_indexer.h"
18#include <memory>
19#include <unordered_map>
20#include <gtest/gtest.h>
21#include "index/file_helper.h"
22using namespace proxima::be;
23using namespace proxima::be::index;
24
25
26class ForwardIndexerTest : public testing::Test {
27 protected:
28 void SetUp() {
29 FileHelper::RemoveFile("./data.fwd.0");
30 }
31
32 void TearDown() {}
33};
34
35TEST_F(ForwardIndexerTest, TestGeneral) {
36 auto forward_indexer = ForwardIndexer::Create("test_collection", "./", 0);
37 ASSERT_NE(forward_indexer, nullptr);
38
39 ReadOptions read_options;
40 read_options.use_mmap = true;
41 read_options.create_new = true;
42
43 forward_indexer->set_start_doc_id(0U);
44 int ret = forward_indexer->open(read_options);
45 ASSERT_EQ(ret, 0);
46
47 // insert 1000 records
48 for (size_t i = 0; i < 1000; i++) {
49 ForwardData forward;
50 forward.header.primary_key = i;
51 forward.header.lsn = i;
52 forward.header.revision = i;
53 forward.data = "hello";
54
55 idx_t doc_id = 0U;
56 ret = forward_indexer->insert(forward, &doc_id);
57 ASSERT_EQ(ret, 0);
58 ASSERT_EQ(doc_id, i);
59 }
60
61 // seek 1000 records
62 for (size_t i = 0; i < 1000; i++) {
63 ForwardData forward;
64 ret = forward_indexer->seek(i, &forward);
65 ASSERT_EQ(ret, 0);
66 ASSERT_EQ(forward.header.primary_key, i);
67 ASSERT_EQ(forward.header.lsn, i);
68 ASSERT_EQ(forward.header.revision, i);
69 ASSERT_EQ(forward.data, "hello");
70 }
71
72 ret = forward_indexer->flush();
73 ASSERT_EQ(ret, 0);
74
75 ret = forward_indexer->close();
76 ASSERT_EQ(ret, 0);
77
78 // test reopen
79 read_options.create_new = false;
80 ret = forward_indexer->open(read_options);
81 ASSERT_EQ(ret, 0);
82
83 // test seek 1000 records again
84 for (size_t i = 0; i < 1000; i++) {
85 ForwardData forward;
86 ret = forward_indexer->seek(i, &forward);
87 ASSERT_EQ(ret, 0);
88 ASSERT_EQ(forward.header.primary_key, i);
89 ASSERT_EQ(forward.header.lsn, i);
90 ASSERT_EQ(forward.header.revision, i);
91 ASSERT_EQ(forward.data, "hello");
92 }
93
94 // Test remove
95 for (size_t i = 0; i < 1000; i++) {
96 ret = forward_indexer->remove(i);
97 ASSERT_EQ(ret, 0);
98
99 ForwardData forward;
100 ret = forward_indexer->seek(i, &forward);
101 ASSERT_NE(ret, 0);
102 }
103}
104
105std::mutex g_mutex;
106std::unordered_map<uint64_t, uint64_t> g_key_id_map;
107
108void do_insert_forward(ForwardIndexer *forward_indexer, size_t number) {
109 ForwardData forward;
110 forward.header.primary_key = number;
111 forward.header.lsn = number;
112 forward.header.revision = number;
113 forward.data = "hello";
114
115 idx_t doc_id = 0U;
116 int ret = forward_indexer->insert(forward, &doc_id);
117 ASSERT_EQ(ret, 0);
118
119 std::lock_guard<std::mutex> lock(g_mutex);
120 g_key_id_map[number] = doc_id;
121}
122
123void do_seek_forward(ForwardIndexer *forward_indexer, size_t number) {
124 std::lock_guard<std::mutex> lock(g_mutex);
125 ForwardData forward;
126 int ret = forward_indexer->seek(g_key_id_map[number], &forward);
127 ASSERT_EQ(ret, 0);
128 ASSERT_EQ(forward.header.primary_key, number);
129 ASSERT_EQ(forward.header.lsn, number);
130 ASSERT_EQ(forward.header.revision, number);
131 ASSERT_EQ(forward.data, "hello");
132}
133
134void do_hybrid_operations(ForwardIndexer *forward_indexer, size_t number) {
135 do_insert_forward(forward_indexer, number);
136 do_seek_forward(forward_indexer, number);
137}
138
139TEST_F(ForwardIndexerTest, TestMultiThread) {
140 auto forward_indexer = ForwardIndexer::Create("test_collection", "./", 0);
141 ASSERT_TRUE(forward_indexer != nullptr);
142
143 ReadOptions read_options;
144 read_options.use_mmap = true;
145 read_options.create_new = true;
146
147 forward_indexer->set_start_doc_id(0U);
148 int ret = forward_indexer->open(read_options);
149 ASSERT_EQ(ret, 0);
150
151 ailego::ThreadPool pool(3);
152 for (size_t i = 0; i < 2000; i++) {
153 pool.execute(&do_hybrid_operations, forward_indexer.get(), i);
154 }
155 pool.wait_finish();
156}
157