1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * \author guonix
17 * \date Nov 2020
18 * \brief
19 */
20
21#include "meta_service.h"
22#include <mutex>
23#include <ailego/encoding/uri.h>
24#include <ailego/parallel/lock.h>
25#include "common/error_code.h"
26#include "meta_cache.h"
27#include "meta_impl.h"
28#include "meta_service_builder.h"
29#include "meta_store_factory.h"
30
31namespace proxima {
32namespace be {
33namespace meta {
34
35/*! MetaServiceImpl
36 */
37class MetaServiceImpl : public MetaService {
38 public:
39 //! Constructor
40 //! @param uri: Identifier with database
41 explicit MetaServiceImpl(MetaStorePtr store, MetaCachePtr cache);
42
43 //! Disable default constructor and copy constructor
44 MetaServiceImpl() = delete;
45 MetaServiceImpl(const MetaServiceImpl &) = delete;
46
47 // Destructor
48 ~MetaServiceImpl() override;
49
50 public:
51 // Interface inherent from Service
52 int init_impl() override;
53
54 int cleanup_impl() override;
55
56 int start_impl() override;
57
58 int stop_impl() override;
59
60 public:
61 //! Reload meta service
62 int reload() override;
63
64 //! Create collection and columns
65 int create_collection(const CollectionBase &param,
66 CollectionMetaPtr *meta) override;
67
68 //! Update collection and columns, increase revision and copy a new collection
69 int update_collection(const CollectionBase &param,
70 CollectionMetaPtr *meta) override;
71
72 //! Enable collection
73 int enable_collection(const std::string &collection, uint32_t revision,
74 bool enable) override;
75
76 //! Update the status of current used collection
77 int update_status(const std::string &collection_name,
78 CollectionStatus status) override;
79
80 //! Suspend reading requests of collection
81 int suspend_collection_read(const std::string &collection_name) override;
82
83 //! Resume reading requests of collection
84 int resume_collection_read(const std::string &collection_name) override;
85
86 //! Suspend writing requests of collection
87 int suspend_collection_write(const std::string &collection_name) override;
88
89 //! Resume writing requests of collection
90 int resume_collection_write(const std::string &collection_name) override;
91
92 //! Drop collection
93 int drop_collection(const std::string &name) override;
94
95 //! Retrieve latest version of collection
96 CollectionMetaPtr get_current_collection(
97 const std::string &name) const override;
98
99 //! Retrieve latest version of collections
100 int get_latest_collections(CollectionMetaPtrList *collections) const override;
101
102 //! Retrieve all of collections
103 int get_collections(CollectionMetaPtrList *collections) const override;
104
105 //! Retrieve collections with specific repository
106 int get_collections_by_repo(
107 const std::string &repository,
108 CollectionMetaPtrList *collections) const override;
109
110 //! Retrieve collections with specific collection name
111 int get_collections(const std::string &name,
112 CollectionMetaPtrList *collections) const override;
113
114 //! Retrieve collection
115 CollectionMetaPtr get_collection(const std::string &name,
116 uint64_t revision) const override;
117
118 //! Check collection exists
119 bool exist_collection(const std::string &collection) const override;
120
121 private:
122 //! Retrieve collection
123 const CollectionImplPtr &inner_get_collection(const std::string &name,
124 uint64_t revision) const;
125
126 //! Update current collection
127 int update_current_used_collection(
128 const std::string collection_name,
129 std::function<int(CollectionImplPtr)> handler);
130
131 //! Cleanup cache without lock
132 inline void cleanup_cache() {
133 cache_->clear();
134 }
135
136 //! Load meta store to cache with out lock
137 int load_meta_cache();
138
139 //! load collections from meta store
140 int load_collections();
141
142 //! load columns from meta store
143 int load_columns();
144
145 //! load repositories from meta store
146 int load_repositories();
147
148 //! load meta from meta store
149 int load_meta_store();
150
151 //! store collection fo meta store
152 int store_collection(const CollectionImplPtr &collection);
153
154 private:
155 //! Mutex for cache access
156 mutable ailego::SharedMutex mutex_{};
157
158 //! Meta Store
159 MetaStorePtr store_{nullptr};
160
161 //! Meta Cache
162 MetaCachePtr cache_{nullptr};
163}; // end of MetaServiceImpl
164
165
166MetaServicePtr MetaServiceBuilder::Create(const std::string &uri_str) {
167 ailego::Uri uri(uri_str);
168 if (!uri.is_valid()) {
169 LOG_ERROR("Failed to parse uri, initialize MetaServiceImpl failed. uri[%s]",
170 uri_str.c_str());
171 return nullptr;
172 }
173
174 MetaStorePtr store =
175 MetaStoreFactory::Instance().create(uri.scheme().c_str(), &uri);
176 if (!store) {
177 LOG_ERROR(
178 "MetaServiceImpl init failed. reason[can't get meta store instance]");
179 return nullptr;
180 }
181
182 return std::make_shared<MetaServiceImpl>(store,
183 std::make_shared<MetaCache>());
184}
185
186int MetaServiceImpl::load_collections() {
187 CollectionImplPtr collection{nullptr};
188 // Load collection to cache
189 int code =
190 store_->list_collections([this, &collection]() -> CollectionObject * {
191 if (collection) { // Append collection to cache
192 collection->transform();
193 cache_->append_collection(collection);
194 collection.reset();
195 }
196 collection = std::make_shared<CollectionImpl>();
197 return collection.get();
198 });
199 if (code == 0 && collection) { // Append last collection
200 collection->transform();
201 cache_->append_collection(collection);
202 }
203 if (code != 0) { // Failed
204 LOG_ERROR("Failed to load collection from meta store. code[%d]", code);
205 return PROXIMA_BE_ERROR_CODE(RuntimeError);
206 }
207 return code;
208}
209
210int MetaServiceImpl::load_columns() {
211 ColumnImplPtr column{nullptr};
212 // Load columns into cache
213 int code = store_->list_columns([this, &column]() -> ColumnObject * {
214 if (column) { // Append column to cache
215 column->transform();
216 cache_->append_column(column);
217 column.reset();
218 }
219 column = std::make_shared<ColumnImpl>();
220 return column.get();
221 });
222 if (code == 0 && column) { // Append last column to cache
223 column->transform();
224 cache_->append_column(column);
225 }
226 if (code != 0) {
227 LOG_ERROR("Failed to load columns from meta store. code[%d]", code);
228 return PROXIMA_BE_ERROR_CODE(RuntimeError);
229 }
230 return code;
231}
232
233int MetaServiceImpl::load_repositories() {
234 DatabaseRepositoryImplPtr repository{nullptr};
235 // Load columns into cache
236 int code = store_->list_repositories(
237 [this, &repository]() -> DatabaseRepositoryObject * {
238 if (repository) { // Attach to collection
239 cache_->append_repository(repository);
240 }
241 repository = std::make_shared<DatabaseRepositoryImpl>();
242 return repository.get();
243 });
244 if (code == 0 && repository) { // Append last repository to cache
245 cache_->append_repository(repository);
246 }
247 if (code != 0) {
248 LOG_ERROR("Failed to load repository from meta store. code[%d]", code);
249 return PROXIMA_BE_ERROR_CODE(RuntimeError);
250 }
251 return code;
252}
253
254int MetaServiceImpl::load_meta_store() {
255 int code = load_collections();
256 if (code == 0) {
257 code = load_columns();
258 if (code == 0) {
259 code = load_repositories();
260 }
261 }
262 if (code != 0) {
263 cache_->clear();
264 }
265 return code;
266}
267
268int MetaServiceImpl::store_collection(const CollectionImplPtr &collection) {
269 int code = store_->create_collection(*collection);
270 if (code != 0) {
271 return code;
272 }
273
274 if (collection->repository()) {
275 code = store_->create_repository(*collection->repository());
276 if (code != 0) {
277 store_->delete_repositories_by_uuid(collection->uuid());
278 store_->delete_collection_by_uuid(collection->uuid());
279 return code;
280 }
281 }
282
283 for (auto &column : collection->columns()) {
284 code = store_->create_column(*column);
285 if (code != 0) {
286 store_->delete_columns_by_uuid(collection->uuid());
287 store_->delete_repositories_by_uuid(collection->uuid());
288 store_->delete_collection_by_uuid(collection->uuid());
289 break;
290 }
291 }
292 return code;
293}
294
295#define META_WRITE_LOCK_GUARD(MUTEX, GUARD) \
296 ailego::WriteLock write_lock(MUTEX); \
297 std::lock_guard<ailego::WriteLock> GUARD(write_lock)
298
299#define META_READ_LOCK_GUARD(MUTEX, GUARD) \
300 ailego::ReadLock read_lock(MUTEX); \
301 std::lock_guard<ailego::ReadLock> GUARD(read_lock)
302
303
304MetaServiceImpl::MetaServiceImpl(MetaStorePtr store, MetaCachePtr cache)
305 : store_(std::move(store)), cache_(std::move(cache)) {}
306
307MetaServiceImpl::~MetaServiceImpl() = default;
308
309// Interface inherent from Service
310int MetaServiceImpl::init_impl() {
311 META_WRITE_LOCK_GUARD(mutex_, guard);
312 return load_meta_cache();
313}
314
315int MetaServiceImpl::cleanup_impl() {
316 META_WRITE_LOCK_GUARD(mutex_, guard);
317 cleanup_cache();
318 return 0;
319}
320
321int MetaServiceImpl::start_impl() {
322 return 0;
323}
324
325int MetaServiceImpl::stop_impl() {
326 return 0;
327}
328
329int MetaServiceImpl::reload() {
330 LOG_INFO("Reload meta service.");
331 META_WRITE_LOCK_GUARD(mutex_, guard);
332
333 LOG_DEBUG("Cleanup meta cache.");
334 cleanup_cache();
335 LOG_DEBUG("Reload meta cache.");
336 int code = load_meta_cache();
337 if (code == 0) {
338 LOG_INFO("Reload meta service successes.");
339 } else {
340 LOG_INFO("Reload meta service failed. code[%d] what[%s]", code,
341 ErrorCode::What(code));
342 }
343 return code;
344}
345
346int MetaServiceImpl::create_collection(const CollectionBase &param,
347 CollectionMetaPtr *out) {
348 META_WRITE_LOCK_GUARD(mutex_, guard);
349
350 if (cache_->exist_collection(param.name())) {
351 return PROXIMA_BE_ERROR_CODE(DuplicateCollection);
352 }
353
354 CollectionMetaPtr meta = std::make_shared<CollectionMeta>(param);
355 int code = meta->validate();
356 if (code != 0) {
357 LOG_ERROR("Meta was invalid. code[%d] what[%s]", code,
358 ErrorCode::What(code));
359 return code;
360 }
361 CollectionImplPtr collection = std::make_shared<CollectionImpl>(meta);
362 collection->transform();
363
364 code = store_collection(collection);
365 if (code == 0) {
366 cache_->append_collection(collection);
367 } else {
368 LOG_ERROR("Failed to store collection. code[%d]", code);
369 }
370
371 if (code == 0 && out != nullptr) {
372 *out = meta;
373 }
374 return code;
375}
376
377//! Update collection and columns, increase revision and copy a new collection
378int MetaServiceImpl::update_collection(const CollectionBase &param,
379 CollectionMetaPtr *out) {
380 META_WRITE_LOCK_GUARD(mutex_, guard);
381
382 CollectionImplPtr latest = cache_->get_latest_collection(param.name());
383 if (!latest) {
384 return PROXIMA_BE_ERROR_CODE(InexistentCollection);
385 }
386
387 // Create new meta from latest revision
388 CollectionMetaPtr meta = std::make_shared<CollectionMeta>(*latest->meta());
389 int code = meta->merge_update_param(param);
390 if (code != 0) {
391 LOG_ERROR("Readonly field updated. code[%d] what[%s]", code,
392 ErrorCode::What(code));
393 return code;
394 }
395 code = meta->validate();
396 if (code != 0) {
397 LOG_ERROR("Update collection failed. code[%d] what[%s]", code,
398 ErrorCode::What(code));
399 return code;
400 }
401 // Increase revision of collection
402 meta->increase_revision();
403 meta->set_current(false);
404
405 // Copy to Collection Impl
406 CollectionImplPtr next = std::make_shared<CollectionImpl>(meta);
407 next->transform();
408
409 // Store collection and append to cache
410 code = store_collection(next);
411 if (code != 0) {
412 LOG_ERROR("Failed to update collection. code[%d]", code);
413 } else {
414 cache_->append_collection(next);
415 }
416
417 // Write back of results
418 if (code == 0 && out != nullptr) {
419 *out = meta;
420 }
421
422 return code;
423}
424
425//! Enable collection
426int MetaServiceImpl::enable_collection(const std::string &name,
427 uint32_t revision, bool) {
428 META_WRITE_LOCK_GUARD(mutex_, guard);
429
430 int code = PROXIMA_BE_ERROR_CODE(InexistentCollection);
431 CollectionImplPtr current = cache_->get_collection(name);
432 auto &next = inner_get_collection(name, revision);
433 if (!current || !next) {
434 LOG_ERROR("Can't get collection by name or by revision.");
435 } else {
436 if (current != next) { // Not same
437 current->meta()->set_readable(false);
438 current->meta()->set_writable(false);
439 current->meta()->set_current(false);
440 store_->update_collection(*current);
441 }
442
443 next->meta()->set_status(CollectionStatus::SERVING);
444 next->meta()->set_current(true);
445 code = store_->update_collection(*next);
446 }
447 return code;
448}
449
450//! Update the status of current used collection
451int MetaServiceImpl::update_status(const std::string &name,
452 CollectionStatus stat) {
453 META_WRITE_LOCK_GUARD(mutex_, guard);
454
455 return update_current_used_collection(
456 name, [&stat](CollectionImplPtr current) -> int {
457 current->meta()->set_status(stat);
458 return 0;
459 });
460}
461
462//! Suspend reading requests of collection
463int MetaServiceImpl::suspend_collection_read(
464 const std::string &collection_name) {
465 META_WRITE_LOCK_GUARD(mutex_, guard);
466
467 return update_current_used_collection(collection_name,
468 [](CollectionImplPtr current) -> int {
469 current->meta()->set_readable(false);
470 return 0;
471 });
472}
473
474//! Resume reading requests of collection
475int MetaServiceImpl::resume_collection_read(
476 const std::string &collection_name) {
477 META_WRITE_LOCK_GUARD(mutex_, guard);
478
479 return update_current_used_collection(collection_name,
480 [](CollectionImplPtr current) -> int {
481 current->meta()->set_readable(true);
482 return 0;
483 });
484}
485
486//! Suspend writing requests of collection
487int MetaServiceImpl::suspend_collection_write(
488 const std::string &collection_name) {
489 META_WRITE_LOCK_GUARD(mutex_, guard);
490
491 return update_current_used_collection(collection_name,
492 [](CollectionImplPtr current) -> int {
493 current->meta()->set_writable(false);
494 return 0;
495 });
496}
497
498//! Resume writing requests of collection
499int MetaServiceImpl::resume_collection_write(
500 const std::string &collection_name) {
501 META_WRITE_LOCK_GUARD(mutex_, guard);
502
503 return update_current_used_collection(collection_name,
504 [](CollectionImplPtr current) -> int {
505 current->meta()->set_writable(true);
506 return 0;
507 });
508}
509
510//! Drop collection
511int MetaServiceImpl::drop_collection(const std::string &name) {
512 META_WRITE_LOCK_GUARD(mutex_, guard);
513
514 CollectionImplPtr current = cache_->get_collection(name);
515 if (current) {
516 cache_->delete_collection(name);
517 int code = store_->delete_collection(name);
518 if (code == 0) {
519 code = store_->delete_columns_by_uid(current->uid());
520 if (code == 0) {
521 code = store_->delete_repositories_by_uid(current->uid());
522 }
523 }
524 return code;
525 }
526 return 0;
527}
528
529//! Retrieve latest version of collection
530CollectionMetaPtr MetaServiceImpl::get_current_collection(
531 const std::string &name) const {
532 META_READ_LOCK_GUARD(mutex_, guard);
533
534 auto current = cache_->get_collection(name);
535 return current ? current->meta() : nullptr;
536}
537
538//! Retrieve latest version of collections
539int MetaServiceImpl::get_latest_collections(
540 CollectionMetaPtrList *collections) const {
541 if (!collections) {
542 return PROXIMA_BE_ERROR_CODE(InvalidArgument);
543 }
544
545 META_READ_LOCK_GUARD(mutex_, guard);
546
547 cache_->get_collections(collections);
548 return 0;
549}
550
551//! Retrieve all of collections
552int MetaServiceImpl::get_collections(CollectionMetaPtrList *collections) const {
553 META_READ_LOCK_GUARD(mutex_, guard);
554
555 cache_->get_collections(MetaCache::PassAllFilter, collections);
556 return 0;
557}
558
559//! Retrieve collections with specific repository
560int MetaServiceImpl::get_collections_by_repo(
561 const std::string &repo, CollectionMetaPtrList *collections) const {
562 META_READ_LOCK_GUARD(mutex_, guard);
563
564 cache_->get_collections_by_repo(repo, collections);
565 return 0;
566}
567
568//! Retrieve collections with specific collection name
569int MetaServiceImpl::get_collections(const std::string &name,
570 CollectionMetaPtrList *collections) const {
571 META_READ_LOCK_GUARD(mutex_, guard);
572
573 cache_->get_collections(name, collections);
574 return collections->empty() ? PROXIMA_BE_ERROR_CODE(InexistentCollection) : 0;
575}
576
577//! Retrieve collection
578CollectionMetaPtr MetaServiceImpl::get_collection(const std::string &name,
579 uint64_t revision) const {
580 META_READ_LOCK_GUARD(mutex_, guard);
581
582 auto &impl = inner_get_collection(name, revision);
583 return impl ? impl->meta() : nullptr;
584}
585
586//! Check collection exists
587bool MetaServiceImpl::exist_collection(const std::string &collection) const {
588 META_READ_LOCK_GUARD(mutex_, guard);
589
590 return cache_->exist_collection(collection);
591}
592
593const CollectionImplPtr &MetaServiceImpl::inner_get_collection(
594 const std::string &name, uint64_t revision) const {
595 return cache_->get_collection(
596 name, [&revision](const CollectionImplPtr &collection) -> bool {
597 return collection->revision() == revision;
598 });
599}
600
601int MetaServiceImpl::update_current_used_collection(
602 const std::string collection_name,
603 std::function<int(CollectionImplPtr)> handler) {
604 int code = PROXIMA_BE_ERROR_CODE(InexistentCollection);
605 CollectionImplPtr current = cache_->get_collection(collection_name);
606 if (current) {
607 code = handler(current);
608 if (code == 0) {
609 store_->update_collection(*current);
610 } else {
611 LOG_ERROR("Update collection failed. collection[%s]",
612 current->name().c_str());
613 }
614 }
615 return code;
616}
617
618int MetaServiceImpl::load_meta_cache() {
619 // Load all the collections from meta store
620 int code = load_meta_store();
621 if (code != 0) {
622 cleanup_impl();
623 LOG_ERROR("Failed to load collection into cache.");
624 code = PROXIMA_BE_ERROR_CODE(RuntimeError);
625 }
626 return code;
627}
628
629#undef META_WRITE_LOCK_GUARD
630#undef META_READ_LOCK_GUARD
631
632} // namespace meta
633} // namespace be
634} // namespace proxima
635