1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author guonix |
17 | * \date Nov 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include "meta_service.h" |
22 | #include <mutex> |
23 | #include <ailego/encoding/uri.h> |
24 | #include <ailego/parallel/lock.h> |
25 | #include "common/error_code.h" |
26 | #include "meta_cache.h" |
27 | #include "meta_impl.h" |
28 | #include "meta_service_builder.h" |
29 | #include "meta_store_factory.h" |
30 | |
31 | namespace proxima { |
32 | namespace be { |
33 | namespace meta { |
34 | |
35 | /*! MetaServiceImpl |
36 | */ |
37 | class MetaServiceImpl : public MetaService { |
38 | public: |
39 | //! Constructor |
40 | //! @param uri: Identifier with database |
41 | explicit MetaServiceImpl(MetaStorePtr store, MetaCachePtr cache); |
42 | |
43 | //! Disable default constructor and copy constructor |
44 | MetaServiceImpl() = delete; |
45 | MetaServiceImpl(const MetaServiceImpl &) = delete; |
46 | |
47 | // Destructor |
48 | ~MetaServiceImpl() override; |
49 | |
50 | public: |
51 | // Interface inherent from Service |
52 | int init_impl() override; |
53 | |
54 | int cleanup_impl() override; |
55 | |
56 | int start_impl() override; |
57 | |
58 | int stop_impl() override; |
59 | |
60 | public: |
61 | //! Reload meta service |
62 | int reload() override; |
63 | |
64 | //! Create collection and columns |
65 | int create_collection(const CollectionBase ¶m, |
66 | CollectionMetaPtr *meta) override; |
67 | |
68 | //! Update collection and columns, increase revision and copy a new collection |
69 | int update_collection(const CollectionBase ¶m, |
70 | CollectionMetaPtr *meta) override; |
71 | |
72 | //! Enable collection |
73 | int enable_collection(const std::string &collection, uint32_t revision, |
74 | bool enable) override; |
75 | |
76 | //! Update the status of current used collection |
77 | int update_status(const std::string &collection_name, |
78 | CollectionStatus status) override; |
79 | |
80 | //! Suspend reading requests of collection |
81 | int suspend_collection_read(const std::string &collection_name) override; |
82 | |
83 | //! Resume reading requests of collection |
84 | int resume_collection_read(const std::string &collection_name) override; |
85 | |
86 | //! Suspend writing requests of collection |
87 | int suspend_collection_write(const std::string &collection_name) override; |
88 | |
89 | //! Resume writing requests of collection |
90 | int resume_collection_write(const std::string &collection_name) override; |
91 | |
92 | //! Drop collection |
93 | int drop_collection(const std::string &name) override; |
94 | |
95 | //! Retrieve latest version of collection |
96 | CollectionMetaPtr get_current_collection( |
97 | const std::string &name) const override; |
98 | |
99 | //! Retrieve latest version of collections |
100 | int get_latest_collections(CollectionMetaPtrList *collections) const override; |
101 | |
102 | //! Retrieve all of collections |
103 | int get_collections(CollectionMetaPtrList *collections) const override; |
104 | |
105 | //! Retrieve collections with specific repository |
106 | int get_collections_by_repo( |
107 | const std::string &repository, |
108 | CollectionMetaPtrList *collections) const override; |
109 | |
110 | //! Retrieve collections with specific collection name |
111 | int get_collections(const std::string &name, |
112 | CollectionMetaPtrList *collections) const override; |
113 | |
114 | //! Retrieve collection |
115 | CollectionMetaPtr get_collection(const std::string &name, |
116 | uint64_t revision) const override; |
117 | |
118 | //! Check collection exists |
119 | bool exist_collection(const std::string &collection) const override; |
120 | |
121 | private: |
122 | //! Retrieve collection |
123 | const CollectionImplPtr &inner_get_collection(const std::string &name, |
124 | uint64_t revision) const; |
125 | |
126 | //! Update current collection |
127 | int update_current_used_collection( |
128 | const std::string collection_name, |
129 | std::function<int(CollectionImplPtr)> handler); |
130 | |
131 | //! Cleanup cache without lock |
132 | inline void cleanup_cache() { |
133 | cache_->clear(); |
134 | } |
135 | |
136 | //! Load meta store to cache with out lock |
137 | int load_meta_cache(); |
138 | |
139 | //! load collections from meta store |
140 | int load_collections(); |
141 | |
142 | //! load columns from meta store |
143 | int load_columns(); |
144 | |
145 | //! load repositories from meta store |
146 | int load_repositories(); |
147 | |
148 | //! load meta from meta store |
149 | int load_meta_store(); |
150 | |
151 | //! store collection fo meta store |
152 | int store_collection(const CollectionImplPtr &collection); |
153 | |
154 | private: |
155 | //! Mutex for cache access |
156 | mutable ailego::SharedMutex mutex_{}; |
157 | |
158 | //! Meta Store |
159 | MetaStorePtr store_{nullptr}; |
160 | |
161 | //! Meta Cache |
162 | MetaCachePtr cache_{nullptr}; |
163 | }; // end of MetaServiceImpl |
164 | |
165 | |
166 | MetaServicePtr MetaServiceBuilder::Create(const std::string &uri_str) { |
167 | ailego::Uri uri(uri_str); |
168 | if (!uri.is_valid()) { |
169 | LOG_ERROR("Failed to parse uri, initialize MetaServiceImpl failed. uri[%s]" , |
170 | uri_str.c_str()); |
171 | return nullptr; |
172 | } |
173 | |
174 | MetaStorePtr store = |
175 | MetaStoreFactory::Instance().create(uri.scheme().c_str(), &uri); |
176 | if (!store) { |
177 | LOG_ERROR( |
178 | "MetaServiceImpl init failed. reason[can't get meta store instance]" ); |
179 | return nullptr; |
180 | } |
181 | |
182 | return std::make_shared<MetaServiceImpl>(store, |
183 | std::make_shared<MetaCache>()); |
184 | } |
185 | |
186 | int MetaServiceImpl::load_collections() { |
187 | CollectionImplPtr collection{nullptr}; |
188 | // Load collection to cache |
189 | int code = |
190 | store_->list_collections([this, &collection]() -> CollectionObject * { |
191 | if (collection) { // Append collection to cache |
192 | collection->transform(); |
193 | cache_->append_collection(collection); |
194 | collection.reset(); |
195 | } |
196 | collection = std::make_shared<CollectionImpl>(); |
197 | return collection.get(); |
198 | }); |
199 | if (code == 0 && collection) { // Append last collection |
200 | collection->transform(); |
201 | cache_->append_collection(collection); |
202 | } |
203 | if (code != 0) { // Failed |
204 | LOG_ERROR("Failed to load collection from meta store. code[%d]" , code); |
205 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
206 | } |
207 | return code; |
208 | } |
209 | |
210 | int MetaServiceImpl::load_columns() { |
211 | ColumnImplPtr column{nullptr}; |
212 | // Load columns into cache |
213 | int code = store_->list_columns([this, &column]() -> ColumnObject * { |
214 | if (column) { // Append column to cache |
215 | column->transform(); |
216 | cache_->append_column(column); |
217 | column.reset(); |
218 | } |
219 | column = std::make_shared<ColumnImpl>(); |
220 | return column.get(); |
221 | }); |
222 | if (code == 0 && column) { // Append last column to cache |
223 | column->transform(); |
224 | cache_->append_column(column); |
225 | } |
226 | if (code != 0) { |
227 | LOG_ERROR("Failed to load columns from meta store. code[%d]" , code); |
228 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
229 | } |
230 | return code; |
231 | } |
232 | |
233 | int MetaServiceImpl::load_repositories() { |
234 | DatabaseRepositoryImplPtr repository{nullptr}; |
235 | // Load columns into cache |
236 | int code = store_->list_repositories( |
237 | [this, &repository]() -> DatabaseRepositoryObject * { |
238 | if (repository) { // Attach to collection |
239 | cache_->append_repository(repository); |
240 | } |
241 | repository = std::make_shared<DatabaseRepositoryImpl>(); |
242 | return repository.get(); |
243 | }); |
244 | if (code == 0 && repository) { // Append last repository to cache |
245 | cache_->append_repository(repository); |
246 | } |
247 | if (code != 0) { |
248 | LOG_ERROR("Failed to load repository from meta store. code[%d]" , code); |
249 | return PROXIMA_BE_ERROR_CODE(RuntimeError); |
250 | } |
251 | return code; |
252 | } |
253 | |
254 | int MetaServiceImpl::load_meta_store() { |
255 | int code = load_collections(); |
256 | if (code == 0) { |
257 | code = load_columns(); |
258 | if (code == 0) { |
259 | code = load_repositories(); |
260 | } |
261 | } |
262 | if (code != 0) { |
263 | cache_->clear(); |
264 | } |
265 | return code; |
266 | } |
267 | |
268 | int MetaServiceImpl::store_collection(const CollectionImplPtr &collection) { |
269 | int code = store_->create_collection(*collection); |
270 | if (code != 0) { |
271 | return code; |
272 | } |
273 | |
274 | if (collection->repository()) { |
275 | code = store_->create_repository(*collection->repository()); |
276 | if (code != 0) { |
277 | store_->delete_repositories_by_uuid(collection->uuid()); |
278 | store_->delete_collection_by_uuid(collection->uuid()); |
279 | return code; |
280 | } |
281 | } |
282 | |
283 | for (auto &column : collection->columns()) { |
284 | code = store_->create_column(*column); |
285 | if (code != 0) { |
286 | store_->delete_columns_by_uuid(collection->uuid()); |
287 | store_->delete_repositories_by_uuid(collection->uuid()); |
288 | store_->delete_collection_by_uuid(collection->uuid()); |
289 | break; |
290 | } |
291 | } |
292 | return code; |
293 | } |
294 | |
295 | #define META_WRITE_LOCK_GUARD(MUTEX, GUARD) \ |
296 | ailego::WriteLock write_lock(MUTEX); \ |
297 | std::lock_guard<ailego::WriteLock> GUARD(write_lock) |
298 | |
299 | #define META_READ_LOCK_GUARD(MUTEX, GUARD) \ |
300 | ailego::ReadLock read_lock(MUTEX); \ |
301 | std::lock_guard<ailego::ReadLock> GUARD(read_lock) |
302 | |
303 | |
304 | MetaServiceImpl::MetaServiceImpl(MetaStorePtr store, MetaCachePtr cache) |
305 | : store_(std::move(store)), cache_(std::move(cache)) {} |
306 | |
307 | MetaServiceImpl::~MetaServiceImpl() = default; |
308 | |
309 | // Interface inherent from Service |
310 | int MetaServiceImpl::init_impl() { |
311 | META_WRITE_LOCK_GUARD(mutex_, guard); |
312 | return load_meta_cache(); |
313 | } |
314 | |
315 | int MetaServiceImpl::cleanup_impl() { |
316 | META_WRITE_LOCK_GUARD(mutex_, guard); |
317 | cleanup_cache(); |
318 | return 0; |
319 | } |
320 | |
321 | int MetaServiceImpl::start_impl() { |
322 | return 0; |
323 | } |
324 | |
325 | int MetaServiceImpl::stop_impl() { |
326 | return 0; |
327 | } |
328 | |
329 | int MetaServiceImpl::reload() { |
330 | LOG_INFO("Reload meta service." ); |
331 | META_WRITE_LOCK_GUARD(mutex_, guard); |
332 | |
333 | LOG_DEBUG("Cleanup meta cache." ); |
334 | cleanup_cache(); |
335 | LOG_DEBUG("Reload meta cache." ); |
336 | int code = load_meta_cache(); |
337 | if (code == 0) { |
338 | LOG_INFO("Reload meta service successes." ); |
339 | } else { |
340 | LOG_INFO("Reload meta service failed. code[%d] what[%s]" , code, |
341 | ErrorCode::What(code)); |
342 | } |
343 | return code; |
344 | } |
345 | |
346 | int MetaServiceImpl::create_collection(const CollectionBase ¶m, |
347 | CollectionMetaPtr *out) { |
348 | META_WRITE_LOCK_GUARD(mutex_, guard); |
349 | |
350 | if (cache_->exist_collection(param.name())) { |
351 | return PROXIMA_BE_ERROR_CODE(DuplicateCollection); |
352 | } |
353 | |
354 | CollectionMetaPtr meta = std::make_shared<CollectionMeta>(param); |
355 | int code = meta->validate(); |
356 | if (code != 0) { |
357 | LOG_ERROR("Meta was invalid. code[%d] what[%s]" , code, |
358 | ErrorCode::What(code)); |
359 | return code; |
360 | } |
361 | CollectionImplPtr collection = std::make_shared<CollectionImpl>(meta); |
362 | collection->transform(); |
363 | |
364 | code = store_collection(collection); |
365 | if (code == 0) { |
366 | cache_->append_collection(collection); |
367 | } else { |
368 | LOG_ERROR("Failed to store collection. code[%d]" , code); |
369 | } |
370 | |
371 | if (code == 0 && out != nullptr) { |
372 | *out = meta; |
373 | } |
374 | return code; |
375 | } |
376 | |
377 | //! Update collection and columns, increase revision and copy a new collection |
378 | int MetaServiceImpl::update_collection(const CollectionBase ¶m, |
379 | CollectionMetaPtr *out) { |
380 | META_WRITE_LOCK_GUARD(mutex_, guard); |
381 | |
382 | CollectionImplPtr latest = cache_->get_latest_collection(param.name()); |
383 | if (!latest) { |
384 | return PROXIMA_BE_ERROR_CODE(InexistentCollection); |
385 | } |
386 | |
387 | // Create new meta from latest revision |
388 | CollectionMetaPtr meta = std::make_shared<CollectionMeta>(*latest->meta()); |
389 | int code = meta->merge_update_param(param); |
390 | if (code != 0) { |
391 | LOG_ERROR("Readonly field updated. code[%d] what[%s]" , code, |
392 | ErrorCode::What(code)); |
393 | return code; |
394 | } |
395 | code = meta->validate(); |
396 | if (code != 0) { |
397 | LOG_ERROR("Update collection failed. code[%d] what[%s]" , code, |
398 | ErrorCode::What(code)); |
399 | return code; |
400 | } |
401 | // Increase revision of collection |
402 | meta->increase_revision(); |
403 | meta->set_current(false); |
404 | |
405 | // Copy to Collection Impl |
406 | CollectionImplPtr next = std::make_shared<CollectionImpl>(meta); |
407 | next->transform(); |
408 | |
409 | // Store collection and append to cache |
410 | code = store_collection(next); |
411 | if (code != 0) { |
412 | LOG_ERROR("Failed to update collection. code[%d]" , code); |
413 | } else { |
414 | cache_->append_collection(next); |
415 | } |
416 | |
417 | // Write back of results |
418 | if (code == 0 && out != nullptr) { |
419 | *out = meta; |
420 | } |
421 | |
422 | return code; |
423 | } |
424 | |
425 | //! Enable collection |
426 | int MetaServiceImpl::enable_collection(const std::string &name, |
427 | uint32_t revision, bool) { |
428 | META_WRITE_LOCK_GUARD(mutex_, guard); |
429 | |
430 | int code = PROXIMA_BE_ERROR_CODE(InexistentCollection); |
431 | CollectionImplPtr current = cache_->get_collection(name); |
432 | auto &next = inner_get_collection(name, revision); |
433 | if (!current || !next) { |
434 | LOG_ERROR("Can't get collection by name or by revision." ); |
435 | } else { |
436 | if (current != next) { // Not same |
437 | current->meta()->set_readable(false); |
438 | current->meta()->set_writable(false); |
439 | current->meta()->set_current(false); |
440 | store_->update_collection(*current); |
441 | } |
442 | |
443 | next->meta()->set_status(CollectionStatus::SERVING); |
444 | next->meta()->set_current(true); |
445 | code = store_->update_collection(*next); |
446 | } |
447 | return code; |
448 | } |
449 | |
450 | //! Update the status of current used collection |
451 | int MetaServiceImpl::update_status(const std::string &name, |
452 | CollectionStatus stat) { |
453 | META_WRITE_LOCK_GUARD(mutex_, guard); |
454 | |
455 | return update_current_used_collection( |
456 | name, [&stat](CollectionImplPtr current) -> int { |
457 | current->meta()->set_status(stat); |
458 | return 0; |
459 | }); |
460 | } |
461 | |
462 | //! Suspend reading requests of collection |
463 | int MetaServiceImpl::suspend_collection_read( |
464 | const std::string &collection_name) { |
465 | META_WRITE_LOCK_GUARD(mutex_, guard); |
466 | |
467 | return update_current_used_collection(collection_name, |
468 | [](CollectionImplPtr current) -> int { |
469 | current->meta()->set_readable(false); |
470 | return 0; |
471 | }); |
472 | } |
473 | |
474 | //! Resume reading requests of collection |
475 | int MetaServiceImpl::resume_collection_read( |
476 | const std::string &collection_name) { |
477 | META_WRITE_LOCK_GUARD(mutex_, guard); |
478 | |
479 | return update_current_used_collection(collection_name, |
480 | [](CollectionImplPtr current) -> int { |
481 | current->meta()->set_readable(true); |
482 | return 0; |
483 | }); |
484 | } |
485 | |
486 | //! Suspend writing requests of collection |
487 | int MetaServiceImpl::suspend_collection_write( |
488 | const std::string &collection_name) { |
489 | META_WRITE_LOCK_GUARD(mutex_, guard); |
490 | |
491 | return update_current_used_collection(collection_name, |
492 | [](CollectionImplPtr current) -> int { |
493 | current->meta()->set_writable(false); |
494 | return 0; |
495 | }); |
496 | } |
497 | |
498 | //! Resume writing requests of collection |
499 | int MetaServiceImpl::resume_collection_write( |
500 | const std::string &collection_name) { |
501 | META_WRITE_LOCK_GUARD(mutex_, guard); |
502 | |
503 | return update_current_used_collection(collection_name, |
504 | [](CollectionImplPtr current) -> int { |
505 | current->meta()->set_writable(true); |
506 | return 0; |
507 | }); |
508 | } |
509 | |
510 | //! Drop collection |
511 | int MetaServiceImpl::drop_collection(const std::string &name) { |
512 | META_WRITE_LOCK_GUARD(mutex_, guard); |
513 | |
514 | CollectionImplPtr current = cache_->get_collection(name); |
515 | if (current) { |
516 | cache_->delete_collection(name); |
517 | int code = store_->delete_collection(name); |
518 | if (code == 0) { |
519 | code = store_->delete_columns_by_uid(current->uid()); |
520 | if (code == 0) { |
521 | code = store_->delete_repositories_by_uid(current->uid()); |
522 | } |
523 | } |
524 | return code; |
525 | } |
526 | return 0; |
527 | } |
528 | |
529 | //! Retrieve latest version of collection |
530 | CollectionMetaPtr MetaServiceImpl::get_current_collection( |
531 | const std::string &name) const { |
532 | META_READ_LOCK_GUARD(mutex_, guard); |
533 | |
534 | auto current = cache_->get_collection(name); |
535 | return current ? current->meta() : nullptr; |
536 | } |
537 | |
538 | //! Retrieve latest version of collections |
539 | int MetaServiceImpl::get_latest_collections( |
540 | CollectionMetaPtrList *collections) const { |
541 | if (!collections) { |
542 | return PROXIMA_BE_ERROR_CODE(InvalidArgument); |
543 | } |
544 | |
545 | META_READ_LOCK_GUARD(mutex_, guard); |
546 | |
547 | cache_->get_collections(collections); |
548 | return 0; |
549 | } |
550 | |
551 | //! Retrieve all of collections |
552 | int MetaServiceImpl::get_collections(CollectionMetaPtrList *collections) const { |
553 | META_READ_LOCK_GUARD(mutex_, guard); |
554 | |
555 | cache_->get_collections(MetaCache::PassAllFilter, collections); |
556 | return 0; |
557 | } |
558 | |
559 | //! Retrieve collections with specific repository |
560 | int MetaServiceImpl::get_collections_by_repo( |
561 | const std::string &repo, CollectionMetaPtrList *collections) const { |
562 | META_READ_LOCK_GUARD(mutex_, guard); |
563 | |
564 | cache_->get_collections_by_repo(repo, collections); |
565 | return 0; |
566 | } |
567 | |
568 | //! Retrieve collections with specific collection name |
569 | int MetaServiceImpl::get_collections(const std::string &name, |
570 | CollectionMetaPtrList *collections) const { |
571 | META_READ_LOCK_GUARD(mutex_, guard); |
572 | |
573 | cache_->get_collections(name, collections); |
574 | return collections->empty() ? PROXIMA_BE_ERROR_CODE(InexistentCollection) : 0; |
575 | } |
576 | |
577 | //! Retrieve collection |
578 | CollectionMetaPtr MetaServiceImpl::get_collection(const std::string &name, |
579 | uint64_t revision) const { |
580 | META_READ_LOCK_GUARD(mutex_, guard); |
581 | |
582 | auto &impl = inner_get_collection(name, revision); |
583 | return impl ? impl->meta() : nullptr; |
584 | } |
585 | |
586 | //! Check collection exists |
587 | bool MetaServiceImpl::exist_collection(const std::string &collection) const { |
588 | META_READ_LOCK_GUARD(mutex_, guard); |
589 | |
590 | return cache_->exist_collection(collection); |
591 | } |
592 | |
593 | const CollectionImplPtr &MetaServiceImpl::inner_get_collection( |
594 | const std::string &name, uint64_t revision) const { |
595 | return cache_->get_collection( |
596 | name, [&revision](const CollectionImplPtr &collection) -> bool { |
597 | return collection->revision() == revision; |
598 | }); |
599 | } |
600 | |
601 | int MetaServiceImpl::update_current_used_collection( |
602 | const std::string collection_name, |
603 | std::function<int(CollectionImplPtr)> handler) { |
604 | int code = PROXIMA_BE_ERROR_CODE(InexistentCollection); |
605 | CollectionImplPtr current = cache_->get_collection(collection_name); |
606 | if (current) { |
607 | code = handler(current); |
608 | if (code == 0) { |
609 | store_->update_collection(*current); |
610 | } else { |
611 | LOG_ERROR("Update collection failed. collection[%s]" , |
612 | current->name().c_str()); |
613 | } |
614 | } |
615 | return code; |
616 | } |
617 | |
618 | int MetaServiceImpl::load_meta_cache() { |
619 | // Load all the collections from meta store |
620 | int code = load_meta_store(); |
621 | if (code != 0) { |
622 | cleanup_impl(); |
623 | LOG_ERROR("Failed to load collection into cache." ); |
624 | code = PROXIMA_BE_ERROR_CODE(RuntimeError); |
625 | } |
626 | return code; |
627 | } |
628 | |
629 | #undef META_WRITE_LOCK_GUARD |
630 | #undef META_READ_LOCK_GUARD |
631 | |
632 | } // namespace meta |
633 | } // namespace be |
634 | } // namespace proxima |
635 | |