1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Dianzhang.Chen
17 * \date Jan 2021
18 * \brief Definition of fake collection
19 */
20
21#pragma once
22
23#include <string>
24#include <brpc/channel.h>
25
26#define private public
27#define protected public
28#include "repository/repository_common/config.h"
29#undef private
30#undef protected
31
32#include "repository/binlog/mysql_handler.h"
33#include "repository/collection.h"
34#include "repository/common_types.h"
35#include "repository/repository_common/error_code.h"
36
37namespace proxima {
38namespace be {
39namespace repository {
40
41class FakeMysqlCollection;
42using FakeMysqlCollectionPtr = std::shared_ptr<FakeMysqlCollection>;
43
44/*! Mysql Collection
45 */
46class FakeMysqlCollection : public Collection {
47 public:
48 //! Constructor
49 FakeMysqlCollection(const proto::CollectionConfig &config,
50 MysqlHandlerPtr mysql_handler)
51 : config_(config), mysql_handler_(std::move(mysql_handler)) {}
52
53 //! Destructor
54 virtual ~FakeMysqlCollection() {
55 if (work_thread_.joinable()) {
56 work_thread_.join();
57 }
58 }
59
60 public:
61 //! Initialize MySQL Collection
62 int init() override;
63
64 //! Start Collection
65 void run() override;
66
67 //! Update Collection
68 void update() override;
69
70 //! Drop Collection
71 void drop() override;
72
73 //! Get collection state
74 CollectionStatus state() const override;
75
76 //! Stop collection
77 void stop() override;
78
79 //! Get collection schema revision
80 uint32_t schema_revision() const override;
81
82 //! Check if collection is finished
83 bool finished() const override;
84
85
86 protected:
87 int init_brpc();
88
89 void work_impl();
90
91 bool is_valid();
92
93 private:
94 std::atomic<CollectionStatus> state_{CollectionStatus::INIT};
95 proto::CollectionConfig config_;
96 MysqlHandlerPtr mysql_handler_;
97 brpc::Channel channel_;
98 std::shared_ptr<proto::ProximaService_Stub> stub_;
99 std::thread work_thread_;
100};
101
102void FakeMysqlCollection::run() {
103 LOG_INFO("Start Fake Mysql Collection. name[%s]",
104 config_.collection_name().c_str());
105 work_thread_ = std::thread(&FakeMysqlCollection::work_impl, this);
106}
107
108void FakeMysqlCollection::work_impl() {
109 while (is_valid()) {
110 std::this_thread::sleep_for(std::chrono::microseconds(1));
111 proto::Status response;
112 proto::WriteRequest request;
113 request.set_collection_name(config_.collection_name());
114 brpc::Controller cntl;
115 stub_->write(&cntl, &request, &response, NULL);
116 }
117}
118
119bool FakeMysqlCollection::is_valid() {
120 if (state_.load() == CollectionStatus::FINISHED) {
121 return false;
122 }
123 return true;
124}
125
126bool FakeMysqlCollection::finished() const {
127 return state() == CollectionStatus::FINISHED;
128}
129
130
131void FakeMysqlCollection::update() {
132 state_.store(CollectionStatus::UPDATING);
133 // Just fake: ++schema_revision when update collection
134 // real logic should update from brpc
135 // auto current_schema_revision = config_.schema_revision();
136 // config_.set_schema_revision(++current_schema_revision);
137}
138
139void FakeMysqlCollection::drop() {
140 state_.store(CollectionStatus::FINISHED);
141}
142
143int FakeMysqlCollection::init() {
144 LOG_INFO("Init Fake Mysql Collection. name[%s]",
145 config_.collection_name().c_str());
146 int ret = init_brpc();
147 if (ret != 0) {
148 return ret;
149 }
150 proto::CollectionName request;
151 proto::DescribeCollectionResponse response;
152 brpc::Controller cntl;
153 request.set_collection_name(config_.collection_name());
154 stub_->describe_collection(&cntl, &request, &response, NULL);
155 return 0;
156}
157
158int FakeMysqlCollection::init_brpc() {
159 brpc::ChannelOptions options;
160 auto index_uri =
161 proxima::be::repository::Config::Instance().get_index_agent_uri();
162 int ret = channel_.Init(index_uri.c_str(), "", &options);
163 if (ret != 0) {
164 LOG_ERROR("Failed to initialize channel");
165 return ErrorCode_InitChannel;
166 }
167 stub_.reset(new (std::nothrow) proto::ProximaService_Stub(&channel_));
168 return 0;
169}
170
171CollectionStatus FakeMysqlCollection::state() const {
172 CollectionStatus state = state_.load();
173 return state;
174}
175
176void FakeMysqlCollection::stop() {
177 LOG_INFO("Stop Fake Mysql Collection. name[%s]",
178 config_.collection_name().c_str());
179 state_.store(CollectionStatus::FINISHED);
180 if (work_thread_.joinable()) {
181 work_thread_.join();
182 }
183}
184
185uint32_t FakeMysqlCollection::schema_revision() const {
186 return 0;
187}
188
189} // end namespace repository
190} // namespace be
191} // end namespace proxima
192