1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | |
20 | /*! |
21 | * \file thread_pool.cc |
22 | * \brief Threadpool for multi-threading runtime. |
23 | */ |
24 | #include <dmlc/thread_local.h> |
25 | #include <tvm/runtime/c_backend_api.h> |
26 | #include <tvm/runtime/c_runtime_api.h> |
27 | #include <tvm/runtime/container/array.h> |
28 | #include <tvm/runtime/logging.h> |
29 | #include <tvm/runtime/packed_func.h> |
30 | #include <tvm/runtime/registry.h> |
31 | #include <tvm/runtime/threading_backend.h> |
32 | #if TVM_THREADPOOL_USE_OPENMP |
33 | #include <omp.h> |
34 | #endif |
35 | #include <algorithm> |
36 | #include <atomic> |
37 | #include <condition_variable> |
38 | #include <cstring> |
39 | #include <memory> |
40 | #include <mutex> |
41 | #include <sstream> |
42 | #include <string> |
43 | #include <thread> |
44 | #include <vector> |
45 | |
46 | #include "../support/utils.h" |
47 | const constexpr int kL1CacheBytes = 64; |
48 | |
49 | namespace tvm { |
50 | namespace runtime { |
51 | namespace { |
52 | using support::IsNumber; |
53 | constexpr uint32_t kDefaultSpinCount = 300000; |
54 | |
55 | uint32_t GetSpinCount() { |
56 | const char* val = getenv("TVM_THREAD_POOL_SPIN_COUNT" ); |
57 | if (!val) { |
58 | return kDefaultSpinCount; |
59 | } |
60 | return atoi(val); |
61 | } |
62 | |
63 | } // namespace |
64 | |
65 | // stride in the page, fit to cache line. |
66 | constexpr int kSyncStride = 64 / sizeof(std::atomic<int>); |
67 | |
68 | /*! |
69 | * \brief Thread local main environment. |
70 | */ |
71 | class ParallelLauncher { |
72 | public: |
73 | // Reset the task request. |
74 | void Init(FTVMParallelLambda flambda, void* cdata, int num_task, bool need_sync) { |
75 | num_pending_.store(num_task); |
76 | this->cdata = cdata; |
77 | this->flambda = flambda; |
78 | this->env.num_task = num_task; |
79 | has_error_.store(false); |
80 | // reshape |
81 | if (static_cast<size_t>(num_task) > par_errors_.size()) { |
82 | par_errors_.resize(num_task + 1); |
83 | if (need_sync) { |
84 | delete[] sync_counter_; |
85 | sync_counter_ = new std::atomic<int>[num_task * kSyncStride]; |
86 | } |
87 | } |
88 | if (need_sync) { |
89 | for (int i = 0; i < num_task; ++i) { |
90 | sync_counter_[i * kSyncStride].store(0, std::memory_order_relaxed); |
91 | } |
92 | this->env.sync_handle = sync_counter_; |
93 | } else { |
94 | this->env.sync_handle = nullptr; |
95 | } |
96 | } |
97 | ~ParallelLauncher() { delete[] sync_counter_; } |
98 | // Wait n jobs to finish |
99 | int WaitForJobs() { |
100 | while (num_pending_.load() != 0) { |
101 | tvm::runtime::threading::Yield(); |
102 | } |
103 | if (!has_error_.load()) return 0; |
104 | std::ostringstream os; |
105 | for (size_t i = 0; i < par_errors_.size(); ++i) { |
106 | if (par_errors_[i].length() != 0) { |
107 | os << "Task " << i << " error: " << par_errors_[i] << '\n'; |
108 | par_errors_[i].clear(); |
109 | } |
110 | } |
111 | TVMAPISetLastError(os.str().c_str()); |
112 | return -1; |
113 | } |
114 | // Signal that one job has finished. |
115 | void SignalJobError(int task_id) { |
116 | num_pending_.fetch_sub(1); |
117 | par_errors_[task_id] = TVMGetLastError(); |
118 | has_error_.store(true); |
119 | } |
120 | // Signal that one job has finished. |
121 | void SignalJobFinish() { num_pending_.fetch_sub(1); } |
122 | // Get thread local version of the store. |
123 | static ParallelLauncher* ThreadLocal() { return dmlc::ThreadLocalStore<ParallelLauncher>::Get(); } |
124 | // The parallel lambda |
125 | FTVMParallelLambda flambda; |
126 | // The closure data |
127 | void* cdata; |
128 | // Local env |
129 | TVMParallelGroupEnv env; |
130 | // Whether this thread is worker of the pool. |
131 | // used to prevent recursive launch. |
132 | bool is_worker{false}; |
133 | |
134 | private: |
135 | // The pending jobs. |
136 | std::atomic<int32_t> num_pending_; |
137 | // Whether error has been countered. |
138 | std::atomic<bool> has_error_; |
139 | // The counter page. |
140 | std::atomic<int32_t>* sync_counter_{nullptr}; |
141 | // The error message |
142 | std::vector<std::string> par_errors_; |
143 | }; |
144 | |
145 | /*! \brief Lock-free single-producer-single-consumer queue for each thread */ |
146 | class SpscTaskQueue { |
147 | public: |
148 | /*! \brief The task entry */ |
149 | struct Task { |
150 | ParallelLauncher* launcher; |
151 | int32_t task_id; |
152 | }; |
153 | |
154 | SpscTaskQueue() : buffer_(new Task[kRingSize]), head_(0), tail_(0) {} |
155 | |
156 | ~SpscTaskQueue() { delete[] buffer_; } |
157 | |
158 | /*! |
159 | * \brief Push a task into the queue and notify the comsumer if it is on wait. |
160 | * \param input The task to be dequeued. |
161 | */ |
162 | void Push(const Task& input) { |
163 | while (!Enqueue(input)) { |
164 | tvm::runtime::threading::Yield(); |
165 | } |
166 | if (pending_.fetch_add(1) == -1) { |
167 | std::unique_lock<std::mutex> lock(mutex_); |
168 | cv_.notify_one(); |
169 | } |
170 | } |
171 | |
172 | /*! |
173 | * \brief Pop a task out of the queue and condition wait if no tasks. |
174 | * \param output The pointer to the task to be dequeued. |
175 | * \param spin_count The number of iterations to spin before sleep. |
176 | * \return Whether pop is successful (true) or we need to exit now (false). |
177 | */ |
178 | bool Pop(Task* output, uint32_t spin_count) { |
179 | // Busy wait a bit when the queue is empty. |
180 | // If a new task comes to the queue quickly, this wait avoid the worker from sleeping. |
181 | // The default spin count is set by following the typical omp convention |
182 | for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) { |
183 | tvm::runtime::threading::Yield(); |
184 | } |
185 | if (pending_.fetch_sub(1) == 0) { |
186 | std::unique_lock<std::mutex> lock(mutex_); |
187 | cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); |
188 | } |
189 | if (exit_now_.load(std::memory_order_relaxed)) { |
190 | return false; |
191 | } |
192 | const uint32_t head = head_.load(std::memory_order_relaxed); |
193 | // sanity check if the queue is empty |
194 | ICHECK(tail_.load(std::memory_order_acquire) != head); |
195 | *output = buffer_[head]; |
196 | head_.store((head + 1) % kRingSize, std::memory_order_release); |
197 | return true; |
198 | } |
199 | |
200 | /*! |
201 | * \brief Signal to terminate the worker. |
202 | */ |
203 | void SignalForKill() { |
204 | std::lock_guard<std::mutex> lock(mutex_); |
205 | exit_now_.store(true); |
206 | cv_.notify_all(); |
207 | } |
208 | |
209 | protected: |
210 | /*! |
211 | * \brief Lock-free enqueue. |
212 | * \param input The task to be enqueued. |
213 | * \return Whether the task is enqueued. |
214 | */ |
215 | bool Enqueue(const Task& input) { |
216 | if (exit_now_.load(std::memory_order_relaxed)) return false; |
217 | |
218 | const uint32_t tail = tail_.load(std::memory_order_relaxed); |
219 | |
220 | if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) { |
221 | buffer_[tail] = input; |
222 | tail_.store((tail + 1) % kRingSize, std::memory_order_release); |
223 | return true; |
224 | } |
225 | return false; |
226 | } |
227 | |
228 | // the cache line paddings are used for avoid false sharing between atomic variables |
229 | typedef char cache_line_pad_t[kL1CacheBytes]; |
230 | cache_line_pad_t pad0_; |
231 | // size of the queue, the queue can host size_ - 1 items at most |
232 | // define it as a constant for better compiler optimization |
233 | static constexpr const int kRingSize = 2; |
234 | // pointer to access the item |
235 | Task* const buffer_; |
236 | |
237 | cache_line_pad_t pad1_; |
238 | // queue head, where one gets a task from the queue |
239 | std::atomic<uint32_t> head_; |
240 | |
241 | cache_line_pad_t pad2_; |
242 | // queue tail, when one puts a task to the queue |
243 | std::atomic<uint32_t> tail_; |
244 | |
245 | cache_line_pad_t pad3_; |
246 | // pending tasks in the queue |
247 | std::atomic<int8_t> pending_{0}; |
248 | |
249 | cache_line_pad_t pad4_; |
250 | // signal for exit now |
251 | std::atomic<bool> exit_now_{false}; |
252 | |
253 | // internal mutex |
254 | std::mutex mutex_; |
255 | // cv for consumer |
256 | std::condition_variable cv_; |
257 | }; |
258 | |
259 | // The thread pool |
260 | class ThreadPool { |
261 | public: |
262 | ThreadPool() : num_workers_(tvm::runtime::threading::MaxConcurrency()) { |
263 | const char* exclude_worker0 = getenv("TVM_EXCLUDE_WORKER0" ); |
264 | if (exclude_worker0 && atoi(exclude_worker0) == 0) { |
265 | exclude_worker0_ = false; |
266 | } |
267 | Init(); |
268 | } |
269 | |
270 | ~ThreadPool() { |
271 | for (std::unique_ptr<SpscTaskQueue>& q : queues_) { |
272 | q->SignalForKill(); |
273 | } |
274 | threads_.reset(); |
275 | } |
276 | |
277 | void Reset() { |
278 | for (std::unique_ptr<SpscTaskQueue>& q : queues_) { |
279 | q->SignalForKill(); |
280 | } |
281 | // Destroy threads before we destory the shared queue, otherwise we segfault on MacOS |
282 | threads_.reset(); |
283 | queues_.clear(); |
284 | Init(); |
285 | } |
286 | |
287 | int Launch(FTVMParallelLambda flambda, void* cdata, int num_task, int need_sync) { |
288 | ParallelLauncher* launcher = ParallelLauncher::ThreadLocal(); |
289 | ICHECK(!launcher->is_worker) |
290 | << "Cannot launch parallel job inside worker, consider fuse then parallel" ; |
291 | if (num_task == 0) { |
292 | num_task = num_workers_used_; |
293 | } |
294 | if (need_sync != 0) { |
295 | ICHECK_LE(num_task, num_workers_used_) |
296 | << "Request parallel sync task larger than number of threads used " |
297 | << " workers=" << num_workers_used_ << " request=" << num_task; |
298 | } |
299 | launcher->Init(flambda, cdata, num_task, need_sync != 0); |
300 | SpscTaskQueue::Task tsk; |
301 | tsk.launcher = launcher; |
302 | // if worker0 is taken by the main, queues_[0] is abandoned |
303 | for (int i = exclude_worker0_; i < num_task; ++i) { |
304 | tsk.task_id = i; |
305 | queues_[i]->Push(tsk); |
306 | } |
307 | // use the main thread to run task 0 |
308 | if (exclude_worker0_) { |
309 | TVMParallelGroupEnv* penv = &(tsk.launcher->env); |
310 | if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) { |
311 | tsk.launcher->SignalJobFinish(); |
312 | } else { |
313 | tsk.launcher->SignalJobError(tsk.task_id); |
314 | } |
315 | } |
316 | int res = launcher->WaitForJobs(); |
317 | return res; |
318 | } |
319 | |
320 | static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore<ThreadPool>::Get(); } |
321 | |
322 | void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads, |
323 | const std::vector<unsigned int>& cpus) { |
324 | // this will also reset the affinity of the ThreadGroup |
325 | // may use less than the MaxConcurrency number of workers |
326 | num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_, cpus); |
327 | // if MaxConcurrency restricted the number of workers (e.g., due to |
328 | // hyperthreading), respect the restriction |
329 | num_workers_used_ = std::min(num_workers_, num_workers_used_); |
330 | } |
331 | |
332 | int32_t NumThreads() const { return num_workers_used_; } |
333 | |
334 | private: |
335 | // Shared initialization code |
336 | void Init() { |
337 | for (int i = 0; i < num_workers_; ++i) { |
338 | // The SpscTaskQueue only hosts ONE item at a time |
339 | queues_.emplace_back(std::make_unique<SpscTaskQueue>()); |
340 | } |
341 | threads_ = std::make_unique<tvm::runtime::threading::ThreadGroup>( |
342 | num_workers_, [this](int worker_id) { this->RunWorker(worker_id); }, |
343 | exclude_worker0_ /* include_main_thread */); |
344 | num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_); |
345 | } |
346 | |
347 | // Internal worker function. |
348 | void RunWorker(int worker_id) { |
349 | SpscTaskQueue* queue = queues_[worker_id].get(); |
350 | SpscTaskQueue::Task task; |
351 | ParallelLauncher::ThreadLocal()->is_worker = true; |
352 | // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on |
353 | // the global first use of the ThreadPool. |
354 | // TODO(tulloch): should we make this configurable via standard APIs? |
355 | static size_t spin_count = GetSpinCount(); |
356 | while (queue->Pop(&task, spin_count)) { |
357 | ICHECK(task.launcher != nullptr); |
358 | TVMParallelGroupEnv* penv = &(task.launcher->env); |
359 | void* cdata = task.launcher->cdata; |
360 | if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) { |
361 | task.launcher->SignalJobFinish(); |
362 | } else { |
363 | task.launcher->SignalJobError(task.task_id); |
364 | } |
365 | } |
366 | } |
367 | int num_workers_; |
368 | // number of workers used (can be restricted with affinity pref) |
369 | int num_workers_used_; |
370 | // if or not to exclude worker 0 and use main to run task 0 |
371 | bool exclude_worker0_{true}; |
372 | std::vector<std::unique_ptr<SpscTaskQueue>> queues_; |
373 | std::unique_ptr<tvm::runtime::threading::ThreadGroup> threads_; |
374 | }; |
375 | |
376 | /*! |
377 | * \brief args[0] is the AffinityMode, args[1] is the number of threads. |
378 | * args2 is a list of CPUs which is used to set the CPU affinity. |
379 | */ |
380 | TVM_REGISTER_GLOBAL("runtime.config_threadpool" ).set_body([](TVMArgs args, TVMRetValue* rv) { |
381 | threading::ThreadGroup::AffinityMode mode = |
382 | static_cast<threading::ThreadGroup::AffinityMode>(static_cast<int>(args[0])); |
383 | int nthreads = args[1]; |
384 | std::vector<unsigned int> cpus; |
385 | if (args.num_args >= 3) { |
386 | Array<String> cpu_array = args[2]; |
387 | for (auto cpu : cpu_array) { |
388 | ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number." ; |
389 | cpus.push_back(std::stoi(cpu)); |
390 | } |
391 | } |
392 | threading::Configure(mode, nthreads, cpus); |
393 | }); |
394 | |
395 | TVM_REGISTER_GLOBAL("runtime.NumThreads" ).set_body_typed([]() -> int32_t { |
396 | return threading::NumThreads(); |
397 | }); |
398 | |
399 | namespace threading { |
400 | |
401 | #if TVM_THREADPOOL_USE_OPENMP |
402 | /*! |
403 | * \brief Helper function that allows to pin threads to cores in case of multi instance execution |
404 | * when we use OpenMP thread pool. |
405 | * |
406 | * \param mode Affinity mode (now supports only kSpecifyOneCorePerThread and |
407 | * kSpecifyThreadShareAllCore). |
408 | * \param nthreads The number of threads to use (0 = use all). |
409 | * \param cpus A list of CPU ids to set 'cpu affinity'. |
410 | * |
411 | */ |
412 | static void ConfigureOMP(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, |
413 | const std::vector<unsigned int>& cpus) { |
414 | #if defined(__linux__) || defined(__ANDROID__) |
415 | const int num_workers = MaxConcurrency(); |
416 | |
417 | if (mode == ThreadGroup::kSpecifyOneCorePerThread) { |
418 | #pragma omp parallel num_threads(num_workers) |
419 | { |
420 | int core_id = cpus[omp_get_thread_num()]; |
421 | cpu_set_t cpuset; |
422 | CPU_ZERO(&cpuset); |
423 | CPU_SET(core_id, &cpuset); |
424 | #if defined(__ANDROID__) |
425 | sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset); |
426 | #else |
427 | pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); |
428 | #endif |
429 | } |
430 | } else if (mode == ThreadGroup::kSpecifyThreadShareAllCore) { |
431 | cpu_set_t cpuset; |
432 | CPU_ZERO(&cpuset); |
433 | for (auto id : cpus) { |
434 | CPU_SET(id, &cpuset); |
435 | } |
436 | |
437 | #pragma omp parallel num_threads(num_workers) |
438 | { |
439 | #if defined(__ANDROID__) |
440 | sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset); |
441 | #else |
442 | pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); |
443 | #endif |
444 | } |
445 | } |
446 | #endif |
447 | } |
448 | |
449 | #endif |
450 | |
451 | void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } |
452 | /*! |
453 | * \brief configure the CPU id affinity |
454 | * \param mode The preferred CPU type (1 = big, -1 = little, -2 = kSpecifyOneCorePerThread, |
455 | * -3 = kSpecifyThreadShareAllCore). |
456 | * \param nthreads The number of threads to use (0 = use all). |
457 | * \param cpus cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads. |
458 | * |
459 | */ |
460 | TVM_DLL void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, |
461 | std::vector<unsigned int> cpus) { |
462 | tvm::runtime::threading::SetMaxConcurrency(cpus.size()); |
463 | #if !TVM_THREADPOOL_USE_OPENMP |
464 | tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus); |
465 | #else |
466 | ConfigureOMP(mode, nthreads, cpus); |
467 | #endif |
468 | } |
469 | int32_t NumThreads() { return tvm::runtime::ThreadPool::ThreadLocal()->NumThreads(); } |
470 | } // namespace threading |
471 | } // namespace runtime |
472 | } // namespace tvm |
473 | |
474 | int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_task) { |
475 | int num_workers = tvm::runtime::threading::MaxConcurrency(); |
476 | if (num_workers == 1) { |
477 | std::atomic<int32_t> sync_counter{0}; |
478 | TVMParallelGroupEnv env; |
479 | env.num_task = 1; |
480 | env.sync_handle = &sync_counter; |
481 | (*flambda)(0, &env, cdata); |
482 | return 0; |
483 | } else { |
484 | #if !TVM_THREADPOOL_USE_OPENMP |
485 | int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1); |
486 | return res; |
487 | #else |
488 | if (num_task == 0) num_task = num_workers; |
489 | omp_set_num_threads(num_task); |
490 | #pragma omp parallel num_threads(num_task) |
491 | { |
492 | TVMParallelGroupEnv env; |
493 | env.num_task = num_task; |
494 | (*flambda)(omp_get_thread_num(), &env, cdata); |
495 | } |
496 | return 0; |
497 | #endif |
498 | } |
499 | } |
500 | |
501 | int TVMBackendParallelBarrier(int task_id, TVMParallelGroupEnv* penv) { |
502 | #if TVM_THREADPOOL_USE_OPENMP |
503 | #pragma omp barrier |
504 | #else |
505 | using tvm::runtime::kSyncStride; |
506 | int num_task = penv->num_task; |
507 | std::atomic<int>* sync_counter = reinterpret_cast<std::atomic<int>*>(penv->sync_handle); |
508 | int old_counter = sync_counter[task_id * kSyncStride].fetch_add(1, std::memory_order_release); |
509 | for (int i = 0; i < num_task; ++i) { |
510 | if (i != task_id) { |
511 | while (sync_counter[i * kSyncStride].load(std::memory_order_relaxed) <= old_counter) { |
512 | tvm::runtime::threading::Yield(); |
513 | } |
514 | } |
515 | } |
516 | std::atomic_thread_fence(std::memory_order_acquire); |
517 | #endif |
518 | return 0; |
519 | } |
520 | |