1// Copyright 2019 The Marl Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "marl_test.h"
16
17#include "marl/containers.h"
18#include "marl/defer.h"
19#include "marl/event.h"
20#include "marl/waitgroup.h"
21
22#include <atomic>
23
24TEST_F(WithoutBoundScheduler, SchedulerConstructAndDestruct) {
25 auto scheduler = std::unique_ptr<marl::Scheduler>(
26 new marl::Scheduler(marl::Scheduler::Config()));
27}
28
29TEST_F(WithoutBoundScheduler, SchedulerBindGetUnbind) {
30 auto scheduler = std::unique_ptr<marl::Scheduler>(
31 new marl::Scheduler(marl::Scheduler::Config()));
32 scheduler->bind();
33 auto got = marl::Scheduler::get();
34 ASSERT_EQ(scheduler.get(), got);
35 scheduler->unbind();
36 got = marl::Scheduler::get();
37 ASSERT_EQ(got, nullptr);
38}
39
40TEST_F(WithoutBoundScheduler, CheckConfig) {
41 marl::Scheduler::Config cfg;
42 cfg.setAllocator(allocator).setWorkerThreadCount(10);
43
44 auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
45
46 auto gotCfg = scheduler->config();
47 ASSERT_EQ(gotCfg.allocator, allocator);
48 ASSERT_EQ(gotCfg.workerThread.count, 10);
49}
50
51TEST_P(WithBoundScheduler, DestructWithPendingTasks) {
52 std::atomic<int> counter = {0};
53 for (int i = 0; i < 1000; i++) {
54 marl::schedule([&] { counter++; });
55 }
56
57 auto scheduler = marl::Scheduler::get();
58 scheduler->unbind();
59 delete scheduler;
60
61 // All scheduled tasks should be completed before the scheduler is destructed.
62 ASSERT_EQ(counter.load(), 1000);
63
64 // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
65 (new marl::Scheduler(marl::Scheduler::Config()))->bind();
66}
67
68TEST_P(WithBoundScheduler, DestructWithPendingFibers) {
69 std::atomic<int> counter = {0};
70
71 marl::WaitGroup wg(1);
72 for (int i = 0; i < 1000; i++) {
73 marl::schedule([&] {
74 wg.wait();
75 counter++;
76 });
77 }
78
79 // Schedule a task to unblock all the tasks scheduled above.
80 // We assume that some of these tasks will not finish before the scheduler
81 // destruction logic kicks in.
82 marl::schedule([=] {
83 wg.done(); // Ready, steady, go...
84 });
85
86 auto scheduler = marl::Scheduler::get();
87 scheduler->unbind();
88 delete scheduler;
89
90 // All scheduled tasks should be completed before the scheduler is destructed.
91 ASSERT_EQ(counter.load(), 1000);
92
93 // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
94 (new marl::Scheduler(marl::Scheduler::Config()))->bind();
95}
96
97TEST_P(WithBoundScheduler, ScheduleWithArgs) {
98 std::string got;
99 marl::WaitGroup wg(1);
100 marl::schedule(
101 [wg, &got](std::string s, int i, bool b) {
102 got = "s: '" + s + "', i: " + std::to_string(i) +
103 ", b: " + (b ? "true" : "false");
104 wg.done();
105 },
106 "a string", 42, true);
107 wg.wait();
108 ASSERT_EQ(got, "s: 'a string', i: 42, b: true");
109}
110
111TEST_P(WithBoundScheduler, FibersResumeOnSameThread) {
112 marl::WaitGroup fence(1);
113 marl::WaitGroup wg(1000);
114 for (int i = 0; i < 1000; i++) {
115 marl::schedule([=] {
116 auto threadID = std::this_thread::get_id();
117 fence.wait();
118 ASSERT_EQ(threadID, std::this_thread::get_id());
119 wg.done();
120 });
121 }
122 // just to try and get some tasks to yield.
123 std::this_thread::sleep_for(std::chrono::milliseconds(10));
124 fence.done();
125 wg.wait();
126}
127
128TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
129 auto scheduler = marl::Scheduler::get();
130
131 // on 32-bit OSs, excessive numbers of threads can run out of address space.
132 constexpr auto num_threads = sizeof(void*) > 4 ? 1000 : 100;
133
134 marl::WaitGroup fence(1);
135 marl::WaitGroup wg(num_threads);
136
137 marl::containers::vector<std::thread, 32> threads;
138 for (int i = 0; i < num_threads; i++) {
139 threads.emplace_back(std::thread([=] {
140 scheduler->bind();
141 defer(scheduler->unbind());
142
143 auto threadID = std::this_thread::get_id();
144 fence.wait();
145 ASSERT_EQ(threadID, std::this_thread::get_id());
146 wg.done();
147 }));
148 }
149 // just to try and get some tasks to yield.
150 std::this_thread::sleep_for(std::chrono::milliseconds(10));
151 fence.done();
152 wg.wait();
153
154 for (auto& thread : threads) {
155 thread.join();
156 }
157}
158
159TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
160 marl::Scheduler::Config cfg;
161 cfg.setWorkerThreadCount(8);
162
163 auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
164 scheduler->bind();
165 defer(scheduler->unbind());
166
167 std::mutex mutex;
168 marl::containers::unordered_set<std::thread::id> threads(allocator);
169 marl::WaitGroup wg;
170 for (int i = 0; i < 10000; i++) {
171 wg.add(1);
172 marl::schedule([&mutex, &threads, wg] {
173 defer(wg.done());
174 std::unique_lock<std::mutex> lock(mutex);
175 threads.emplace(std::this_thread::get_id());
176 });
177 }
178 wg.wait();
179
180 ASSERT_LE(threads.size(), 8U);
181 ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
182}
183
184// Test that a marl::Scheduler *with dedicated worker threads* can be used
185// without first binding to the scheduling thread.
186TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
187 marl::Scheduler::Config cfg;
188 cfg.setWorkerThreadCount(8);
189 auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
190
191 marl::WaitGroup wg;
192 for (int i = 0; i < 100; i++) {
193 wg.add(1);
194
195 marl::Event event;
196 scheduler->enqueue(marl::Task([event, wg] {
197 event.wait(); // Test that tasks can wait on other tasks.
198 wg.done();
199 }));
200
201 scheduler->enqueue(marl::Task([event, &scheduler] {
202 // Despite the main thread never binding the scheduler, the scheduler
203 // should be automatically bound to worker threads.
204 ASSERT_EQ(marl::Scheduler::get(), scheduler.get());
205
206 event.signal();
207 }));
208 }
209
210 // As the scheduler has not been bound to the main thread, the wait() call
211 // here will block **without** fiber yielding.
212 wg.wait();
213}
214
215// Test that a marl::Scheduler *without dedicated worker threads* cannot be used
216// without first binding to the scheduling thread.
217TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) {
218 marl::Scheduler::Config cfg;
219 auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
220
221#if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
222 EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})),
223 "Did you forget to call marl::Scheduler::bind");
224#elif !MARL_DEBUG_ENABLED
225 scheduler->enqueue(marl::Task([] { FAIL() << "Should not be called"; }));
226#endif
227}
228