1 | // Copyright 2019 The Marl Authors. |
2 | // |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | // you may not use this file except in compliance with the License. |
5 | // You may obtain a copy of the License at |
6 | // |
7 | // https://www.apache.org/licenses/LICENSE-2.0 |
8 | // |
9 | // Unless required by applicable law or agreed to in writing, software |
10 | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | // See the License for the specific language governing permissions and |
13 | // limitations under the License. |
14 | |
15 | #include "marl_test.h" |
16 | |
17 | #include "marl/containers.h" |
18 | #include "marl/defer.h" |
19 | #include "marl/event.h" |
20 | #include "marl/waitgroup.h" |
21 | |
22 | #include <atomic> |
23 | |
24 | TEST_F(WithoutBoundScheduler, SchedulerConstructAndDestruct) { |
25 | auto scheduler = std::unique_ptr<marl::Scheduler>( |
26 | new marl::Scheduler(marl::Scheduler::Config())); |
27 | } |
28 | |
29 | TEST_F(WithoutBoundScheduler, SchedulerBindGetUnbind) { |
30 | auto scheduler = std::unique_ptr<marl::Scheduler>( |
31 | new marl::Scheduler(marl::Scheduler::Config())); |
32 | scheduler->bind(); |
33 | auto got = marl::Scheduler::get(); |
34 | ASSERT_EQ(scheduler.get(), got); |
35 | scheduler->unbind(); |
36 | got = marl::Scheduler::get(); |
37 | ASSERT_EQ(got, nullptr); |
38 | } |
39 | |
40 | TEST_F(WithoutBoundScheduler, CheckConfig) { |
41 | marl::Scheduler::Config cfg; |
42 | cfg.setAllocator(allocator).setWorkerThreadCount(10); |
43 | |
44 | auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg)); |
45 | |
46 | auto gotCfg = scheduler->config(); |
47 | ASSERT_EQ(gotCfg.allocator, allocator); |
48 | ASSERT_EQ(gotCfg.workerThread.count, 10); |
49 | } |
50 | |
51 | TEST_P(WithBoundScheduler, DestructWithPendingTasks) { |
52 | std::atomic<int> counter = {0}; |
53 | for (int i = 0; i < 1000; i++) { |
54 | marl::schedule([&] { counter++; }); |
55 | } |
56 | |
57 | auto scheduler = marl::Scheduler::get(); |
58 | scheduler->unbind(); |
59 | delete scheduler; |
60 | |
61 | // All scheduled tasks should be completed before the scheduler is destructed. |
62 | ASSERT_EQ(counter.load(), 1000); |
63 | |
64 | // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy. |
65 | (new marl::Scheduler(marl::Scheduler::Config()))->bind(); |
66 | } |
67 | |
68 | TEST_P(WithBoundScheduler, DestructWithPendingFibers) { |
69 | std::atomic<int> counter = {0}; |
70 | |
71 | marl::WaitGroup wg(1); |
72 | for (int i = 0; i < 1000; i++) { |
73 | marl::schedule([&] { |
74 | wg.wait(); |
75 | counter++; |
76 | }); |
77 | } |
78 | |
79 | // Schedule a task to unblock all the tasks scheduled above. |
80 | // We assume that some of these tasks will not finish before the scheduler |
81 | // destruction logic kicks in. |
82 | marl::schedule([=] { |
83 | wg.done(); // Ready, steady, go... |
84 | }); |
85 | |
86 | auto scheduler = marl::Scheduler::get(); |
87 | scheduler->unbind(); |
88 | delete scheduler; |
89 | |
90 | // All scheduled tasks should be completed before the scheduler is destructed. |
91 | ASSERT_EQ(counter.load(), 1000); |
92 | |
93 | // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy. |
94 | (new marl::Scheduler(marl::Scheduler::Config()))->bind(); |
95 | } |
96 | |
97 | TEST_P(WithBoundScheduler, ScheduleWithArgs) { |
98 | std::string got; |
99 | marl::WaitGroup wg(1); |
100 | marl::schedule( |
101 | [wg, &got](std::string s, int i, bool b) { |
102 | got = "s: '" + s + "', i: " + std::to_string(i) + |
103 | ", b: " + (b ? "true" : "false" ); |
104 | wg.done(); |
105 | }, |
106 | "a string" , 42, true); |
107 | wg.wait(); |
108 | ASSERT_EQ(got, "s: 'a string', i: 42, b: true" ); |
109 | } |
110 | |
111 | TEST_P(WithBoundScheduler, FibersResumeOnSameThread) { |
112 | marl::WaitGroup fence(1); |
113 | marl::WaitGroup wg(1000); |
114 | for (int i = 0; i < 1000; i++) { |
115 | marl::schedule([=] { |
116 | auto threadID = std::this_thread::get_id(); |
117 | fence.wait(); |
118 | ASSERT_EQ(threadID, std::this_thread::get_id()); |
119 | wg.done(); |
120 | }); |
121 | } |
122 | // just to try and get some tasks to yield. |
123 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
124 | fence.done(); |
125 | wg.wait(); |
126 | } |
127 | |
128 | TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) { |
129 | auto scheduler = marl::Scheduler::get(); |
130 | |
131 | // on 32-bit OSs, excessive numbers of threads can run out of address space. |
132 | constexpr auto num_threads = sizeof(void*) > 4 ? 1000 : 100; |
133 | |
134 | marl::WaitGroup fence(1); |
135 | marl::WaitGroup wg(num_threads); |
136 | |
137 | marl::containers::vector<std::thread, 32> threads; |
138 | for (int i = 0; i < num_threads; i++) { |
139 | threads.emplace_back(std::thread([=] { |
140 | scheduler->bind(); |
141 | defer(scheduler->unbind()); |
142 | |
143 | auto threadID = std::this_thread::get_id(); |
144 | fence.wait(); |
145 | ASSERT_EQ(threadID, std::this_thread::get_id()); |
146 | wg.done(); |
147 | })); |
148 | } |
149 | // just to try and get some tasks to yield. |
150 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
151 | fence.done(); |
152 | wg.wait(); |
153 | |
154 | for (auto& thread : threads) { |
155 | thread.join(); |
156 | } |
157 | } |
158 | |
159 | TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) { |
160 | marl::Scheduler::Config cfg; |
161 | cfg.setWorkerThreadCount(8); |
162 | |
163 | auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg)); |
164 | scheduler->bind(); |
165 | defer(scheduler->unbind()); |
166 | |
167 | std::mutex mutex; |
168 | marl::containers::unordered_set<std::thread::id> threads(allocator); |
169 | marl::WaitGroup wg; |
170 | for (int i = 0; i < 10000; i++) { |
171 | wg.add(1); |
172 | marl::schedule([&mutex, &threads, wg] { |
173 | defer(wg.done()); |
174 | std::unique_lock<std::mutex> lock(mutex); |
175 | threads.emplace(std::this_thread::get_id()); |
176 | }); |
177 | } |
178 | wg.wait(); |
179 | |
180 | ASSERT_LE(threads.size(), 8U); |
181 | ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U); |
182 | } |
183 | |
184 | // Test that a marl::Scheduler *with dedicated worker threads* can be used |
185 | // without first binding to the scheduling thread. |
186 | TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) { |
187 | marl::Scheduler::Config cfg; |
188 | cfg.setWorkerThreadCount(8); |
189 | auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg)); |
190 | |
191 | marl::WaitGroup wg; |
192 | for (int i = 0; i < 100; i++) { |
193 | wg.add(1); |
194 | |
195 | marl::Event event; |
196 | scheduler->enqueue(marl::Task([event, wg] { |
197 | event.wait(); // Test that tasks can wait on other tasks. |
198 | wg.done(); |
199 | })); |
200 | |
201 | scheduler->enqueue(marl::Task([event, &scheduler] { |
202 | // Despite the main thread never binding the scheduler, the scheduler |
203 | // should be automatically bound to worker threads. |
204 | ASSERT_EQ(marl::Scheduler::get(), scheduler.get()); |
205 | |
206 | event.signal(); |
207 | })); |
208 | } |
209 | |
210 | // As the scheduler has not been bound to the main thread, the wait() call |
211 | // here will block **without** fiber yielding. |
212 | wg.wait(); |
213 | } |
214 | |
215 | // Test that a marl::Scheduler *without dedicated worker threads* cannot be used |
216 | // without first binding to the scheduling thread. |
217 | TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) { |
218 | marl::Scheduler::Config cfg; |
219 | auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg)); |
220 | |
221 | #if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST |
222 | EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})), |
223 | "Did you forget to call marl::Scheduler::bind" ); |
224 | #elif !MARL_DEBUG_ENABLED |
225 | scheduler->enqueue(marl::Task([] { FAIL() << "Should not be called" ; })); |
226 | #endif |
227 | } |
228 | |