1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Haichao.chc
17 * \date Oct 2020
18 * \brief Implementation of simple forward indexer
19 */
20
21#include "simple_forward_indexer.h"
22#include "common/error_code.h"
23#include "forward_indexer.h"
24#include "../file_helper.h"
25
26namespace proxima {
27namespace be {
28namespace index {
29
30SimpleForwardIndexer::~SimpleForwardIndexer() {
31 if (opened_) {
32 this->close();
33 }
34}
35
36int SimpleForwardIndexer::open(const ReadOptions &read_options) {
37 CHECK_STATUS(opened_, false);
38
39 int ret =
40 Snapshot::CreateAndOpen(this->collection_path(), FileID::FORWARD_FILE,
41 this->segment_id(), read_options, &snapshot_);
42 CHECK_RETURN_WITH_SLOG(ret, 0, "Create snapshot failed.");
43
44 ret = open_proxima_forward();
45 CHECK_RETURN_WITH_SLOG(ret, 0, "Open proxima forward failed.");
46
47 opened_ = true;
48 return 0;
49}
50
51int SimpleForwardIndexer::close() {
52 CHECK_STATUS(opened_, true);
53
54 proxima_forward_->close();
55 int ret = snapshot_->close();
56 if (ret != 0) {
57 SLOG_WARN("Close snapshot failed.");
58 }
59
60 opened_ = false;
61 return ret;
62}
63
64int SimpleForwardIndexer::flush() {
65 CHECK_STATUS(opened_, true);
66
67 return proxima_forward_->flush(0);
68}
69
70int SimpleForwardIndexer::dump(IndexDumperPtr dumper) {
71 CHECK_STATUS(opened_, true);
72
73 return proxima_forward_->dump(dumper);
74}
75
76int SimpleForwardIndexer::insert(const ForwardData &forward_data,
77 idx_t *doc_id) {
78 CHECK_STATUS(opened_, true);
79
80 uint64_t index = 0U;
81 std::string buffer;
82 forward_data.serialize(&buffer);
83 uint64_t key = forward_data.header.primary_key;
84 int ret = proxima_forward_->append(buffer.data(), buffer.size(), &index);
85 CHECK_RETURN_WITH_SLOG(ret, 0, "Append forward failed. key[%zu] ret[%d]",
86 (size_t)key, ret);
87
88 *doc_id = this->start_doc_id() + index;
89 return 0;
90}
91
92#if 0
93int SimpleForwardIndexer::update(idx_t doc_id,
94 const ForwardData &forward_data) {
95 CHECK_STATUS(opened_, true);
96
97 uint64_t key = forward_data.header.primary_key;
98 uint64_t index = doc_id - this->start_doc_id();
99 std::string buffer;
100 forward_data.serialize(&buffer);
101 int ret = proxima_forward_->update(index, buffer.data(), buffer.size());
102 CHECK_RETURN_WITH_SLOG(
103 ret, 0,
104 "Update forward data failed. key[%zu] doc_id[%zu] index[%zu] ret[%d]",
105 (size_t)key, (size_t)doc_id, (size_t)index, ret);
106
107 return 0;
108}
109#endif
110
111int SimpleForwardIndexer::remove(idx_t doc_id) {
112 CHECK_STATUS(opened_, true);
113
114 uint64_t index = doc_id - this->start_doc_id();
115 int ret = proxima_forward_->erase(index);
116 CHECK_RETURN_WITH_SLOG(
117 ret, 0, "Remove forward data failed. doc_id[%zu] index[%zu] ret[%d]",
118 (size_t)doc_id, (size_t)index, ret);
119
120 return 0;
121}
122
123int SimpleForwardIndexer::seek(idx_t doc_id, ForwardData *forward_data) {
124 CHECK_STATUS(opened_, true);
125
126 uint64_t index = doc_id - this->start_doc_id();
127 std::string buffer;
128 int ret = proxima_forward_->fetch(index, &buffer);
129 CHECK_RETURN_WITH_SLOG(
130 ret, 0, "Forward store fetch failed. doc_id[%zu] index[%zu] ret[%d]",
131 (size_t)doc_id, (size_t)index, ret);
132
133 forward_data->deserialize(buffer);
134 return 0;
135}
136
137int SimpleForwardIndexer::open_proxima_forward() {
138 proxima_forward_ = aitheta2::IndexFactory::CreateCloset("ChainCloset");
139 if (!proxima_forward_) {
140 SLOG_ERROR("Create proxima forward failed.");
141 return ErrorCode_RuntimeError;
142 }
143
144 aitheta2::IndexParams params;
145 params.set("proxima.chain.closet.slot_size", 128);
146 int ret = proxima_forward_->init(params);
147 CHECK_RETURN_WITH_SLOG(ret, 0, "Init proxima forward failed.");
148
149 ret = proxima_forward_->open(snapshot_->data());
150 CHECK_RETURN_WITH_SLOG(ret, 0, "Open proxima forward failed. ret[%d]", ret);
151
152 return 0;
153}
154
155
156} // end namespace index
157} // namespace be
158} // end namespace proxima
159