1 | /* Copyright 2019 Google LLC. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | // The 'middle-end' in ruy. See TrMul function comment. |
17 | |
18 | #include "ruy/trmul.h" |
19 | |
20 | #include <algorithm> |
21 | #include <atomic> |
22 | #include <cstdint> |
23 | #include <cstring> |
24 | #include <limits> |
25 | #include <memory> |
26 | #include <vector> |
27 | |
28 | #include "ruy/allocator.h" |
29 | #include "ruy/block_map.h" |
30 | #include "ruy/check_macros.h" |
31 | #include "ruy/cpu_cache_params.h" |
32 | #include "ruy/cpuinfo.h" |
33 | #include "ruy/ctx.h" |
34 | #include "ruy/denormal.h" |
35 | #include "ruy/mat.h" |
36 | #include "ruy/matrix.h" |
37 | #include "ruy/mul_params.h" |
38 | #include "ruy/opt_set.h" |
39 | #include "ruy/profiler/instrumentation.h" |
40 | #include "ruy/side_pair.h" |
41 | #include "ruy/size_util.h" |
42 | #include "ruy/thread_pool.h" |
43 | #include "ruy/trace.h" |
44 | #include "ruy/tune.h" |
45 | |
46 | namespace ruy { |
47 | |
48 | namespace { |
49 | |
50 | // Enum to track the packingstatus of a block of the LHS or RHS matrix. |
51 | enum class PackingStatus : std::uint8_t { |
52 | kNotStarted, // No thread has started packing this block yet. |
53 | kInProgress, // Some thread is currently packing this block. |
54 | kFinished // This block has already been packed. |
55 | }; |
56 | |
57 | // TrMulTask is the task that a ruy thread runs to perform the TrMul operation. |
58 | class TrMulTask final : public Task { |
59 | public: |
60 | TrMulTask(TrMulParams* params, const BlockMap& block_map, |
61 | std::atomic<int>* atomic_block_id, int thread_id, bool need_atomics, |
62 | SidePair<std::atomic<PackingStatus>*> packing_status, |
63 | TuningResolver* tuning_resolver, Allocator* local_allocator, |
64 | CpuInfo* cpuinfo) |
65 | : params_(params), |
66 | block_map_(block_map), |
67 | atomic_block_id_(atomic_block_id), |
68 | thread_id_(thread_id), |
69 | need_atomics_(need_atomics), |
70 | packing_status_(packing_status), |
71 | tuning_resolver_(tuning_resolver), |
72 | local_allocator_(local_allocator), |
73 | local_already_packed_{nullptr, nullptr}, |
74 | cpuinfo_(cpuinfo) {} |
75 | |
76 | // Thread main function. This is one thread's share of the TrMul work. |
77 | void Run() override { |
78 | RUY_TRACE_SCOPE_NAME("TrMulTask::Run" ); |
79 | RUY_TRACE_SET_THEAD_ID(thread_id_); |
80 | // Allocate and initialize `local_packed`. |
81 | for (Side side : {Side::kLhs, Side::kRhs}) { |
82 | if (!params_->is_prepacked[side]) { |
83 | const int size = NumBlocksPerSide(side, block_map_); |
84 | local_allocator_->Allocate(size, &local_already_packed_[side]); |
85 | memset(local_already_packed_[side], 0, size * sizeof(bool)); |
86 | } |
87 | } |
88 | |
89 | const Tuning tuning = tuning_resolver_->Resolve(cpuinfo_); |
90 | const int num_blocks = NumBlocks(block_map_); |
91 | |
92 | // Each thread starts by initially reserving the block whose id |
93 | // is the thread id. |
94 | int block_id = thread_id_; |
95 | // Loop until all blocks have been computed. |
96 | while (block_id < num_blocks) { |
97 | RUY_TRACE_SCOPE_NAME("Main loop iteration" ); |
98 | // Reserve the next block to handle, hiding the latency of this atomic op. |
99 | const int next_block_id = |
100 | atomic_block_id_->fetch_add(1, std::memory_order_relaxed); |
101 | // Get coordinates of the current block to handle, in "block space". |
102 | SidePair<int> block; |
103 | GetBlockByIndex(block_map_, block_id, &block); |
104 | // Get coordinates of the current block to handle, in matrix space. |
105 | SidePair<int> start, end; |
106 | GetBlockMatrixCoords(block_map_, block, &start, &end); |
107 | RUY_TRACE_INFO(TRMUL_TASK_MAIN_LOOP_GOT_BLOCK_COORDS); |
108 | // Maybe pack the current LHS/RHS block, if not already packed. |
109 | EnsurePacked(block, start, end, tuning); |
110 | // Actually do matrix multiplication work |
111 | params_->RunKernel(tuning, start, end); |
112 | // Move on to the next block as obtained by the atomic increment |
113 | // at the start of this while loop iteration. |
114 | block_id = next_block_id; |
115 | } |
116 | |
117 | local_allocator_->FreeAll(); |
118 | } |
119 | |
120 | private: |
121 | // Tries to pack a block, without blocking. |
122 | // If the block was already packed, returns true. |
123 | // If the block was not started packing, packs it and returns true. |
124 | // If the block was being packed by another thread, returns false. |
125 | bool TryPack(Side side, int block, int start, int end, Tuning tuning) { |
126 | if (params_->is_prepacked[side]) { |
127 | return true; |
128 | } |
129 | if (!local_already_packed_[side][block]) { |
130 | if (need_atomics_) { |
131 | // Explanation of this compare_exchange_strong operation: |
132 | // This atomically performs all of the following: |
133 | // 1. Read `status` with "acquire" memory order. |
134 | // * That this read uses "acquire" is because both memory orders |
135 | // specified have "acquire" as their read-component. |
136 | // 2. Compare (bitwise) with `exchanged_status`. |
137 | // 3. If equal, stores the value kInProgress to `status` with "release" |
138 | // memory order, and returns true, so we take this 'if' branch. |
139 | // * That this store uses "release" is because of the _rel part in |
140 | // memory_order_acq_rel passed as the first memory order argument. |
141 | // 4. If not equal, stores the loaded value of `status` to |
142 | // `exchanged_status` with "relaxed" semantics, and returns false, |
143 | // so we take the 'else' branch. |
144 | // * That this store uses "relaxed" is because the second memory |
145 | // order argument, memory_order_acquire, implies no particular |
146 | // store semantics. "relaxed" is acceptable here because this |
147 | // stores to a local stack variable. |
148 | // |
149 | // Rationale for compare_exchange_strong as opposed to |
150 | // compare_exchange_weak: |
151 | // The spurious-failure case with compare_exchange_weak will actually |
152 | // happen a lot here, because the atomic 'status' bytes are stored |
153 | // contiguously in arrays and neighboring values will be accessed |
154 | // by multiple threads concurrently. On a typical ARM CPU, an exclusives |
155 | // reservation granule is 64 bytes, so a lot of false-sharing may |
156 | // happen. Using compare_exchange_weak would thus result in often having |
157 | // TryPack return 'false' when it could instead have done the packing |
158 | // work and returned 'true'. Heuristically, that is not a good thing. |
159 | // Moreover, this changes the TryPack contract, loosening it and making |
160 | // it harder for the caller to reason about. Finally, the overhead of |
161 | // atomic operations is mitigated by the enclosing check on |
162 | // local_already_packed, so maybe the overhead of |
163 | // compare_exchange_strong isn't such a problem. But we don't really |
164 | // know for sure, that would be interesting to experiment more with. |
165 | PackingStatus exchanged_status = PackingStatus::kNotStarted; |
166 | std::atomic<PackingStatus>& status = packing_status_[side][block]; |
167 | if (status.compare_exchange_strong( |
168 | exchanged_status, PackingStatus::kInProgress, |
169 | std::memory_order_acq_rel, std::memory_order_acquire)) { |
170 | // In this branch, the status was kNotStarted and we just atomically |
171 | // changed it to kInProgress as we are about to handle the packing |
172 | // ourselves. |
173 | RUY_TRACE_INFO(TRYPACK_PACKING); |
174 | params_->RunPack(side, tuning, start, end); |
175 | status.store(PackingStatus::kFinished, std::memory_order_release); |
176 | } else if (exchanged_status == PackingStatus::kInProgress) { |
177 | // Another thread is currently packing this block. |
178 | RUY_TRACE_INFO(TRYPACK_ANOTHER_THREAD_PACKING); |
179 | return false; |
180 | } else { |
181 | RUY_TRACE_INFO(TRYPACK_PACKED_BY_ANOTHER_THREAD); |
182 | } |
183 | RUY_DCHECK(status.load(std::memory_order_acquire) == |
184 | PackingStatus::kFinished); |
185 | } else { |
186 | // Single-threaded case: no need for expensive atomics, |
187 | // local_already_packed is the truth already. |
188 | params_->RunPack(side, tuning, start, end); |
189 | } |
190 | local_already_packed_[side][block] = true; |
191 | } else { |
192 | RUY_TRACE_INFO(TRYPACK_PREVIOUSLY_PACKED); |
193 | } |
194 | return true; |
195 | } |
196 | |
197 | // Ensures that both the LHS and RHS blocks required by the specified block |
198 | // are packed. In the event that they are already being packed on another |
199 | // threads, this function may perform the packing of some other block while |
200 | // waiting for that other thread to finish packing the requested block. |
201 | void EnsurePacked(const SidePair<int>& block, const SidePair<int>& start, |
202 | const SidePair<int>& end, Tuning tuning) { |
203 | #if RUY_OPT(PACK_AHEAD) |
204 | SidePair<int> next_runahead_block{block[Side::kLhs] + 1, |
205 | block[Side::kRhs] + 1}; |
206 | Side next_runahead_side = Side::kLhs; |
207 | #endif |
208 | while (true) { |
209 | bool both_sides_packed = true; |
210 | for (Side side : {Side::kLhs, Side::kRhs}) { |
211 | both_sides_packed &= |
212 | TryPack(side, block[side], start[side], end[side], tuning); |
213 | } |
214 | if (both_sides_packed) { |
215 | break; |
216 | } |
217 | #if RUY_OPT(PACK_AHEAD) |
218 | RUY_TRACE_INFO(ENSURE_PACKED_ENTER_RUN_AHEAD); |
219 | const Side runahead_side = next_runahead_side; |
220 | const int runahead_block = next_runahead_block[runahead_side]; |
221 | next_runahead_side = OtherSide(next_runahead_side); |
222 | if (runahead_block >= NumBlocksPerSide(runahead_side, block_map_)) { |
223 | continue; |
224 | } |
225 | int runahead_block_start, runahead_block_end; |
226 | GetBlockMatrixCoords(runahead_side, block_map_, runahead_block, |
227 | &runahead_block_start, &runahead_block_end); |
228 | TryPack(runahead_side, runahead_block, runahead_block_start, |
229 | runahead_block_end, tuning); |
230 | next_runahead_block[runahead_side] = runahead_block + 1; |
231 | #endif |
232 | } |
233 | RUY_TRACE_INFO(ENSURE_PACKED_END); |
234 | } |
235 | |
236 | TrMulParams* params_; |
237 | const BlockMap& block_map_; |
238 | std::atomic<int>* atomic_block_id_; |
239 | int thread_id_; |
240 | bool need_atomics_; |
241 | SidePair<std::atomic<PackingStatus>*> packing_status_; |
242 | TuningResolver* tuning_resolver_; |
243 | Allocator* local_allocator_; |
244 | |
245 | // Local indicators of packedness to avoid the overhead of atomic ops. |
246 | SidePair<bool*> local_already_packed_; |
247 | |
248 | CpuInfo* cpuinfo_; |
249 | }; |
250 | |
251 | int GetTentativeThreadCount(Ctx* ctx, int rows, int cols, int depth) { |
252 | #if RUY_PLATFORM_EMSCRIPTEN |
253 | // b/139927184, std::thread constructor raises exception |
254 | return 1; |
255 | #endif |
256 | RUY_TRACE_SCOPE; |
257 | // Empirically determined rule for reasonable number of |
258 | // threads to use. This is proportional to the number of arithmetic ops |
259 | // in this Mul (product of the 3 sizes). |
260 | // Be defensive here by explicitly promoting operands to int64 to avoid the |
261 | // pitfall of `int64 result = x * y;` overflowing as x and y are still narrow. |
262 | const std::int64_t rows_i64 = rows; |
263 | const std::int64_t cols_i64 = cols; |
264 | const std::int64_t depth_i64 = depth; |
265 | const std::int64_t problem_size = rows_i64 * cols_i64 * depth_i64; |
266 | // Division is cheap when the denominator is constant |
267 | static constexpr std::int64_t kSizePerAdditionalThread = 32768; |
268 | std::int64_t tentative_thread_count = problem_size / kSizePerAdditionalThread; |
269 | // tentative_thread_count is still an int64, still not necessarily in the |
270 | // range of type int. It probably is as long as kSizePerAdditionalThread is |
271 | // large, but imagine that that constant might change in the future. |
272 | tentative_thread_count = std::max<std::int64_t>(tentative_thread_count, 1); |
273 | tentative_thread_count = |
274 | std::min<std::int64_t>(tentative_thread_count, ctx->max_num_threads()); |
275 | // now tentative_thread_count must be in the range of type int, because |
276 | // ctx->max_num_threads() is. |
277 | RUY_DCHECK_LE(tentative_thread_count, std::numeric_limits<int>::max()); |
278 | return tentative_thread_count; |
279 | } |
280 | |
281 | bool GetUseSimpleLoop(int tentative_thread_count, int rows, int cols, int depth, |
282 | int lhs_scalar_size, int rhs_scalar_size, |
283 | const CpuCacheParams& cpu_cache_params) { |
284 | RUY_TRACE_SCOPE; |
285 | if (tentative_thread_count == 1) { |
286 | if (IsObviouslyLinearTraversal(rows, cols, depth, lhs_scalar_size, |
287 | rhs_scalar_size, cpu_cache_params)) { |
288 | RUY_TRACE_INFO(GET_USE_SIMPLE_LOOP_RETURNS_TRUE); |
289 | return true; |
290 | } |
291 | } |
292 | RUY_TRACE_INFO(GET_USE_SIMPLE_LOOP_RETURNS_FALSE); |
293 | return false; |
294 | } |
295 | |
296 | } // namespace |
297 | |
298 | // TrMul is the ruy middle-end. It contains the high-level logic to perform |
299 | // a ruy::Mul's work, down to calls to back-end Kernel and Pack functions. |
300 | // This includes determining how many threads to use, computing the BlockMap, |
301 | // executing tasks on a thread-pool. The TrMul function itself runs on the main |
302 | // thread, the code that is potentially running on worker threads is in |
303 | // TrMulTask::Run(). |
304 | void TrMul(Ctx* ctx, TrMulParams* params) { |
305 | RUY_TRACE_SCOPE; |
306 | profiler::ScopeLabel label( |
307 | "TrMul (Path=0x%x, max_num_threads=%d, is_prepacked=(%d,%d))" , |
308 | static_cast<int>(params->path), ctx->max_num_threads(), |
309 | params->is_prepacked[Side::kLhs], params->is_prepacked[Side::kRhs]); |
310 | |
311 | PEMat& packed_lhs = params->packed_matrix[Side::kLhs]; |
312 | PEMat& packed_rhs = params->packed_matrix[Side::kRhs]; |
313 | EMat& lhs = params->src[Side::kLhs]; |
314 | EMat& rhs = params->src[Side::kRhs]; |
315 | |
316 | const int rows = lhs.layout.cols; |
317 | const int cols = rhs.layout.cols; |
318 | const int depth = lhs.layout.rows; |
319 | |
320 | const int tentative_thread_count = |
321 | GetTentativeThreadCount(ctx, rows, cols, depth); |
322 | const auto& cpu_cache_params = ctx->mutable_cpuinfo()->CacheParams(); |
323 | |
324 | // Suppress denormals to avoid computation inefficiency. |
325 | // Note this only handles the denormal suppression on the main thread. As for |
326 | // worker threads, the suppression is handled in each thread's main loop. See |
327 | // the corresponding code in thread_pool.cc for details. |
328 | ScopedSuppressDenormals suppress_denormals; |
329 | |
330 | // Case of running this TrMul as a simple loop. |
331 | // This is a good place to start reading this function: all the rest |
332 | // of this function is just an optimized, but functionally equivalent, |
333 | // version of that. |
334 | if (GetUseSimpleLoop(tentative_thread_count, rows, cols, depth, |
335 | lhs.data_type.size, rhs.data_type.size, |
336 | cpu_cache_params)) { |
337 | profiler::ScopeLabel label_simple("TrMulImpl, simple loop" ); |
338 | Tuning tuning = ctx->GetMainThreadTuning(); |
339 | RUY_TRACE_INFO(TRMUL_SIMPLE_LOOP); |
340 | |
341 | const SidePair<int> origin{0, 0}; |
342 | const SidePair<int> rounded_dims{packed_lhs.layout.cols, |
343 | packed_rhs.layout.cols}; |
344 | for (Side side : {Side::kLhs, Side::kRhs}) { |
345 | if (!params->is_prepacked[side]) { |
346 | params->RunPack(side, tuning, origin[side], rounded_dims[side]); |
347 | } |
348 | } |
349 | params->RunKernel(tuning, origin, rounded_dims); |
350 | return; |
351 | } |
352 | |
353 | profiler::ScopeLabel label_general("TrMulImpl, general case" ); |
354 | RUY_TRACE_INFO(TRMUL_GENERAL_CASE); |
355 | Allocator* main_allocator = ctx->GetMainAllocator(); |
356 | |
357 | // Initialize block map. |
358 | BlockMap block_map; |
359 | MakeBlockMap(packed_lhs.layout.cols, packed_rhs.layout.cols, depth, |
360 | packed_lhs.layout.kernel.cols, packed_rhs.layout.kernel.cols, |
361 | packed_lhs.data_type.size, packed_rhs.data_type.size, |
362 | tentative_thread_count, cpu_cache_params, &block_map); |
363 | |
364 | // Initialize per-thread state. |
365 | const int thread_count = block_map.thread_count; |
366 | const bool need_atomics = thread_count > 1; |
367 | ctx->EnsureThreadSpecificResources(thread_count); |
368 | for (int i = 0; i < thread_count; i++) { |
369 | ctx->GetThreadSpecificTuningResolver(i)->SetTuning(ctx->explicit_tuning()); |
370 | } |
371 | |
372 | // In the need_atomics case, allocate and initialize atomic values tracking |
373 | // the packing status of blocks. |
374 | SidePair<std::atomic<PackingStatus>*> packing_status{nullptr, nullptr}; |
375 | if (need_atomics) { |
376 | for (Side side : {Side::kLhs, Side::kRhs}) { |
377 | if (!params->is_prepacked[side]) { |
378 | const int size = NumBlocksPerSide(side, block_map); |
379 | main_allocator->Allocate(size, &packing_status[side]); |
380 | for (int i = 0; i < size; i++) { |
381 | packing_status[side][i].store(PackingStatus::kNotStarted, |
382 | std::memory_order_relaxed); |
383 | } |
384 | } |
385 | } |
386 | } |
387 | |
388 | // Create the atomic block id, allocate it using Allocator so that |
389 | // we get the alignment ensuring that it sits alone in its exclusives |
390 | // reservation granule. |
391 | std::atomic<int>* atomic_block_id; |
392 | main_allocator->Allocate(1, &atomic_block_id); |
393 | atomic_block_id->store(thread_count); |
394 | |
395 | // Create task objects. We allocate a single buffer and then use placement-new |
396 | // to construct N TrMulTask objects within it. To avoid having the Clang CFI |
397 | // sanitizer complain about a TrMulTask* pointer temporarily pointing to |
398 | // garbage, we keep the pointer a plain char* until finished constructing. |
399 | char* tasks_buf = |
400 | main_allocator->Allocate<char>(thread_count * sizeof(TrMulTask)); |
401 | for (int i = 0; i < thread_count; i++) { |
402 | auto* allocator = ctx->GetThreadSpecificAllocator(i); |
403 | auto* tuning_resolver = ctx->GetThreadSpecificTuningResolver(i); |
404 | new (tasks_buf + i * sizeof(TrMulTask)) TrMulTask( |
405 | params, block_map, atomic_block_id, i, need_atomics, packing_status, |
406 | tuning_resolver, allocator, ctx->mutable_cpuinfo()); |
407 | } |
408 | TrMulTask* tasks = reinterpret_cast<TrMulTask*>(tasks_buf); |
409 | |
410 | // Do the computation. |
411 | ctx->mutable_thread_pool()->Execute(thread_count, tasks); |
412 | |
413 | // Finish up. |
414 | for (int i = 0; i < thread_count; i++) { |
415 | tasks[i].~TrMulTask(); |
416 | } |
417 | } |
418 | |
419 | } // namespace ruy |
420 | |