1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of persist segment
19 */
20
21#include "persist_segment.h"
22#include <ailego/utility/time_helper.h>
23#include "common/auto_counter.h"
24#include "common/error_code.h"
25#include "../file_helper.h"
26
27namespace proxima {
28namespace be {
29namespace index {
30
31PersistSegmentPtr PersistSegment::Create(const std::string &collection_name,
32 const std::string &collection_path,
33 const SegmentMeta &segment_meta,
34 const meta::CollectionMeta *schema,
35 const DeleteStore *delete_store,
36 const IDMap *id_map,
37 uint32_t concurrency) {
38 return std::make_shared<PersistSegment>(collection_name, collection_path,
39 segment_meta, schema, delete_store,
40 id_map, concurrency);
41}
42
43int PersistSegment::CreateAndLoad(
44 const std::string &collection_name, const std::string &collection_path,
45 const SegmentMeta &segment_meta, const meta::CollectionMeta *schema,
46 const DeleteStore *delete_store, const IDMap *id_map, uint32_t concurrency,
47 const ReadOptions &read_options, PersistSegmentPtr *persist_segment) {
48 (*persist_segment) = Create(collection_name, collection_path, segment_meta,
49 schema, delete_store, id_map, concurrency);
50
51 return (*persist_segment)->load(read_options);
52}
53
54PersistSegment::~PersistSegment() {
55 if (loaded_) {
56 unload();
57 }
58}
59
60int PersistSegment::load(const ReadOptions &read_options) {
61 CHECK_STATUS(loaded_, false);
62
63 // Load forward searcher
64 int ret = load_forward_reader(read_options);
65 CHECK_RETURN_WITH_SLOG(ret, 0, "Load forward searcher failed.");
66
67 // Load column searchers
68 ret = load_column_readers(read_options);
69 CHECK_RETURN_WITH_SLOG(ret, 0, "Load column searchers failed.");
70
71 SLOG_DEBUG("Load persist segment success.");
72 loaded_ = true;
73
74 return 0;
75}
76
77int PersistSegment::unload() {
78 CHECK_STATUS(loaded_, true);
79
80 // try to ensure active search requests finished
81 uint32_t retry = 0;
82 while (retry < MAX_WAIT_RETRY_COUNT && active_search_count_ > 0) {
83 LOG_INFO(
84 "Try to wait active request finished. active_search_count[%zu] "
85 "retry[%d]",
86 (size_t)active_search_count_.load(), retry);
87 std::this_thread::sleep_for(std::chrono::seconds(1));
88 retry++;
89 }
90
91 forward_reader_->close();
92 for (auto &it : column_readers_) {
93 // Check if column searcher is empty searcher
94 // because sometimes will add empty column in
95 // persist segment.
96 if (it.second != nullptr) {
97 it.second->close();
98 }
99 }
100 column_readers_.clear();
101
102 loaded_ = false;
103 SLOG_DEBUG("Unloaded persist segment.");
104
105 return 0;
106}
107
108int PersistSegment::knn_search(const std::string &column_name,
109 const std::string &query,
110 const QueryParams &query_params,
111 QueryResultList *results) {
112 CHECK_STATUS(loaded_, true);
113 std::vector<QueryResultList> batch_results;
114 int ret =
115 this->knn_search(column_name, query, query_params, 1, &batch_results);
116 CHECK_RETURN(ret, 0);
117
118 (*results) = batch_results[0];
119 return 0;
120}
121
122int PersistSegment::knn_search(const std::string &column_name,
123 const std::string &query,
124 const QueryParams &query_params,
125 uint32_t batch_count,
126 std::vector<QueryResultList> *batch_results) {
127 CHECK_STATUS(loaded_, true);
128
129 AutoCounter as(active_search_count_);
130
131 ailego::ElapsedTime timer;
132 uint64_t query_id = query_params.query_id;
133
134 if (!column_readers_.has(column_name)) {
135 SLOG_ERROR("Column not exist. query_id[%zu] column_name[%s]",
136 (size_t)query_id, column_name.c_str());
137 return ErrorCode_InexistentColumn;
138 }
139
140 auto &column_reader = column_readers_.get(column_name);
141 // check if column searcher is empty searcher
142 // it means this column added later by update schema
143 // so we just return empty results
144 if (!column_reader) {
145 SLOG_INFO(
146 "Empty column searcher return empty results. query_id[%zu] "
147 "batch_count[%u] topk[%u] "
148 "res_num[0] cost[%zums] column[%s]",
149 (size_t)query_id, batch_count, query_params.topk,
150 (size_t)timer.milli_seconds(), column_name.c_str());
151 return 0;
152 }
153
154 // search columns
155 std::vector<IndexDocumentList> batch_search_results;
156 FilterFunction filter = nullptr;
157 if (delete_store_ && delete_store_->count() > 0) {
158 filter = [this](idx_t doc_id) { return delete_store_->has(doc_id); };
159 }
160
161 int ret = column_reader->search(query, query_params, batch_count, filter,
162 &batch_search_results);
163 CHECK_RETURN_WITH_SLOG(
164 ret, 0, "Column searcher search failed. query_id[%zu] column[%s]",
165 (size_t)query_id, column_name.c_str());
166
167 // fill results
168 uint32_t res_num = 0U;
169 for (size_t i = 0; i < batch_search_results.size(); i++) {
170 auto &search_results = batch_search_results[i];
171 QueryResultList output_result_list;
172 for (size_t j = 0; j < search_results.size(); j++) {
173 idx_t doc_id = search_results[j].key();
174 ForwardData fwd_data;
175 ret = forward_reader_->seek(doc_id, &fwd_data);
176 if (ret != 0) {
177 SLOG_WARN("Forward not exist. query_id[%zu] doc_id[%zu] column[%s] ",
178 (size_t)query_id, (size_t)doc_id, column_name.c_str());
179 continue;
180 }
181 QueryResult res;
182 res.primary_key = fwd_data.header.primary_key;
183 res.score = search_results[j].score();
184 res.revision = fwd_data.header.revision;
185 res.forward_data = std::move(fwd_data.data);
186 res.lsn = fwd_data.header.lsn;
187 output_result_list.emplace_back(res);
188 }
189 res_num += search_results.size();
190 batch_results->emplace_back(output_result_list);
191 }
192
193 SLOG_DEBUG(
194 "Knn search query success. query_id[%zu] "
195 "batch_count[%u] topk[%u] res_num[%u] cost[%zuus] column[%s]",
196 (size_t)query_id, batch_count, query_params.topk, res_num,
197 (size_t)timer.micro_seconds(), column_name.c_str());
198
199 return 0;
200}
201
202int PersistSegment::kv_search(uint64_t primary_key, QueryResult *result) {
203 CHECK_STATUS(loaded_, true);
204
205 idx_t doc_id = id_map_->get_mapping_id(primary_key);
206 bool found = false;
207 result->primary_key = INVALID_KEY;
208
209 if (!delete_store_->has(doc_id)) {
210 if (doc_id >= segment_meta_.min_doc_id &&
211 doc_id <= segment_meta_.max_doc_id) {
212 ForwardData fwd_data;
213 int ret = forward_reader_->seek(doc_id, &fwd_data);
214 if (ret == 0 && fwd_data.header.primary_key != INVALID_KEY) {
215 result->primary_key = fwd_data.header.primary_key;
216 result->revision = fwd_data.header.revision;
217 result->forward_data = std::move(fwd_data.data);
218 result->lsn = fwd_data.header.lsn;
219 found = true;
220 }
221 }
222 }
223
224 SLOG_DEBUG("Kv search query success. key[%zu] found[%d]", (size_t)primary_key,
225 found);
226 return 0;
227}
228
229int PersistSegment::remove_column(const std::string &column_name) {
230 CHECK_STATUS(loaded_, true);
231 if (!column_readers_.has(column_name)) {
232 SLOG_WARN("Column not exist, remove failed. column[%s]",
233 column_name.c_str());
234 return 0;
235 }
236
237 column_readers_.get(column_name)->close();
238 column_readers_.erase(column_name);
239
240 SLOG_INFO("Remove column done. column[%s]", column_name.c_str());
241 return 0;
242}
243
244int PersistSegment::add_column(const meta::ColumnMetaPtr &column_meta) {
245 CHECK_STATUS(loaded_, true);
246 std::string column_name = column_meta->name();
247 if (column_readers_.has(column_meta->name())) {
248 SLOG_WARN("Column already exist, remove failed. column[%s]",
249 column_name.c_str());
250 return 0;
251 }
252
253 // occupy a empty column, it will skip in query process
254 column_readers_.emplace(column_name, ColumnReaderPtr());
255
256 SLOG_INFO("Add column success. column[%s]", column_name.c_str());
257 return 0;
258}
259
260int PersistSegment::load_forward_reader(const ReadOptions &read_options) {
261 forward_reader_ = ForwardReader::Create(collection_name_, collection_path_,
262 segment_meta_.segment_id);
263 if (!forward_reader_) {
264 SLOG_ERROR("Forward reader create failed.");
265 return ErrorCode_RuntimeError;
266 }
267
268 forward_reader_->set_start_doc_id(segment_meta_.min_doc_id);
269 int ret = forward_reader_->open(read_options);
270 CHECK_RETURN_WITH_SLOG(ret, 0, "Open forward reader failed.");
271
272 SLOG_DEBUG("Opened forward reader.");
273 return 0;
274}
275
276int PersistSegment::load_column_readers(const ReadOptions &read_options) {
277 for (auto &column_meta : schema_->index_columns()) {
278 std::string column_name = column_meta->name();
279
280 ColumnReaderPtr new_column_reader = ColumnReader::Create(
281 collection_name_, collection_path_, segment_meta_.segment_id,
282 column_name, column_meta->index_type());
283 if (!new_column_reader) {
284 SLOG_ERROR("Create column reader failed. index_type[%d] column[%s]",
285 column_meta->index_type(), column_name.c_str());
286 return ErrorCode_RuntimeError;
287 }
288
289 new_column_reader->set_concurrency(concurrency_);
290 int ret = new_column_reader->open(*column_meta.get(), read_options);
291 CHECK_RETURN_WITH_SLOG(
292 ret, 0, "Open column reader failed. index_type[%d] column[%s]",
293 column_meta->index_type(), column_name.c_str());
294
295 column_readers_.emplace(column_name, new_column_reader);
296 }
297
298 return 0;
299}
300
301
302} // end namespace index
303} // namespace be
304} // end namespace proxima
305