1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Hechong.xyf |
17 | * \date Jan 2018 |
18 | * \brief Interface of AiLego Utility Thread Pool |
19 | */ |
20 | |
21 | #ifndef __AILEGO_PARALLEL_THREAD_POOL_H__ |
22 | #define __AILEGO_PARALLEL_THREAD_POOL_H__ |
23 | |
24 | #include <atomic> |
25 | #include <condition_variable> |
26 | #include <mutex> |
27 | #include <queue> |
28 | #include <thread> |
29 | #include <utility> |
30 | #include <vector> |
31 | #include <ailego/pattern/closure.h> |
32 | |
33 | namespace ailego { |
34 | |
35 | /*! Thread Pool |
36 | */ |
37 | class ThreadPool { |
38 | public: |
39 | /*! Thread Pool Task Group |
40 | */ |
41 | class TaskGroup : public std::enable_shared_from_this<TaskGroup> { |
42 | public: |
43 | using Pointer = std::shared_ptr<TaskGroup>; |
44 | |
45 | //! Constructor |
46 | TaskGroup(ThreadPool *pool) : pool_(pool) {} |
47 | |
48 | //! Push a task to the queue |
49 | void enqueue(const ClosureHandler &handle) { |
50 | pool_->enqueue(handle, this->shared_from_this(), nullptr); |
51 | } |
52 | |
53 | //! Push a task to the queue |
54 | void enqueue(ClosureHandler &&handle) { |
55 | pool_->enqueue(std::move(handle), this->shared_from_this(), nullptr); |
56 | } |
57 | |
58 | //! Submit a task to the queue |
59 | void submit(ClosureHandler &&handle) { |
60 | return enqueue_and_wake(std::move(handle)); |
61 | } |
62 | |
63 | //! Push a task to the queue |
64 | void enqueue_and_wake(const ClosureHandler &handle) { |
65 | pool_->enqueue_and_wake(handle, this->shared_from_this(), nullptr); |
66 | } |
67 | |
68 | //! Push a task to the queue |
69 | void enqueue_and_wake(ClosureHandler &&handle) { |
70 | pool_->enqueue_and_wake(std::move(handle), this->shared_from_this(), |
71 | nullptr); |
72 | } |
73 | |
74 | //! Execute a function as a task in pool |
75 | template <typename... TArgs> |
76 | void execute_and_wait(TArgs &&... args) { |
77 | ThreadPool::TaskControl ctrl; |
78 | pool_->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...), |
79 | this->shared_from_this(), &ctrl); |
80 | ctrl.wait(); |
81 | } |
82 | |
83 | //! Execute a function as a task in pool |
84 | template <typename... TArgs> |
85 | void execute(TArgs &&... args) { |
86 | this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...)); |
87 | } |
88 | |
89 | //! Wait until all tasks in group finished |
90 | void wait_finish(void) { |
91 | std::unique_lock<std::mutex> lock(mutex_); |
92 | cond_.wait(lock, [this]() { return this->is_finished(); }); |
93 | } |
94 | |
95 | //! Check if the group is finished |
96 | bool is_finished(void) const { |
97 | return (active_count_ == 0 && pending_count_ == 0); |
98 | } |
99 | |
100 | //! Retrieve count of pending tasks in group |
101 | size_t pending_count(void) const { |
102 | return pending_count_.load(std::memory_order_relaxed); |
103 | } |
104 | |
105 | //! Retrieve count of active tasks in group |
106 | size_t active_count(void) const { |
107 | return active_count_.load(std::memory_order_relaxed); |
108 | } |
109 | |
110 | protected: |
111 | friend class ThreadPool; |
112 | |
113 | //! Mark a task enqueued |
114 | void mark_task_enqueued(void) { |
115 | ++pending_count_; |
116 | } |
117 | |
118 | //! Mark a task actived |
119 | void mark_task_actived(void) { |
120 | std::lock_guard<std::mutex> lock(mutex_); |
121 | ++active_count_; |
122 | --pending_count_; |
123 | } |
124 | |
125 | //! Notify a task finished |
126 | void notify(void) { |
127 | std::lock_guard<std::mutex> lock(mutex_); |
128 | if (--active_count_ == 0 && pending_count_ == 0) { |
129 | cond_.notify_all(); |
130 | } |
131 | } |
132 | |
133 | private: |
134 | //! Members |
135 | ThreadPool *pool_{nullptr}; |
136 | std::atomic_uint active_count_{0}; |
137 | std::atomic_uint pending_count_{0}; |
138 | std::mutex mutex_{}; |
139 | std::condition_variable cond_{}; |
140 | }; |
141 | |
142 | //! Constructor |
143 | explicit ThreadPool(uint32_t size, bool binding); |
144 | |
145 | //! Constructor |
146 | explicit ThreadPool(bool binding) |
147 | : ThreadPool{std::max(std::thread::hardware_concurrency(), 1u), binding} { |
148 | } |
149 | |
150 | //! Constructor |
151 | ThreadPool(void) : ThreadPool{false} {} |
152 | |
153 | //! Destructor |
154 | ~ThreadPool(void) { |
155 | this->stop(); |
156 | |
157 | // Join all threads |
158 | for (auto it = pool_.begin(); it != pool_.end(); ++it) { |
159 | if (it->joinable()) { |
160 | it->join(); |
161 | } |
162 | } |
163 | } |
164 | |
165 | //! Retrieve thread count in pool |
166 | size_t count(void) const { |
167 | return pool_.size(); |
168 | } |
169 | |
170 | //! Stop all threads |
171 | void stop(void) { |
172 | // Set stop flag as ture, then wake all threads |
173 | stopping_ = true; |
174 | std::lock_guard<std::mutex> lock(queue_mutex_); |
175 | work_cond_.notify_all(); |
176 | } |
177 | |
178 | //! Push a task to the queue |
179 | void enqueue(const ClosureHandler &handle) { |
180 | this->enqueue(handle, nullptr); |
181 | } |
182 | |
183 | //! Push a task to the queue |
184 | void enqueue(ClosureHandler &&handle) { |
185 | this->enqueue(std::move(handle), nullptr); |
186 | } |
187 | |
188 | //! Push a task to the queue |
189 | void enqueue_and_wake(const ClosureHandler &handle) { |
190 | this->enqueue_and_wake(handle, nullptr); |
191 | } |
192 | |
193 | //! Push a task to the queue |
194 | void enqueue_and_wake(ClosureHandler &&handle) { |
195 | this->enqueue_and_wake(std::move(handle), nullptr); |
196 | } |
197 | |
198 | //! Execute a function as a task in pool |
199 | template <typename... TArgs> |
200 | void execute_and_wait(TArgs &&... args) { |
201 | ThreadPool::TaskControl ctrl; |
202 | this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...), &ctrl); |
203 | ctrl.wait(); |
204 | } |
205 | |
206 | //! Execute a function as a task in pool |
207 | template <typename... TArgs> |
208 | void execute(TArgs &&... args) { |
209 | this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...)); |
210 | } |
211 | |
212 | //! Wake any one thread |
213 | void wake_any(void) { |
214 | std::lock_guard<std::mutex> lock(queue_mutex_); |
215 | work_cond_.notify_one(); |
216 | } |
217 | |
218 | //! Wake all threads |
219 | void wake_all(void) { |
220 | std::lock_guard<std::mutex> lock(queue_mutex_); |
221 | work_cond_.notify_all(); |
222 | } |
223 | |
224 | //! Wait until all threads finished processing |
225 | void wait_finish(void) { |
226 | std::unique_lock<std::mutex> lock(wait_mutex_); |
227 | finished_cond_.wait(lock, [this]() { return this->is_finished(); }); |
228 | } |
229 | |
230 | //! Wait until all threads stopped processing |
231 | void wait_stop(void) { |
232 | std::unique_lock<std::mutex> lock(wait_mutex_); |
233 | stopped_cond_.wait(lock, [this]() { return this->is_stopped(); }); |
234 | } |
235 | |
236 | //! Make a task group |
237 | TaskGroup::Pointer make_group(void) { |
238 | return std::make_shared<TaskGroup>(this); |
239 | } |
240 | |
241 | //! Check if the pool is finished |
242 | bool is_finished(void) const { |
243 | return (active_count_ == 0 && pending_count_ == 0); |
244 | } |
245 | |
246 | //! Check if the pool is stopped |
247 | bool is_stopped(void) const { |
248 | return (worker_count_ == 0); |
249 | } |
250 | |
251 | //! Retrieve count of worker in pool |
252 | size_t worker_count(void) const { |
253 | return worker_count_.load(std::memory_order_relaxed); |
254 | } |
255 | |
256 | //! Retrieve count of pending tasks in pool |
257 | size_t pending_count(void) const { |
258 | return pending_count_.load(std::memory_order_relaxed); |
259 | } |
260 | |
261 | //! Retrieve count of active tasks in pool |
262 | size_t active_count(void) const { |
263 | return active_count_.load(std::memory_order_relaxed); |
264 | } |
265 | |
266 | //! Get the thread index via thread id |
267 | int indexof(const std::thread::id &thread_id) const { |
268 | for (size_t i = 0; i < pool_.size(); ++i) { |
269 | if (pool_[i].get_id() == thread_id) { |
270 | return static_cast<int>(i); |
271 | } |
272 | } |
273 | return -1; |
274 | } |
275 | |
276 | //! Get the current work thread index |
277 | int indexof_this(void) const { |
278 | return this->indexof(std::this_thread::get_id()); |
279 | } |
280 | |
281 | //! Bind threads to processors |
282 | void bind(void); |
283 | |
284 | //! Unbind threads of processors |
285 | void unbind(void); |
286 | |
287 | protected: |
288 | //! Thread task control |
289 | class TaskControl { |
290 | public: |
291 | //! Notify task finished |
292 | void notify(void) { |
293 | finished_ = true; |
294 | std::lock_guard<std::mutex> lock(mutex_); |
295 | cond_.notify_one(); |
296 | } |
297 | |
298 | //! Wait until task finished |
299 | void wait(void) { |
300 | std::unique_lock<std::mutex> lock(mutex_); |
301 | cond_.wait(lock, [this]() { return finished_.load(); }); |
302 | } |
303 | |
304 | private: |
305 | std::atomic_bool finished_{false}; |
306 | std::mutex mutex_{}; |
307 | std::condition_variable cond_{}; |
308 | }; |
309 | |
310 | //! Thread task |
311 | struct Task { |
312 | // Constructor |
313 | Task(const ClosureHandler &h, TaskControl *c) : handle(h), control(c) {} |
314 | |
315 | // Constructor |
316 | Task(ClosureHandler &&h, TaskControl *c) |
317 | : handle(std::move(h)), control(c) {} |
318 | |
319 | // Constructor |
320 | Task(const ClosureHandler &h, TaskGroup::Pointer &&g, TaskControl *c) |
321 | : handle(h), group(std::move(g)), control(c) {} |
322 | |
323 | // Constructor |
324 | Task(ClosureHandler &&h, TaskGroup::Pointer &&g, TaskControl *c) |
325 | : handle(std::move(h)), group(std::move(g)), control(c) {} |
326 | |
327 | // Constructor |
328 | Task(void) {} |
329 | |
330 | //! Members |
331 | ClosureHandler handle{}; |
332 | TaskGroup::Pointer group{nullptr}; |
333 | TaskControl *control{nullptr}; |
334 | }; |
335 | |
336 | //! Thread worker callback |
337 | void worker(void); |
338 | |
339 | //! Pick a task from queue |
340 | bool picking(Task *task); |
341 | |
342 | //! Push a task to the queue |
343 | template <typename T> |
344 | void enqueue(T &&handle, TaskControl *ctrl) { |
345 | if (handle) { |
346 | std::lock_guard<std::mutex> lock(queue_mutex_); |
347 | ++pending_count_; |
348 | queue_.emplace(std::forward<T>(handle), ctrl); |
349 | } |
350 | } |
351 | |
352 | //! Push a task to the queue with group |
353 | template <typename T> |
354 | void enqueue(T &&handle, TaskGroup::Pointer &&group, TaskControl *ctrl) { |
355 | if (handle) { |
356 | std::lock_guard<std::mutex> lock(queue_mutex_); |
357 | ++pending_count_; |
358 | group->mark_task_enqueued(); |
359 | queue_.emplace(std::forward<T>(handle), std::move(group), ctrl); |
360 | } |
361 | } |
362 | |
363 | //! Push a task to the queue |
364 | template <typename T> |
365 | void enqueue_and_wake(T &&handle, TaskControl *ctrl) { |
366 | if (handle) { |
367 | std::lock_guard<std::mutex> lock(queue_mutex_); |
368 | ++pending_count_; |
369 | queue_.emplace(std::forward<T>(handle), ctrl); |
370 | work_cond_.notify_one(); |
371 | } |
372 | } |
373 | |
374 | //! Push a task to the queue with group |
375 | template <typename T> |
376 | void enqueue_and_wake(T &&handle, TaskGroup::Pointer &&group, |
377 | TaskControl *ctrl) { |
378 | if (handle) { |
379 | std::lock_guard<std::mutex> lock(queue_mutex_); |
380 | ++pending_count_; |
381 | group->mark_task_enqueued(); |
382 | queue_.emplace(std::forward<T>(handle), std::move(group), ctrl); |
383 | work_cond_.notify_one(); |
384 | } |
385 | } |
386 | |
387 | private: |
388 | //! Disable them |
389 | ThreadPool(const ThreadPool &) = delete; |
390 | ThreadPool(ThreadPool &&) = delete; |
391 | ThreadPool &operator=(const ThreadPool &) = delete; |
392 | |
393 | //! Members |
394 | std::queue<Task> queue_{}; |
395 | std::atomic_bool stopping_{false}; |
396 | std::atomic_uint worker_count_{0}; |
397 | std::atomic_uint active_count_{0}; |
398 | std::atomic_uint pending_count_{0}; |
399 | std::mutex queue_mutex_{}; |
400 | std::mutex wait_mutex_{}; |
401 | std::condition_variable work_cond_{}; |
402 | std::condition_variable finished_cond_{}; |
403 | std::condition_variable stopped_cond_{}; |
404 | std::vector<std::thread> pool_{}; |
405 | }; |
406 | |
407 | } // namespace ailego |
408 | |
409 | #endif // __AILEGO_PARALLEL_THREAD_POOL_H__ |
410 | |