1/*******************************************************************************
2* Copyright 2022 Intel Corporation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*******************************************************************************/
16
17#include <algorithm>
18#include <functional>
19#include <memory>
20#include <mutex>
21#include <thread>
22#include <vector>
23#include <unordered_map>
24
25#include "dnnl_test_common.hpp"
26#include "gtest/gtest.h"
27
28#include "oneapi/dnnl/dnnl.hpp"
29
30namespace dnnl {
31
32using dim = memory::dim;
33using dt = memory::data_type;
34using tag = memory::format_tag;
35
36enum class task_kind_t {
37 conv_fwd,
38};
39
40class key_t {
41public:
42 key_t() = default;
43
44 template <typename T>
45 key_t(T t) {
46 values_.push_back(static_cast<uint64_t>(t));
47 }
48
49 template <typename T, typename U>
50 key_t(T t, U u) {
51 values_.push_back(static_cast<uint64_t>(t));
52 values_.push_back(static_cast<uint64_t>(u));
53 }
54
55 template <typename T>
56 key_t append(T t) {
57 auto ret = *this;
58 ret.values_.push_back(static_cast<uint64_t>(t));
59 return ret;
60 }
61
62 size_t get_hash() const {
63 size_t ret = 0;
64 for (auto v : values_) {
65 ret ^= std::hash<uint64_t>()(v);
66 }
67 return ret;
68 }
69
70 bool operator==(const key_t &other) const {
71 if (values_.size() != other.values_.size()) return false;
72 for (size_t i = 0; i < values_.size(); i++) {
73 if (values_[i] != other.values_[i]) return false;
74 }
75 return true;
76 }
77
78private:
79 std::vector<uint64_t> values_;
80};
81
82struct key_hash_t {
83 size_t operator()(const key_t &key) const { return key.get_hash(); }
84};
85
86struct key_equal_t {
87 bool operator()(const key_t &a, const key_t &b) const { return a == b; }
88};
89
90template <typename T>
91class resource_manager_t {
92public:
93 bool has(const key_t &key = 0) const { return cache_.count(key) != 0; }
94
95 const T &get(const key_t &key = 0) const { return cache_.at(key); }
96
97 void set(const key_t &key, const T &obj) {
98 if (has(key)) return;
99 cache_.emplace(key, obj);
100 }
101
102private:
103 std::unordered_map<key_t, T, key_hash_t, key_equal_t> cache_;
104};
105
106class task_t {
107public:
108 virtual ~task_t() = default;
109 virtual task_kind_t kind() const = 0;
110 static std::mutex &mutex() { return mutex_; }
111
112 virtual void create() = 0;
113 virtual void execute() = 0;
114 virtual void validate() = 0;
115
116 void set_reuse_engine(bool value) { reuse_engine_ = value; }
117 void set_reuse_stream(bool value) { reuse_stream_ = value; }
118 void set_reuse_primitive(bool value) { reuse_primitive_ = value; }
119
120 engine create_engine() const {
121 key_t key;
122 return create_object<engine>(reuse_engine_, key, engine_mgr_,
123 [&] { return engine(get_test_engine_kind(), 0); });
124 }
125
126 stream create_stream(const engine &eng) const {
127 key_t key(reinterpret_cast<uint64_t>(eng.get()));
128 return create_object<stream>(
129 reuse_stream_, key, stream_mgr_, [&] { return stream(eng); });
130 }
131
132 template <typename T>
133 primitive create_primitive(const typename T::primitive_desc &pd) {
134 key_t engine_key(reinterpret_cast<uint64_t>(pd.get_engine().get()));
135 key_t key = engine_key.append(kind());
136 return create_object<primitive>(
137 reuse_primitive_, key, primitive_mgr_, [&] { return T(pd); });
138 }
139
140 memory create_memory(
141 const memory::desc &d, const engine &eng, int value = 0) {
142 auto ret = memory(d, eng);
143 fill_memory(ret, value);
144 return ret;
145 }
146
147protected:
148 template <typename T, typename F>
149 static T create_object(bool reuse, const key_t &key,
150 resource_manager_t<T> &mgr, const F &func) {
151 std::lock_guard<std::mutex> lock(mutex_);
152 T ret;
153 if (reuse && mgr.has(key)) {
154 ret = mgr.get(key);
155 } else {
156 ret = func();
157 }
158 mgr.set(key, ret);
159 return ret;
160 }
161
162 void fill_memory(const memory &mem, float value) {
163 size_t sz = mem.get_desc().get_size();
164 int elems = (int)(sz / sizeof(float));
165 auto *ptr = mem.map_data<float>();
166 GTEST_EXPECT_NE(ptr, nullptr);
167 for (int i = 0; i < elems; i++) {
168 ptr[i] = value;
169 }
170 mem.unmap_data(ptr);
171 }
172
173 static resource_manager_t<engine> engine_mgr_;
174 static resource_manager_t<stream> stream_mgr_;
175 static resource_manager_t<primitive> primitive_mgr_;
176 static std::mutex mutex_;
177
178 bool reuse_engine_ = false;
179 bool reuse_stream_ = false;
180 bool reuse_primitive_ = false;
181};
182
183resource_manager_t<engine> task_t::engine_mgr_;
184resource_manager_t<stream> task_t::stream_mgr_;
185resource_manager_t<primitive> task_t::primitive_mgr_;
186std::mutex task_t::mutex_;
187
188class conv_fwd_task_t : public task_t {
189public:
190 conv_fwd_task_t(int idx) : fill_value_(idx % 5) {}
191
192 task_kind_t kind() const override { return task_kind_t::conv_fwd; }
193
194 void create() override {
195 eng_ = create_engine();
196
197 memory::dims src_dims = {N, IC, IH, IW};
198 memory::dims wei_dims = {OC, IC, KH, KW};
199 memory::dims dst_dims = {N, OC, OH, OW};
200 memory::dims strides = {SH, SW};
201 memory::dims padding_l = {PH, PW};
202 memory::dims padding_r = {PH, PW};
203 auto src_md = memory::desc(src_dims, dt::f32, tag::nhwc);
204 auto wei_md = memory::desc(wei_dims, dt::f32, tag::oihw);
205 auto dst_md = memory::desc(dst_dims, dt::f32, tag::nhwc);
206
207 primitive_attr attr;
208 attr.set_scratchpad_mode(scratchpad_mode::user);
209
210 pd_ = convolution_forward::primitive_desc(eng_,
211 prop_kind::forward_training, algorithm::convolution_direct,
212 src_md, wei_md, memory::desc(), dst_md, strides, padding_l,
213 padding_r, attr);
214
215 prim_ = create_primitive<convolution_forward>(pd_);
216
217 size_t sz = pd_.scratchpad_desc().get_size();
218 auto scratchpad = memory(memory::desc({(dim)sz}, dt::u8, tag::x), eng_);
219 args_.emplace(DNNL_ARG_SCRATCHPAD, scratchpad);
220
221 auto src_mem = create_memory(src_md, eng_, fill_value_);
222 auto wei_mem = create_memory(wei_md, eng_, 1);
223 auto dst_mem = create_memory(dst_md, eng_, 0);
224
225 args_.emplace(DNNL_ARG_SRC, src_mem);
226 args_.emplace(DNNL_ARG_WEIGHTS, wei_mem);
227 args_.emplace(DNNL_ARG_DST, dst_mem);
228 }
229
230 void execute() override {
231 auto strm = create_stream(eng_);
232 prim_.execute(strm, args_);
233 strm.wait();
234 }
235
236 void validate() override {
237 auto &dst = args_.at(DNNL_ARG_DST);
238 auto *ptr = dst.map_data<float>();
239 GTEST_EXPECT_NE(ptr, nullptr);
240 int elems = N * OC * OH * OW;
241 bool ok = true;
242 for (int i = 0; i < elems; i++) {
243 if (ptr[i] != fill_value_ * IC * KH * KW) {
244 ok = false;
245 break;
246 }
247 }
248 dst.unmap_data(ptr);
249 EXPECT_TRUE(ok);
250 }
251
252private:
253 static const dim N = 1;
254 static const dim OC = 32;
255 static const dim IC = 32;
256 static const dim KH = 3, KW = 3;
257 static const dim OH = 4, OW = 8;
258 static const dim IH = OH + KH - 1, IW = OW + KW - 1;
259 static const dim SH = 1, SW = 1;
260 static const dim PH = 0, PW = 0;
261
262 int fill_value_;
263 engine eng_;
264 convolution_forward::primitive_desc pd_;
265 primitive prim_;
266 std::unordered_map<int, memory> args_;
267};
268
269class test_concurrency_t : public ::testing::Test {
270protected:
271 void SetUp() override {
272#ifdef _WIN32
273 SKIP_IF(get_test_engine_kind() == engine::kind::gpu,
274 "GPU Windows is temporarily disabled due to long execution "
275 "time.");
276#endif
277 SKIP_IF_CUDA(true, "Concurrent execution is not supported with CUDA.");
278
279 // This test doesn't work properly under SDE.
280 const int len = 1024;
281 char value_str[len];
282 if (gtest_getenv("SDE_COMMAND_LINE", value_str, len) > 0)
283 SKIP_IF(true, "Skipping concurrency test since executed under SDE");
284
285 for (int i = 0; i < ntasks; i++) {
286 auto task = std::make_shared<conv_fwd_task_t>(i);
287
288 task->set_reuse_engine(i % 2 == 0);
289 task->set_reuse_stream(i % 3 == 0);
290 task->set_reuse_primitive(i % 5 == 0);
291
292 task->create();
293 tasks_.emplace_back(task);
294 }
295
296 Test();
297 }
298
299 void Test() {
300 std::vector<std::thread> threads;
301 for (int i = 0; i < nthreads; i++) {
302 int step = (ntasks + nthreads - 1) / nthreads;
303 int beg = i * step;
304 int end = std::min(beg + step, ntasks);
305 threads.emplace_back([&, beg, end]() {
306 for (int j = beg; j < end; j++)
307 tasks_[j]->execute();
308 });
309 }
310
311 for (auto &t : threads) {
312 t.join();
313 }
314
315 for (int i = 0; i < ntasks; i++) {
316 tasks_[i]->validate();
317 }
318 }
319
320 static const int ntasks;
321 static const int nthreads;
322 std::vector<std::shared_ptr<task_t>> tasks_;
323 engine eng;
324 stream strm;
325};
326
327const int test_concurrency_t::ntasks = 1000;
328const int test_concurrency_t::nthreads = []() {
329 int res = 100;
330#ifdef _OPENMP
331 const int hc = std::thread::hardware_concurrency();
332 // On systems that have many cores this test may not work properly
333 // due to high number of created threads (nthreads * primitive threads).
334 // Only OpenMP runtime is affected (probably only GOMP).
335 // Error message:
336 // libgomp: Thread creation failed: Resource temporarily unavailable
337 res = hc > 60 ? 10 : res;
338#endif
339 return res;
340}();
341
342TEST_F(test_concurrency_t, Basic) {}
343
344} // namespace dnnl
345