1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Hechong.xyf, daibing.db
17 * \date Jun 2021
18 * \brief Interface of AiTheta Index Threads
19 */
20
21#ifndef __AITHETA2_INDEX_THREADS_H__
22#define __AITHETA2_INDEX_THREADS_H__
23
24#include <atomic>
25#include <condition_variable>
26#include <mutex>
27#include <queue>
28#include <thread>
29#include <utility>
30#include <vector>
31#include <ailego/parallel/thread_pool.h>
32#include <ailego/pattern/closure.h>
33
34namespace aitheta2 {
35
36/*! Index Threads
37 * Index ThreadPool maintains multiple threads resources to execute the tasks
38 * concurrently
39 */
40class IndexThreads {
41 public:
42 using Pointer = std::shared_ptr<IndexThreads>;
43
44 /*! Threads Task Group
45 * Manage of a group of sub-tasks which can be seen as a big task,
46 * so we can wait all sub-tasks finished, or get the status of them
47 */
48 class TaskGroup {
49 public:
50 using Pointer = std::shared_ptr<TaskGroup>;
51
52 //! Destructor
53 virtual ~TaskGroup(void) {}
54
55 //! Submit a task to be executed asynchronous
56 virtual void submit(ailego::ClosureHandler &&task) = 0;
57
58 //! Check if the group is finished
59 virtual bool is_finished(void) const = 0;
60
61 //! Wait until all tasks in group finished
62 virtual void wait_finish(void) = 0;
63 };
64
65 //! Destructor
66 virtual ~IndexThreads(void) {}
67
68 //! Retrieve thread count in pool
69 virtual size_t count(void) const = 0;
70
71 //! Stop all threads
72 virtual void stop(void) = 0;
73
74 //! Submit a task to be executed asynchronous
75 virtual void submit(ailego::ClosureHandler &&task) = 0;
76
77 //! Make a task group
78 virtual TaskGroup::Pointer make_group(void) = 0;
79
80 //! Get the current work thread index
81 virtual int indexof_this(void) const = 0;
82};
83
84/*! Single Queue Index Threads
85 */
86class SingleQueueIndexThreads : public IndexThreads {
87 public:
88 /*! Single Queue Index Threads Task Group
89 */
90 class SingleQueueTaskGroup : public TaskGroup {
91 public:
92 using Pointer = std::shared_ptr<SingleQueueTaskGroup>;
93
94 //! Constructor
95 explicit SingleQueueTaskGroup(
96 ailego::ThreadPool::TaskGroup::Pointer task_group)
97 : task_group_(std::move(task_group)) {}
98
99 //! Submit a task to be executed asynchronous
100 void submit(ailego::ClosureHandler &&task) override {
101 while (task_group_->pending_count() >= kMaxQueueSize) {
102 std::this_thread::sleep_for(std::chrono::milliseconds(1));
103 }
104 task_group_->enqueue_and_wake(std::move(task));
105 }
106
107 //! Check if the group is finished
108 bool is_finished(void) const override {
109 return task_group_->is_finished();
110 }
111
112 //! Wait until all tasks in group finished
113 void wait_finish(void) override {
114 return task_group_->wait_finish();
115 }
116
117 private:
118 //! Members
119 ailego::ThreadPool::TaskGroup::Pointer task_group_{};
120 };
121
122 //! Constructor
123 SingleQueueIndexThreads(uint32_t size, bool binding)
124 : pool_(
125 size > 0 ? size : std::max(std::thread::hardware_concurrency(), 1u),
126 binding) {}
127
128 //! Constructor
129 explicit SingleQueueIndexThreads(bool binding)
130 : SingleQueueIndexThreads(0, binding) {}
131
132 //! Constructor
133 SingleQueueIndexThreads(void) : SingleQueueIndexThreads{false} {}
134
135 //! Destructor
136 virtual ~SingleQueueIndexThreads(void) {}
137
138 //! Retrieve thread count in pool
139 size_t count(void) const override {
140 return pool_.count();
141 }
142
143 //! Stop all threads
144 void stop(void) override {
145 pool_.stop();
146 }
147
148 //! Submit a task to be executed asynchronous
149 void submit(ailego::ClosureHandler &&task) override {
150 while (pool_.pending_count() >= kMaxQueueSize) {
151 std::this_thread::sleep_for(std::chrono::milliseconds(1));
152 }
153 pool_.enqueue_and_wake(std::move(task));
154 }
155
156 //! Make a task group
157 TaskGroup::Pointer make_group(void) override {
158 return std::make_shared<SingleQueueTaskGroup>(pool_.make_group());
159 }
160
161 //! Get the current work thread index
162 int indexof_this(void) const override {
163 return pool_.indexof_this();
164 }
165
166 private:
167 static constexpr size_t kMaxQueueSize = 4096u;
168
169 //! Disable them
170 SingleQueueIndexThreads(const SingleQueueIndexThreads &) = delete;
171 SingleQueueIndexThreads(SingleQueueIndexThreads &&) = delete;
172 SingleQueueIndexThreads &operator=(const SingleQueueIndexThreads &) = delete;
173
174 //! Members
175 ailego::ThreadPool pool_{};
176};
177
178} // namespace aitheta2
179
180#endif // __AITHETA2_INDEX_THREADS_H__
181