1 | /** |
2 | * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | |
16 | * \author Hechong.xyf, daibing.db |
17 | * \date Jun 2021 |
18 | * \brief Interface of AiTheta Index Threads |
19 | */ |
20 | |
21 | #ifndef __AITHETA2_INDEX_THREADS_H__ |
22 | #define __AITHETA2_INDEX_THREADS_H__ |
23 | |
24 | #include <atomic> |
25 | #include <condition_variable> |
26 | #include <mutex> |
27 | #include <queue> |
28 | #include <thread> |
29 | #include <utility> |
30 | #include <vector> |
31 | #include <ailego/parallel/thread_pool.h> |
32 | #include <ailego/pattern/closure.h> |
33 | |
34 | namespace aitheta2 { |
35 | |
36 | /*! Index Threads |
37 | * Index ThreadPool maintains multiple threads resources to execute the tasks |
38 | * concurrently |
39 | */ |
40 | class IndexThreads { |
41 | public: |
42 | using Pointer = std::shared_ptr<IndexThreads>; |
43 | |
44 | /*! Threads Task Group |
45 | * Manage of a group of sub-tasks which can be seen as a big task, |
46 | * so we can wait all sub-tasks finished, or get the status of them |
47 | */ |
48 | class TaskGroup { |
49 | public: |
50 | using Pointer = std::shared_ptr<TaskGroup>; |
51 | |
52 | //! Destructor |
53 | virtual ~TaskGroup(void) {} |
54 | |
55 | //! Submit a task to be executed asynchronous |
56 | virtual void submit(ailego::ClosureHandler &&task) = 0; |
57 | |
58 | //! Check if the group is finished |
59 | virtual bool is_finished(void) const = 0; |
60 | |
61 | //! Wait until all tasks in group finished |
62 | virtual void wait_finish(void) = 0; |
63 | }; |
64 | |
65 | //! Destructor |
66 | virtual ~IndexThreads(void) {} |
67 | |
68 | //! Retrieve thread count in pool |
69 | virtual size_t count(void) const = 0; |
70 | |
71 | //! Stop all threads |
72 | virtual void stop(void) = 0; |
73 | |
74 | //! Submit a task to be executed asynchronous |
75 | virtual void submit(ailego::ClosureHandler &&task) = 0; |
76 | |
77 | //! Make a task group |
78 | virtual TaskGroup::Pointer make_group(void) = 0; |
79 | |
80 | //! Get the current work thread index |
81 | virtual int indexof_this(void) const = 0; |
82 | }; |
83 | |
84 | /*! Single Queue Index Threads |
85 | */ |
86 | class SingleQueueIndexThreads : public IndexThreads { |
87 | public: |
88 | /*! Single Queue Index Threads Task Group |
89 | */ |
90 | class SingleQueueTaskGroup : public TaskGroup { |
91 | public: |
92 | using Pointer = std::shared_ptr<SingleQueueTaskGroup>; |
93 | |
94 | //! Constructor |
95 | explicit SingleQueueTaskGroup( |
96 | ailego::ThreadPool::TaskGroup::Pointer task_group) |
97 | : task_group_(std::move(task_group)) {} |
98 | |
99 | //! Submit a task to be executed asynchronous |
100 | void submit(ailego::ClosureHandler &&task) override { |
101 | while (task_group_->pending_count() >= kMaxQueueSize) { |
102 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
103 | } |
104 | task_group_->enqueue_and_wake(std::move(task)); |
105 | } |
106 | |
107 | //! Check if the group is finished |
108 | bool is_finished(void) const override { |
109 | return task_group_->is_finished(); |
110 | } |
111 | |
112 | //! Wait until all tasks in group finished |
113 | void wait_finish(void) override { |
114 | return task_group_->wait_finish(); |
115 | } |
116 | |
117 | private: |
118 | //! Members |
119 | ailego::ThreadPool::TaskGroup::Pointer task_group_{}; |
120 | }; |
121 | |
122 | //! Constructor |
123 | SingleQueueIndexThreads(uint32_t size, bool binding) |
124 | : pool_( |
125 | size > 0 ? size : std::max(std::thread::hardware_concurrency(), 1u), |
126 | binding) {} |
127 | |
128 | //! Constructor |
129 | explicit SingleQueueIndexThreads(bool binding) |
130 | : SingleQueueIndexThreads(0, binding) {} |
131 | |
132 | //! Constructor |
133 | SingleQueueIndexThreads(void) : SingleQueueIndexThreads{false} {} |
134 | |
135 | //! Destructor |
136 | virtual ~SingleQueueIndexThreads(void) {} |
137 | |
138 | //! Retrieve thread count in pool |
139 | size_t count(void) const override { |
140 | return pool_.count(); |
141 | } |
142 | |
143 | //! Stop all threads |
144 | void stop(void) override { |
145 | pool_.stop(); |
146 | } |
147 | |
148 | //! Submit a task to be executed asynchronous |
149 | void submit(ailego::ClosureHandler &&task) override { |
150 | while (pool_.pending_count() >= kMaxQueueSize) { |
151 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
152 | } |
153 | pool_.enqueue_and_wake(std::move(task)); |
154 | } |
155 | |
156 | //! Make a task group |
157 | TaskGroup::Pointer make_group(void) override { |
158 | return std::make_shared<SingleQueueTaskGroup>(pool_.make_group()); |
159 | } |
160 | |
161 | //! Get the current work thread index |
162 | int indexof_this(void) const override { |
163 | return pool_.indexof_this(); |
164 | } |
165 | |
166 | private: |
167 | static constexpr size_t kMaxQueueSize = 4096u; |
168 | |
169 | //! Disable them |
170 | SingleQueueIndexThreads(const SingleQueueIndexThreads &) = delete; |
171 | SingleQueueIndexThreads(SingleQueueIndexThreads &&) = delete; |
172 | SingleQueueIndexThreads &operator=(const SingleQueueIndexThreads &) = delete; |
173 | |
174 | //! Members |
175 | ailego::ThreadPool pool_{}; |
176 | }; |
177 | |
178 | } // namespace aitheta2 |
179 | |
180 | #endif // __AITHETA2_INDEX_THREADS_H__ |
181 | |