1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Collection provides add/delete/modify operations of single
19 * collection
20 */
21
22#pragma once
23
24#include "common/macro_define.h"
25#include "meta/meta.h"
26#include "segment/memory_segment.h"
27#include "segment/persist_segment_manager.h"
28#include "collection_dataset.h"
29#include "collection_stats.h"
30#include "delete_store.h"
31#include "id_map.h"
32#include "lsn_store.h"
33#include "version_manager.h"
34
35namespace proxima {
36namespace be {
37namespace index {
38
39class Collection;
40using CollectionPtr = std::shared_ptr<Collection>;
41
42/*
43 * Collection is responsible for data storage, building index,
44 * and searching index. It use traditional LSM structure for
45 * data processing and storing. Data will insert into a memory
46 * segment, when full it will dump to a persist segment.
47 */
48class Collection {
49 public:
50 PROXIMA_DISALLOW_COPY_AND_ASSIGN(Collection);
51
52 //! Constructor
53 Collection(const std::string &collection_name, const std::string &prefix_path,
54 meta::CollectionMetaPtr schema, uint32_t concurrency,
55 ThreadPool *thread_pool);
56
57 //! Destructor
58 ~Collection();
59
60 //! Create an instance
61 static CollectionPtr Create(const std::string &collection_name,
62 const std::string &prefix_path,
63 meta::CollectionMetaPtr schema,
64 uint32_t concurrency, ThreadPool *thread_pool);
65
66 //! Crate an instance and open
67 static int CreateAndOpen(const std::string &collection_name,
68 const std::string &prefix_path,
69 meta::CollectionMetaPtr schema, uint32_t concurrency,
70 ThreadPool *thread_pool,
71 const ReadOptions &read_options,
72 CollectionPtr *collection);
73
74 public:
75 //! Open and initialize collection
76 int open(const ReadOptions &read_options);
77
78 //! Close collection
79 int close();
80
81 //! Close collection and cleanup files
82 int close_and_cleanup();
83
84 //! Flush collection's memory to persist storage
85 int flush();
86
87 //! Dump collection's memory segment to persist segment
88 int dump();
89
90 //! Optimize collection memory usage
91 int optimize(ThreadPoolPtr pool);
92
93 public:
94 //! Batch write records
95 int write_records(const CollectionDataset &records);
96
97 //! Get latest lsn context of record
98 int get_latest_lsn(uint64_t *lsn, std::string *lsn_context);
99
100 //! Get all segments
101 int get_segments(std::vector<SegmentPtr> *segments);
102
103 //! Get statistics
104 int get_stats(CollectionStats *stats);
105
106 public:
107 //! Update schema
108 int update_schema(meta::CollectionMetaPtr new_schema);
109
110 public:
111 //! Get collection name
112 const std::string &collection_name() const {
113 return collection_name_;
114 }
115
116 //! Get collection disk path
117 const std::string &dir_path() const {
118 return dir_path_;
119 }
120
121 //! Get collection schema
122 const meta::CollectionMetaPtr &schema() {
123 return schema_;
124 }
125
126 private:
127 int recover_from_snapshot(const ReadOptions &read_options);
128
129 int remove_files();
130
131 int open_memory_segment(const SegmentMeta &segment_meta,
132 const ReadOptions &read_options,
133 MemorySegmentPtr *new_segment);
134
135 int load_persist_segment(const SegmentMeta &segment_meta,
136 const ReadOptions &read_options,
137 PersistSegmentPtr *new_segment);
138
139 int drive_dump_segment();
140
141 int do_dump_segment();
142
143 void diff_schema(const meta::CollectionMeta &new_schema,
144 const meta::CollectionMeta &current_schema,
145 std::vector<meta::ColumnMetaPtr> *add_columns,
146 std::vector<meta::ColumnMetaPtr> *delete_columns);
147
148 int insert_record(const Record &record);
149
150 int delete_record(uint64_t primary_key);
151
152 int update_record(const Record &record);
153
154 bool has_record(uint64_t primary_key);
155
156 int search_record(uint64_t primary_key, Record *record);
157
158 private:
159 static constexpr uint32_t DOC_ID_INCREASE_COUNT = 1000;
160
161 private:
162 std::string collection_name_{};
163 std::string prefix_path_{};
164 std::string dir_path_{};
165 meta::CollectionMetaPtr schema_{};
166 uint32_t concurrency_{0U};
167 ThreadPool *thread_pool_{nullptr};
168
169 IDMapPtr id_map_{};
170 DeleteStorePtr delete_store_{};
171 LsnStorePtr lsn_store_{};
172 MemorySegmentPtr writing_segment_{};
173 MemorySegmentPtr dumping_segment_{};
174 VersionManagerPtr version_manager_{};
175 PersistSegmentManagerPtr persist_segment_mgr_{};
176
177 std::mutex schema_mutex_{};
178 std::atomic<bool> is_dumping_{false};
179 std::atomic<bool> is_flushing_{false};
180 std::atomic<bool> is_optimizing_{false};
181
182 bool opened_{false};
183};
184
185
186} // end namespace index
187} // namespace be
188} // end namespace proxima
189