1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #if defined(ENABLE_ONEDNN_OPENMP) && defined(ENABLE_MKL) && defined(_OPENMP) |
17 | #ifndef DNNL_AARCH64_USE_ACL |
18 | // Using LLVM's OpenMP header |
19 | #include "external/llvm_openmp/include/omp.h" |
20 | /* Added EIGEN_DONT_PARALLELIZE to avoid duplicating omp.h, please refer to |
21 | this link https://eigen.tuxfamily.org/dox/TopicMultiThreading.html for more |
22 | info. It does not have any negative impact on performance. */ |
23 | #define EIGEN_DONT_PARALLELIZE |
24 | #else |
25 | #include "omp.h" // NOLINT |
26 | #endif |
27 | #endif // ENABLE_ONEDNN_OPENMP && ENABLE_MKL &&_OPENMP |
28 | |
29 | #include "absl/base/call_once.h" |
30 | #include "absl/container/flat_hash_set.h" |
31 | #include "tensorflow/core/common_runtime/local_device.h" |
32 | #include "tensorflow/core/common_runtime/scoped_allocator.h" |
33 | #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h" |
34 | #include "tensorflow/core/common_runtime/threadpool_device.h" |
35 | #include "tensorflow/core/framework/allocator.h" |
36 | #include "tensorflow/core/framework/allocator_registry.h" |
37 | #include "tensorflow/core/framework/device_base.h" |
38 | #include "tensorflow/core/framework/op_kernel.h" |
39 | #include "tensorflow/core/framework/tensor.pb.h" |
40 | #include "tensorflow/core/framework/tensor_util.h" |
41 | #include "tensorflow/core/framework/types.h" |
42 | #include "tensorflow/core/graph/types.h" |
43 | #include "tensorflow/core/lib/hash/hash.h" |
44 | #include "tensorflow/core/platform/tracing.h" |
45 | #include "tensorflow/core/platform/types.h" |
46 | #include "tensorflow/core/public/session_options.h" |
47 | #include "tensorflow/core/util/util.h" |
48 | |
49 | #ifdef INTEL_MKL |
50 | #include "tensorflow/core/common_runtime/mkl_cpu_allocator.h" |
51 | #include "tensorflow/core/platform/cpu_info.h" |
52 | #endif // INTEL_MKL |
53 | |
54 | namespace tensorflow { |
55 | |
56 | ThreadPoolDevice::ThreadPoolDevice(const SessionOptions& options, |
57 | const string& name, Bytes memory_limit, |
58 | const DeviceLocality& locality, |
59 | Allocator* allocator) |
60 | : LocalDevice(options, Device::BuildDeviceAttributes( |
61 | name, DEVICE_CPU, memory_limit, locality)), |
62 | allocator_(allocator), |
63 | scoped_allocator_mgr_(new ScopedAllocatorMgr(name)) { |
64 | auto s = NodeFileWriter::GetNodeFileWriterIfEnabled(name, env()); |
65 | if (!s.ok()) { |
66 | LOG(ERROR) << s.status(); |
67 | } else { |
68 | node_file_writer_ = *s; |
69 | if (node_file_writer_) { |
70 | LOG(INFO) << "Writing NodeDefs to file: " |
71 | << node_file_writer_->filename(); |
72 | } |
73 | } |
74 | |
75 | #if defined(ENABLE_ONEDNN_OPENMP) && defined(INTEL_MKL) |
76 | // Early return when MKL is disabled |
77 | if (!IsMKLEnabled()) return; |
78 | #ifdef _OPENMP |
79 | const char* user_omp_threads = getenv("OMP_NUM_THREADS" ); |
80 | static absl::once_flag num_threads_setting_flag; |
81 | if (user_omp_threads == nullptr) { |
82 | // OMP_NUM_THREADS controls MKL's intra-op parallelization |
83 | // Default to available physical cores |
84 | const int mkl_intra_op = port::NumSchedulableCPUs(); |
85 | const int ht = port::NumHyperthreadsPerCore(); |
86 | absl::call_once(num_threads_setting_flag, omp_set_num_threads, |
87 | (mkl_intra_op + ht - 1) / ht); |
88 | } |
89 | |
90 | #ifndef DNNL_AARCH64_USE_ACL |
91 | const char* user_kmp_blocktime = getenv("KMP_BLOCKTIME" ); |
92 | static absl::once_flag blocktime_setting_flag; |
93 | if (user_kmp_blocktime == nullptr) { |
94 | // Sets the time, in milliseconds, that a thread should wait, |
95 | // after completing the execution of a parallel region, before sleeping. |
96 | absl::call_once(blocktime_setting_flag, kmp_set_blocktime, 1); |
97 | } |
98 | #endif |
99 | |
100 | #endif // _OPENMP |
101 | #endif // defined(ENABLE_ONEDNN_OPENMP) && defined(INTEL_MKL) |
102 | } |
103 | |
104 | ThreadPoolDevice::~ThreadPoolDevice() {} |
105 | |
106 | Allocator* ThreadPoolDevice::GetAllocator(AllocatorAttributes attr) { |
107 | return allocator_; |
108 | } |
109 | |
110 | Allocator* ThreadPoolDevice::GetScopedAllocator(AllocatorAttributes attr, |
111 | int64_t step_id) { |
112 | if (attr.scope_id > 0) { |
113 | return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance( |
114 | attr.scope_id); |
115 | } |
116 | LOG(FATAL) << "Unexpected call to ThreadPoolDevice::GetScopedAllocator " |
117 | << "attr.scope_id = " << attr.scope_id; |
118 | return allocator_; |
119 | } |
120 | |
121 | Status ThreadPoolDevice::MakeTensorFromProto( |
122 | const TensorProto& tensor_proto, const AllocatorAttributes alloc_attrs, |
123 | Tensor* tensor) { |
124 | if (tensor_proto.dtype() > 0 && tensor_proto.dtype() <= DataType_MAX) { |
125 | Tensor parsed(tensor_proto.dtype()); |
126 | if (parsed.FromProto(allocator_, tensor_proto)) { |
127 | *tensor = std::move(parsed); |
128 | return OkStatus(); |
129 | } |
130 | } |
131 | return errors::InvalidArgument("Cannot parse tensor from proto: " , |
132 | tensor_proto.DebugString()); |
133 | } |
134 | |
135 | void ThreadPoolDevice::CopyTensorInSameDevice( |
136 | const Tensor* input_tensor, Tensor* output_tensor, |
137 | const DeviceContext* device_context, StatusCallback done) { |
138 | if (input_tensor->NumElements() != output_tensor->NumElements()) { |
139 | done(errors::Internal( |
140 | "CPU->CPU copy shape mismatch: input=" , input_tensor->shape(), |
141 | ", output=" , output_tensor->shape())); |
142 | return; |
143 | } |
144 | tensor::DeepCopy(*input_tensor, output_tensor); |
145 | done(OkStatus()); |
146 | } |
147 | |
148 | namespace { |
149 | const absl::flat_hash_set<std::string>* GetOpsToLogFromEnv() { |
150 | auto* result = new absl::flat_hash_set<std::string>; |
151 | const char* env = getenv("TF_CPU_DEBUG_OPS_TO_LOG" ); |
152 | if (!env) { |
153 | return result; |
154 | } |
155 | |
156 | std::vector<absl::string_view> ops = absl::StrSplit(env, ','); |
157 | LOG(INFO) << "Will log inputs & outputs from the following ops: " ; |
158 | for (absl::string_view op : ops) { |
159 | result->insert(std::string(op)); |
160 | LOG(INFO) << " |" << op << "|" ; |
161 | } |
162 | |
163 | return result; |
164 | } |
165 | |
166 | bool ShouldLogInputsAndOutputs(OpKernel* op_kernel) { |
167 | static const absl::flat_hash_set<std::string>& ops_to_log = |
168 | *GetOpsToLogFromEnv(); |
169 | static const bool is_empty = ops_to_log.empty(); |
170 | if (is_empty) { |
171 | return false; |
172 | } |
173 | return ops_to_log.count(op_kernel->type_string()); |
174 | } |
175 | } // namespace |
176 | |
177 | void ThreadPoolDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) { |
178 | bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel); |
179 | |
180 | if (should_log_inputs_and_outputs) { |
181 | LogInputs(op_kernel, context); |
182 | } |
183 | |
184 | op_kernel->Compute(context); |
185 | |
186 | if (context->status().ok() && node_file_writer_) { |
187 | Status s = node_file_writer_->RecordNodeExecution(op_kernel, context); |
188 | if (!s.ok()) { |
189 | LOG(ERROR) << s; |
190 | context->SetStatus(s); |
191 | } |
192 | } |
193 | |
194 | if (should_log_inputs_and_outputs) { |
195 | LogOutputs(op_kernel, context); |
196 | } |
197 | } |
198 | |
199 | void ThreadPoolDevice::ComputeAsync(AsyncOpKernel* op_kernel, |
200 | OpKernelContext* context, |
201 | AsyncOpKernel::DoneCallback done) { |
202 | bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel); |
203 | |
204 | if (should_log_inputs_and_outputs) { |
205 | LogInputs(op_kernel, context); |
206 | AsyncOpKernel::DoneCallback parent_done = done; |
207 | done = [this, parent_done, op_kernel, context]() { |
208 | LogOutputs(op_kernel, context); |
209 | parent_done(); |
210 | }; |
211 | } |
212 | |
213 | op_kernel->ComputeAsync(context, done); |
214 | } |
215 | |
216 | void ThreadPoolDevice::LogInputs(OpKernel* op_kernel, |
217 | OpKernelContext* context) { |
218 | LOG(INFO) << "Inputs for " << op_kernel->name() << " (total " |
219 | << context->num_inputs() << "):" ; |
220 | for (int i = 0; i < context->num_inputs(); i++) { |
221 | if (!context->has_input(i)) { |
222 | LOG(INFO) << "input # " << i << " is absent" ; |
223 | continue; |
224 | } |
225 | LOG(INFO) << "input # " << i; |
226 | LOG(INFO) << context->input(i).DebugString(-1); |
227 | } |
228 | LOG(INFO) << "" ; |
229 | } |
230 | |
231 | void ThreadPoolDevice::LogOutputs(OpKernel* op_kernel, |
232 | OpKernelContext* context) { |
233 | if (!context->status().ok()) { |
234 | LOG(INFO) << op_kernel->name() |
235 | << " failed: " << context->status().error_message(); |
236 | return; |
237 | } |
238 | |
239 | LOG(INFO) << "Outputs for " << op_kernel->name() << " (total " |
240 | << context->num_inputs() << "):" ; |
241 | for (int i = 0; i < context->num_outputs(); i++) { |
242 | Tensor* output = context->mutable_output(i); |
243 | if (output == nullptr) { |
244 | LOG(INFO) << "output # " << i << " is null" ; |
245 | } else { |
246 | LOG(INFO) << "output # " << i; |
247 | LOG(INFO) << output->DebugString(-1); |
248 | } |
249 | } |
250 | LOG(INFO) << "" ; |
251 | } |
252 | |
253 | #ifdef INTEL_MKL |
254 | namespace { |
255 | class MklCPUAllocatorFactory : public AllocatorFactory { |
256 | public: |
257 | bool NumaEnabled() override { return false; } |
258 | |
259 | Allocator* CreateAllocator() override { return new MklCPUAllocator; } |
260 | |
261 | // Note: Ignores numa_node, for now. |
262 | virtual SubAllocator* CreateSubAllocator(int numa_node) { |
263 | return new MklSubAllocator; |
264 | } |
265 | }; |
266 | |
267 | REGISTER_MEM_ALLOCATOR("MklCPUAllocator" , (IsMKLEnabled() ? 200 : 50), |
268 | MklCPUAllocatorFactory); |
269 | |
270 | } // namespace |
271 | #endif // INTEL_MKL |
272 | |
273 | } // namespace tensorflow |
274 | |