1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Writing requests will insert into memory segment firstly.
19 * Then it will dump to persist segment when it's full
20 */
21
22#pragma once
23
24#include <ailego/parallel/lock.h>
25#include "common/auto_counter.h"
26#include "meta/meta.h"
27#include "segment.h"
28#include "../collection_dataset.h"
29#include "../column/column_indexer.h"
30#include "../column/forward_indexer.h"
31#include "../concurrent_hash_map.h"
32#include "../delete_store.h"
33#include "../id_map.h"
34
35namespace proxima {
36namespace be {
37namespace index {
38
39class MemorySegment;
40using MemorySegmentPtr = std::shared_ptr<MemorySegment>;
41
42/*
43 * A MemorySegment represents block of index data in memory, with
44 * streaming insert and search ability at the same time. You can
45 * set a docs limit to it, and it will dump to persit storage when
46 * full.
47 */
48class MemorySegment : public Segment {
49 public:
50 PROXIMA_DISALLOW_COPY_AND_ASSIGN(MemorySegment);
51
52 //! Constructor
53 MemorySegment(const std::string &coll_name, const std::string &coll_path,
54 const SegmentMeta &seg_meta,
55 const meta::CollectionMeta *schema_ptr,
56 const DeleteStore *delete_store_ptr, const IDMap *id_map_ptr,
57 uint32_t concurrency_val)
58 : schema_(schema_ptr),
59 delete_store_(delete_store_ptr),
60 id_map_(id_map_ptr),
61 concurrency_(concurrency_val) {
62 this->set_collection_name(coll_name);
63 this->set_collection_path(coll_path);
64 this->set_segment_meta(seg_meta);
65 }
66
67
68 //! Destructor
69 ~MemorySegment() override;
70
71 //! Create an instance and return shared ptr
72 static MemorySegmentPtr Create(const std::string &collection_name,
73 const std::string &collection_path,
74 const SegmentMeta &segment_meta,
75 const meta::CollectionMeta *schema,
76 const DeleteStore *delete_store,
77 const IDMap *id_map, uint32_t concurrency);
78
79 //! Create and open an instance
80 static int CreateAndOpen(const std::string &collection_name,
81 const std::string &collection_path,
82 const SegmentMeta &segment_meta,
83 const meta::CollectionMeta *schema,
84 const DeleteStore *delete_store, const IDMap *id_map,
85 uint32_t concurrency,
86 const ReadOptions &read_options,
87 MemorySegmentPtr *memory_segment);
88
89 public:
90 //! Open and initialize memory segment
91 int open(const ReadOptions &read_options);
92
93 //! Close and cleanup memory segment
94 int close();
95
96 //! Flush memory to persist storage
97 int flush();
98
99 //! Dump to another index type to persist storage
100 int dump();
101
102 //! Close and remove internal files
103 int close_and_remove_files();
104
105 public:
106 //! Insert a record & alloc a doc_id
107 int insert(const Record &record, idx_t *doc_id);
108
109 //! Remove a record
110 int remove(idx_t doc_id);
111
112 //! Optimize memory usage
113 int optimize(ThreadPoolPtr pool);
114
115#if 0
116 //! Update a record
117 int update(idx_t doc_id, const Record &record);
118#endif
119
120 //! Knn similar search
121 int knn_search(const std::string &column_name, const std::string &query,
122 const QueryParams &query_params,
123 QueryResultList *results) override;
124
125 //! Knn similar search with batch mode
126 int knn_search(const std::string &column_name, const std::string &query,
127 const QueryParams &query_params, uint32_t batch_count,
128 std::vector<QueryResultList> *results) override;
129
130 //! Just search forward by doc primary key
131 int kv_search(uint64_t primary_key, QueryResult *result) override;
132
133 public:
134 //! Remove a column
135 int remove_column(const std::string &column_name) override;
136
137 //! Add a column
138 int add_column(const meta::ColumnMetaPtr &column_meta) override;
139
140 public:
141 //! Set segment state
142 void update_state(SegmentState new_state) {
143 segment_meta_.state = new_state;
144 }
145
146 //! Return forward count
147 size_t doc_count() const override {
148 return segment_meta_.doc_count;
149 }
150
151 public:
152 //! Get forward reader
153 ForwardReaderPtr get_forward_reader() const override {
154 return forward_indexer_;
155 }
156
157 //! Get column reader
158 ColumnReaderPtr get_column_reader(
159 const std::string &column_name) const override {
160 if (!column_indexers_.has(column_name)) {
161 return ColumnReaderPtr();
162 }
163 return column_indexers_.get(column_name);
164 }
165
166 private:
167 int open_forward_indexer(const ReadOptions &read_options);
168
169 int open_column_indexers(const ReadOptions &read_options);
170
171 int dump_forward_indexer(const IndexDumperPtr &dumper);
172
173 int dump_column_indexers(const IndexDumperPtr &dumper);
174
175 void update_stats(const Record &record, idx_t doc_id);
176
177 size_t get_index_file_count();
178
179 size_t get_index_file_size();
180
181 private:
182 static constexpr uint32_t MAX_WAIT_RETRY_COUNT = 60U;
183
184 private:
185 const meta::CollectionMeta *schema_{nullptr};
186 const DeleteStore *delete_store_{nullptr};
187 const IDMap *id_map_{nullptr};
188 uint32_t concurrency_{0U};
189
190 ForwardIndexerPtr forward_indexer_{};
191 ConcurrentHashMap<std::string, ColumnIndexerPtr> column_indexers_{};
192
193 std::mutex mutex_{};
194 std::atomic<uint64_t> active_insert_count_{0U};
195 std::atomic<uint64_t> active_search_count_{0U};
196 bool opened_{false};
197};
198
199
200} // end namespace index
201} // namespace be
202} // end namespace proxima
203