1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief VectorColumnIndexer process vector column data streamly, it's
19 * a kind of implementation of ColumnIndexer.
20 */
21
22#pragma once
23
24#include <condition_variable>
25#include <memory>
26#include <queue>
27#include "common/error_code.h"
28#include "common/macro_define.h"
29#include "common/types.h"
30#include "meta/meta.h"
31#include "column_indexer.h"
32#include "context_pool.h"
33#include "index_helper.h"
34#include "../snapshot.h"
35#include "../typedef.h"
36
37namespace proxima {
38namespace be {
39namespace index {
40
41/*
42 * VectorColumnIndexer process vector column data streamly, and provides
43 * vector search interfaces.
44 */
45class VectorColumnIndexer : public ColumnIndexer {
46 public:
47 enum class EngineTypes : uint32_t {
48 PROXIMA_HNSW_STREAMER = 0,
49 PROXIMA_OSWG_STREAMER = 1
50 };
51
52 public:
53 PROXIMA_DISALLOW_COPY_AND_ASSIGN(VectorColumnIndexer);
54
55 //! Constructor
56 VectorColumnIndexer(const std::string &coll_name,
57 const std::string &coll_path, SegmentID seg_id,
58 const std::string &col_name) {
59 this->set_collection_name(coll_name);
60 this->set_collection_path(coll_path);
61 this->set_segment_id(seg_id);
62 this->set_column_name(col_name);
63 }
64
65 //! Destructor
66 ~VectorColumnIndexer();
67
68 public:
69 //! Open persist storage
70 int open(const meta::ColumnMeta &column_meta,
71 const ReadOptions &read_options) override;
72
73 //! Flush memory to persist storage
74 int flush() override;
75
76 //! Close persist storage
77 int close() override;
78
79 //! Dump index to persist storage
80 int dump(IndexDumperPtr dumper) override;
81
82 public:
83 //! Insert vector
84 int insert(idx_t doc_id, const ColumnData &column_data) override;
85
86#if 0
87 //! Update column data by doc_id
88 int update(idx_t doc_id, const ColumnData &column_data) override;
89#endif
90
91 //! Remove column data by doc_id
92 int remove(idx_t doc_id) override;
93
94 //! Optimize index structure
95 int optimize(ThreadPoolPtr pool) override;
96
97 //! Search similar results with query
98 int search(const std::string &query, const QueryParams &query_params,
99 FilterFunction filter, IndexDocumentList *result_list) override;
100
101 //! Batch search similar results with query
102 int search(const std::string &query, const QueryParams &query_params,
103 uint32_t batch_count, FilterFunction filter,
104 std::vector<IndexDocumentList> *batch_result_list) override;
105
106 public:
107 //! Return index path
108 std::string index_file_path() const override {
109 if (snapshot_) {
110 return snapshot_->file_path();
111 } else {
112 return "";
113 }
114 }
115
116 //! Return doc count
117 size_t doc_count() const override {
118 if (proxima_streamer_) {
119 return proxima_streamer_->stats().added_count();
120 } else {
121 return 0U;
122 }
123 }
124
125 private:
126 bool check_column_meta(const meta::ColumnMeta &column_meta);
127
128 int open_proxima_streamer();
129
130 std::string get_engine_name() {
131 if (engine_type_ == EngineTypes::PROXIMA_OSWG_STREAMER) {
132 return "OswgStreamer";
133 } else {
134 return "HnswStreamer";
135 }
136 }
137
138 private:
139 SnapshotPtr snapshot_{};
140 IndexParams proxima_params_{};
141 IndexStreamerPtr proxima_streamer_{};
142 IndexMeta proxima_meta_{};
143 ContextPool context_pool_{};
144
145 EngineTypes engine_type_{EngineTypes::PROXIMA_OSWG_STREAMER};
146
147 QuantizeTypes quantize_type_{QuantizeTypes::UNDEFINED};
148 IndexReformerPtr reformer_{};
149 IndexMeasurePtr measure_{};
150
151 bool opened_{false};
152};
153
154} // end namespace index
155} // namespace be
156} // end namespace proxima
157