1// Copyright 2019 The Marl Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "marl/thread.h"
16
17#include "marl/debug.h"
18#include "marl/defer.h"
19#include "marl/trace.h"
20
21#include <algorithm> // std::sort
22
23#include <cstdarg>
24#include <cstdio>
25
26#if defined(_WIN32)
27#define WIN32_LEAN_AND_MEAN 1
28#include <windows.h>
29#include <array>
30#include <cstdlib> // mbstowcs
31#include <limits> // std::numeric_limits
32#include <vector>
33#undef max
34#elif defined(__APPLE__)
35#include <mach/thread_act.h>
36#include <pthread.h>
37#include <unistd.h>
38#include <thread>
39#elif defined(__FreeBSD__)
40#include <pthread.h>
41#include <pthread_np.h>
42#include <unistd.h>
43#include <thread>
44#else
45#include <pthread.h>
46#include <unistd.h>
47#include <thread>
48#endif
49
50namespace {
51
52struct CoreHasher {
53 inline uint64_t operator()(const marl::Thread::Core& core) const {
54 return core.pthread.index;
55 }
56};
57
58} // anonymous namespace
59
60namespace marl {
61
62#if defined(_WIN32)
63static constexpr size_t MaxCoreCount =
64 std::numeric_limits<decltype(Thread::Core::windows.index)>::max() + 1ULL;
65static constexpr size_t MaxGroupCount =
66 std::numeric_limits<decltype(Thread::Core::windows.group)>::max() + 1ULL;
67static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount,
68 "Thread::Core::windows.index is too small");
69
70namespace {
71#define CHECK_WIN32(expr) \
72 do { \
73 auto res = expr; \
74 (void)res; \
75 MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \
76 (int)GetLastError()); \
77 } while (false)
78
79struct ProcessorGroup {
80 unsigned int count; // number of logical processors in this group.
81 KAFFINITY affinity; // affinity mask.
82};
83
84struct ProcessorGroups {
85 std::array<ProcessorGroup, MaxGroupCount> groups;
86 size_t count;
87};
88
89const ProcessorGroups& getProcessorGroups() {
90 static ProcessorGroups groups = [] {
91 ProcessorGroups out = {};
92 SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
93 DWORD size = sizeof(info);
94 CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
95 DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
96 for (DWORD i = 0; i < count; i++) {
97 if (info[i].Relationship == RelationGroup) {
98 auto groupCount = info[i].Group.ActiveGroupCount;
99 for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
100 auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
101 out.groups[out.count++] = ProcessorGroup{
102 groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask};
103 MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow");
104 }
105 }
106 }
107 return out;
108 }();
109 return groups;
110}
111} // namespace
112#endif // defined(_WIN32)
113
114////////////////////////////////////////////////////////////////////////////////
115// Thread::Affinty
116////////////////////////////////////////////////////////////////////////////////
117
118Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {}
119Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {}
120Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator)
121 : cores(other.cores, allocator) {}
122
123Thread::Affinity::Affinity(std::initializer_list<Core> list,
124 Allocator* allocator)
125 : cores(allocator) {
126 cores.reserve(list.size());
127 for (auto core : list) {
128 cores.push_back(core);
129 }
130}
131
132Thread::Affinity::Affinity(const containers::vector<Core, 32>& coreList,
133 Allocator* allocator)
134 : cores(coreList, allocator) {}
135
136Thread::Affinity Thread::Affinity::all(
137 Allocator* allocator /* = Allocator::Default */) {
138 Thread::Affinity affinity(allocator);
139
140#if defined(_WIN32)
141 const auto& groups = getProcessorGroups();
142 for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
143 const auto& group = groups.groups[groupIdx];
144 Core core;
145 core.windows.group = static_cast<decltype(Core::windows.group)>(groupIdx);
146 for (unsigned int coreIdx = 0; coreIdx < group.count; coreIdx++) {
147 if ((group.affinity >> coreIdx) & 1) {
148 core.windows.index = static_cast<decltype(core.windows.index)>(coreIdx);
149 affinity.cores.emplace_back(std::move(core));
150 }
151 }
152 }
153#elif defined(__linux__) && !defined(__ANDROID__)
154 auto thread = pthread_self();
155 cpu_set_t cpuset;
156 CPU_ZERO(&cpuset);
157 if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) {
158 int count = CPU_COUNT(&cpuset);
159 for (int i = 0; i < count; i++) {
160 Core core;
161 core.pthread.index = static_cast<uint16_t>(i);
162 affinity.cores.emplace_back(std::move(core));
163 }
164 }
165#elif defined(__FreeBSD__)
166 auto thread = pthread_self();
167 cpuset_t cpuset;
168 CPU_ZERO(&cpuset);
169 if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) {
170 int count = CPU_COUNT(&cpuset);
171 for (int i = 0; i < count; i++) {
172 Core core;
173 core.pthread.index = static_cast<uint16_t>(i);
174 affinity.cores.emplace_back(std::move(core));
175 }
176 }
177#else
178 static_assert(!supported,
179 "marl::Thread::Affinity::supported is true, but "
180 "Thread::Affinity::all() is not implemented for this platform");
181#endif
182
183 return affinity;
184}
185
186std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::anyOf(
187 Affinity&& affinity,
188 Allocator* allocator /* = Allocator::Default */) {
189 struct Policy : public Thread::Affinity::Policy {
190 Affinity affinity;
191 Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
192
193 Affinity get(uint32_t threadId, Allocator* allocator) const override {
194#if defined(_WIN32)
195 auto count = affinity.count();
196 if (count == 0) {
197 return Affinity(affinity, allocator);
198 }
199 auto group = affinity[threadId % affinity.count()].windows.group;
200 Affinity out(allocator);
201 out.cores.reserve(count);
202 for (auto core : affinity.cores) {
203 if (core.windows.group == group) {
204 out.cores.push_back(core);
205 }
206 }
207 return out;
208#else
209 return Affinity(affinity, allocator);
210#endif
211 }
212 };
213
214 return allocator->make_shared<Policy>(std::move(affinity));
215}
216
217std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::oneOf(
218 Affinity&& affinity,
219 Allocator* allocator /* = Allocator::Default */) {
220 struct Policy : public Thread::Affinity::Policy {
221 Affinity affinity;
222 Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
223
224 Affinity get(uint32_t threadId, Allocator* allocator) const override {
225 auto count = affinity.count();
226 if (count == 0) {
227 return Affinity(affinity, allocator);
228 }
229 return Affinity({affinity[threadId % affinity.count()]}, allocator);
230 }
231 };
232
233 return allocator->make_shared<Policy>(std::move(affinity));
234}
235
236size_t Thread::Affinity::count() const {
237 return cores.size();
238}
239
240Thread::Core Thread::Affinity::operator[](size_t index) const {
241 return cores[index];
242}
243
244Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) {
245 containers::unordered_set<Core, CoreHasher> set(cores.allocator);
246 for (auto core : cores) {
247 set.emplace(core);
248 }
249 for (auto core : other.cores) {
250 if (set.count(core) == 0) {
251 cores.push_back(core);
252 }
253 }
254 std::sort(cores.begin(), cores.end());
255 return *this;
256}
257
258Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) {
259 containers::unordered_set<Core, CoreHasher> set(cores.allocator);
260 for (auto core : other.cores) {
261 set.emplace(core);
262 }
263 for (size_t i = 0; i < cores.size(); i++) {
264 if (set.count(cores[i]) != 0) {
265 cores[i] = cores.back();
266 cores.resize(cores.size() - 1);
267 }
268 }
269 std::sort(cores.begin(), cores.end());
270 return *this;
271}
272
273#if defined(_WIN32)
274
275class Thread::Impl {
276 public:
277 Impl(Func&& func) : func(std::move(func)) {}
278 static DWORD WINAPI run(void* self) {
279 reinterpret_cast<Impl*>(self)->func();
280 return 0;
281 }
282
283 Func func;
284 HANDLE handle;
285};
286
287Thread::Thread(Affinity&& affinity, Func&& func) {
288 SIZE_T size = 0;
289 InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
290 MARL_ASSERT(size > 0,
291 "InitializeProcThreadAttributeList() did not give a size");
292
293 std::vector<uint8_t> buffer(size);
294 LPPROC_THREAD_ATTRIBUTE_LIST attributes =
295 reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(buffer.data());
296 CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size));
297 defer(DeleteProcThreadAttributeList(attributes));
298
299 GROUP_AFFINITY groupAffinity = {};
300
301 auto count = affinity.count();
302 if (count > 0) {
303 groupAffinity.Group = affinity[0].windows.group;
304 for (size_t i = 0; i < count; i++) {
305 auto core = affinity[i];
306 MARL_ASSERT(groupAffinity.Group == core.windows.group,
307 "Cannot create thread that uses multiple affinity groups");
308 groupAffinity.Mask |= (1ULL << core.windows.index);
309 }
310 CHECK_WIN32(UpdateProcThreadAttribute(
311 attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
312 sizeof(groupAffinity), nullptr, nullptr));
313 }
314
315 impl = new Impl(std::move(func));
316 impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0,
317 &Impl::run, impl, 0, attributes, nullptr);
318}
319
320Thread::~Thread() {
321 if (impl) {
322 CloseHandle(impl->handle);
323 delete impl;
324 }
325}
326
327void Thread::join() {
328 MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread");
329 WaitForSingleObject(impl->handle, INFINITE);
330}
331
332void Thread::setName(const char* fmt, ...) {
333 static auto setThreadDescription =
334 reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress(
335 GetModuleHandle("kernelbase.dll"), "SetThreadDescription"));
336 if (setThreadDescription == nullptr) {
337 return;
338 }
339
340 char name[1024];
341 va_list vararg;
342 va_start(vararg, fmt);
343 vsnprintf(name, sizeof(name), fmt, vararg);
344 va_end(vararg);
345
346 wchar_t wname[1024];
347 mbstowcs(wname, name, 1024);
348 setThreadDescription(GetCurrentThread(), wname);
349 MARL_NAME_THREAD("%s", name);
350}
351
352unsigned int Thread::numLogicalCPUs() {
353 unsigned int count = 0;
354 const auto& groups = getProcessorGroups();
355 for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
356 const auto& group = groups.groups[groupIdx];
357 count += group.count;
358 }
359 return count;
360}
361
362#else
363
364class Thread::Impl {
365 public:
366 Impl(Affinity&& affinity, Thread::Func&& f)
367 : affinity(std::move(affinity)), func(std::move(f)), thread([this] {
368 setAffinity();
369 func();
370 }) {}
371
372 Affinity affinity;
373 Func func;
374 std::thread thread;
375
376 void setAffinity() {
377 auto count = affinity.count();
378 if (count == 0) {
379 return;
380 }
381
382#if defined(__linux__) && !defined(__ANDROID__)
383 cpu_set_t cpuset;
384 CPU_ZERO(&cpuset);
385 for (size_t i = 0; i < count; i++) {
386 CPU_SET(affinity[i].pthread.index, &cpuset);
387 }
388 auto thread = pthread_self();
389 pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
390#elif defined(__FreeBSD__)
391 cpuset_t cpuset;
392 CPU_ZERO(&cpuset);
393 for (size_t i = 0; i < count; i++) {
394 CPU_SET(affinity[i].pthread.index, &cpuset);
395 }
396 auto thread = pthread_self();
397 pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset);
398#else
399 MARL_ASSERT(!marl::Thread::Affinity::supported,
400 "Attempting to use thread affinity on a unsupported platform");
401#endif
402 }
403};
404
405Thread::Thread(Affinity&& affinity, Func&& func)
406 : impl(new Thread::Impl(std::move(affinity), std::move(func))) {}
407
408Thread::~Thread() {
409 MARL_ASSERT(!impl, "Thread::join() was not called before destruction");
410}
411
412void Thread::join() {
413 impl->thread.join();
414 delete impl;
415 impl = nullptr;
416}
417
418void Thread::setName(const char* fmt, ...) {
419 char name[1024];
420 va_list vararg;
421 va_start(vararg, fmt);
422 vsnprintf(name, sizeof(name), fmt, vararg);
423 va_end(vararg);
424
425#if defined(__APPLE__)
426 pthread_setname_np(name);
427#elif defined(__FreeBSD__)
428 pthread_set_name_np(pthread_self(), name);
429#elif !defined(__Fuchsia__)
430 pthread_setname_np(pthread_self(), name);
431#endif
432
433 MARL_NAME_THREAD("%s", name);
434}
435
436unsigned int Thread::numLogicalCPUs() {
437 return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN));
438}
439
440#endif // OS
441
442Thread::Thread(Thread&& rhs) : impl(rhs.impl) {
443 rhs.impl = nullptr;
444}
445
446Thread& Thread::operator=(Thread&& rhs) {
447 if (impl) {
448 delete impl;
449 impl = nullptr;
450 }
451 impl = rhs.impl;
452 rhs.impl = nullptr;
453 return *this;
454}
455
456} // namespace marl
457