1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Hechong.xyf
17 * \date Jan 2018
18 * \brief Interface of AiLego Utility Thread Pool
19 */
20
21#ifndef __AILEGO_PARALLEL_THREAD_POOL_H__
22#define __AILEGO_PARALLEL_THREAD_POOL_H__
23
24#include <atomic>
25#include <condition_variable>
26#include <mutex>
27#include <queue>
28#include <thread>
29#include <utility>
30#include <vector>
31#include <ailego/pattern/closure.h>
32
33namespace ailego {
34
35/*! Thread Pool
36 */
37class ThreadPool {
38 public:
39 /*! Thread Pool Task Group
40 */
41 class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
42 public:
43 using Pointer = std::shared_ptr<TaskGroup>;
44
45 //! Constructor
46 TaskGroup(ThreadPool *pool) : pool_(pool) {}
47
48 //! Push a task to the queue
49 void enqueue(const ClosureHandler &handle) {
50 pool_->enqueue(handle, this->shared_from_this(), nullptr);
51 }
52
53 //! Push a task to the queue
54 void enqueue(ClosureHandler &&handle) {
55 pool_->enqueue(std::move(handle), this->shared_from_this(), nullptr);
56 }
57
58 //! Submit a task to the queue
59 void submit(ClosureHandler &&handle) {
60 return enqueue_and_wake(std::move(handle));
61 }
62
63 //! Push a task to the queue
64 void enqueue_and_wake(const ClosureHandler &handle) {
65 pool_->enqueue_and_wake(handle, this->shared_from_this(), nullptr);
66 }
67
68 //! Push a task to the queue
69 void enqueue_and_wake(ClosureHandler &&handle) {
70 pool_->enqueue_and_wake(std::move(handle), this->shared_from_this(),
71 nullptr);
72 }
73
74 //! Execute a function as a task in pool
75 template <typename... TArgs>
76 void execute_and_wait(TArgs &&... args) {
77 ThreadPool::TaskControl ctrl;
78 pool_->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...),
79 this->shared_from_this(), &ctrl);
80 ctrl.wait();
81 }
82
83 //! Execute a function as a task in pool
84 template <typename... TArgs>
85 void execute(TArgs &&... args) {
86 this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...));
87 }
88
89 //! Wait until all tasks in group finished
90 void wait_finish(void) {
91 std::unique_lock<std::mutex> lock(mutex_);
92 cond_.wait(lock, [this]() { return this->is_finished(); });
93 }
94
95 //! Check if the group is finished
96 bool is_finished(void) const {
97 return (active_count_ == 0 && pending_count_ == 0);
98 }
99
100 //! Retrieve count of pending tasks in group
101 size_t pending_count(void) const {
102 return pending_count_.load(std::memory_order_relaxed);
103 }
104
105 //! Retrieve count of active tasks in group
106 size_t active_count(void) const {
107 return active_count_.load(std::memory_order_relaxed);
108 }
109
110 protected:
111 friend class ThreadPool;
112
113 //! Mark a task enqueued
114 void mark_task_enqueued(void) {
115 ++pending_count_;
116 }
117
118 //! Mark a task actived
119 void mark_task_actived(void) {
120 std::lock_guard<std::mutex> lock(mutex_);
121 ++active_count_;
122 --pending_count_;
123 }
124
125 //! Notify a task finished
126 void notify(void) {
127 std::lock_guard<std::mutex> lock(mutex_);
128 if (--active_count_ == 0 && pending_count_ == 0) {
129 cond_.notify_all();
130 }
131 }
132
133 private:
134 //! Members
135 ThreadPool *pool_{nullptr};
136 std::atomic_uint active_count_{0};
137 std::atomic_uint pending_count_{0};
138 std::mutex mutex_{};
139 std::condition_variable cond_{};
140 };
141
142 //! Constructor
143 explicit ThreadPool(uint32_t size, bool binding);
144
145 //! Constructor
146 explicit ThreadPool(bool binding)
147 : ThreadPool{std::max(std::thread::hardware_concurrency(), 1u), binding} {
148 }
149
150 //! Constructor
151 ThreadPool(void) : ThreadPool{false} {}
152
153 //! Destructor
154 ~ThreadPool(void) {
155 this->stop();
156
157 // Join all threads
158 for (auto it = pool_.begin(); it != pool_.end(); ++it) {
159 if (it->joinable()) {
160 it->join();
161 }
162 }
163 }
164
165 //! Retrieve thread count in pool
166 size_t count(void) const {
167 return pool_.size();
168 }
169
170 //! Stop all threads
171 void stop(void) {
172 // Set stop flag as ture, then wake all threads
173 stopping_ = true;
174 std::lock_guard<std::mutex> lock(queue_mutex_);
175 work_cond_.notify_all();
176 }
177
178 //! Push a task to the queue
179 void enqueue(const ClosureHandler &handle) {
180 this->enqueue(handle, nullptr);
181 }
182
183 //! Push a task to the queue
184 void enqueue(ClosureHandler &&handle) {
185 this->enqueue(std::move(handle), nullptr);
186 }
187
188 //! Push a task to the queue
189 void enqueue_and_wake(const ClosureHandler &handle) {
190 this->enqueue_and_wake(handle, nullptr);
191 }
192
193 //! Push a task to the queue
194 void enqueue_and_wake(ClosureHandler &&handle) {
195 this->enqueue_and_wake(std::move(handle), nullptr);
196 }
197
198 //! Execute a function as a task in pool
199 template <typename... TArgs>
200 void execute_and_wait(TArgs &&... args) {
201 ThreadPool::TaskControl ctrl;
202 this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...), &ctrl);
203 ctrl.wait();
204 }
205
206 //! Execute a function as a task in pool
207 template <typename... TArgs>
208 void execute(TArgs &&... args) {
209 this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...));
210 }
211
212 //! Wake any one thread
213 void wake_any(void) {
214 std::lock_guard<std::mutex> lock(queue_mutex_);
215 work_cond_.notify_one();
216 }
217
218 //! Wake all threads
219 void wake_all(void) {
220 std::lock_guard<std::mutex> lock(queue_mutex_);
221 work_cond_.notify_all();
222 }
223
224 //! Wait until all threads finished processing
225 void wait_finish(void) {
226 std::unique_lock<std::mutex> lock(wait_mutex_);
227 finished_cond_.wait(lock, [this]() { return this->is_finished(); });
228 }
229
230 //! Wait until all threads stopped processing
231 void wait_stop(void) {
232 std::unique_lock<std::mutex> lock(wait_mutex_);
233 stopped_cond_.wait(lock, [this]() { return this->is_stopped(); });
234 }
235
236 //! Make a task group
237 TaskGroup::Pointer make_group(void) {
238 return std::make_shared<TaskGroup>(this);
239 }
240
241 //! Check if the pool is finished
242 bool is_finished(void) const {
243 return (active_count_ == 0 && pending_count_ == 0);
244 }
245
246 //! Check if the pool is stopped
247 bool is_stopped(void) const {
248 return (worker_count_ == 0);
249 }
250
251 //! Retrieve count of worker in pool
252 size_t worker_count(void) const {
253 return worker_count_.load(std::memory_order_relaxed);
254 }
255
256 //! Retrieve count of pending tasks in pool
257 size_t pending_count(void) const {
258 return pending_count_.load(std::memory_order_relaxed);
259 }
260
261 //! Retrieve count of active tasks in pool
262 size_t active_count(void) const {
263 return active_count_.load(std::memory_order_relaxed);
264 }
265
266 //! Get the thread index via thread id
267 int indexof(const std::thread::id &thread_id) const {
268 for (size_t i = 0; i < pool_.size(); ++i) {
269 if (pool_[i].get_id() == thread_id) {
270 return static_cast<int>(i);
271 }
272 }
273 return -1;
274 }
275
276 //! Get the current work thread index
277 int indexof_this(void) const {
278 return this->indexof(std::this_thread::get_id());
279 }
280
281 //! Bind threads to processors
282 void bind(void);
283
284 //! Unbind threads of processors
285 void unbind(void);
286
287 protected:
288 //! Thread task control
289 class TaskControl {
290 public:
291 //! Notify task finished
292 void notify(void) {
293 finished_ = true;
294 std::lock_guard<std::mutex> lock(mutex_);
295 cond_.notify_one();
296 }
297
298 //! Wait until task finished
299 void wait(void) {
300 std::unique_lock<std::mutex> lock(mutex_);
301 cond_.wait(lock, [this]() { return finished_.load(); });
302 }
303
304 private:
305 std::atomic_bool finished_{false};
306 std::mutex mutex_{};
307 std::condition_variable cond_{};
308 };
309
310 //! Thread task
311 struct Task {
312 // Constructor
313 Task(const ClosureHandler &h, TaskControl *c) : handle(h), control(c) {}
314
315 // Constructor
316 Task(ClosureHandler &&h, TaskControl *c)
317 : handle(std::move(h)), control(c) {}
318
319 // Constructor
320 Task(const ClosureHandler &h, TaskGroup::Pointer &&g, TaskControl *c)
321 : handle(h), group(std::move(g)), control(c) {}
322
323 // Constructor
324 Task(ClosureHandler &&h, TaskGroup::Pointer &&g, TaskControl *c)
325 : handle(std::move(h)), group(std::move(g)), control(c) {}
326
327 // Constructor
328 Task(void) {}
329
330 //! Members
331 ClosureHandler handle{};
332 TaskGroup::Pointer group{nullptr};
333 TaskControl *control{nullptr};
334 };
335
336 //! Thread worker callback
337 void worker(void);
338
339 //! Pick a task from queue
340 bool picking(Task *task);
341
342 //! Push a task to the queue
343 template <typename T>
344 void enqueue(T &&handle, TaskControl *ctrl) {
345 if (handle) {
346 std::lock_guard<std::mutex> lock(queue_mutex_);
347 ++pending_count_;
348 queue_.emplace(std::forward<T>(handle), ctrl);
349 }
350 }
351
352 //! Push a task to the queue with group
353 template <typename T>
354 void enqueue(T &&handle, TaskGroup::Pointer &&group, TaskControl *ctrl) {
355 if (handle) {
356 std::lock_guard<std::mutex> lock(queue_mutex_);
357 ++pending_count_;
358 group->mark_task_enqueued();
359 queue_.emplace(std::forward<T>(handle), std::move(group), ctrl);
360 }
361 }
362
363 //! Push a task to the queue
364 template <typename T>
365 void enqueue_and_wake(T &&handle, TaskControl *ctrl) {
366 if (handle) {
367 std::lock_guard<std::mutex> lock(queue_mutex_);
368 ++pending_count_;
369 queue_.emplace(std::forward<T>(handle), ctrl);
370 work_cond_.notify_one();
371 }
372 }
373
374 //! Push a task to the queue with group
375 template <typename T>
376 void enqueue_and_wake(T &&handle, TaskGroup::Pointer &&group,
377 TaskControl *ctrl) {
378 if (handle) {
379 std::lock_guard<std::mutex> lock(queue_mutex_);
380 ++pending_count_;
381 group->mark_task_enqueued();
382 queue_.emplace(std::forward<T>(handle), std::move(group), ctrl);
383 work_cond_.notify_one();
384 }
385 }
386
387 private:
388 //! Disable them
389 ThreadPool(const ThreadPool &) = delete;
390 ThreadPool(ThreadPool &&) = delete;
391 ThreadPool &operator=(const ThreadPool &) = delete;
392
393 //! Members
394 std::queue<Task> queue_{};
395 std::atomic_bool stopping_{false};
396 std::atomic_uint worker_count_{0};
397 std::atomic_uint active_count_{0};
398 std::atomic_uint pending_count_{0};
399 std::mutex queue_mutex_{};
400 std::mutex wait_mutex_{};
401 std::condition_variable work_cond_{};
402 std::condition_variable finished_cond_{};
403 std::condition_variable stopped_cond_{};
404 std::vector<std::thread> pool_{};
405};
406
407} // namespace ailego
408
409#endif // __AILEGO_PARALLEL_THREAD_POOL_H__
410