1/* Copyright 2019 Google LLC. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16// The 'middle-end' in ruy. See TrMul function comment.
17
18#include "ruy/trmul.h"
19
20#include <algorithm>
21#include <atomic>
22#include <cstdint>
23#include <cstring>
24#include <limits>
25#include <memory>
26#include <vector>
27
28#include "ruy/allocator.h"
29#include "ruy/block_map.h"
30#include "ruy/check_macros.h"
31#include "ruy/cpu_cache_params.h"
32#include "ruy/cpuinfo.h"
33#include "ruy/ctx.h"
34#include "ruy/denormal.h"
35#include "ruy/mat.h"
36#include "ruy/matrix.h"
37#include "ruy/mul_params.h"
38#include "ruy/opt_set.h"
39#include "ruy/profiler/instrumentation.h"
40#include "ruy/side_pair.h"
41#include "ruy/size_util.h"
42#include "ruy/thread_pool.h"
43#include "ruy/trace.h"
44#include "ruy/tune.h"
45
46namespace ruy {
47
48namespace {
49
50// Enum to track the packingstatus of a block of the LHS or RHS matrix.
51enum class PackingStatus : std::uint8_t {
52 kNotStarted, // No thread has started packing this block yet.
53 kInProgress, // Some thread is currently packing this block.
54 kFinished // This block has already been packed.
55};
56
57// TrMulTask is the task that a ruy thread runs to perform the TrMul operation.
58class TrMulTask final : public Task {
59 public:
60 TrMulTask(TrMulParams* params, const BlockMap& block_map,
61 std::atomic<int>* atomic_block_id, int thread_id, bool need_atomics,
62 SidePair<std::atomic<PackingStatus>*> packing_status,
63 TuningResolver* tuning_resolver, Allocator* local_allocator,
64 CpuInfo* cpuinfo)
65 : params_(params),
66 block_map_(block_map),
67 atomic_block_id_(atomic_block_id),
68 thread_id_(thread_id),
69 need_atomics_(need_atomics),
70 packing_status_(packing_status),
71 tuning_resolver_(tuning_resolver),
72 local_allocator_(local_allocator),
73 local_already_packed_{nullptr, nullptr},
74 cpuinfo_(cpuinfo) {}
75
76 // Thread main function. This is one thread's share of the TrMul work.
77 void Run() override {
78 RUY_TRACE_SCOPE_NAME("TrMulTask::Run");
79 RUY_TRACE_SET_THEAD_ID(thread_id_);
80 // Allocate and initialize `local_packed`.
81 for (Side side : {Side::kLhs, Side::kRhs}) {
82 if (!params_->is_prepacked[side]) {
83 const int size = NumBlocksPerSide(side, block_map_);
84 local_allocator_->Allocate(size, &local_already_packed_[side]);
85 memset(local_already_packed_[side], 0, size * sizeof(bool));
86 }
87 }
88
89 const Tuning tuning = tuning_resolver_->Resolve(cpuinfo_);
90 const int num_blocks = NumBlocks(block_map_);
91
92 // Each thread starts by initially reserving the block whose id
93 // is the thread id.
94 int block_id = thread_id_;
95 // Loop until all blocks have been computed.
96 while (block_id < num_blocks) {
97 RUY_TRACE_SCOPE_NAME("Main loop iteration");
98 // Reserve the next block to handle, hiding the latency of this atomic op.
99 const int next_block_id =
100 atomic_block_id_->fetch_add(1, std::memory_order_relaxed);
101 // Get coordinates of the current block to handle, in "block space".
102 SidePair<int> block;
103 GetBlockByIndex(block_map_, block_id, &block);
104 // Get coordinates of the current block to handle, in matrix space.
105 SidePair<int> start, end;
106 GetBlockMatrixCoords(block_map_, block, &start, &end);
107 RUY_TRACE_INFO(TRMUL_TASK_MAIN_LOOP_GOT_BLOCK_COORDS);
108 // Maybe pack the current LHS/RHS block, if not already packed.
109 EnsurePacked(block, start, end, tuning);
110 // Actually do matrix multiplication work
111 params_->RunKernel(tuning, start, end);
112 // Move on to the next block as obtained by the atomic increment
113 // at the start of this while loop iteration.
114 block_id = next_block_id;
115 }
116
117 local_allocator_->FreeAll();
118 }
119
120 private:
121 // Tries to pack a block, without blocking.
122 // If the block was already packed, returns true.
123 // If the block was not started packing, packs it and returns true.
124 // If the block was being packed by another thread, returns false.
125 bool TryPack(Side side, int block, int start, int end, Tuning tuning) {
126 if (params_->is_prepacked[side]) {
127 return true;
128 }
129 if (!local_already_packed_[side][block]) {
130 if (need_atomics_) {
131 // Explanation of this compare_exchange_strong operation:
132 // This atomically performs all of the following:
133 // 1. Read `status` with "acquire" memory order.
134 // * That this read uses "acquire" is because both memory orders
135 // specified have "acquire" as their read-component.
136 // 2. Compare (bitwise) with `exchanged_status`.
137 // 3. If equal, stores the value kInProgress to `status` with "release"
138 // memory order, and returns true, so we take this 'if' branch.
139 // * That this store uses "release" is because of the _rel part in
140 // memory_order_acq_rel passed as the first memory order argument.
141 // 4. If not equal, stores the loaded value of `status` to
142 // `exchanged_status` with "relaxed" semantics, and returns false,
143 // so we take the 'else' branch.
144 // * That this store uses "relaxed" is because the second memory
145 // order argument, memory_order_acquire, implies no particular
146 // store semantics. "relaxed" is acceptable here because this
147 // stores to a local stack variable.
148 //
149 // Rationale for compare_exchange_strong as opposed to
150 // compare_exchange_weak:
151 // The spurious-failure case with compare_exchange_weak will actually
152 // happen a lot here, because the atomic 'status' bytes are stored
153 // contiguously in arrays and neighboring values will be accessed
154 // by multiple threads concurrently. On a typical ARM CPU, an exclusives
155 // reservation granule is 64 bytes, so a lot of false-sharing may
156 // happen. Using compare_exchange_weak would thus result in often having
157 // TryPack return 'false' when it could instead have done the packing
158 // work and returned 'true'. Heuristically, that is not a good thing.
159 // Moreover, this changes the TryPack contract, loosening it and making
160 // it harder for the caller to reason about. Finally, the overhead of
161 // atomic operations is mitigated by the enclosing check on
162 // local_already_packed, so maybe the overhead of
163 // compare_exchange_strong isn't such a problem. But we don't really
164 // know for sure, that would be interesting to experiment more with.
165 PackingStatus exchanged_status = PackingStatus::kNotStarted;
166 std::atomic<PackingStatus>& status = packing_status_[side][block];
167 if (status.compare_exchange_strong(
168 exchanged_status, PackingStatus::kInProgress,
169 std::memory_order_acq_rel, std::memory_order_acquire)) {
170 // In this branch, the status was kNotStarted and we just atomically
171 // changed it to kInProgress as we are about to handle the packing
172 // ourselves.
173 RUY_TRACE_INFO(TRYPACK_PACKING);
174 params_->RunPack(side, tuning, start, end);
175 status.store(PackingStatus::kFinished, std::memory_order_release);
176 } else if (exchanged_status == PackingStatus::kInProgress) {
177 // Another thread is currently packing this block.
178 RUY_TRACE_INFO(TRYPACK_ANOTHER_THREAD_PACKING);
179 return false;
180 } else {
181 RUY_TRACE_INFO(TRYPACK_PACKED_BY_ANOTHER_THREAD);
182 }
183 RUY_DCHECK(status.load(std::memory_order_acquire) ==
184 PackingStatus::kFinished);
185 } else {
186 // Single-threaded case: no need for expensive atomics,
187 // local_already_packed is the truth already.
188 params_->RunPack(side, tuning, start, end);
189 }
190 local_already_packed_[side][block] = true;
191 } else {
192 RUY_TRACE_INFO(TRYPACK_PREVIOUSLY_PACKED);
193 }
194 return true;
195 }
196
197 // Ensures that both the LHS and RHS blocks required by the specified block
198 // are packed. In the event that they are already being packed on another
199 // threads, this function may perform the packing of some other block while
200 // waiting for that other thread to finish packing the requested block.
201 void EnsurePacked(const SidePair<int>& block, const SidePair<int>& start,
202 const SidePair<int>& end, Tuning tuning) {
203#if RUY_OPT(PACK_AHEAD)
204 SidePair<int> next_runahead_block{block[Side::kLhs] + 1,
205 block[Side::kRhs] + 1};
206 Side next_runahead_side = Side::kLhs;
207#endif
208 while (true) {
209 bool both_sides_packed = true;
210 for (Side side : {Side::kLhs, Side::kRhs}) {
211 both_sides_packed &=
212 TryPack(side, block[side], start[side], end[side], tuning);
213 }
214 if (both_sides_packed) {
215 break;
216 }
217#if RUY_OPT(PACK_AHEAD)
218 RUY_TRACE_INFO(ENSURE_PACKED_ENTER_RUN_AHEAD);
219 const Side runahead_side = next_runahead_side;
220 const int runahead_block = next_runahead_block[runahead_side];
221 next_runahead_side = OtherSide(next_runahead_side);
222 if (runahead_block >= NumBlocksPerSide(runahead_side, block_map_)) {
223 continue;
224 }
225 int runahead_block_start, runahead_block_end;
226 GetBlockMatrixCoords(runahead_side, block_map_, runahead_block,
227 &runahead_block_start, &runahead_block_end);
228 TryPack(runahead_side, runahead_block, runahead_block_start,
229 runahead_block_end, tuning);
230 next_runahead_block[runahead_side] = runahead_block + 1;
231#endif
232 }
233 RUY_TRACE_INFO(ENSURE_PACKED_END);
234 }
235
236 TrMulParams* params_;
237 const BlockMap& block_map_;
238 std::atomic<int>* atomic_block_id_;
239 int thread_id_;
240 bool need_atomics_;
241 SidePair<std::atomic<PackingStatus>*> packing_status_;
242 TuningResolver* tuning_resolver_;
243 Allocator* local_allocator_;
244
245 // Local indicators of packedness to avoid the overhead of atomic ops.
246 SidePair<bool*> local_already_packed_;
247
248 CpuInfo* cpuinfo_;
249};
250
251int GetTentativeThreadCount(Ctx* ctx, int rows, int cols, int depth) {
252#if RUY_PLATFORM_EMSCRIPTEN
253 // b/139927184, std::thread constructor raises exception
254 return 1;
255#endif
256 RUY_TRACE_SCOPE;
257 // Empirically determined rule for reasonable number of
258 // threads to use. This is proportional to the number of arithmetic ops
259 // in this Mul (product of the 3 sizes).
260 // Be defensive here by explicitly promoting operands to int64 to avoid the
261 // pitfall of `int64 result = x * y;` overflowing as x and y are still narrow.
262 const std::int64_t rows_i64 = rows;
263 const std::int64_t cols_i64 = cols;
264 const std::int64_t depth_i64 = depth;
265 const std::int64_t problem_size = rows_i64 * cols_i64 * depth_i64;
266 // Division is cheap when the denominator is constant
267 static constexpr std::int64_t kSizePerAdditionalThread = 32768;
268 std::int64_t tentative_thread_count = problem_size / kSizePerAdditionalThread;
269 // tentative_thread_count is still an int64, still not necessarily in the
270 // range of type int. It probably is as long as kSizePerAdditionalThread is
271 // large, but imagine that that constant might change in the future.
272 tentative_thread_count = std::max<std::int64_t>(tentative_thread_count, 1);
273 tentative_thread_count =
274 std::min<std::int64_t>(tentative_thread_count, ctx->max_num_threads());
275 // now tentative_thread_count must be in the range of type int, because
276 // ctx->max_num_threads() is.
277 RUY_DCHECK_LE(tentative_thread_count, std::numeric_limits<int>::max());
278 return tentative_thread_count;
279}
280
281bool GetUseSimpleLoop(int tentative_thread_count, int rows, int cols, int depth,
282 int lhs_scalar_size, int rhs_scalar_size,
283 const CpuCacheParams& cpu_cache_params) {
284 RUY_TRACE_SCOPE;
285 if (tentative_thread_count == 1) {
286 if (IsObviouslyLinearTraversal(rows, cols, depth, lhs_scalar_size,
287 rhs_scalar_size, cpu_cache_params)) {
288 RUY_TRACE_INFO(GET_USE_SIMPLE_LOOP_RETURNS_TRUE);
289 return true;
290 }
291 }
292 RUY_TRACE_INFO(GET_USE_SIMPLE_LOOP_RETURNS_FALSE);
293 return false;
294}
295
296} // namespace
297
298// TrMul is the ruy middle-end. It contains the high-level logic to perform
299// a ruy::Mul's work, down to calls to back-end Kernel and Pack functions.
300// This includes determining how many threads to use, computing the BlockMap,
301// executing tasks on a thread-pool. The TrMul function itself runs on the main
302// thread, the code that is potentially running on worker threads is in
303// TrMulTask::Run().
304void TrMul(Ctx* ctx, TrMulParams* params) {
305 RUY_TRACE_SCOPE;
306 profiler::ScopeLabel label(
307 "TrMul (Path=0x%x, max_num_threads=%d, is_prepacked=(%d,%d))",
308 static_cast<int>(params->path), ctx->max_num_threads(),
309 params->is_prepacked[Side::kLhs], params->is_prepacked[Side::kRhs]);
310
311 PEMat& packed_lhs = params->packed_matrix[Side::kLhs];
312 PEMat& packed_rhs = params->packed_matrix[Side::kRhs];
313 EMat& lhs = params->src[Side::kLhs];
314 EMat& rhs = params->src[Side::kRhs];
315
316 const int rows = lhs.layout.cols;
317 const int cols = rhs.layout.cols;
318 const int depth = lhs.layout.rows;
319
320 const int tentative_thread_count =
321 GetTentativeThreadCount(ctx, rows, cols, depth);
322 const auto& cpu_cache_params = ctx->mutable_cpuinfo()->CacheParams();
323
324 // Suppress denormals to avoid computation inefficiency.
325 // Note this only handles the denormal suppression on the main thread. As for
326 // worker threads, the suppression is handled in each thread's main loop. See
327 // the corresponding code in thread_pool.cc for details.
328 ScopedSuppressDenormals suppress_denormals;
329
330 // Case of running this TrMul as a simple loop.
331 // This is a good place to start reading this function: all the rest
332 // of this function is just an optimized, but functionally equivalent,
333 // version of that.
334 if (GetUseSimpleLoop(tentative_thread_count, rows, cols, depth,
335 lhs.data_type.size, rhs.data_type.size,
336 cpu_cache_params)) {
337 profiler::ScopeLabel label_simple("TrMulImpl, simple loop");
338 Tuning tuning = ctx->GetMainThreadTuning();
339 RUY_TRACE_INFO(TRMUL_SIMPLE_LOOP);
340
341 const SidePair<int> origin{0, 0};
342 const SidePair<int> rounded_dims{packed_lhs.layout.cols,
343 packed_rhs.layout.cols};
344 for (Side side : {Side::kLhs, Side::kRhs}) {
345 if (!params->is_prepacked[side]) {
346 params->RunPack(side, tuning, origin[side], rounded_dims[side]);
347 }
348 }
349 params->RunKernel(tuning, origin, rounded_dims);
350 return;
351 }
352
353 profiler::ScopeLabel label_general("TrMulImpl, general case");
354 RUY_TRACE_INFO(TRMUL_GENERAL_CASE);
355 Allocator* main_allocator = ctx->GetMainAllocator();
356
357 // Initialize block map.
358 BlockMap block_map;
359 MakeBlockMap(packed_lhs.layout.cols, packed_rhs.layout.cols, depth,
360 packed_lhs.layout.kernel.cols, packed_rhs.layout.kernel.cols,
361 packed_lhs.data_type.size, packed_rhs.data_type.size,
362 tentative_thread_count, cpu_cache_params, &block_map);
363
364 // Initialize per-thread state.
365 const int thread_count = block_map.thread_count;
366 const bool need_atomics = thread_count > 1;
367 ctx->EnsureThreadSpecificResources(thread_count);
368 for (int i = 0; i < thread_count; i++) {
369 ctx->GetThreadSpecificTuningResolver(i)->SetTuning(ctx->explicit_tuning());
370 }
371
372 // In the need_atomics case, allocate and initialize atomic values tracking
373 // the packing status of blocks.
374 SidePair<std::atomic<PackingStatus>*> packing_status{nullptr, nullptr};
375 if (need_atomics) {
376 for (Side side : {Side::kLhs, Side::kRhs}) {
377 if (!params->is_prepacked[side]) {
378 const int size = NumBlocksPerSide(side, block_map);
379 main_allocator->Allocate(size, &packing_status[side]);
380 for (int i = 0; i < size; i++) {
381 packing_status[side][i].store(PackingStatus::kNotStarted,
382 std::memory_order_relaxed);
383 }
384 }
385 }
386 }
387
388 // Create the atomic block id, allocate it using Allocator so that
389 // we get the alignment ensuring that it sits alone in its exclusives
390 // reservation granule.
391 std::atomic<int>* atomic_block_id;
392 main_allocator->Allocate(1, &atomic_block_id);
393 atomic_block_id->store(thread_count);
394
395 // Create task objects. We allocate a single buffer and then use placement-new
396 // to construct N TrMulTask objects within it. To avoid having the Clang CFI
397 // sanitizer complain about a TrMulTask* pointer temporarily pointing to
398 // garbage, we keep the pointer a plain char* until finished constructing.
399 char* tasks_buf =
400 main_allocator->Allocate<char>(thread_count * sizeof(TrMulTask));
401 for (int i = 0; i < thread_count; i++) {
402 auto* allocator = ctx->GetThreadSpecificAllocator(i);
403 auto* tuning_resolver = ctx->GetThreadSpecificTuningResolver(i);
404 new (tasks_buf + i * sizeof(TrMulTask)) TrMulTask(
405 params, block_map, atomic_block_id, i, need_atomics, packing_status,
406 tuning_resolver, allocator, ctx->mutable_cpuinfo());
407 }
408 TrMulTask* tasks = reinterpret_cast<TrMulTask*>(tasks_buf);
409
410 // Do the computation.
411 ctx->mutable_thread_pool()->Execute(thread_count, tasks);
412
413 // Finish up.
414 for (int i = 0; i < thread_count; i++) {
415 tasks[i].~TrMulTask();
416 }
417}
418
419} // namespace ruy
420