1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | |
20 | /*! |
21 | * \file threading_backend.cc |
22 | * \brief Native threading backend |
23 | */ |
24 | #include <tvm/runtime/logging.h> |
25 | #include <tvm/runtime/threading_backend.h> |
26 | |
27 | #if defined(__linux__) || defined(__ANDROID__) |
28 | #if __ANDROID_API__ >= 21 |
29 | #include <pthread.h> |
30 | #endif |
31 | #include <fstream> |
32 | #include <sstream> |
33 | #else |
34 | #endif |
35 | #if defined(__linux__) |
36 | #include <sched.h> |
37 | #endif |
38 | #if defined(__hexagon__) |
39 | extern "C" { |
40 | #include <qurt_hvx.h> |
41 | } |
42 | #include <dlfcn.h> |
43 | #include <qurt.h> |
44 | #include <stdlib.h> |
45 | #define HEXAGON_STACK_SIZE 65536 |
46 | #define HEXAGON_STACK_ALIGNMENT 32 |
47 | #endif |
48 | #include <algorithm> |
49 | #include <thread> |
50 | #define CURRENT_THREAD_HANDLE (static_cast<std::thread::native_handle_type>(0)) |
51 | namespace tvm { |
52 | namespace runtime { |
53 | namespace threading { |
54 | #ifdef __hexagon__ |
55 | // pthreads are broken on older versions of qurt, so |
56 | // we need to use native APIs instead of std::threads |
57 | class QuRTThread { |
58 | typedef std::function<void()> Callback; |
59 | |
60 | public: |
61 | explicit QuRTThread(Callback worker_callback) : worker_callback_(worker_callback) { |
62 | static int id = 1; |
63 | qurt_thread_attr_t attr; |
64 | char name[32]; |
65 | int ret = posix_memalign(&stack_, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); |
66 | CHECK_EQ(ret, 0); |
67 | // When a std::function<> is cast to bool, |
68 | // it indicates whether it stores a callable target |
69 | CHECK_EQ((bool)worker_callback_, true); |
70 | qurt_thread_attr_init(&attr); |
71 | qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); |
72 | qurt_thread_attr_set_stack_addr(&attr, stack_); |
73 | snprintf(name, sizeof(name), "worker %d" , id++); |
74 | qurt_thread_attr_set_name(&attr, name); |
75 | ret = qurt_thread_create(&thread_, &attr, (void (*)(void*))RunFunction, this); |
76 | CHECK_EQ(ret, QURT_EOK); |
77 | } |
78 | QuRTThread(QuRTThread&& other) |
79 | : thread_(other.thread_), |
80 | worker_callback_(std::move(other.worker_callback_)), |
81 | stack_(other.stack_) { |
82 | other.thread_ = 0; |
83 | other.stack_ = nullptr; |
84 | } |
85 | ~QuRTThread() { |
86 | if (thread_) { |
87 | join(); |
88 | } |
89 | if (stack_) { |
90 | free(stack_); |
91 | } |
92 | } |
93 | bool joinable() const { return qurt_thread_get_id() != thread_; } |
94 | void join() { |
95 | int status; |
96 | qurt_thread_join(thread_, &status); |
97 | } |
98 | |
99 | private: |
100 | static void RunFunction(QuRTThread* qrt_thread) { |
101 | qrt_thread->worker_callback_(); |
102 | qurt_thread_exit(QURT_EOK); |
103 | } |
104 | qurt_thread_t thread_; |
105 | Callback worker_callback_; |
106 | void* stack_ = nullptr; |
107 | }; |
108 | #endif // __hexagon__ |
109 | thread_local int max_concurrency = 0; |
110 | class ThreadGroup::Impl { |
111 | public: |
112 | Impl(int num_workers, std::function<void(int)> worker_callback, bool exclude_worker0) |
113 | : num_workers_(num_workers) { |
114 | ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads." ; |
115 | for (int i = exclude_worker0; i < num_workers_; ++i) { |
116 | threads_.emplace_back([worker_callback, i] { worker_callback(i); }); |
117 | } |
118 | InitSortedOrder(); |
119 | } |
120 | ~Impl() { Join(); } |
121 | |
122 | void Join() { |
123 | for (auto& t : threads_) { |
124 | if (t.joinable()) t.join(); |
125 | } |
126 | } |
127 | |
128 | int Configure(AffinityMode mode, int nthreads, bool exclude_worker0, |
129 | std::vector<unsigned int> cpus) { |
130 | int num_workers_used = 0; |
131 | switch (mode) { |
132 | case kLittle: |
133 | num_workers_used = little_count_; |
134 | break; |
135 | case kBig: |
136 | num_workers_used = big_count_; |
137 | break; |
138 | case kSpecifyOneCorePerThread: |
139 | case kSpecifyThreadShareAllCore: |
140 | num_workers_used = cpus.size(); |
141 | sorted_order_ = cpus; |
142 | break; |
143 | default: |
144 | // use default |
145 | num_workers_used = threading::MaxConcurrency(); |
146 | } |
147 | // if a specific number was given, use that |
148 | if (nthreads) { |
149 | num_workers_used = nthreads; |
150 | } |
151 | // if MaxConcurrency restricted the number of workers (e.g., due to |
152 | // hyperthreading), respect the restriction. On CPUs with N logical cores |
153 | // and N/2 physical cores this will set affinity to the first N/2 logical |
154 | // ones. |
155 | num_workers_used = std::min(num_workers_, num_workers_used); |
156 | SetAffinity(exclude_worker0, mode); |
157 | return num_workers_used; |
158 | } |
159 | |
160 | private: |
161 | void SetThreadAffinity(std::thread::native_handle_type thread, |
162 | const std::vector<unsigned int>& ids) { |
163 | #if defined(__linux__) || defined(__ANDROID__) |
164 | if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { |
165 | thread = pthread_self(); |
166 | } |
167 | cpu_set_t cpuset; |
168 | CPU_ZERO(&cpuset); |
169 | for (auto id : ids) { |
170 | CPU_SET(id, &cpuset); |
171 | } |
172 | #if defined(__ANDROID__) |
173 | #if __ANDROID_API__ >= 21 |
174 | pid_t tid = pthread_gettid_np(thread); |
175 | #else |
176 | typedef struct { |
177 | void* next; |
178 | void* pred; |
179 | pid_t tid; |
180 | } pthread_internal; |
181 | pid_t tid = reinterpret_cast<pthread_internal*>(thread)->tid; |
182 | #endif |
183 | if (sched_setaffinity(tid, sizeof(cpu_set_t), &cpuset) != 0) { |
184 | LOG(WARNING) << "sched_setaffinity failed" ; |
185 | } |
186 | #else |
187 | pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); |
188 | #endif |
189 | #endif |
190 | } |
191 | |
192 | // bind worker threads to disjoint cores |
193 | // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, |
194 | // the main thread is bound to core 0. |
195 | void SetAffinity(bool exclude_worker0, AffinityMode mode) { |
196 | #ifndef __hexagon__ |
197 | const char* val = getenv("TVM_BIND_THREADS" ); |
198 | if (val != nullptr && atoi(val) != 1) { |
199 | return; |
200 | } |
201 | // Do not set affinity if there are more workers than found cores and mode is not kSpecify*. |
202 | if (sorted_order_.size() < static_cast<unsigned int>(num_workers_)) { |
203 | switch (mode) { |
204 | // When the mode is kSpecifyOneCorePerThread or kSpecifyThreadShareAllCore, we should |
205 | // let the threads share all the cpu cores. |
206 | case kSpecifyOneCorePerThread: |
207 | case kSpecifyThreadShareAllCore: |
208 | for (unsigned i = 0; i < threads_.size(); ++i) { |
209 | SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); |
210 | } |
211 | if (exclude_worker0) { // main thread run task |
212 | SetMainThreadFullCpuAffinity(mode); |
213 | } |
214 | break; |
215 | case kLittle: |
216 | case kBig: |
217 | default: |
218 | LOG(WARNING) << "The thread affinity cannot be set when the number of workers" |
219 | << "is larger than the number of available cores in the system." ; |
220 | break; |
221 | } |
222 | } else { |
223 | ICHECK_GE(sorted_order_.size(), num_workers_); |
224 | switch (mode) { |
225 | case kSpecifyThreadShareAllCore: |
226 | for (unsigned i = 0; i < threads_.size(); ++i) { |
227 | SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); |
228 | } |
229 | break; |
230 | case kLittle: |
231 | case kBig: |
232 | case kSpecifyOneCorePerThread: |
233 | for (unsigned i = 0; i < threads_.size(); ++i) { |
234 | bool reverse = mode == kLittle; |
235 | unsigned core_id; |
236 | if (reverse) { |
237 | core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; |
238 | } else { |
239 | core_id = sorted_order_[i + exclude_worker0]; |
240 | } |
241 | SetThreadAffinity(threads_[i].native_handle(), {core_id}); |
242 | } |
243 | break; |
244 | } |
245 | if (exclude_worker0) { // main thread run task |
246 | // Main thread will have free migration on needed cores. |
247 | // Typically, the OS will schedule the main thread to run at core 0, |
248 | // which is idle, when other workers are running. |
249 | // See the comment inside SetMainThreadFullCpuAffinity function to get more detail. |
250 | SetMainThreadFullCpuAffinity(mode); |
251 | } |
252 | } |
253 | #endif // __hexagon__ |
254 | } |
255 | |
256 | void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { |
257 | // For example, we have 2xA72 + 4xA53 (id is 0 - 5, 4, 5 is A72 big core) |
258 | // And we use config_threadpool API to set we will only use 4xA53. |
259 | // The sorted_order will be [4, 5, 0, 1, 2, 3]. |
260 | // When to call this API, we have spawn threads on little cores for other workers |
261 | // in SetAffinity function. And for tvm main thread, it should also run on little cores, |
262 | // not big cores (4, 5). |
263 | |
264 | // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, |
265 | // our implementation will use kBig mode by default and will let main thread |
266 | // run on intended cores. |
267 | #ifndef __hexagon__ |
268 | std::vector<unsigned> ids; |
269 | switch (mode) { |
270 | case kSpecifyOneCorePerThread: |
271 | case kSpecifyThreadShareAllCore: |
272 | for (size_t i = 0; i < sorted_order_.size(); ++i) { |
273 | ids.push_back(sorted_order_[i]); |
274 | } |
275 | break; |
276 | case kLittle: |
277 | for (int i = 0; i < little_count_; ++i) { |
278 | ids.push_back(sorted_order_[sorted_order_.size() - i - 1]); |
279 | } |
280 | break; |
281 | case kBig: |
282 | int num_cpu_workers = std::min(MaxConcurrency(), big_count_); |
283 | for (int i = 0; i < num_cpu_workers; ++i) { |
284 | ids.push_back(sorted_order_[i]); |
285 | } |
286 | break; |
287 | } |
288 | SetThreadAffinity(thread, ids); |
289 | #endif // __hexagon__ |
290 | } |
291 | |
292 | void SetMainThreadFullCpuAffinity(AffinityMode mode) { |
293 | SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode); |
294 | } |
295 | |
296 | void InitSortedOrder() { |
297 | unsigned int threads = std::thread::hardware_concurrency(); |
298 | #if defined(__hexagon__) |
299 | // With unsigned PDs, getting the number of available hardware threads |
300 | // is not supported in earlier versions of QuRT. In such cases assume 4. |
301 | if (threads == 0) threads = 4; |
302 | #endif |
303 | std::vector<std::pair<unsigned int, int64_t>> max_freqs; |
304 | |
305 | for (unsigned int i = 0; i < threads; ++i) { |
306 | int64_t cur_freq = 0; |
307 | #if defined(__linux__) || defined(__ANDROID__) |
308 | std::ostringstream filepath; |
309 | // according to https://www.kernel.org/doc/Documentation/cpu-freq/user-guide.txt |
310 | // it's better to use cpuinfo_max_freq instead of scaling_max_freq for our |
311 | // purposes since scaling values can be changed dynamically according "policy limits" |
312 | // while we are looking for persistent definition of cores |
313 | filepath << "/sys/devices/system/cpu/cpu" << i << "/cpufreq/cpuinfo_max_freq" ; |
314 | std::ifstream ifs(filepath.str()); |
315 | if (!ifs.fail()) { |
316 | if (!(ifs >> cur_freq)) { |
317 | cur_freq = -1; |
318 | } |
319 | ifs.close(); |
320 | } |
321 | #endif |
322 | max_freqs.push_back(std::make_pair(i, cur_freq)); |
323 | } |
324 | |
325 | auto fcmpbyfreq = [](const std::pair<unsigned int, int64_t>& a, |
326 | const std::pair<unsigned int, int64_t>& b) { |
327 | return a.second == b.second ? a.first < b.first : a.second > b.second; |
328 | }; |
329 | std::sort(max_freqs.begin(), max_freqs.end(), fcmpbyfreq); |
330 | int64_t big_freq = max_freqs.begin()->second; |
331 | int64_t little_freq = max_freqs.rbegin()->second; |
332 | for (auto it = max_freqs.begin(); it != max_freqs.end(); it++) { |
333 | sorted_order_.push_back(it->first); |
334 | if (big_freq == it->second) { |
335 | big_count_++; |
336 | } |
337 | if (big_freq != little_freq && little_freq == it->second) { |
338 | little_count_++; |
339 | } |
340 | } |
341 | if (big_count_ + little_count_ != static_cast<int>(sorted_order_.size())) { |
342 | big_count_ = static_cast<int>(sorted_order_.size()) - little_count_; |
343 | LOG(WARNING) << "more than two frequencies detected! Forced big_count_ to " << big_count_; |
344 | } |
345 | } |
346 | |
347 | int num_workers_; |
348 | #if defined(__hexagon__) |
349 | std::vector<QuRTThread> threads_; |
350 | #else |
351 | std::vector<std::thread> threads_; |
352 | #endif |
353 | std::vector<unsigned int> sorted_order_; |
354 | int big_count_ = 0; |
355 | int little_count_ = 0; |
356 | }; |
357 | |
358 | ThreadGroup::ThreadGroup(int num_workers, std::function<void(int)> worker_callback, |
359 | bool exclude_worker0) |
360 | : impl_(new ThreadGroup::Impl(num_workers, worker_callback, exclude_worker0)) {} |
361 | ThreadGroup::~ThreadGroup() { delete impl_; } |
362 | void ThreadGroup::Join() { impl_->Join(); } |
363 | |
364 | int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0, |
365 | std::vector<unsigned int> cpus) { |
366 | return impl_->Configure(mode, nthreads, exclude_worker0, cpus); |
367 | } |
368 | |
369 | void Yield() { |
370 | #ifdef __hexagon__ |
371 | // QuRT doesn't have a yield API, so instead we sleep for the minimum amount |
372 | // of time to let the OS schedule another thread. std::this_thread::yield() |
373 | // compiles down to an empty function. |
374 | qurt_sleep(1); |
375 | #else |
376 | std::this_thread::yield(); |
377 | #endif |
378 | } |
379 | |
380 | /*! |
381 | * \brief Set the maximum number of available cores. |
382 | */ |
383 | void SetMaxConcurrency(int value) { |
384 | if (value < 0) { |
385 | LOG(WARNING) << "The value of maximum concurrency '" << value << "' can not be negative " |
386 | << "the setting of maximum concurrency is not success." ; |
387 | return; |
388 | } |
389 | max_concurrency = value; |
390 | } |
391 | int MaxConcurrency() { |
392 | int max_concurrency = 1; |
393 | if (tvm::runtime::threading::max_concurrency != 0) { |
394 | max_concurrency = tvm::runtime::threading::max_concurrency; |
395 | } else { |
396 | const char* val = getenv("TVM_NUM_THREADS" ); |
397 | if (val == nullptr) { |
398 | val = getenv("OMP_NUM_THREADS" ); |
399 | } |
400 | if (val != nullptr) { |
401 | max_concurrency = atoi(val); |
402 | } else { |
403 | max_concurrency = std::thread::hardware_concurrency(); |
404 | #if defined(_M_X64) || defined(__x86_64__) |
405 | max_concurrency /= 2; // ignore hyper-threading |
406 | #elif defined(__hexagon__) |
407 | // Ideally max_concurrency is set to the total count of 128B |
408 | // HVX units available. This prevenets threads unable to lock |
409 | // an HVX unit from scheduling work on the Scalar cores instead |
410 | // of HVX. |
411 | int num_hvx128_contexts = (qurt_hvx_get_units() >> 8) & 0xFF; |
412 | // With unsigned PDs, getting the number of available hardware threads |
413 | // is not supported in earlier versions of QuRT. In such cases assume |
414 | // the number of HVX units available. If running on simulator, set |
415 | // max_concurrency to 1. |
416 | if (max_concurrency == 0) { |
417 | if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7" )) { |
418 | max_concurrency = 1; |
419 | } else { |
420 | max_concurrency = num_hvx128_contexts; |
421 | } |
422 | } else { |
423 | // If the hardware_concurrency has already set the max_concurrency to |
424 | // a non-zero value then make sure it is not greater than the number |
425 | // of HVX units available. |
426 | max_concurrency = std::min(num_hvx128_contexts, max_concurrency); |
427 | } |
428 | #endif |
429 | } |
430 | } |
431 | return std::max(max_concurrency, 1); |
432 | } |
433 | |
434 | } // namespace threading |
435 | } // namespace runtime |
436 | } // namespace tvm |
437 | |