1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Hechong.xyf |
17 | * \date Oct 2020 |
18 | * \brief Interface of AiLego Utility Thread Queue |
19 | */ |
20 | |
21 | #ifndef __AILEGO_PARALLEL_THREAD_QUEUE_H__ |
22 | #define __AILEGO_PARALLEL_THREAD_QUEUE_H__ |
23 | |
24 | #include <atomic> |
25 | #include <condition_variable> |
26 | #include <mutex> |
27 | #include <queue> |
28 | #include <thread> |
29 | #include <utility> |
30 | #include <vector> |
31 | #include <ailego/hash/jump_hash.h> |
32 | #include <ailego/pattern/closure.h> |
33 | |
34 | namespace ailego { |
35 | |
36 | /*! Thread Queue (One Thread One Queue) |
37 | */ |
38 | class ThreadQueue { |
39 | public: |
40 | /*! Thread Worker (One Thread One Worker) |
41 | */ |
42 | class ThreadWorker { |
43 | public: |
44 | //! Constructor |
45 | ThreadWorker(ThreadQueue *owner) : owner_(owner) {} |
46 | |
47 | //! Destructor |
48 | ~ThreadWorker(void) { |
49 | // Join the current thread |
50 | if (thread_.joinable()) { |
51 | thread_.join(); |
52 | } |
53 | } |
54 | |
55 | //! Push a task to the queue |
56 | template <typename T> |
57 | void enqueue(T &&handle) { |
58 | std::lock_guard<std::mutex> lock(mutex_); |
59 | queue_.emplace(std::forward<T>(handle)); |
60 | } |
61 | |
62 | //! Push a task to the queue |
63 | template <typename T> |
64 | void enqueue_and_wake(T &&handle) { |
65 | std::lock_guard<std::mutex> lock(mutex_); |
66 | queue_.emplace(std::forward<T>(handle)); |
67 | cond_.notify_one(); |
68 | } |
69 | |
70 | //! Execute a function as a task |
71 | template <typename... TArgs> |
72 | void execute(TArgs &&...args) { |
73 | this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...)); |
74 | } |
75 | |
76 | //! Wake the thread |
77 | void wake(void) { |
78 | std::lock_guard<std::mutex> lock(mutex_); |
79 | cond_.notify_one(); |
80 | } |
81 | |
82 | //! Notify thread stopped |
83 | void stop(void) { |
84 | // Set stop flag as ture, then wake the thread |
85 | stopping_ = true; |
86 | std::lock_guard<std::mutex> lock(mutex_); |
87 | cond_.notify_one(); |
88 | } |
89 | |
90 | protected: |
91 | //! Thread worker callback |
92 | void worker(void) { |
93 | owner_->mark_worker_started(); |
94 | |
95 | ClosureHandler task; |
96 | while (this->picking(&task)) { |
97 | // Run the task |
98 | if (task) { |
99 | task->run(); |
100 | task = nullptr; |
101 | } |
102 | } |
103 | owner_->mark_worker_stopped(); |
104 | } |
105 | |
106 | //! Pick a task from queue |
107 | bool picking(ClosureHandler *task) { |
108 | std::unique_lock<std::mutex> latch(mutex_); |
109 | cond_.wait(latch, [this]() { return (queue_.size() > 0 || stopping_); }); |
110 | if (stopping_) { |
111 | return false; |
112 | } |
113 | |
114 | *task = std::move(queue_.front()); |
115 | queue_.pop(); |
116 | return true; |
117 | } |
118 | |
119 | private: |
120 | //! Disable them |
121 | ThreadWorker(void) = delete; |
122 | ThreadWorker(ThreadWorker &&) = delete; |
123 | ThreadWorker(const ThreadWorker &) = delete; |
124 | ThreadWorker &operator=(const ThreadWorker &) = delete; |
125 | |
126 | //! Members |
127 | ThreadQueue *owner_{nullptr}; |
128 | std::queue<ClosureHandler> queue_{}; |
129 | std::atomic_bool stopping_{false}; |
130 | std::mutex mutex_{}; |
131 | std::condition_variable cond_{}; |
132 | std::thread thread_{&ThreadWorker::worker, this}; |
133 | }; |
134 | |
135 | //! Constructor |
136 | ThreadQueue(void) |
137 | : ThreadQueue{std::max(std::thread::hardware_concurrency(), 1u)} {} |
138 | |
139 | //! Constructor |
140 | explicit ThreadQueue(uint32_t size) { |
141 | for (uint32_t i = 0u; i < size; ++i) { |
142 | threads_.emplace_back(new ThreadWorker(this)); |
143 | } |
144 | } |
145 | |
146 | //! Destructor |
147 | ~ThreadQueue(void) { |
148 | this->stop(); |
149 | // Cleanup threads |
150 | for (auto it = threads_.begin(); it != threads_.end(); ++it) { |
151 | delete *it; |
152 | } |
153 | } |
154 | |
155 | //! operator [] |
156 | ThreadWorker &operator[](size_t i) { |
157 | return *(threads_[i]); |
158 | } |
159 | |
160 | //! Stop the thread |
161 | void stop(void) { |
162 | // Stop all workers |
163 | for (auto it = threads_.begin(); it != threads_.end(); ++it) { |
164 | (*it)->stop(); |
165 | } |
166 | } |
167 | |
168 | //! Wake all worker threads |
169 | void wake(void) { |
170 | for (auto it = threads_.begin(); it != threads_.end(); ++it) { |
171 | (*it)->wake(); |
172 | } |
173 | } |
174 | |
175 | //! Wait until all threads stopped processing |
176 | void wait_stop(void) { |
177 | std::unique_lock<std::mutex> lock(wait_mutex_); |
178 | stopped_cond_.wait(lock, [this]() { return this->is_stopped(); }); |
179 | } |
180 | |
181 | //! Check if the pool is stopped |
182 | bool is_stopped(void) const { |
183 | return (worker_count_ == 0); |
184 | } |
185 | |
186 | //! Retrieve count of worker in queue |
187 | size_t worker_count(void) const { |
188 | return worker_count_.load(std::memory_order_relaxed); |
189 | } |
190 | |
191 | //! Retrieve thread count in queue |
192 | size_t count(void) const { |
193 | return threads_.size(); |
194 | } |
195 | |
196 | //! Push a task to the queue |
197 | template <typename T> |
198 | void enqueue(uint64_t key, T &&handle) { |
199 | threads_[JumpHash(key, static_cast<int32_t>(threads_.size()))]->enqueue( |
200 | std::forward<T>(handle)); |
201 | } |
202 | |
203 | //! Push a task to the queue |
204 | template <typename T> |
205 | void enqueue_and_wake(uint64_t key, T &&handle) { |
206 | threads_[JumpHash(key, static_cast<int32_t>(threads_.size()))] |
207 | ->enqueue_and_wake(std::forward<T>(handle)); |
208 | } |
209 | |
210 | //! Execute a function as a task in pool |
211 | template <typename... TArgs> |
212 | void execute(uint64_t key, TArgs &&...args) { |
213 | this->enqueue_and_wake(key, Closure::New(std::forward<TArgs>(args)...)); |
214 | } |
215 | |
216 | protected: |
217 | //! Mark a worker started |
218 | void mark_worker_started(void) { |
219 | ++worker_count_; |
220 | } |
221 | |
222 | //! Mark a worker stopped |
223 | void mark_worker_stopped(void) { |
224 | // Decrease count of workers |
225 | std::lock_guard<std::mutex> lock(wait_mutex_); |
226 | if (--worker_count_ == 0) { |
227 | stopped_cond_.notify_all(); |
228 | } |
229 | } |
230 | |
231 | private: |
232 | //! Disable them |
233 | ThreadQueue(const ThreadQueue &) = delete; |
234 | ThreadQueue(ThreadQueue &&) = delete; |
235 | ThreadQueue &operator=(const ThreadQueue &) = delete; |
236 | |
237 | //! Members |
238 | std::atomic_uint worker_count_{0}; |
239 | std::mutex wait_mutex_{}; |
240 | std::condition_variable stopped_cond_{}; |
241 | std::vector<ThreadWorker *> threads_{}; |
242 | }; |
243 | |
244 | } // namespace ailego |
245 | |
246 | #endif // __AILEGO_PARALLEL_THREAD_QUEUE_H__ |
247 | |