1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Haichao.chc |
17 | * \date Oct 2020 |
18 | * \brief Implementation of persist segment |
19 | */ |
20 | |
21 | #include "persist_segment.h" |
22 | #include <ailego/utility/time_helper.h> |
23 | #include "common/auto_counter.h" |
24 | #include "common/error_code.h" |
25 | #include "../file_helper.h" |
26 | |
27 | namespace proxima { |
28 | namespace be { |
29 | namespace index { |
30 | |
31 | PersistSegmentPtr PersistSegment::Create(const std::string &collection_name, |
32 | const std::string &collection_path, |
33 | const SegmentMeta &segment_meta, |
34 | const meta::CollectionMeta *schema, |
35 | const DeleteStore *delete_store, |
36 | const IDMap *id_map, |
37 | uint32_t concurrency) { |
38 | return std::make_shared<PersistSegment>(collection_name, collection_path, |
39 | segment_meta, schema, delete_store, |
40 | id_map, concurrency); |
41 | } |
42 | |
43 | int PersistSegment::CreateAndLoad( |
44 | const std::string &collection_name, const std::string &collection_path, |
45 | const SegmentMeta &segment_meta, const meta::CollectionMeta *schema, |
46 | const DeleteStore *delete_store, const IDMap *id_map, uint32_t concurrency, |
47 | const ReadOptions &read_options, PersistSegmentPtr *persist_segment) { |
48 | (*persist_segment) = Create(collection_name, collection_path, segment_meta, |
49 | schema, delete_store, id_map, concurrency); |
50 | |
51 | return (*persist_segment)->load(read_options); |
52 | } |
53 | |
54 | PersistSegment::~PersistSegment() { |
55 | if (loaded_) { |
56 | unload(); |
57 | } |
58 | } |
59 | |
60 | int PersistSegment::load(const ReadOptions &read_options) { |
61 | CHECK_STATUS(loaded_, false); |
62 | |
63 | // Load forward searcher |
64 | int ret = load_forward_reader(read_options); |
65 | CHECK_RETURN_WITH_SLOG(ret, 0, "Load forward searcher failed." ); |
66 | |
67 | // Load column searchers |
68 | ret = load_column_readers(read_options); |
69 | CHECK_RETURN_WITH_SLOG(ret, 0, "Load column searchers failed." ); |
70 | |
71 | SLOG_DEBUG("Load persist segment success." ); |
72 | loaded_ = true; |
73 | |
74 | return 0; |
75 | } |
76 | |
77 | int PersistSegment::unload() { |
78 | CHECK_STATUS(loaded_, true); |
79 | |
80 | // try to ensure active search requests finished |
81 | uint32_t retry = 0; |
82 | while (retry < MAX_WAIT_RETRY_COUNT && active_search_count_ > 0) { |
83 | LOG_INFO( |
84 | "Try to wait active request finished. active_search_count[%zu] " |
85 | "retry[%d]" , |
86 | (size_t)active_search_count_.load(), retry); |
87 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
88 | retry++; |
89 | } |
90 | |
91 | forward_reader_->close(); |
92 | for (auto &it : column_readers_) { |
93 | // Check if column searcher is empty searcher |
94 | // because sometimes will add empty column in |
95 | // persist segment. |
96 | if (it.second != nullptr) { |
97 | it.second->close(); |
98 | } |
99 | } |
100 | column_readers_.clear(); |
101 | |
102 | loaded_ = false; |
103 | SLOG_DEBUG("Unloaded persist segment." ); |
104 | |
105 | return 0; |
106 | } |
107 | |
108 | int PersistSegment::knn_search(const std::string &column_name, |
109 | const std::string &query, |
110 | const QueryParams &query_params, |
111 | QueryResultList *results) { |
112 | CHECK_STATUS(loaded_, true); |
113 | std::vector<QueryResultList> batch_results; |
114 | int ret = |
115 | this->knn_search(column_name, query, query_params, 1, &batch_results); |
116 | CHECK_RETURN(ret, 0); |
117 | |
118 | (*results) = batch_results[0]; |
119 | return 0; |
120 | } |
121 | |
122 | int PersistSegment::knn_search(const std::string &column_name, |
123 | const std::string &query, |
124 | const QueryParams &query_params, |
125 | uint32_t batch_count, |
126 | std::vector<QueryResultList> *batch_results) { |
127 | CHECK_STATUS(loaded_, true); |
128 | |
129 | AutoCounter as(active_search_count_); |
130 | |
131 | ailego::ElapsedTime timer; |
132 | uint64_t query_id = query_params.query_id; |
133 | |
134 | if (!column_readers_.has(column_name)) { |
135 | SLOG_ERROR("Column not exist. query_id[%zu] column_name[%s]" , |
136 | (size_t)query_id, column_name.c_str()); |
137 | return ErrorCode_InexistentColumn; |
138 | } |
139 | |
140 | auto &column_reader = column_readers_.get(column_name); |
141 | // check if column searcher is empty searcher |
142 | // it means this column added later by update schema |
143 | // so we just return empty results |
144 | if (!column_reader) { |
145 | SLOG_INFO( |
146 | "Empty column searcher return empty results. query_id[%zu] " |
147 | "batch_count[%u] topk[%u] " |
148 | "res_num[0] cost[%zums] column[%s]" , |
149 | (size_t)query_id, batch_count, query_params.topk, |
150 | (size_t)timer.milli_seconds(), column_name.c_str()); |
151 | return 0; |
152 | } |
153 | |
154 | // search columns |
155 | std::vector<IndexDocumentList> batch_search_results; |
156 | FilterFunction filter = nullptr; |
157 | if (delete_store_ && delete_store_->count() > 0) { |
158 | filter = [this](idx_t doc_id) { return delete_store_->has(doc_id); }; |
159 | } |
160 | |
161 | int ret = column_reader->search(query, query_params, batch_count, filter, |
162 | &batch_search_results); |
163 | CHECK_RETURN_WITH_SLOG( |
164 | ret, 0, "Column searcher search failed. query_id[%zu] column[%s]" , |
165 | (size_t)query_id, column_name.c_str()); |
166 | |
167 | // fill results |
168 | uint32_t res_num = 0U; |
169 | for (size_t i = 0; i < batch_search_results.size(); i++) { |
170 | auto &search_results = batch_search_results[i]; |
171 | QueryResultList output_result_list; |
172 | for (size_t j = 0; j < search_results.size(); j++) { |
173 | idx_t doc_id = search_results[j].key(); |
174 | ForwardData fwd_data; |
175 | ret = forward_reader_->seek(doc_id, &fwd_data); |
176 | if (ret != 0) { |
177 | SLOG_WARN("Forward not exist. query_id[%zu] doc_id[%zu] column[%s] " , |
178 | (size_t)query_id, (size_t)doc_id, column_name.c_str()); |
179 | continue; |
180 | } |
181 | QueryResult res; |
182 | res.primary_key = fwd_data.header.primary_key; |
183 | res.score = search_results[j].score(); |
184 | res.revision = fwd_data.header.revision; |
185 | res.forward_data = std::move(fwd_data.data); |
186 | res.lsn = fwd_data.header.lsn; |
187 | output_result_list.emplace_back(res); |
188 | } |
189 | res_num += search_results.size(); |
190 | batch_results->emplace_back(output_result_list); |
191 | } |
192 | |
193 | SLOG_DEBUG( |
194 | "Knn search query success. query_id[%zu] " |
195 | "batch_count[%u] topk[%u] res_num[%u] cost[%zuus] column[%s]" , |
196 | (size_t)query_id, batch_count, query_params.topk, res_num, |
197 | (size_t)timer.micro_seconds(), column_name.c_str()); |
198 | |
199 | return 0; |
200 | } |
201 | |
202 | int PersistSegment::kv_search(uint64_t primary_key, QueryResult *result) { |
203 | CHECK_STATUS(loaded_, true); |
204 | |
205 | idx_t doc_id = id_map_->get_mapping_id(primary_key); |
206 | bool found = false; |
207 | result->primary_key = INVALID_KEY; |
208 | |
209 | if (!delete_store_->has(doc_id)) { |
210 | if (doc_id >= segment_meta_.min_doc_id && |
211 | doc_id <= segment_meta_.max_doc_id) { |
212 | ForwardData fwd_data; |
213 | int ret = forward_reader_->seek(doc_id, &fwd_data); |
214 | if (ret == 0 && fwd_data.header.primary_key != INVALID_KEY) { |
215 | result->primary_key = fwd_data.header.primary_key; |
216 | result->revision = fwd_data.header.revision; |
217 | result->forward_data = std::move(fwd_data.data); |
218 | result->lsn = fwd_data.header.lsn; |
219 | found = true; |
220 | } |
221 | } |
222 | } |
223 | |
224 | SLOG_DEBUG("Kv search query success. key[%zu] found[%d]" , (size_t)primary_key, |
225 | found); |
226 | return 0; |
227 | } |
228 | |
229 | int PersistSegment::remove_column(const std::string &column_name) { |
230 | CHECK_STATUS(loaded_, true); |
231 | if (!column_readers_.has(column_name)) { |
232 | SLOG_WARN("Column not exist, remove failed. column[%s]" , |
233 | column_name.c_str()); |
234 | return 0; |
235 | } |
236 | |
237 | column_readers_.get(column_name)->close(); |
238 | column_readers_.erase(column_name); |
239 | |
240 | SLOG_INFO("Remove column done. column[%s]" , column_name.c_str()); |
241 | return 0; |
242 | } |
243 | |
244 | int PersistSegment::add_column(const meta::ColumnMetaPtr &column_meta) { |
245 | CHECK_STATUS(loaded_, true); |
246 | std::string column_name = column_meta->name(); |
247 | if (column_readers_.has(column_meta->name())) { |
248 | SLOG_WARN("Column already exist, remove failed. column[%s]" , |
249 | column_name.c_str()); |
250 | return 0; |
251 | } |
252 | |
253 | // occupy a empty column, it will skip in query process |
254 | column_readers_.emplace(column_name, ColumnReaderPtr()); |
255 | |
256 | SLOG_INFO("Add column success. column[%s]" , column_name.c_str()); |
257 | return 0; |
258 | } |
259 | |
260 | int PersistSegment::load_forward_reader(const ReadOptions &read_options) { |
261 | forward_reader_ = ForwardReader::Create(collection_name_, collection_path_, |
262 | segment_meta_.segment_id); |
263 | if (!forward_reader_) { |
264 | SLOG_ERROR("Forward reader create failed." ); |
265 | return ErrorCode_RuntimeError; |
266 | } |
267 | |
268 | forward_reader_->set_start_doc_id(segment_meta_.min_doc_id); |
269 | int ret = forward_reader_->open(read_options); |
270 | CHECK_RETURN_WITH_SLOG(ret, 0, "Open forward reader failed." ); |
271 | |
272 | SLOG_DEBUG("Opened forward reader." ); |
273 | return 0; |
274 | } |
275 | |
276 | int PersistSegment::load_column_readers(const ReadOptions &read_options) { |
277 | for (auto &column_meta : schema_->index_columns()) { |
278 | std::string column_name = column_meta->name(); |
279 | |
280 | ColumnReaderPtr new_column_reader = ColumnReader::Create( |
281 | collection_name_, collection_path_, segment_meta_.segment_id, |
282 | column_name, column_meta->index_type()); |
283 | if (!new_column_reader) { |
284 | SLOG_ERROR("Create column reader failed. index_type[%d] column[%s]" , |
285 | column_meta->index_type(), column_name.c_str()); |
286 | return ErrorCode_RuntimeError; |
287 | } |
288 | |
289 | new_column_reader->set_concurrency(concurrency_); |
290 | int ret = new_column_reader->open(*column_meta.get(), read_options); |
291 | CHECK_RETURN_WITH_SLOG( |
292 | ret, 0, "Open column reader failed. index_type[%d] column[%s]" , |
293 | column_meta->index_type(), column_name.c_str()); |
294 | |
295 | column_readers_.emplace(column_name, new_column_reader); |
296 | } |
297 | |
298 | return 0; |
299 | } |
300 | |
301 | |
302 | } // end namespace index |
303 | } // namespace be |
304 | } // end namespace proxima |
305 | |