1 | /** |
---|---|
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | * |
16 | * \author guonix |
17 | * \date Nov 2020 |
18 | * \brief |
19 | */ |
20 | |
21 | #include "scheduler.h" |
22 | #include <atomic> |
23 | #include <thread> |
24 | #include "common/error_code.h" |
25 | #include "common/logger.h" |
26 | #include "bthread_queue.h" |
27 | |
28 | namespace proxima { |
29 | namespace be { |
30 | namespace query { |
31 | |
32 | //! Alias for TaskQueueVector |
33 | using TaskQueueVector = std::vector<TaskQueuePtr>; |
34 | |
35 | /*! |
36 | * Selector |
37 | */ |
38 | struct Selector { |
39 | //! Pick a number between [0:limit] |
40 | static uint32_t pick(uint32_t /* limit */) { |
41 | return 0; |
42 | } |
43 | }; |
44 | |
45 | //! Round Robin Selector |
46 | struct RoundRobinSelector { |
47 | //! Pick a number between [0:limit] |
48 | uint32_t pick(uint32_t limit) { |
49 | return count++ % limit; |
50 | } |
51 | |
52 | private: |
53 | std::atomic<uint64_t> count{0}; |
54 | }; |
55 | |
56 | template <typename Selector> |
57 | class SchedulerImpl : public Scheduler { |
58 | public: |
59 | //! Constructor |
60 | SchedulerImpl() : concurrency_(0) {} |
61 | |
62 | //! Destructor |
63 | ~SchedulerImpl() override = default; |
64 | |
65 | public: |
66 | //! Dispatch task to execution queue |
67 | int schedule(TaskPtr task) override { |
68 | if (!queues_.empty()) { |
69 | uint32_t pos = selector_.pick(concurrency_); |
70 | LOG_DEBUG("Selector return[%u], task[%s]", pos, task->name().c_str()); |
71 | return queues_[pos]->put(task); |
72 | } |
73 | return PROXIMA_BE_ERROR_CODE(UnreadyQueue); |
74 | } |
75 | |
76 | //! Retrieve concurrency field |
77 | uint32_t concurrency() const override { |
78 | return concurrency_; |
79 | } |
80 | |
81 | //! Set concurrency field |
82 | uint32_t concurrency(uint32_t concurrent) override { |
83 | recycle_queue(); |
84 | concurrency_ = resize(concurrent); |
85 | return concurrency_; |
86 | } |
87 | |
88 | private: |
89 | //! Resize queue buckets |
90 | uint32_t resize(uint32_t size) { |
91 | while (size--) { |
92 | TaskQueuePtr queue = TaskQueuePtr(new BThreadQueue()); |
93 | if (queue->start() == 0) { |
94 | LOG_DEBUG("Success to start task execution queue"); |
95 | queues_.emplace_back(queue); |
96 | } |
97 | } |
98 | return static_cast<uint32_t>(queues_.size()); |
99 | } |
100 | |
101 | void recycle_queue() { |
102 | for (auto &queue : queues_) { |
103 | queue->stop(); |
104 | queue->join(); |
105 | } |
106 | queues_.clear(); |
107 | } |
108 | |
109 | private: |
110 | //! Concurrency |
111 | uint32_t concurrency_{0}; |
112 | //! Execution queue buckets |
113 | TaskQueueVector queues_{}; |
114 | //! Selector, Load balance role |
115 | Selector selector_{}; |
116 | }; |
117 | |
118 | //! Retrieve default scheduler reference |
119 | SchedulerPtr Scheduler::Default() { |
120 | static SchedulerPtr kScheduler = |
121 | SchedulerPtr(new SchedulerImpl<RoundRobinSelector>()); |
122 | return kScheduler; |
123 | } |
124 | |
125 | //! Retrieve hardware concurrency |
126 | uint32_t Scheduler::HostConcurrency() { |
127 | return std::thread::hardware_concurrency(); |
128 | } |
129 | |
130 | } // namespace query |
131 | } // namespace be |
132 | } // namespace proxima |
133 |