1 | // Copyright 2019 The Marl Authors. |
2 | // |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | // you may not use this file except in compliance with the License. |
5 | // You may obtain a copy of the License at |
6 | // |
7 | // https://www.apache.org/licenses/LICENSE-2.0 |
8 | // |
9 | // Unless required by applicable law or agreed to in writing, software |
10 | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | // See the License for the specific language governing permissions and |
13 | // limitations under the License. |
14 | |
15 | #include "marl/thread.h" |
16 | |
17 | #include "marl/debug.h" |
18 | #include "marl/defer.h" |
19 | #include "marl/trace.h" |
20 | |
21 | #include <algorithm> // std::sort |
22 | |
23 | #include <cstdarg> |
24 | #include <cstdio> |
25 | |
26 | #if defined(_WIN32) |
27 | #define WIN32_LEAN_AND_MEAN 1 |
28 | #include <windows.h> |
29 | #include <array> |
30 | #include <cstdlib> // mbstowcs |
31 | #include <limits> // std::numeric_limits |
32 | #include <vector> |
33 | #undef max |
34 | #elif defined(__APPLE__) |
35 | #include <mach/thread_act.h> |
36 | #include <pthread.h> |
37 | #include <unistd.h> |
38 | #include <thread> |
39 | #elif defined(__FreeBSD__) |
40 | #include <pthread.h> |
41 | #include <pthread_np.h> |
42 | #include <unistd.h> |
43 | #include <thread> |
44 | #else |
45 | #include <pthread.h> |
46 | #include <unistd.h> |
47 | #include <thread> |
48 | #endif |
49 | |
50 | namespace { |
51 | |
52 | struct CoreHasher { |
53 | inline uint64_t operator()(const marl::Thread::Core& core) const { |
54 | return core.pthread.index; |
55 | } |
56 | }; |
57 | |
58 | } // anonymous namespace |
59 | |
60 | namespace marl { |
61 | |
62 | #if defined(_WIN32) |
63 | static constexpr size_t MaxCoreCount = |
64 | std::numeric_limits<decltype(Thread::Core::windows.index)>::max() + 1ULL; |
65 | static constexpr size_t MaxGroupCount = |
66 | std::numeric_limits<decltype(Thread::Core::windows.group)>::max() + 1ULL; |
67 | static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount, |
68 | "Thread::Core::windows.index is too small" ); |
69 | |
70 | namespace { |
71 | #define CHECK_WIN32(expr) \ |
72 | do { \ |
73 | auto res = expr; \ |
74 | (void)res; \ |
75 | MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \ |
76 | (int)GetLastError()); \ |
77 | } while (false) |
78 | |
79 | struct ProcessorGroup { |
80 | unsigned int count; // number of logical processors in this group. |
81 | KAFFINITY affinity; // affinity mask. |
82 | }; |
83 | |
84 | struct ProcessorGroups { |
85 | std::array<ProcessorGroup, MaxGroupCount> groups; |
86 | size_t count; |
87 | }; |
88 | |
89 | const ProcessorGroups& getProcessorGroups() { |
90 | static ProcessorGroups groups = [] { |
91 | ProcessorGroups out = {}; |
92 | SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {}; |
93 | DWORD size = sizeof(info); |
94 | CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size)); |
95 | DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX); |
96 | for (DWORD i = 0; i < count; i++) { |
97 | if (info[i].Relationship == RelationGroup) { |
98 | auto groupCount = info[i].Group.ActiveGroupCount; |
99 | for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) { |
100 | auto const& groupInfo = info[i].Group.GroupInfo[groupIdx]; |
101 | out.groups[out.count++] = ProcessorGroup{ |
102 | groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask}; |
103 | MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow" ); |
104 | } |
105 | } |
106 | } |
107 | return out; |
108 | }(); |
109 | return groups; |
110 | } |
111 | } // namespace |
112 | #endif // defined(_WIN32) |
113 | |
114 | //////////////////////////////////////////////////////////////////////////////// |
115 | // Thread::Affinty |
116 | //////////////////////////////////////////////////////////////////////////////// |
117 | |
118 | Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {} |
119 | Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {} |
120 | Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator) |
121 | : cores(other.cores, allocator) {} |
122 | |
123 | Thread::Affinity::Affinity(std::initializer_list<Core> list, |
124 | Allocator* allocator) |
125 | : cores(allocator) { |
126 | cores.reserve(list.size()); |
127 | for (auto core : list) { |
128 | cores.push_back(core); |
129 | } |
130 | } |
131 | |
132 | Thread::Affinity::Affinity(const containers::vector<Core, 32>& coreList, |
133 | Allocator* allocator) |
134 | : cores(coreList, allocator) {} |
135 | |
136 | Thread::Affinity Thread::Affinity::all( |
137 | Allocator* allocator /* = Allocator::Default */) { |
138 | Thread::Affinity affinity(allocator); |
139 | |
140 | #if defined(_WIN32) |
141 | const auto& groups = getProcessorGroups(); |
142 | for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) { |
143 | const auto& group = groups.groups[groupIdx]; |
144 | Core core; |
145 | core.windows.group = static_cast<decltype(Core::windows.group)>(groupIdx); |
146 | for (unsigned int coreIdx = 0; coreIdx < group.count; coreIdx++) { |
147 | if ((group.affinity >> coreIdx) & 1) { |
148 | core.windows.index = static_cast<decltype(core.windows.index)>(coreIdx); |
149 | affinity.cores.emplace_back(std::move(core)); |
150 | } |
151 | } |
152 | } |
153 | #elif defined(__linux__) && !defined(__ANDROID__) |
154 | auto thread = pthread_self(); |
155 | cpu_set_t cpuset; |
156 | CPU_ZERO(&cpuset); |
157 | if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) { |
158 | int count = CPU_COUNT(&cpuset); |
159 | for (int i = 0; i < count; i++) { |
160 | Core core; |
161 | core.pthread.index = static_cast<uint16_t>(i); |
162 | affinity.cores.emplace_back(std::move(core)); |
163 | } |
164 | } |
165 | #elif defined(__FreeBSD__) |
166 | auto thread = pthread_self(); |
167 | cpuset_t cpuset; |
168 | CPU_ZERO(&cpuset); |
169 | if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) { |
170 | int count = CPU_COUNT(&cpuset); |
171 | for (int i = 0; i < count; i++) { |
172 | Core core; |
173 | core.pthread.index = static_cast<uint16_t>(i); |
174 | affinity.cores.emplace_back(std::move(core)); |
175 | } |
176 | } |
177 | #else |
178 | static_assert(!supported, |
179 | "marl::Thread::Affinity::supported is true, but " |
180 | "Thread::Affinity::all() is not implemented for this platform" ); |
181 | #endif |
182 | |
183 | return affinity; |
184 | } |
185 | |
186 | std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::anyOf( |
187 | Affinity&& affinity, |
188 | Allocator* allocator /* = Allocator::Default */) { |
189 | struct Policy : public Thread::Affinity::Policy { |
190 | Affinity affinity; |
191 | Policy(Affinity&& affinity) : affinity(std::move(affinity)) {} |
192 | |
193 | Affinity get(uint32_t threadId, Allocator* allocator) const override { |
194 | #if defined(_WIN32) |
195 | auto count = affinity.count(); |
196 | if (count == 0) { |
197 | return Affinity(affinity, allocator); |
198 | } |
199 | auto group = affinity[threadId % affinity.count()].windows.group; |
200 | Affinity out(allocator); |
201 | out.cores.reserve(count); |
202 | for (auto core : affinity.cores) { |
203 | if (core.windows.group == group) { |
204 | out.cores.push_back(core); |
205 | } |
206 | } |
207 | return out; |
208 | #else |
209 | return Affinity(affinity, allocator); |
210 | #endif |
211 | } |
212 | }; |
213 | |
214 | return allocator->make_shared<Policy>(std::move(affinity)); |
215 | } |
216 | |
217 | std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::oneOf( |
218 | Affinity&& affinity, |
219 | Allocator* allocator /* = Allocator::Default */) { |
220 | struct Policy : public Thread::Affinity::Policy { |
221 | Affinity affinity; |
222 | Policy(Affinity&& affinity) : affinity(std::move(affinity)) {} |
223 | |
224 | Affinity get(uint32_t threadId, Allocator* allocator) const override { |
225 | auto count = affinity.count(); |
226 | if (count == 0) { |
227 | return Affinity(affinity, allocator); |
228 | } |
229 | return Affinity({affinity[threadId % affinity.count()]}, allocator); |
230 | } |
231 | }; |
232 | |
233 | return allocator->make_shared<Policy>(std::move(affinity)); |
234 | } |
235 | |
236 | size_t Thread::Affinity::count() const { |
237 | return cores.size(); |
238 | } |
239 | |
240 | Thread::Core Thread::Affinity::operator[](size_t index) const { |
241 | return cores[index]; |
242 | } |
243 | |
244 | Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) { |
245 | containers::unordered_set<Core, CoreHasher> set(cores.allocator); |
246 | for (auto core : cores) { |
247 | set.emplace(core); |
248 | } |
249 | for (auto core : other.cores) { |
250 | if (set.count(core) == 0) { |
251 | cores.push_back(core); |
252 | } |
253 | } |
254 | std::sort(cores.begin(), cores.end()); |
255 | return *this; |
256 | } |
257 | |
258 | Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) { |
259 | containers::unordered_set<Core, CoreHasher> set(cores.allocator); |
260 | for (auto core : other.cores) { |
261 | set.emplace(core); |
262 | } |
263 | for (size_t i = 0; i < cores.size(); i++) { |
264 | if (set.count(cores[i]) != 0) { |
265 | cores[i] = cores.back(); |
266 | cores.resize(cores.size() - 1); |
267 | } |
268 | } |
269 | std::sort(cores.begin(), cores.end()); |
270 | return *this; |
271 | } |
272 | |
273 | #if defined(_WIN32) |
274 | |
275 | class Thread::Impl { |
276 | public: |
277 | Impl(Func&& func) : func(std::move(func)) {} |
278 | static DWORD WINAPI run(void* self) { |
279 | reinterpret_cast<Impl*>(self)->func(); |
280 | return 0; |
281 | } |
282 | |
283 | Func func; |
284 | HANDLE handle; |
285 | }; |
286 | |
287 | Thread::Thread(Affinity&& affinity, Func&& func) { |
288 | SIZE_T size = 0; |
289 | InitializeProcThreadAttributeList(nullptr, 1, 0, &size); |
290 | MARL_ASSERT(size > 0, |
291 | "InitializeProcThreadAttributeList() did not give a size" ); |
292 | |
293 | std::vector<uint8_t> buffer(size); |
294 | LPPROC_THREAD_ATTRIBUTE_LIST attributes = |
295 | reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(buffer.data()); |
296 | CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size)); |
297 | defer(DeleteProcThreadAttributeList(attributes)); |
298 | |
299 | GROUP_AFFINITY groupAffinity = {}; |
300 | |
301 | auto count = affinity.count(); |
302 | if (count > 0) { |
303 | groupAffinity.Group = affinity[0].windows.group; |
304 | for (size_t i = 0; i < count; i++) { |
305 | auto core = affinity[i]; |
306 | MARL_ASSERT(groupAffinity.Group == core.windows.group, |
307 | "Cannot create thread that uses multiple affinity groups" ); |
308 | groupAffinity.Mask |= (1ULL << core.windows.index); |
309 | } |
310 | CHECK_WIN32(UpdateProcThreadAttribute( |
311 | attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity, |
312 | sizeof(groupAffinity), nullptr, nullptr)); |
313 | } |
314 | |
315 | impl = new Impl(std::move(func)); |
316 | impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0, |
317 | &Impl::run, impl, 0, attributes, nullptr); |
318 | } |
319 | |
320 | Thread::~Thread() { |
321 | if (impl) { |
322 | CloseHandle(impl->handle); |
323 | delete impl; |
324 | } |
325 | } |
326 | |
327 | void Thread::join() { |
328 | MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread" ); |
329 | WaitForSingleObject(impl->handle, INFINITE); |
330 | } |
331 | |
332 | void Thread::setName(const char* fmt, ...) { |
333 | static auto setThreadDescription = |
334 | reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress( |
335 | GetModuleHandle("kernelbase.dll" ), "SetThreadDescription" )); |
336 | if (setThreadDescription == nullptr) { |
337 | return; |
338 | } |
339 | |
340 | char name[1024]; |
341 | va_list vararg; |
342 | va_start(vararg, fmt); |
343 | vsnprintf(name, sizeof(name), fmt, vararg); |
344 | va_end(vararg); |
345 | |
346 | wchar_t wname[1024]; |
347 | mbstowcs(wname, name, 1024); |
348 | setThreadDescription(GetCurrentThread(), wname); |
349 | MARL_NAME_THREAD("%s" , name); |
350 | } |
351 | |
352 | unsigned int Thread::numLogicalCPUs() { |
353 | unsigned int count = 0; |
354 | const auto& groups = getProcessorGroups(); |
355 | for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) { |
356 | const auto& group = groups.groups[groupIdx]; |
357 | count += group.count; |
358 | } |
359 | return count; |
360 | } |
361 | |
362 | #else |
363 | |
364 | class Thread::Impl { |
365 | public: |
366 | Impl(Affinity&& affinity, Thread::Func&& f) |
367 | : affinity(std::move(affinity)), func(std::move(f)), thread([this] { |
368 | setAffinity(); |
369 | func(); |
370 | }) {} |
371 | |
372 | Affinity affinity; |
373 | Func func; |
374 | std::thread thread; |
375 | |
376 | void setAffinity() { |
377 | auto count = affinity.count(); |
378 | if (count == 0) { |
379 | return; |
380 | } |
381 | |
382 | #if defined(__linux__) && !defined(__ANDROID__) |
383 | cpu_set_t cpuset; |
384 | CPU_ZERO(&cpuset); |
385 | for (size_t i = 0; i < count; i++) { |
386 | CPU_SET(affinity[i].pthread.index, &cpuset); |
387 | } |
388 | auto thread = pthread_self(); |
389 | pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); |
390 | #elif defined(__FreeBSD__) |
391 | cpuset_t cpuset; |
392 | CPU_ZERO(&cpuset); |
393 | for (size_t i = 0; i < count; i++) { |
394 | CPU_SET(affinity[i].pthread.index, &cpuset); |
395 | } |
396 | auto thread = pthread_self(); |
397 | pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset); |
398 | #else |
399 | MARL_ASSERT(!marl::Thread::Affinity::supported, |
400 | "Attempting to use thread affinity on a unsupported platform" ); |
401 | #endif |
402 | } |
403 | }; |
404 | |
405 | Thread::Thread(Affinity&& affinity, Func&& func) |
406 | : impl(new Thread::Impl(std::move(affinity), std::move(func))) {} |
407 | |
408 | Thread::~Thread() { |
409 | MARL_ASSERT(!impl, "Thread::join() was not called before destruction" ); |
410 | } |
411 | |
412 | void Thread::join() { |
413 | impl->thread.join(); |
414 | delete impl; |
415 | impl = nullptr; |
416 | } |
417 | |
418 | void Thread::setName(const char* fmt, ...) { |
419 | char name[1024]; |
420 | va_list vararg; |
421 | va_start(vararg, fmt); |
422 | vsnprintf(name, sizeof(name), fmt, vararg); |
423 | va_end(vararg); |
424 | |
425 | #if defined(__APPLE__) |
426 | pthread_setname_np(name); |
427 | #elif defined(__FreeBSD__) |
428 | pthread_set_name_np(pthread_self(), name); |
429 | #elif !defined(__Fuchsia__) |
430 | pthread_setname_np(pthread_self(), name); |
431 | #endif |
432 | |
433 | MARL_NAME_THREAD("%s" , name); |
434 | } |
435 | |
436 | unsigned int Thread::numLogicalCPUs() { |
437 | return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN)); |
438 | } |
439 | |
440 | #endif // OS |
441 | |
442 | Thread::Thread(Thread&& rhs) : impl(rhs.impl) { |
443 | rhs.impl = nullptr; |
444 | } |
445 | |
446 | Thread& Thread::operator=(Thread&& rhs) { |
447 | if (impl) { |
448 | delete impl; |
449 | impl = nullptr; |
450 | } |
451 | impl = rhs.impl; |
452 | rhs.impl = nullptr; |
453 | return *this; |
454 | } |
455 | |
456 | } // namespace marl |
457 | |