1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Writing requests will insert into memory segment firstly. |
19 | * Then it will dump to persist segment when it's full |
20 | */ |
21 | |
22 | #pragma once |
23 | |
24 | #include <ailego/parallel/lock.h> |
25 | #include "common/auto_counter.h" |
26 | #include "meta/meta.h" |
27 | #include "segment.h" |
28 | #include "../collection_dataset.h" |
29 | #include "../column/column_indexer.h" |
30 | #include "../column/forward_indexer.h" |
31 | #include "../concurrent_hash_map.h" |
32 | #include "../delete_store.h" |
33 | #include "../id_map.h" |
34 | |
35 | namespace proxima { |
36 | namespace be { |
37 | namespace index { |
38 | |
39 | class MemorySegment; |
40 | using MemorySegmentPtr = std::shared_ptr<MemorySegment>; |
41 | |
42 | /* |
43 | * A MemorySegment represents block of index data in memory, with |
44 | * streaming insert and search ability at the same time. You can |
45 | * set a docs limit to it, and it will dump to persit storage when |
46 | * full. |
47 | */ |
48 | class MemorySegment : public Segment { |
49 | public: |
50 | PROXIMA_DISALLOW_COPY_AND_ASSIGN(MemorySegment); |
51 | |
52 | //! Constructor |
53 | MemorySegment(const std::string &coll_name, const std::string &coll_path, |
54 | const SegmentMeta &seg_meta, |
55 | const meta::CollectionMeta *schema_ptr, |
56 | const DeleteStore *delete_store_ptr, const IDMap *id_map_ptr, |
57 | uint32_t concurrency_val) |
58 | : schema_(schema_ptr), |
59 | delete_store_(delete_store_ptr), |
60 | id_map_(id_map_ptr), |
61 | concurrency_(concurrency_val) { |
62 | this->set_collection_name(coll_name); |
63 | this->set_collection_path(coll_path); |
64 | this->set_segment_meta(seg_meta); |
65 | } |
66 | |
67 | |
68 | //! Destructor |
69 | ~MemorySegment() override; |
70 | |
71 | //! Create an instance and return shared ptr |
72 | static MemorySegmentPtr Create(const std::string &collection_name, |
73 | const std::string &collection_path, |
74 | const SegmentMeta &segment_meta, |
75 | const meta::CollectionMeta *schema, |
76 | const DeleteStore *delete_store, |
77 | const IDMap *id_map, uint32_t concurrency); |
78 | |
79 | //! Create and open an instance |
80 | static int CreateAndOpen(const std::string &collection_name, |
81 | const std::string &collection_path, |
82 | const SegmentMeta &segment_meta, |
83 | const meta::CollectionMeta *schema, |
84 | const DeleteStore *delete_store, const IDMap *id_map, |
85 | uint32_t concurrency, |
86 | const ReadOptions &read_options, |
87 | MemorySegmentPtr *memory_segment); |
88 | |
89 | public: |
90 | //! Open and initialize memory segment |
91 | int open(const ReadOptions &read_options); |
92 | |
93 | //! Close and cleanup memory segment |
94 | int close(); |
95 | |
96 | //! Flush memory to persist storage |
97 | int flush(); |
98 | |
99 | //! Dump to another index type to persist storage |
100 | int dump(); |
101 | |
102 | //! Close and remove internal files |
103 | int close_and_remove_files(); |
104 | |
105 | public: |
106 | //! Insert a record & alloc a doc_id |
107 | int insert(const Record &record, idx_t *doc_id); |
108 | |
109 | //! Remove a record |
110 | int remove(idx_t doc_id); |
111 | |
112 | //! Optimize memory usage |
113 | int optimize(ThreadPoolPtr pool); |
114 | |
115 | #if 0 |
116 | //! Update a record |
117 | int update(idx_t doc_id, const Record &record); |
118 | #endif |
119 | |
120 | //! Knn similar search |
121 | int knn_search(const std::string &column_name, const std::string &query, |
122 | const QueryParams &query_params, |
123 | QueryResultList *results) override; |
124 | |
125 | //! Knn similar search with batch mode |
126 | int knn_search(const std::string &column_name, const std::string &query, |
127 | const QueryParams &query_params, uint32_t batch_count, |
128 | std::vector<QueryResultList> *results) override; |
129 | |
130 | //! Just search forward by doc primary key |
131 | int kv_search(uint64_t primary_key, QueryResult *result) override; |
132 | |
133 | public: |
134 | //! Remove a column |
135 | int remove_column(const std::string &column_name) override; |
136 | |
137 | //! Add a column |
138 | int add_column(const meta::ColumnMetaPtr &column_meta) override; |
139 | |
140 | public: |
141 | //! Set segment state |
142 | void update_state(SegmentState new_state) { |
143 | segment_meta_.state = new_state; |
144 | } |
145 | |
146 | //! Return forward count |
147 | size_t doc_count() const override { |
148 | return segment_meta_.doc_count; |
149 | } |
150 | |
151 | public: |
152 | //! Get forward reader |
153 | ForwardReaderPtr get_forward_reader() const override { |
154 | return forward_indexer_; |
155 | } |
156 | |
157 | //! Get column reader |
158 | ColumnReaderPtr get_column_reader( |
159 | const std::string &column_name) const override { |
160 | if (!column_indexers_.has(column_name)) { |
161 | return ColumnReaderPtr(); |
162 | } |
163 | return column_indexers_.get(column_name); |
164 | } |
165 | |
166 | private: |
167 | int open_forward_indexer(const ReadOptions &read_options); |
168 | |
169 | int open_column_indexers(const ReadOptions &read_options); |
170 | |
171 | int dump_forward_indexer(const IndexDumperPtr &dumper); |
172 | |
173 | int dump_column_indexers(const IndexDumperPtr &dumper); |
174 | |
175 | void update_stats(const Record &record, idx_t doc_id); |
176 | |
177 | size_t get_index_file_count(); |
178 | |
179 | size_t get_index_file_size(); |
180 | |
181 | private: |
182 | static constexpr uint32_t MAX_WAIT_RETRY_COUNT = 60U; |
183 | |
184 | private: |
185 | const meta::CollectionMeta *schema_{nullptr}; |
186 | const DeleteStore *delete_store_{nullptr}; |
187 | const IDMap *id_map_{nullptr}; |
188 | uint32_t concurrency_{0U}; |
189 | |
190 | ForwardIndexerPtr forward_indexer_{}; |
191 | ConcurrentHashMap<std::string, ColumnIndexerPtr> column_indexers_{}; |
192 | |
193 | std::mutex mutex_{}; |
194 | std::atomic<uint64_t> active_insert_count_{0U}; |
195 | std::atomic<uint64_t> active_search_count_{0U}; |
196 | bool opened_{false}; |
197 | }; |
198 | |
199 | |
200 | } // end namespace index |
201 | } // namespace be |
202 | } // end namespace proxima |
203 | |