1 | // Copyright 2015 The Gemmlowp Authors. All Rights Reserved. |
2 | // |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | // you may not use this file except in compliance with the License. |
5 | // You may obtain a copy of the License at |
6 | // |
7 | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | // |
9 | // Unless required by applicable law or agreed to in writing, software |
10 | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | // See the License for the specific language governing permissions and |
13 | // limitations under the License. |
14 | |
15 | // multi_thread_gemm.h: Multi-threaded GEMM entry point. |
16 | // Readers note: To understand this file, it is useful to first |
17 | // read and understand the much simpler single_thread_gemm.h. |
18 | |
19 | #ifndef GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_ |
20 | #define GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_ |
21 | |
22 | #include <atomic> // NOLINT |
23 | #include <chrono> // NOLINT |
24 | #include <thread> // NOLINT |
25 | #include <vector> |
26 | |
27 | #include "single_thread_gemm.h" |
28 | |
29 | namespace gemmlowp { |
30 | |
31 | // This value was empirically derived on an end-to-end application benchmark. |
32 | // That this number of cycles means that we may be sleeping substantially longer |
33 | // than a scheduler timeslice's duration is not necessarily surprising. The |
34 | // idea is to pick up quickly new work after having finished the previous |
35 | // workload. When it's new work within the same GEMM as the previous work, the |
36 | // time interval that we might be busy-waiting is very small, so for that |
37 | // purpose it would be more than enough to sleep for 1 million cycles. |
38 | // That is all what we would observe on a GEMM benchmark. However, in a real |
39 | // application, after having finished a GEMM, we might do unrelated work for |
40 | // a little while, then start on a new GEMM. Think of a neural network |
41 | // application performing inference, where many but not all layers are |
42 | // implemented by a GEMM. In such cases, our worker threads might be idle for |
43 | // longer periods of time before having work again. If we let them passively |
44 | // wait, on a mobile device, the CPU scheduler might aggressively clock down |
45 | // or even turn off the CPU cores that they were running on. That would result |
46 | // in a long delay the next time these need to be turned back on for the next |
47 | // GEMM. So we need to strike a balance that reflects typical time intervals |
48 | // between consecutive GEMM invokations, not just intra-GEMM considerations. |
49 | // Of course, we need to balance keeping CPUs spinning longer to resume work |
50 | // faster, versus passively waiting to conserve power. |
51 | const int kMaxBusyWaitNOPs = 4 * 1000 * 1000; |
52 | |
53 | // On X86 and ARM platforms we may use NOP instructions to know how long we |
54 | // are busy-waiting. |
55 | |
56 | #if defined(GEMMLOWP_ALLOW_INLINE_ASM) && !defined(GEMMLOWP_NO_BUSYWAIT) && \ |
57 | (defined(GEMMLOWP_ARM) || defined(GEMMLOWP_X86)) |
58 | |
59 | #define GEMMLOWP_NOP "nop\n" |
60 | |
61 | #define GEMMLOWP_STRING_CONCAT_4(X) X X X X |
62 | #define GEMMLOWP_NOP4 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP) |
63 | #define GEMMLOWP_NOP16 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP4) |
64 | #define GEMMLOWP_NOP64 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP16) |
65 | |
66 | inline int DoSomeNOPs() { |
67 | asm volatile(GEMMLOWP_NOP64); |
68 | return 64; |
69 | } |
70 | |
71 | #undef GEMMLOWP_STRING_CONCAT_4 |
72 | #undef GEMMLOWP_NOP64 |
73 | #undef GEMMLOWP_NOP16 |
74 | #undef GEMMLOWP_NOP4 |
75 | #undef GEMMLOWP_NOP |
76 | |
77 | #else // May not use asm NOP. |
78 | |
79 | // If we can't use NOPs, let's use a non-inline function call as a basic |
80 | // thing that has some vaguely known, nonzero cost. |
81 | GEMMLOWP_NOINLINE |
82 | inline int DoSomeNOPs() { |
83 | // Pretend that calling an empty function takes as long as 16 NOPs... |
84 | return 16; |
85 | } |
86 | #endif |
87 | |
88 | // Waits until *var != initial_value. |
89 | // |
90 | // Returns the new value of *var. The guarantee here is that |
91 | // the return value is different from initial_value, and that that |
92 | // new value has been taken by *var at some point during the |
93 | // execution of this function. There is no guarantee that this is |
94 | // still the value of *var when this function returns, since *var is |
95 | // not assumed to be guarded by any lock. |
96 | // |
97 | // First does some busy-waiting for a fixed number of no-op cycles, |
98 | // then falls back to passive waiting for the given condvar, guarded |
99 | // by the given mutex. |
100 | // |
101 | // The idea of doing some initial busy-waiting is to help get |
102 | // better and more consistent multithreading benefits for small GEMM sizes. |
103 | // Busy-waiting help ensuring that if we need to wake up soon after having |
104 | // started waiting, then we can wake up quickly (as opposed to, say, |
105 | // having to wait to be scheduled again by the OS). On the other hand, |
106 | // we must still eventually revert to passive waiting for longer waits |
107 | // (e.g. worker threads having finished a GEMM and waiting until the next GEMM) |
108 | // so as to avoid permanently spinning. |
109 | // |
110 | template <typename T> |
111 | T WaitForVariableChange(std::atomic<T>* var, T initial_value, |
112 | pthread_cond_t* cond, pthread_mutex_t* mutex) { |
113 | // First, trivial case where the variable already changed value. |
114 | T new_value = var->load(std::memory_order_acquire); |
115 | if (new_value != initial_value) { |
116 | return new_value; |
117 | } |
118 | // Then try busy-waiting. |
119 | int nops = 0; |
120 | while (nops < kMaxBusyWaitNOPs) { |
121 | nops += DoSomeNOPs(); |
122 | new_value = var->load(std::memory_order_acquire); |
123 | if (new_value != initial_value) { |
124 | return new_value; |
125 | } |
126 | } |
127 | |
128 | // Finally, do real passive waiting. |
129 | pthread_mutex_lock(mutex); |
130 | new_value = var->load(std::memory_order_acquire); |
131 | while (new_value == initial_value) { |
132 | pthread_cond_wait(cond, mutex); |
133 | new_value = var->load(std::memory_order_acquire); |
134 | } |
135 | pthread_mutex_unlock(mutex); |
136 | return new_value; |
137 | } |
138 | |
139 | // A BlockingCounter lets one thread to wait for N events to occur. |
140 | // This is how the master thread waits for all the worker threads |
141 | // to have finished working. |
142 | // The waiting is done using a naive spinlock waiting for the atomic |
143 | // count_ to hit the value 0. This is acceptable because in our usage |
144 | // pattern, BlockingCounter is used only to synchronize threads after |
145 | // short-lived tasks (performing parts of the same GEMM). It is not used |
146 | // for synchronizing longer waits (resuming work on the next GEMM). |
147 | class BlockingCounter { |
148 | public: |
149 | BlockingCounter() : count_(0) {} |
150 | |
151 | // Sets/resets the counter; initial_count is the number of |
152 | // decrementing events that the Wait() call will be waiting for. |
153 | void Reset(std::size_t initial_count) { |
154 | std::size_t old_count_value = count_.load(std::memory_order_relaxed); |
155 | assert(old_count_value == 0); |
156 | (void)old_count_value; |
157 | count_.store(initial_count, std::memory_order_release); |
158 | } |
159 | |
160 | // Decrements the counter; if the counter hits zero, signals |
161 | // the threads that were waiting for that, and returns true. |
162 | // Otherwise (if the decremented count is still nonzero), |
163 | // returns false. |
164 | bool DecrementCount() { |
165 | std::size_t old_count_value = |
166 | count_.fetch_sub(1, std::memory_order_acq_rel); |
167 | assert(old_count_value > 0); |
168 | std::size_t count_value = old_count_value - 1; |
169 | return count_value == 0; |
170 | } |
171 | |
172 | // Waits for the N other threads (N having been set by Reset()) |
173 | // to hit the BlockingCounter. |
174 | void Wait() { |
175 | ScopedProfilingLabel label("BlockingCounter::Wait" ); |
176 | // Busy-wait until the count value is 0. |
177 | int nops = 0; |
178 | while (count_.load(std::memory_order_acquire)) { |
179 | nops += DoSomeNOPs(); |
180 | if (nops > kMaxBusyWaitNOPs) { |
181 | nops = 0; |
182 | // If we are unlucky, the blocking thread (that calls DecrementCount) |
183 | // and the blocked thread (here, calling Wait) may be scheduled on |
184 | // the same CPU, so the busy-waiting of the present thread may prevent |
185 | // the blocking thread from resuming and unblocking. |
186 | // If we are even unluckier, the priorities of the present thread |
187 | // might be higher than that of the blocking thread, so just yielding |
188 | // wouldn't allow the blocking thread to resume. So we sleep for |
189 | // a substantial amount of time in that case. Notice that we only |
190 | // do so after having busy-waited for kMaxBusyWaitNOPs, which is |
191 | // typically several milliseconds, so sleeping 1 more millisecond |
192 | // isn't terrible at that point. |
193 | // |
194 | // How this is mitigated in practice: |
195 | // In practice, it is well known that the application should be |
196 | // conservative in choosing how many threads to tell gemmlowp to use, |
197 | // as it's hard to know how many CPU cores it will get to run on, |
198 | // on typical mobile devices. |
199 | // It seems impossible for gemmlowp to make this choice automatically, |
200 | // which is why gemmlowp's default is to use only 1 thread, and |
201 | // applications may override that if they know that they can count on |
202 | // using more than that. |
203 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
204 | } |
205 | } |
206 | } |
207 | |
208 | private: |
209 | std::atomic<std::size_t> count_; |
210 | }; |
211 | |
212 | // A workload for a worker. |
213 | struct Task { |
214 | Task() : local_allocator(nullptr) {} |
215 | virtual ~Task() {} |
216 | virtual void Run() = 0; |
217 | Allocator* local_allocator; |
218 | }; |
219 | |
220 | // A worker thread. |
221 | class Worker { |
222 | public: |
223 | enum class State { |
224 | ThreadStartup, // The initial state before the thread main loop runs. |
225 | Ready, // Is not working, has not yet received new work to do. |
226 | HasWork, // Has work to do. |
227 | ExitAsSoonAsPossible // Should exit at earliest convenience. |
228 | }; |
229 | |
230 | explicit Worker(BlockingCounter* counter_to_decrement_when_ready) |
231 | : task_(nullptr), |
232 | state_(State::ThreadStartup), |
233 | counter_to_decrement_when_ready_(counter_to_decrement_when_ready) { |
234 | pthread_cond_init(&state_cond_, nullptr); |
235 | pthread_mutex_init(&state_mutex_, nullptr); |
236 | pthread_create(&thread_, nullptr, ThreadFunc, this); |
237 | } |
238 | |
239 | ~Worker() { |
240 | ChangeState(State::ExitAsSoonAsPossible); |
241 | pthread_join(thread_, nullptr); |
242 | pthread_cond_destroy(&state_cond_); |
243 | pthread_mutex_destroy(&state_mutex_); |
244 | } |
245 | |
246 | // Changes State; may be called from either the worker thread |
247 | // or the master thread; however, not all state transitions are legal, |
248 | // which is guarded by assertions. |
249 | // |
250 | // The Task argument is to be used only with new_state==HasWork. |
251 | // It specifies the Task being handed to this Worker. |
252 | void ChangeState(State new_state, Task* task = nullptr) { |
253 | ScopedProfilingLabel label("Worker::ChangeState" ); |
254 | pthread_mutex_lock(&state_mutex_); |
255 | State old_state = state_.load(std::memory_order_relaxed); |
256 | assert(old_state != new_state); |
257 | switch (old_state) { |
258 | case State::ThreadStartup: |
259 | assert(new_state == State::Ready); |
260 | break; |
261 | case State::Ready: |
262 | assert(new_state == State::HasWork || |
263 | new_state == State::ExitAsSoonAsPossible); |
264 | break; |
265 | case State::HasWork: |
266 | assert(new_state == State::Ready || |
267 | new_state == State::ExitAsSoonAsPossible); |
268 | break; |
269 | default: |
270 | abort(); |
271 | } |
272 | switch (new_state) { |
273 | case State::Ready: |
274 | if (task_) { |
275 | // Doing work is part of reverting to 'ready' state. |
276 | task_->Run(); |
277 | task_ = nullptr; |
278 | } |
279 | break; |
280 | case State::HasWork: |
281 | assert(!task_); |
282 | task->local_allocator = &local_allocator_; |
283 | task_ = task; |
284 | break; |
285 | default: |
286 | break; |
287 | } |
288 | state_.store(new_state, std::memory_order_relaxed); |
289 | pthread_cond_broadcast(&state_cond_); |
290 | pthread_mutex_unlock(&state_mutex_); |
291 | if (new_state == State::Ready) { |
292 | counter_to_decrement_when_ready_->DecrementCount(); |
293 | } |
294 | } |
295 | |
296 | // Thread entry point. |
297 | void ThreadFunc() { |
298 | ScopedProfilingLabel label("Worker::ThreadFunc" ); |
299 | |
300 | ChangeState(State::Ready); |
301 | |
302 | // Thread main loop |
303 | while (true) { |
304 | // Get a state to act on |
305 | // In the 'Ready' state, we have nothing to do but to wait until |
306 | // we switch to another state. |
307 | State state_to_act_upon = WaitForVariableChange( |
308 | &state_, State::Ready, &state_cond_, &state_mutex_); |
309 | |
310 | // We now have a state to act on, so act. |
311 | switch (state_to_act_upon) { |
312 | case State::HasWork: |
313 | // Got work to do! So do it, and then revert to 'Ready' state. |
314 | ChangeState(State::Ready); |
315 | break; |
316 | case State::ExitAsSoonAsPossible: |
317 | return; |
318 | default: |
319 | abort(); |
320 | } |
321 | } |
322 | } |
323 | |
324 | static void* ThreadFunc(void* arg) { |
325 | static_cast<Worker*>(arg)->ThreadFunc(); |
326 | return nullptr; |
327 | } |
328 | |
329 | // Called by the master thead to give this worker work to do. |
330 | void StartWork(Task* task) { ChangeState(State::HasWork, task); } |
331 | |
332 | private: |
333 | // The underlying thread. |
334 | pthread_t thread_; |
335 | |
336 | // The task to be worked on. |
337 | Task* task_; |
338 | |
339 | // The condition variable and mutex guarding state changes. |
340 | pthread_cond_t state_cond_; |
341 | pthread_mutex_t state_mutex_; |
342 | |
343 | // The state enum tells if we're currently working, waiting for work, etc. |
344 | // Its concurrent accesses by the worker and main threads are guarded by |
345 | // state_mutex_, and can thus use memory_order_relaxed. This still needs |
346 | // to be a std::atomic because we use WaitForVariableChange. |
347 | std::atomic<State> state_; |
348 | |
349 | // Each thread had a local allocator so they can allocate temporary |
350 | // buffers without blocking each other. |
351 | Allocator local_allocator_; |
352 | |
353 | // pointer to the master's thread BlockingCounter object, to notify the |
354 | // master thread of when this worker switches to the 'Ready' state. |
355 | BlockingCounter* const counter_to_decrement_when_ready_; |
356 | }; |
357 | |
358 | // A very simple pool of workers, that only allows the very |
359 | // specific parallelization pattern that we use here: |
360 | // a fixed number of workers can be given work, and one then |
361 | // waits for all of them to finish. |
362 | // |
363 | // See MultiThreadGemmContextBase for how other WorkersPool implementations can |
364 | // be used. |
365 | class WorkersPool { |
366 | public: |
367 | WorkersPool() {} |
368 | |
369 | ~WorkersPool() { |
370 | for (auto w : workers_) { |
371 | delete w; |
372 | } |
373 | } |
374 | |
375 | // Just executes the tasks. Does not destroy them. Similar to |
376 | // ruy::ThreadPool::Execute. |
377 | template <typename TaskType> |
378 | void Execute(int tasks_count, TaskType* tasks) { |
379 | assert(tasks_count >= 1); |
380 | // One of the tasks will be run on the current thread. |
381 | std::size_t workers_count = tasks_count - 1; |
382 | CreateWorkers(workers_count); |
383 | assert(workers_count <= workers_.size()); |
384 | counter_to_decrement_when_ready_.Reset(workers_count); |
385 | for (std::size_t i = 0; i < tasks_count - 1; i++) { |
386 | workers_[i]->StartWork(&tasks[i]); |
387 | } |
388 | // Execute the remaining workload immediately on the current thread. |
389 | Task* task = &tasks[tasks_count - 1]; |
390 | task->local_allocator = &main_thread_task_allocator_; |
391 | task->Run(); |
392 | // Wait for the workers submitted above to finish. |
393 | counter_to_decrement_when_ready_.Wait(); |
394 | } |
395 | |
396 | // Legacy: executes the tasks and destroys them |
397 | void LegacyExecuteAndDestroyTasks(const std::vector<Task*>& tasks) { |
398 | std::size_t tasks_count = tasks.size(); |
399 | assert(tasks_count >= 1); |
400 | // One of the tasks will be run on the current thread. |
401 | std::size_t workers_count = tasks_count - 1; |
402 | CreateWorkers(workers_count); |
403 | assert(workers_count <= workers_.size()); |
404 | counter_to_decrement_when_ready_.Reset(workers_count); |
405 | for (int i = 0; i < tasks_count - 1; i++) { |
406 | workers_[i]->StartWork(tasks[i]); |
407 | } |
408 | // Execute the remaining workload immediately on the current thread. |
409 | Task* task = tasks[tasks_count - 1]; |
410 | task->local_allocator = &main_thread_task_allocator_; |
411 | task->Run(); |
412 | // Wait for the workers submitted above to finish. |
413 | counter_to_decrement_when_ready_.Wait(); |
414 | // Cleanup tasks (best to do this from the same thread that allocated |
415 | // the memory). |
416 | std::for_each(tasks.begin(), tasks.end(), [](Task* task) { delete task; }); |
417 | } |
418 | |
419 | // Legacy old name of LegacyExecuteAndDestroyTasks |
420 | void Execute(const std::vector<Task*>& tasks) { |
421 | LegacyExecuteAndDestroyTasks(tasks); |
422 | } |
423 | |
424 | private: |
425 | // Ensures that the pool has at least the given count of workers. |
426 | // If any new worker has to be created, this function waits for it to |
427 | // be ready. |
428 | void CreateWorkers(std::size_t workers_count) { |
429 | if (workers_.size() >= workers_count) { |
430 | return; |
431 | } |
432 | counter_to_decrement_when_ready_.Reset(workers_count - workers_.size()); |
433 | while (workers_.size() < workers_count) { |
434 | workers_.push_back(new Worker(&counter_to_decrement_when_ready_)); |
435 | } |
436 | counter_to_decrement_when_ready_.Wait(); |
437 | } |
438 | |
439 | // copy construction disallowed |
440 | WorkersPool(const WorkersPool&) = delete; |
441 | |
442 | // The workers in this pool. They are owned by the pool: |
443 | // the pool creates workers and destroys them in its destructor. |
444 | std::vector<Worker*> workers_; |
445 | |
446 | // The BlockingCounter used to wait for the workers. |
447 | BlockingCounter counter_to_decrement_when_ready_; |
448 | |
449 | // For N-threaded operations, we will use only N-1 worker threads |
450 | // while the last task will be run directly on the main thread. |
451 | // It will then use this main_thread_task_allocator_; having a |
452 | // dedicated allocator for that (separate from the base allocator_) |
453 | // allows to use the same code for all tasks regardless of which |
454 | // thread they run on. |
455 | Allocator main_thread_task_allocator_; |
456 | }; |
457 | |
458 | // The task we use to implement a multi-threaded Gemm: a block of the |
459 | // RHS has been packed by the master thread; each worker thread |
460 | // then has to pack a block of the LHS and accumulate the Gemm of these |
461 | // packed LHS and RHS blocks. |
462 | template <typename KernelFormat, typename InputScalar, typename OutputScalar, |
463 | typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder, |
464 | MapOrder ResultOrder, typename LhsOffset, typename RhsOffset, |
465 | typename OutputPipelineType, typename GemmContextType> |
466 | struct GemmWithPackedRhsTask : Task { |
467 | typedef PackedSideBlock<typename KernelFormat::Lhs> PackedLhs; |
468 | typedef PackedSideBlock<typename KernelFormat::Rhs> PackedRhs; |
469 | GemmWithPackedRhsTask(GemmContextType* _context, const KernelBase& _kernel, |
470 | const MatrixMap<const InputScalar, LhsOrder>& _lhs, |
471 | const PackedRhs& _packed_rhs, |
472 | MatrixMap<OutputScalar, ResultOrder>* _result, |
473 | const MatrixBlockBounds& _result_block, |
474 | const LhsOffset& _lhs_offset, |
475 | const RhsOffset& _rhs_offset, |
476 | const BlockParams& _block_params, |
477 | const OutputPipelineType& _output_pipeline) |
478 | : context(_context), |
479 | kernel(_kernel), |
480 | lhs(_lhs), |
481 | packed_rhs(_packed_rhs), |
482 | result(*_result), |
483 | result_block(_result_block), |
484 | lhs_offset(_lhs_offset), |
485 | rhs_offset(_rhs_offset), |
486 | block_params(_block_params), |
487 | output_pipeline(_output_pipeline) {} |
488 | |
489 | void Run() override { |
490 | ScopedProfilingLabel label("GemmWithPackedRhsTask" ); |
491 | |
492 | const int rows = result_block.rows; |
493 | const int cols = result_block.cols; |
494 | const int depth = lhs.cols(); |
495 | |
496 | PackedLhs packed_lhs(Side::Lhs, local_allocator, block_params); |
497 | |
498 | PackedResult packed_result(local_allocator, block_params); |
499 | |
500 | local_allocator->Commit(); |
501 | |
502 | for (int c = 0; c < cols; c += block_params.l2_cols) { |
503 | int cs = std::min(block_params.l2_cols, cols - c); |
504 | |
505 | for (int r = 0; r < rows; r += block_params.l2_rows) { |
506 | int rs = std::min(block_params.l2_rows, rows - r); |
507 | |
508 | PackLhs(&packed_lhs, lhs.block(r, 0, rs, depth)); |
509 | |
510 | Compute(kernel, block_params, &packed_result, packed_lhs, packed_rhs, |
511 | depth); |
512 | |
513 | auto curr_result_block = MatrixBlockBounds( |
514 | result_block.start_row + r, result_block.start_col + c, rs, cs); |
515 | UnpackResult<KernelFormat>( |
516 | &result, curr_result_block, packed_result, depth, |
517 | packed_lhs.sums_of_each_slice(), packed_rhs.sums_of_each_slice(), |
518 | lhs_offset.block(curr_result_block.start_row, rs), |
519 | rhs_offset.block(curr_result_block.start_col, cs), output_pipeline); |
520 | } |
521 | } |
522 | |
523 | local_allocator->Decommit(); |
524 | } |
525 | |
526 | const GemmContextType* context; |
527 | const KernelBase& kernel; |
528 | const MatrixMap<const InputScalar, LhsOrder> lhs; |
529 | const PackedRhs packed_rhs; |
530 | MatrixMap<OutputScalar, ResultOrder> result; |
531 | const MatrixBlockBounds result_block; |
532 | const LhsOffset& lhs_offset; |
533 | const RhsOffset& rhs_offset; |
534 | const BlockParams& block_params; |
535 | const OutputPipelineType& output_pipeline; |
536 | }; |
537 | |
538 | // This base class for multi-threading allows subclasses to implement their own |
539 | // workers_pool() method. See MultiThreadGemmContext below for an example; |
540 | // any other implementation of workers_pool() must return an object with the |
541 | // same public methods as WorkersPool. |
542 | class MultiThreadGemmContextBase : public SingleThreadGemmContext { |
543 | public: |
544 | void set_max_num_threads(int n) { max_num_threads_ = n; } |
545 | |
546 | int max_num_threads() const { return max_num_threads_; } |
547 | |
548 | protected: |
549 | // The maximum number of worker threads to use (including |
550 | // the master thread). |
551 | // The default value 1 means single-threading. That is the default |
552 | // because gemmlowp's primary target is mobile hardware, where thermal |
553 | // constraints usually mean that it may not be realistic to use more |
554 | // than 1 CPU core even if multiple cores are present. |
555 | // The special value 0 means try to detect the number of hardware threads. |
556 | // Note: this assumes that all CPU cores are equivalent. That assumption |
557 | // is defeated on big.LITTLE ARM devices, where we have no API to query |
558 | // the number of big cores (which is typically what we would want to use, |
559 | // leaving aside above-mentioned thermal issues). That is the other reason |
560 | // why the best compromise here is to let max_num_threads_ default to 1, |
561 | // so users who want multi-threading have to make the decision of how many |
562 | // threads to use by themselves. |
563 | int max_num_threads_ = 1; |
564 | }; |
565 | |
566 | class MultiThreadGemmContext : public MultiThreadGemmContextBase { |
567 | public: |
568 | WorkersPool* workers_pool() { return &workers_pool_; } |
569 | |
570 | private: |
571 | // The workers pool used by MultiThreadGemm. Making |
572 | // this part of the context allows it to be persistent, |
573 | // avoiding recreating threads on every Gemm. |
574 | WorkersPool workers_pool_; |
575 | }; |
576 | |
577 | // Determines how many threads should be used for a given Gemm |
578 | // operation. |
579 | template <int KernelRows> |
580 | inline int HowManyThreads(int max_num_threads, int rows, int cols, int depth) { |
581 | // Early-exit in the default case where multi-threading is disabled. |
582 | if (max_num_threads == 1) { |
583 | return 1; |
584 | } |
585 | |
586 | // Determine the maximum number of threads. |
587 | int max_count = GetHardwareConcurrency(max_num_threads); |
588 | |
589 | // Basic calculation: take into account max pool size, and |
590 | // how many rows we have to feed our kernel. |
591 | // The motivation for an absolute minimum number of rows per thread, |
592 | // potentially higher than KernelRows, is that very thin thread workload |
593 | // currently defeat assumptions of the AddMod generator, resulting |
594 | // in substantial bias in TestWithRealData on 24 threads. |
595 | // Ideally, the AddMod generator should be aware of global (r,c) coordinates |
596 | // so as to be independent of the number of threads. |
597 | static const int AbsoluteMinRowsPerThread = 16; |
598 | static const int MinRowsPerThread = KernelRows > AbsoluteMinRowsPerThread |
599 | ? KernelRows |
600 | : AbsoluteMinRowsPerThread; |
601 | int thread_count = std::min(max_count, CeilQuotient(rows, MinRowsPerThread)); |
602 | |
603 | // At this point for small products we already have thread_count==1 so |
604 | // we can avoid doing more work; otherwise, we still want to check |
605 | // that the cubic size (rows*cols*depth) is big enough to keep |
606 | // workers_ busy. |
607 | if (thread_count > 1) { |
608 | // Empirically determined value. |
609 | static const std::uint64_t min_cubic_size_per_thread = 64 * 1024; |
610 | |
611 | // We can only multiply two out of three sizes without risking overflow |
612 | const std::uint64_t cubic_size = |
613 | std::uint64_t(rows) * std::uint64_t(cols) * std::uint64_t(depth); |
614 | |
615 | thread_count = |
616 | std::min(thread_count, int(cubic_size / min_cubic_size_per_thread)); |
617 | |
618 | if (thread_count < 1) { |
619 | thread_count = 1; |
620 | } |
621 | } |
622 | |
623 | assert(thread_count > 0 && thread_count <= max_count); |
624 | return thread_count; |
625 | } |
626 | |
627 | // The main multi-threaded Gemm function. |
628 | // To understand it, first read the code of SingleThreadGemm(). |
629 | // The parallelization scheme used here is to have this master function |
630 | // pack a block of RHS and then start worker threads to pack a block of LHS |
631 | // each, and accumulate the corresponding products. |
632 | template <typename KernelFormat, typename InputScalar, typename OutputScalar, |
633 | typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder, |
634 | MapOrder ResultOrder, typename LhsOffset, typename RhsOffset, |
635 | typename OutputPipelineType, typename GemmContextType> |
636 | void MultiThreadGemm(GemmContextType* context, const KernelBase& kernel, |
637 | const MatrixMap<const InputScalar, LhsOrder>& lhs, |
638 | const MatrixMap<const InputScalar, RhsOrder>& rhs, |
639 | MatrixMap<OutputScalar, ResultOrder>* result, |
640 | const LhsOffset& lhs_offset, const RhsOffset& rhs_offset, |
641 | const OutputPipelineType& output_pipeline) { |
642 | ScopedProfilingLabel label("gemmlowp::MultiThreadGemm" ); |
643 | |
644 | assert(lhs.cols() == rhs.rows()); |
645 | |
646 | int rows = result->rows(); |
647 | int cols = result->cols(); |
648 | int depth = lhs.cols(); |
649 | |
650 | // zero sizes should have been caught earlier and early-returned. |
651 | assert(rows > 0); |
652 | assert(cols > 0); |
653 | assert(depth > 0); |
654 | |
655 | // The case of rows<cols should have been caught earlier and transposed. |
656 | assert(rows >= cols); |
657 | |
658 | const int thread_count = HowManyThreads<KernelFormat::kRows>( |
659 | context->max_num_threads(), rows, cols, depth); |
660 | if (thread_count == 1) { |
661 | return SingleThreadGemm<KernelFormat, InputScalar, OutputScalar, |
662 | BitDepthParams>(context, kernel, lhs, rhs, result, |
663 | lhs_offset, rhs_offset, |
664 | output_pipeline); |
665 | } |
666 | assert(thread_count > 1); |
667 | |
668 | // Simple 1:1 mapping of tasks to physical cores, which is very important |
669 | // to getting good multithreaded performance, specially for not-very-large |
670 | // GEMMs, and especially on Android. |
671 | const int task_count = thread_count; |
672 | |
673 | Allocator* allocator = context->allocator(); |
674 | auto* workers_pool = context->workers_pool(); |
675 | |
676 | BlockParams block_params; |
677 | block_params.Init<KernelFormat>( |
678 | rows, cols, depth, task_count, context->l1_bytes_to_use(), |
679 | context->l2_bytes_to_use(), context->l2_rhs_factor()); |
680 | |
681 | PackedSideBlock<typename KernelFormat::Rhs> packed_rhs(Side::Rhs, allocator, |
682 | block_params); |
683 | allocator->Commit(); |
684 | |
685 | // We loop over large blocks of the RHS. |
686 | for (int c = 0; c < cols; c += block_params.l2_cols) { |
687 | int cs = std::min(block_params.l2_cols, cols - c); |
688 | |
689 | // Pack a large block of the RHS. |
690 | PackRhs(&packed_rhs, rhs.block(0, c, depth, cs)); |
691 | |
692 | // Give work to each worker. |
693 | std::vector<Task*> tasks; |
694 | int next_start_row = 0; |
695 | for (int n = 0; n < task_count; ++n) { |
696 | int start_row = next_start_row; |
697 | next_start_row = std::min( |
698 | rows, RoundUp<KernelFormat::kRows>(rows * (n + 1) / task_count)); |
699 | |
700 | int block_rows = next_start_row - start_row; |
701 | auto lhs_block = lhs.block(start_row, 0, block_rows, depth); |
702 | typedef GemmWithPackedRhsTask<KernelFormat, InputScalar, OutputScalar, |
703 | BitDepthParams, LhsOrder, RhsOrder, |
704 | ResultOrder, LhsOffset, RhsOffset, |
705 | OutputPipelineType, GemmContextType> |
706 | TaskType; |
707 | tasks.push_back( |
708 | new TaskType(context, kernel, lhs_block, packed_rhs, result, |
709 | MatrixBlockBounds(start_row, c, block_rows, cs), |
710 | lhs_offset, rhs_offset, block_params, output_pipeline)); |
711 | } |
712 | // Execute the work on the workers (and partially on this thread). |
713 | workers_pool->Execute(tasks); |
714 | } |
715 | |
716 | allocator->Decommit(); |
717 | } |
718 | |
719 | } // namespace gemmlowp |
720 | |
721 | #endif // GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_ |
722 | |