1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/*!
21 * \file threading_backend.cc
22 * \brief Native threading backend
23 */
24#include <tvm/runtime/logging.h>
25#include <tvm/runtime/threading_backend.h>
26
27#if defined(__linux__) || defined(__ANDROID__)
28#if __ANDROID_API__ >= 21
29#include <pthread.h>
30#endif
31#include <fstream>
32#include <sstream>
33#else
34#endif
35#if defined(__linux__)
36#include <sched.h>
37#endif
38#if defined(__hexagon__)
39extern "C" {
40#include <qurt_hvx.h>
41}
42#include <dlfcn.h>
43#include <qurt.h>
44#include <stdlib.h>
45#define HEXAGON_STACK_SIZE 65536
46#define HEXAGON_STACK_ALIGNMENT 32
47#endif
48#include <algorithm>
49#include <thread>
50#define CURRENT_THREAD_HANDLE (static_cast<std::thread::native_handle_type>(0))
51namespace tvm {
52namespace runtime {
53namespace threading {
54#ifdef __hexagon__
55// pthreads are broken on older versions of qurt, so
56// we need to use native APIs instead of std::threads
57class QuRTThread {
58 typedef std::function<void()> Callback;
59
60 public:
61 explicit QuRTThread(Callback worker_callback) : worker_callback_(worker_callback) {
62 static int id = 1;
63 qurt_thread_attr_t attr;
64 char name[32];
65 int ret = posix_memalign(&stack_, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE);
66 CHECK_EQ(ret, 0);
67 // When a std::function<> is cast to bool,
68 // it indicates whether it stores a callable target
69 CHECK_EQ((bool)worker_callback_, true);
70 qurt_thread_attr_init(&attr);
71 qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE);
72 qurt_thread_attr_set_stack_addr(&attr, stack_);
73 snprintf(name, sizeof(name), "worker %d", id++);
74 qurt_thread_attr_set_name(&attr, name);
75 ret = qurt_thread_create(&thread_, &attr, (void (*)(void*))RunFunction, this);
76 CHECK_EQ(ret, QURT_EOK);
77 }
78 QuRTThread(QuRTThread&& other)
79 : thread_(other.thread_),
80 worker_callback_(std::move(other.worker_callback_)),
81 stack_(other.stack_) {
82 other.thread_ = 0;
83 other.stack_ = nullptr;
84 }
85 ~QuRTThread() {
86 if (thread_) {
87 join();
88 }
89 if (stack_) {
90 free(stack_);
91 }
92 }
93 bool joinable() const { return qurt_thread_get_id() != thread_; }
94 void join() {
95 int status;
96 qurt_thread_join(thread_, &status);
97 }
98
99 private:
100 static void RunFunction(QuRTThread* qrt_thread) {
101 qrt_thread->worker_callback_();
102 qurt_thread_exit(QURT_EOK);
103 }
104 qurt_thread_t thread_;
105 Callback worker_callback_;
106 void* stack_ = nullptr;
107};
108#endif // __hexagon__
109thread_local int max_concurrency = 0;
110class ThreadGroup::Impl {
111 public:
112 Impl(int num_workers, std::function<void(int)> worker_callback, bool exclude_worker0)
113 : num_workers_(num_workers) {
114 ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads.";
115 for (int i = exclude_worker0; i < num_workers_; ++i) {
116 threads_.emplace_back([worker_callback, i] { worker_callback(i); });
117 }
118 InitSortedOrder();
119 }
120 ~Impl() { Join(); }
121
122 void Join() {
123 for (auto& t : threads_) {
124 if (t.joinable()) t.join();
125 }
126 }
127
128 int Configure(AffinityMode mode, int nthreads, bool exclude_worker0,
129 std::vector<unsigned int> cpus) {
130 int num_workers_used = 0;
131 switch (mode) {
132 case kLittle:
133 num_workers_used = little_count_;
134 break;
135 case kBig:
136 num_workers_used = big_count_;
137 break;
138 case kSpecifyOneCorePerThread:
139 case kSpecifyThreadShareAllCore:
140 num_workers_used = cpus.size();
141 sorted_order_ = cpus;
142 break;
143 default:
144 // use default
145 num_workers_used = threading::MaxConcurrency();
146 }
147 // if a specific number was given, use that
148 if (nthreads) {
149 num_workers_used = nthreads;
150 }
151 // if MaxConcurrency restricted the number of workers (e.g., due to
152 // hyperthreading), respect the restriction. On CPUs with N logical cores
153 // and N/2 physical cores this will set affinity to the first N/2 logical
154 // ones.
155 num_workers_used = std::min(num_workers_, num_workers_used);
156 SetAffinity(exclude_worker0, mode);
157 return num_workers_used;
158 }
159
160 private:
161 void SetThreadAffinity(std::thread::native_handle_type thread,
162 const std::vector<unsigned int>& ids) {
163#if defined(__linux__) || defined(__ANDROID__)
164 if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) {
165 thread = pthread_self();
166 }
167 cpu_set_t cpuset;
168 CPU_ZERO(&cpuset);
169 for (auto id : ids) {
170 CPU_SET(id, &cpuset);
171 }
172#if defined(__ANDROID__)
173#if __ANDROID_API__ >= 21
174 pid_t tid = pthread_gettid_np(thread);
175#else
176 typedef struct {
177 void* next;
178 void* pred;
179 pid_t tid;
180 } pthread_internal;
181 pid_t tid = reinterpret_cast<pthread_internal*>(thread)->tid;
182#endif
183 if (sched_setaffinity(tid, sizeof(cpu_set_t), &cpuset) != 0) {
184 LOG(WARNING) << "sched_setaffinity failed";
185 }
186#else
187 pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
188#endif
189#endif
190 }
191
192 // bind worker threads to disjoint cores
193 // if worker 0 is offloaded to main, i.e. exclude_worker0 is true,
194 // the main thread is bound to core 0.
195 void SetAffinity(bool exclude_worker0, AffinityMode mode) {
196#ifndef __hexagon__
197 const char* val = getenv("TVM_BIND_THREADS");
198 if (val != nullptr && atoi(val) != 1) {
199 return;
200 }
201 // Do not set affinity if there are more workers than found cores and mode is not kSpecify*.
202 if (sorted_order_.size() < static_cast<unsigned int>(num_workers_)) {
203 switch (mode) {
204 // When the mode is kSpecifyOneCorePerThread or kSpecifyThreadShareAllCore, we should
205 // let the threads share all the cpu cores.
206 case kSpecifyOneCorePerThread:
207 case kSpecifyThreadShareAllCore:
208 for (unsigned i = 0; i < threads_.size(); ++i) {
209 SetThreadFullCpuAffinity(threads_[i].native_handle(), mode);
210 }
211 if (exclude_worker0) { // main thread run task
212 SetMainThreadFullCpuAffinity(mode);
213 }
214 break;
215 case kLittle:
216 case kBig:
217 default:
218 LOG(WARNING) << "The thread affinity cannot be set when the number of workers"
219 << "is larger than the number of available cores in the system.";
220 break;
221 }
222 } else {
223 ICHECK_GE(sorted_order_.size(), num_workers_);
224 switch (mode) {
225 case kSpecifyThreadShareAllCore:
226 for (unsigned i = 0; i < threads_.size(); ++i) {
227 SetThreadFullCpuAffinity(threads_[i].native_handle(), mode);
228 }
229 break;
230 case kLittle:
231 case kBig:
232 case kSpecifyOneCorePerThread:
233 for (unsigned i = 0; i < threads_.size(); ++i) {
234 bool reverse = mode == kLittle;
235 unsigned core_id;
236 if (reverse) {
237 core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1];
238 } else {
239 core_id = sorted_order_[i + exclude_worker0];
240 }
241 SetThreadAffinity(threads_[i].native_handle(), {core_id});
242 }
243 break;
244 }
245 if (exclude_worker0) { // main thread run task
246 // Main thread will have free migration on needed cores.
247 // Typically, the OS will schedule the main thread to run at core 0,
248 // which is idle, when other workers are running.
249 // See the comment inside SetMainThreadFullCpuAffinity function to get more detail.
250 SetMainThreadFullCpuAffinity(mode);
251 }
252 }
253#endif // __hexagon__
254 }
255
256 void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) {
257 // For example, we have 2xA72 + 4xA53 (id is 0 - 5, 4, 5 is A72 big core)
258 // And we use config_threadpool API to set we will only use 4xA53.
259 // The sorted_order will be [4, 5, 0, 1, 2, 3].
260 // When to call this API, we have spawn threads on little cores for other workers
261 // in SetAffinity function. And for tvm main thread, it should also run on little cores,
262 // not big cores (4, 5).
263
264 // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE,
265 // our implementation will use kBig mode by default and will let main thread
266 // run on intended cores.
267#ifndef __hexagon__
268 std::vector<unsigned> ids;
269 switch (mode) {
270 case kSpecifyOneCorePerThread:
271 case kSpecifyThreadShareAllCore:
272 for (size_t i = 0; i < sorted_order_.size(); ++i) {
273 ids.push_back(sorted_order_[i]);
274 }
275 break;
276 case kLittle:
277 for (int i = 0; i < little_count_; ++i) {
278 ids.push_back(sorted_order_[sorted_order_.size() - i - 1]);
279 }
280 break;
281 case kBig:
282 int num_cpu_workers = std::min(MaxConcurrency(), big_count_);
283 for (int i = 0; i < num_cpu_workers; ++i) {
284 ids.push_back(sorted_order_[i]);
285 }
286 break;
287 }
288 SetThreadAffinity(thread, ids);
289#endif // __hexagon__
290 }
291
292 void SetMainThreadFullCpuAffinity(AffinityMode mode) {
293 SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode);
294 }
295
296 void InitSortedOrder() {
297 unsigned int threads = std::thread::hardware_concurrency();
298#if defined(__hexagon__)
299 // With unsigned PDs, getting the number of available hardware threads
300 // is not supported in earlier versions of QuRT. In such cases assume 4.
301 if (threads == 0) threads = 4;
302#endif
303 std::vector<std::pair<unsigned int, int64_t>> max_freqs;
304
305 for (unsigned int i = 0; i < threads; ++i) {
306 int64_t cur_freq = 0;
307#if defined(__linux__) || defined(__ANDROID__)
308 std::ostringstream filepath;
309 // according to https://www.kernel.org/doc/Documentation/cpu-freq/user-guide.txt
310 // it's better to use cpuinfo_max_freq instead of scaling_max_freq for our
311 // purposes since scaling values can be changed dynamically according "policy limits"
312 // while we are looking for persistent definition of cores
313 filepath << "/sys/devices/system/cpu/cpu" << i << "/cpufreq/cpuinfo_max_freq";
314 std::ifstream ifs(filepath.str());
315 if (!ifs.fail()) {
316 if (!(ifs >> cur_freq)) {
317 cur_freq = -1;
318 }
319 ifs.close();
320 }
321#endif
322 max_freqs.push_back(std::make_pair(i, cur_freq));
323 }
324
325 auto fcmpbyfreq = [](const std::pair<unsigned int, int64_t>& a,
326 const std::pair<unsigned int, int64_t>& b) {
327 return a.second == b.second ? a.first < b.first : a.second > b.second;
328 };
329 std::sort(max_freqs.begin(), max_freqs.end(), fcmpbyfreq);
330 int64_t big_freq = max_freqs.begin()->second;
331 int64_t little_freq = max_freqs.rbegin()->second;
332 for (auto it = max_freqs.begin(); it != max_freqs.end(); it++) {
333 sorted_order_.push_back(it->first);
334 if (big_freq == it->second) {
335 big_count_++;
336 }
337 if (big_freq != little_freq && little_freq == it->second) {
338 little_count_++;
339 }
340 }
341 if (big_count_ + little_count_ != static_cast<int>(sorted_order_.size())) {
342 big_count_ = static_cast<int>(sorted_order_.size()) - little_count_;
343 LOG(WARNING) << "more than two frequencies detected! Forced big_count_ to " << big_count_;
344 }
345 }
346
347 int num_workers_;
348#if defined(__hexagon__)
349 std::vector<QuRTThread> threads_;
350#else
351 std::vector<std::thread> threads_;
352#endif
353 std::vector<unsigned int> sorted_order_;
354 int big_count_ = 0;
355 int little_count_ = 0;
356};
357
358ThreadGroup::ThreadGroup(int num_workers, std::function<void(int)> worker_callback,
359 bool exclude_worker0)
360 : impl_(new ThreadGroup::Impl(num_workers, worker_callback, exclude_worker0)) {}
361ThreadGroup::~ThreadGroup() { delete impl_; }
362void ThreadGroup::Join() { impl_->Join(); }
363
364int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0,
365 std::vector<unsigned int> cpus) {
366 return impl_->Configure(mode, nthreads, exclude_worker0, cpus);
367}
368
369void Yield() {
370#ifdef __hexagon__
371 // QuRT doesn't have a yield API, so instead we sleep for the minimum amount
372 // of time to let the OS schedule another thread. std::this_thread::yield()
373 // compiles down to an empty function.
374 qurt_sleep(1);
375#else
376 std::this_thread::yield();
377#endif
378}
379
380/*!
381 * \brief Set the maximum number of available cores.
382 */
383void SetMaxConcurrency(int value) {
384 if (value < 0) {
385 LOG(WARNING) << "The value of maximum concurrency '" << value << "' can not be negative "
386 << "the setting of maximum concurrency is not success.";
387 return;
388 }
389 max_concurrency = value;
390}
391int MaxConcurrency() {
392 int max_concurrency = 1;
393 if (tvm::runtime::threading::max_concurrency != 0) {
394 max_concurrency = tvm::runtime::threading::max_concurrency;
395 } else {
396 const char* val = getenv("TVM_NUM_THREADS");
397 if (val == nullptr) {
398 val = getenv("OMP_NUM_THREADS");
399 }
400 if (val != nullptr) {
401 max_concurrency = atoi(val);
402 } else {
403 max_concurrency = std::thread::hardware_concurrency();
404#if defined(_M_X64) || defined(__x86_64__)
405 max_concurrency /= 2; // ignore hyper-threading
406#elif defined(__hexagon__)
407 // Ideally max_concurrency is set to the total count of 128B
408 // HVX units available. This prevenets threads unable to lock
409 // an HVX unit from scheduling work on the Scalar cores instead
410 // of HVX.
411 int num_hvx128_contexts = (qurt_hvx_get_units() >> 8) & 0xFF;
412 // With unsigned PDs, getting the number of available hardware threads
413 // is not supported in earlier versions of QuRT. In such cases assume
414 // the number of HVX units available. If running on simulator, set
415 // max_concurrency to 1.
416 if (max_concurrency == 0) {
417 if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) {
418 max_concurrency = 1;
419 } else {
420 max_concurrency = num_hvx128_contexts;
421 }
422 } else {
423 // If the hardware_concurrency has already set the max_concurrency to
424 // a non-zero value then make sure it is not greater than the number
425 // of HVX units available.
426 max_concurrency = std::min(num_hvx128_contexts, max_concurrency);
427 }
428#endif
429 }
430 }
431 return std::max(max_concurrency, 1);
432}
433
434} // namespace threading
435} // namespace runtime
436} // namespace tvm
437