1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Hechong.xyf
17 * \date Oct 2020
18 * \brief Interface of AiLego Utility Thread Queue
19 */
20
21#ifndef __AILEGO_PARALLEL_THREAD_QUEUE_H__
22#define __AILEGO_PARALLEL_THREAD_QUEUE_H__
23
24#include <atomic>
25#include <condition_variable>
26#include <mutex>
27#include <queue>
28#include <thread>
29#include <utility>
30#include <vector>
31#include <ailego/hash/jump_hash.h>
32#include <ailego/pattern/closure.h>
33
34namespace ailego {
35
36/*! Thread Queue (One Thread One Queue)
37 */
38class ThreadQueue {
39 public:
40 /*! Thread Worker (One Thread One Worker)
41 */
42 class ThreadWorker {
43 public:
44 //! Constructor
45 ThreadWorker(ThreadQueue *owner) : owner_(owner) {}
46
47 //! Destructor
48 ~ThreadWorker(void) {
49 // Join the current thread
50 if (thread_.joinable()) {
51 thread_.join();
52 }
53 }
54
55 //! Push a task to the queue
56 template <typename T>
57 void enqueue(T &&handle) {
58 std::lock_guard<std::mutex> lock(mutex_);
59 queue_.emplace(std::forward<T>(handle));
60 }
61
62 //! Push a task to the queue
63 template <typename T>
64 void enqueue_and_wake(T &&handle) {
65 std::lock_guard<std::mutex> lock(mutex_);
66 queue_.emplace(std::forward<T>(handle));
67 cond_.notify_one();
68 }
69
70 //! Execute a function as a task
71 template <typename... TArgs>
72 void execute(TArgs &&...args) {
73 this->enqueue_and_wake(Closure::New(std::forward<TArgs>(args)...));
74 }
75
76 //! Wake the thread
77 void wake(void) {
78 std::lock_guard<std::mutex> lock(mutex_);
79 cond_.notify_one();
80 }
81
82 //! Notify thread stopped
83 void stop(void) {
84 // Set stop flag as ture, then wake the thread
85 stopping_ = true;
86 std::lock_guard<std::mutex> lock(mutex_);
87 cond_.notify_one();
88 }
89
90 protected:
91 //! Thread worker callback
92 void worker(void) {
93 owner_->mark_worker_started();
94
95 ClosureHandler task;
96 while (this->picking(&task)) {
97 // Run the task
98 if (task) {
99 task->run();
100 task = nullptr;
101 }
102 }
103 owner_->mark_worker_stopped();
104 }
105
106 //! Pick a task from queue
107 bool picking(ClosureHandler *task) {
108 std::unique_lock<std::mutex> latch(mutex_);
109 cond_.wait(latch, [this]() { return (queue_.size() > 0 || stopping_); });
110 if (stopping_) {
111 return false;
112 }
113
114 *task = std::move(queue_.front());
115 queue_.pop();
116 return true;
117 }
118
119 private:
120 //! Disable them
121 ThreadWorker(void) = delete;
122 ThreadWorker(ThreadWorker &&) = delete;
123 ThreadWorker(const ThreadWorker &) = delete;
124 ThreadWorker &operator=(const ThreadWorker &) = delete;
125
126 //! Members
127 ThreadQueue *owner_{nullptr};
128 std::queue<ClosureHandler> queue_{};
129 std::atomic_bool stopping_{false};
130 std::mutex mutex_{};
131 std::condition_variable cond_{};
132 std::thread thread_{&ThreadWorker::worker, this};
133 };
134
135 //! Constructor
136 ThreadQueue(void)
137 : ThreadQueue{std::max(std::thread::hardware_concurrency(), 1u)} {}
138
139 //! Constructor
140 explicit ThreadQueue(uint32_t size) {
141 for (uint32_t i = 0u; i < size; ++i) {
142 threads_.emplace_back(new ThreadWorker(this));
143 }
144 }
145
146 //! Destructor
147 ~ThreadQueue(void) {
148 this->stop();
149 // Cleanup threads
150 for (auto it = threads_.begin(); it != threads_.end(); ++it) {
151 delete *it;
152 }
153 }
154
155 //! operator []
156 ThreadWorker &operator[](size_t i) {
157 return *(threads_[i]);
158 }
159
160 //! Stop the thread
161 void stop(void) {
162 // Stop all workers
163 for (auto it = threads_.begin(); it != threads_.end(); ++it) {
164 (*it)->stop();
165 }
166 }
167
168 //! Wake all worker threads
169 void wake(void) {
170 for (auto it = threads_.begin(); it != threads_.end(); ++it) {
171 (*it)->wake();
172 }
173 }
174
175 //! Wait until all threads stopped processing
176 void wait_stop(void) {
177 std::unique_lock<std::mutex> lock(wait_mutex_);
178 stopped_cond_.wait(lock, [this]() { return this->is_stopped(); });
179 }
180
181 //! Check if the pool is stopped
182 bool is_stopped(void) const {
183 return (worker_count_ == 0);
184 }
185
186 //! Retrieve count of worker in queue
187 size_t worker_count(void) const {
188 return worker_count_.load(std::memory_order_relaxed);
189 }
190
191 //! Retrieve thread count in queue
192 size_t count(void) const {
193 return threads_.size();
194 }
195
196 //! Push a task to the queue
197 template <typename T>
198 void enqueue(uint64_t key, T &&handle) {
199 threads_[JumpHash(key, static_cast<int32_t>(threads_.size()))]->enqueue(
200 std::forward<T>(handle));
201 }
202
203 //! Push a task to the queue
204 template <typename T>
205 void enqueue_and_wake(uint64_t key, T &&handle) {
206 threads_[JumpHash(key, static_cast<int32_t>(threads_.size()))]
207 ->enqueue_and_wake(std::forward<T>(handle));
208 }
209
210 //! Execute a function as a task in pool
211 template <typename... TArgs>
212 void execute(uint64_t key, TArgs &&...args) {
213 this->enqueue_and_wake(key, Closure::New(std::forward<TArgs>(args)...));
214 }
215
216 protected:
217 //! Mark a worker started
218 void mark_worker_started(void) {
219 ++worker_count_;
220 }
221
222 //! Mark a worker stopped
223 void mark_worker_stopped(void) {
224 // Decrease count of workers
225 std::lock_guard<std::mutex> lock(wait_mutex_);
226 if (--worker_count_ == 0) {
227 stopped_cond_.notify_all();
228 }
229 }
230
231 private:
232 //! Disable them
233 ThreadQueue(const ThreadQueue &) = delete;
234 ThreadQueue(ThreadQueue &&) = delete;
235 ThreadQueue &operator=(const ThreadQueue &) = delete;
236
237 //! Members
238 std::atomic_uint worker_count_{0};
239 std::mutex wait_mutex_{};
240 std::condition_variable stopped_cond_{};
241 std::vector<ThreadWorker *> threads_{};
242};
243
244} // namespace ailego
245
246#endif // __AILEGO_PARALLEL_THREAD_QUEUE_H__
247