1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/*!
21 * \file thread_pool.cc
22 * \brief Threadpool for multi-threading runtime.
23 */
24#include <dmlc/thread_local.h>
25#include <tvm/runtime/c_backend_api.h>
26#include <tvm/runtime/c_runtime_api.h>
27#include <tvm/runtime/container/array.h>
28#include <tvm/runtime/logging.h>
29#include <tvm/runtime/packed_func.h>
30#include <tvm/runtime/registry.h>
31#include <tvm/runtime/threading_backend.h>
32#if TVM_THREADPOOL_USE_OPENMP
33#include <omp.h>
34#endif
35#include <algorithm>
36#include <atomic>
37#include <condition_variable>
38#include <cstring>
39#include <memory>
40#include <mutex>
41#include <sstream>
42#include <string>
43#include <thread>
44#include <vector>
45
46#include "../support/utils.h"
47const constexpr int kL1CacheBytes = 64;
48
49namespace tvm {
50namespace runtime {
51namespace {
52using support::IsNumber;
53constexpr uint32_t kDefaultSpinCount = 300000;
54
55uint32_t GetSpinCount() {
56 const char* val = getenv("TVM_THREAD_POOL_SPIN_COUNT");
57 if (!val) {
58 return kDefaultSpinCount;
59 }
60 return atoi(val);
61}
62
63} // namespace
64
65// stride in the page, fit to cache line.
66constexpr int kSyncStride = 64 / sizeof(std::atomic<int>);
67
68/*!
69 * \brief Thread local main environment.
70 */
71class ParallelLauncher {
72 public:
73 // Reset the task request.
74 void Init(FTVMParallelLambda flambda, void* cdata, int num_task, bool need_sync) {
75 num_pending_.store(num_task);
76 this->cdata = cdata;
77 this->flambda = flambda;
78 this->env.num_task = num_task;
79 has_error_.store(false);
80 // reshape
81 if (static_cast<size_t>(num_task) > par_errors_.size()) {
82 par_errors_.resize(num_task + 1);
83 if (need_sync) {
84 delete[] sync_counter_;
85 sync_counter_ = new std::atomic<int>[num_task * kSyncStride];
86 }
87 }
88 if (need_sync) {
89 for (int i = 0; i < num_task; ++i) {
90 sync_counter_[i * kSyncStride].store(0, std::memory_order_relaxed);
91 }
92 this->env.sync_handle = sync_counter_;
93 } else {
94 this->env.sync_handle = nullptr;
95 }
96 }
97 ~ParallelLauncher() { delete[] sync_counter_; }
98 // Wait n jobs to finish
99 int WaitForJobs() {
100 while (num_pending_.load() != 0) {
101 tvm::runtime::threading::Yield();
102 }
103 if (!has_error_.load()) return 0;
104 std::ostringstream os;
105 for (size_t i = 0; i < par_errors_.size(); ++i) {
106 if (par_errors_[i].length() != 0) {
107 os << "Task " << i << " error: " << par_errors_[i] << '\n';
108 par_errors_[i].clear();
109 }
110 }
111 TVMAPISetLastError(os.str().c_str());
112 return -1;
113 }
114 // Signal that one job has finished.
115 void SignalJobError(int task_id) {
116 num_pending_.fetch_sub(1);
117 par_errors_[task_id] = TVMGetLastError();
118 has_error_.store(true);
119 }
120 // Signal that one job has finished.
121 void SignalJobFinish() { num_pending_.fetch_sub(1); }
122 // Get thread local version of the store.
123 static ParallelLauncher* ThreadLocal() { return dmlc::ThreadLocalStore<ParallelLauncher>::Get(); }
124 // The parallel lambda
125 FTVMParallelLambda flambda;
126 // The closure data
127 void* cdata;
128 // Local env
129 TVMParallelGroupEnv env;
130 // Whether this thread is worker of the pool.
131 // used to prevent recursive launch.
132 bool is_worker{false};
133
134 private:
135 // The pending jobs.
136 std::atomic<int32_t> num_pending_;
137 // Whether error has been countered.
138 std::atomic<bool> has_error_;
139 // The counter page.
140 std::atomic<int32_t>* sync_counter_{nullptr};
141 // The error message
142 std::vector<std::string> par_errors_;
143};
144
145/*! \brief Lock-free single-producer-single-consumer queue for each thread */
146class SpscTaskQueue {
147 public:
148 /*! \brief The task entry */
149 struct Task {
150 ParallelLauncher* launcher;
151 int32_t task_id;
152 };
153
154 SpscTaskQueue() : buffer_(new Task[kRingSize]), head_(0), tail_(0) {}
155
156 ~SpscTaskQueue() { delete[] buffer_; }
157
158 /*!
159 * \brief Push a task into the queue and notify the comsumer if it is on wait.
160 * \param input The task to be dequeued.
161 */
162 void Push(const Task& input) {
163 while (!Enqueue(input)) {
164 tvm::runtime::threading::Yield();
165 }
166 if (pending_.fetch_add(1) == -1) {
167 std::unique_lock<std::mutex> lock(mutex_);
168 cv_.notify_one();
169 }
170 }
171
172 /*!
173 * \brief Pop a task out of the queue and condition wait if no tasks.
174 * \param output The pointer to the task to be dequeued.
175 * \param spin_count The number of iterations to spin before sleep.
176 * \return Whether pop is successful (true) or we need to exit now (false).
177 */
178 bool Pop(Task* output, uint32_t spin_count) {
179 // Busy wait a bit when the queue is empty.
180 // If a new task comes to the queue quickly, this wait avoid the worker from sleeping.
181 // The default spin count is set by following the typical omp convention
182 for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
183 tvm::runtime::threading::Yield();
184 }
185 if (pending_.fetch_sub(1) == 0) {
186 std::unique_lock<std::mutex> lock(mutex_);
187 cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); });
188 }
189 if (exit_now_.load(std::memory_order_relaxed)) {
190 return false;
191 }
192 const uint32_t head = head_.load(std::memory_order_relaxed);
193 // sanity check if the queue is empty
194 ICHECK(tail_.load(std::memory_order_acquire) != head);
195 *output = buffer_[head];
196 head_.store((head + 1) % kRingSize, std::memory_order_release);
197 return true;
198 }
199
200 /*!
201 * \brief Signal to terminate the worker.
202 */
203 void SignalForKill() {
204 std::lock_guard<std::mutex> lock(mutex_);
205 exit_now_.store(true);
206 cv_.notify_all();
207 }
208
209 protected:
210 /*!
211 * \brief Lock-free enqueue.
212 * \param input The task to be enqueued.
213 * \return Whether the task is enqueued.
214 */
215 bool Enqueue(const Task& input) {
216 if (exit_now_.load(std::memory_order_relaxed)) return false;
217
218 const uint32_t tail = tail_.load(std::memory_order_relaxed);
219
220 if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
221 buffer_[tail] = input;
222 tail_.store((tail + 1) % kRingSize, std::memory_order_release);
223 return true;
224 }
225 return false;
226 }
227
228 // the cache line paddings are used for avoid false sharing between atomic variables
229 typedef char cache_line_pad_t[kL1CacheBytes];
230 cache_line_pad_t pad0_;
231 // size of the queue, the queue can host size_ - 1 items at most
232 // define it as a constant for better compiler optimization
233 static constexpr const int kRingSize = 2;
234 // pointer to access the item
235 Task* const buffer_;
236
237 cache_line_pad_t pad1_;
238 // queue head, where one gets a task from the queue
239 std::atomic<uint32_t> head_;
240
241 cache_line_pad_t pad2_;
242 // queue tail, when one puts a task to the queue
243 std::atomic<uint32_t> tail_;
244
245 cache_line_pad_t pad3_;
246 // pending tasks in the queue
247 std::atomic<int8_t> pending_{0};
248
249 cache_line_pad_t pad4_;
250 // signal for exit now
251 std::atomic<bool> exit_now_{false};
252
253 // internal mutex
254 std::mutex mutex_;
255 // cv for consumer
256 std::condition_variable cv_;
257};
258
259// The thread pool
260class ThreadPool {
261 public:
262 ThreadPool() : num_workers_(tvm::runtime::threading::MaxConcurrency()) {
263 const char* exclude_worker0 = getenv("TVM_EXCLUDE_WORKER0");
264 if (exclude_worker0 && atoi(exclude_worker0) == 0) {
265 exclude_worker0_ = false;
266 }
267 Init();
268 }
269
270 ~ThreadPool() {
271 for (std::unique_ptr<SpscTaskQueue>& q : queues_) {
272 q->SignalForKill();
273 }
274 threads_.reset();
275 }
276
277 void Reset() {
278 for (std::unique_ptr<SpscTaskQueue>& q : queues_) {
279 q->SignalForKill();
280 }
281 // Destroy threads before we destory the shared queue, otherwise we segfault on MacOS
282 threads_.reset();
283 queues_.clear();
284 Init();
285 }
286
287 int Launch(FTVMParallelLambda flambda, void* cdata, int num_task, int need_sync) {
288 ParallelLauncher* launcher = ParallelLauncher::ThreadLocal();
289 ICHECK(!launcher->is_worker)
290 << "Cannot launch parallel job inside worker, consider fuse then parallel";
291 if (num_task == 0) {
292 num_task = num_workers_used_;
293 }
294 if (need_sync != 0) {
295 ICHECK_LE(num_task, num_workers_used_)
296 << "Request parallel sync task larger than number of threads used "
297 << " workers=" << num_workers_used_ << " request=" << num_task;
298 }
299 launcher->Init(flambda, cdata, num_task, need_sync != 0);
300 SpscTaskQueue::Task tsk;
301 tsk.launcher = launcher;
302 // if worker0 is taken by the main, queues_[0] is abandoned
303 for (int i = exclude_worker0_; i < num_task; ++i) {
304 tsk.task_id = i;
305 queues_[i]->Push(tsk);
306 }
307 // use the main thread to run task 0
308 if (exclude_worker0_) {
309 TVMParallelGroupEnv* penv = &(tsk.launcher->env);
310 if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) {
311 tsk.launcher->SignalJobFinish();
312 } else {
313 tsk.launcher->SignalJobError(tsk.task_id);
314 }
315 }
316 int res = launcher->WaitForJobs();
317 return res;
318 }
319
320 static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore<ThreadPool>::Get(); }
321
322 void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads,
323 const std::vector<unsigned int>& cpus) {
324 // this will also reset the affinity of the ThreadGroup
325 // may use less than the MaxConcurrency number of workers
326 num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_, cpus);
327 // if MaxConcurrency restricted the number of workers (e.g., due to
328 // hyperthreading), respect the restriction
329 num_workers_used_ = std::min(num_workers_, num_workers_used_);
330 }
331
332 int32_t NumThreads() const { return num_workers_used_; }
333
334 private:
335 // Shared initialization code
336 void Init() {
337 for (int i = 0; i < num_workers_; ++i) {
338 // The SpscTaskQueue only hosts ONE item at a time
339 queues_.emplace_back(std::make_unique<SpscTaskQueue>());
340 }
341 threads_ = std::make_unique<tvm::runtime::threading::ThreadGroup>(
342 num_workers_, [this](int worker_id) { this->RunWorker(worker_id); },
343 exclude_worker0_ /* include_main_thread */);
344 num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_);
345 }
346
347 // Internal worker function.
348 void RunWorker(int worker_id) {
349 SpscTaskQueue* queue = queues_[worker_id].get();
350 SpscTaskQueue::Task task;
351 ParallelLauncher::ThreadLocal()->is_worker = true;
352 // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on
353 // the global first use of the ThreadPool.
354 // TODO(tulloch): should we make this configurable via standard APIs?
355 static size_t spin_count = GetSpinCount();
356 while (queue->Pop(&task, spin_count)) {
357 ICHECK(task.launcher != nullptr);
358 TVMParallelGroupEnv* penv = &(task.launcher->env);
359 void* cdata = task.launcher->cdata;
360 if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) {
361 task.launcher->SignalJobFinish();
362 } else {
363 task.launcher->SignalJobError(task.task_id);
364 }
365 }
366 }
367 int num_workers_;
368 // number of workers used (can be restricted with affinity pref)
369 int num_workers_used_;
370 // if or not to exclude worker 0 and use main to run task 0
371 bool exclude_worker0_{true};
372 std::vector<std::unique_ptr<SpscTaskQueue>> queues_;
373 std::unique_ptr<tvm::runtime::threading::ThreadGroup> threads_;
374};
375
376/*!
377 * \brief args[0] is the AffinityMode, args[1] is the number of threads.
378 * args2 is a list of CPUs which is used to set the CPU affinity.
379 */
380TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) {
381 threading::ThreadGroup::AffinityMode mode =
382 static_cast<threading::ThreadGroup::AffinityMode>(static_cast<int>(args[0]));
383 int nthreads = args[1];
384 std::vector<unsigned int> cpus;
385 if (args.num_args >= 3) {
386 Array<String> cpu_array = args[2];
387 for (auto cpu : cpu_array) {
388 ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number.";
389 cpus.push_back(std::stoi(cpu));
390 }
391 }
392 threading::Configure(mode, nthreads, cpus);
393});
394
395TVM_REGISTER_GLOBAL("runtime.NumThreads").set_body_typed([]() -> int32_t {
396 return threading::NumThreads();
397});
398
399namespace threading {
400
401#if TVM_THREADPOOL_USE_OPENMP
402/*!
403 * \brief Helper function that allows to pin threads to cores in case of multi instance execution
404 * when we use OpenMP thread pool.
405 *
406 * \param mode Affinity mode (now supports only kSpecifyOneCorePerThread and
407 * kSpecifyThreadShareAllCore).
408 * \param nthreads The number of threads to use (0 = use all).
409 * \param cpus A list of CPU ids to set 'cpu affinity'.
410 *
411 */
412static void ConfigureOMP(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
413 const std::vector<unsigned int>& cpus) {
414#if defined(__linux__) || defined(__ANDROID__)
415 const int num_workers = MaxConcurrency();
416
417 if (mode == ThreadGroup::kSpecifyOneCorePerThread) {
418#pragma omp parallel num_threads(num_workers)
419 {
420 int core_id = cpus[omp_get_thread_num()];
421 cpu_set_t cpuset;
422 CPU_ZERO(&cpuset);
423 CPU_SET(core_id, &cpuset);
424#if defined(__ANDROID__)
425 sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
426#else
427 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
428#endif
429 }
430 } else if (mode == ThreadGroup::kSpecifyThreadShareAllCore) {
431 cpu_set_t cpuset;
432 CPU_ZERO(&cpuset);
433 for (auto id : cpus) {
434 CPU_SET(id, &cpuset);
435 }
436
437#pragma omp parallel num_threads(num_workers)
438 {
439#if defined(__ANDROID__)
440 sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
441#else
442 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
443#endif
444 }
445 }
446#endif
447}
448
449#endif
450
451void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); }
452/*!
453 * \brief configure the CPU id affinity
454 * \param mode The preferred CPU type (1 = big, -1 = little, -2 = kSpecifyOneCorePerThread,
455 * -3 = kSpecifyThreadShareAllCore).
456 * \param nthreads The number of threads to use (0 = use all).
457 * \param cpus cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads.
458 *
459 */
460TVM_DLL void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
461 std::vector<unsigned int> cpus) {
462 tvm::runtime::threading::SetMaxConcurrency(cpus.size());
463#if !TVM_THREADPOOL_USE_OPENMP
464 tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus);
465#else
466 ConfigureOMP(mode, nthreads, cpus);
467#endif
468}
469int32_t NumThreads() { return tvm::runtime::ThreadPool::ThreadLocal()->NumThreads(); }
470} // namespace threading
471} // namespace runtime
472} // namespace tvm
473
474int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_task) {
475 int num_workers = tvm::runtime::threading::MaxConcurrency();
476 if (num_workers == 1) {
477 std::atomic<int32_t> sync_counter{0};
478 TVMParallelGroupEnv env;
479 env.num_task = 1;
480 env.sync_handle = &sync_counter;
481 (*flambda)(0, &env, cdata);
482 return 0;
483 } else {
484#if !TVM_THREADPOOL_USE_OPENMP
485 int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1);
486 return res;
487#else
488 if (num_task == 0) num_task = num_workers;
489 omp_set_num_threads(num_task);
490#pragma omp parallel num_threads(num_task)
491 {
492 TVMParallelGroupEnv env;
493 env.num_task = num_task;
494 (*flambda)(omp_get_thread_num(), &env, cdata);
495 }
496 return 0;
497#endif
498 }
499}
500
501int TVMBackendParallelBarrier(int task_id, TVMParallelGroupEnv* penv) {
502#if TVM_THREADPOOL_USE_OPENMP
503#pragma omp barrier
504#else
505 using tvm::runtime::kSyncStride;
506 int num_task = penv->num_task;
507 std::atomic<int>* sync_counter = reinterpret_cast<std::atomic<int>*>(penv->sync_handle);
508 int old_counter = sync_counter[task_id * kSyncStride].fetch_add(1, std::memory_order_release);
509 for (int i = 0; i < num_task; ++i) {
510 if (i != task_id) {
511 while (sync_counter[i * kSyncStride].load(std::memory_order_relaxed) <= old_counter) {
512 tvm::runtime::threading::Yield();
513 }
514 }
515 }
516 std::atomic_thread_fence(std::memory_order_acquire);
517#endif
518 return 0;
519}
520