1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#if defined(ENABLE_ONEDNN_OPENMP) && defined(ENABLE_MKL) && defined(_OPENMP)
17#ifndef DNNL_AARCH64_USE_ACL
18// Using LLVM's OpenMP header
19#include "external/llvm_openmp/include/omp.h"
20/* Added EIGEN_DONT_PARALLELIZE to avoid duplicating omp.h, please refer to
21this link https://eigen.tuxfamily.org/dox/TopicMultiThreading.html for more
22info. It does not have any negative impact on performance. */
23#define EIGEN_DONT_PARALLELIZE
24#else
25#include "omp.h" // NOLINT
26#endif
27#endif // ENABLE_ONEDNN_OPENMP && ENABLE_MKL &&_OPENMP
28
29#include "absl/base/call_once.h"
30#include "absl/container/flat_hash_set.h"
31#include "tensorflow/core/common_runtime/local_device.h"
32#include "tensorflow/core/common_runtime/scoped_allocator.h"
33#include "tensorflow/core/common_runtime/scoped_allocator_mgr.h"
34#include "tensorflow/core/common_runtime/threadpool_device.h"
35#include "tensorflow/core/framework/allocator.h"
36#include "tensorflow/core/framework/allocator_registry.h"
37#include "tensorflow/core/framework/device_base.h"
38#include "tensorflow/core/framework/op_kernel.h"
39#include "tensorflow/core/framework/tensor.pb.h"
40#include "tensorflow/core/framework/tensor_util.h"
41#include "tensorflow/core/framework/types.h"
42#include "tensorflow/core/graph/types.h"
43#include "tensorflow/core/lib/hash/hash.h"
44#include "tensorflow/core/platform/tracing.h"
45#include "tensorflow/core/platform/types.h"
46#include "tensorflow/core/public/session_options.h"
47#include "tensorflow/core/util/util.h"
48
49#ifdef INTEL_MKL
50#include "tensorflow/core/common_runtime/mkl_cpu_allocator.h"
51#include "tensorflow/core/platform/cpu_info.h"
52#endif // INTEL_MKL
53
54namespace tensorflow {
55
56ThreadPoolDevice::ThreadPoolDevice(const SessionOptions& options,
57 const string& name, Bytes memory_limit,
58 const DeviceLocality& locality,
59 Allocator* allocator)
60 : LocalDevice(options, Device::BuildDeviceAttributes(
61 name, DEVICE_CPU, memory_limit, locality)),
62 allocator_(allocator),
63 scoped_allocator_mgr_(new ScopedAllocatorMgr(name)) {
64 auto s = NodeFileWriter::GetNodeFileWriterIfEnabled(name, env());
65 if (!s.ok()) {
66 LOG(ERROR) << s.status();
67 } else {
68 node_file_writer_ = *s;
69 if (node_file_writer_) {
70 LOG(INFO) << "Writing NodeDefs to file: "
71 << node_file_writer_->filename();
72 }
73 }
74
75#if defined(ENABLE_ONEDNN_OPENMP) && defined(INTEL_MKL)
76 // Early return when MKL is disabled
77 if (!IsMKLEnabled()) return;
78#ifdef _OPENMP
79 const char* user_omp_threads = getenv("OMP_NUM_THREADS");
80 static absl::once_flag num_threads_setting_flag;
81 if (user_omp_threads == nullptr) {
82 // OMP_NUM_THREADS controls MKL's intra-op parallelization
83 // Default to available physical cores
84 const int mkl_intra_op = port::NumSchedulableCPUs();
85 const int ht = port::NumHyperthreadsPerCore();
86 absl::call_once(num_threads_setting_flag, omp_set_num_threads,
87 (mkl_intra_op + ht - 1) / ht);
88 }
89
90#ifndef DNNL_AARCH64_USE_ACL
91 const char* user_kmp_blocktime = getenv("KMP_BLOCKTIME");
92 static absl::once_flag blocktime_setting_flag;
93 if (user_kmp_blocktime == nullptr) {
94 // Sets the time, in milliseconds, that a thread should wait,
95 // after completing the execution of a parallel region, before sleeping.
96 absl::call_once(blocktime_setting_flag, kmp_set_blocktime, 1);
97 }
98#endif
99
100#endif // _OPENMP
101#endif // defined(ENABLE_ONEDNN_OPENMP) && defined(INTEL_MKL)
102}
103
104ThreadPoolDevice::~ThreadPoolDevice() {}
105
106Allocator* ThreadPoolDevice::GetAllocator(AllocatorAttributes attr) {
107 return allocator_;
108}
109
110Allocator* ThreadPoolDevice::GetScopedAllocator(AllocatorAttributes attr,
111 int64_t step_id) {
112 if (attr.scope_id > 0) {
113 return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance(
114 attr.scope_id);
115 }
116 LOG(FATAL) << "Unexpected call to ThreadPoolDevice::GetScopedAllocator "
117 << "attr.scope_id = " << attr.scope_id;
118 return allocator_;
119}
120
121Status ThreadPoolDevice::MakeTensorFromProto(
122 const TensorProto& tensor_proto, const AllocatorAttributes alloc_attrs,
123 Tensor* tensor) {
124 if (tensor_proto.dtype() > 0 && tensor_proto.dtype() <= DataType_MAX) {
125 Tensor parsed(tensor_proto.dtype());
126 if (parsed.FromProto(allocator_, tensor_proto)) {
127 *tensor = std::move(parsed);
128 return OkStatus();
129 }
130 }
131 return errors::InvalidArgument("Cannot parse tensor from proto: ",
132 tensor_proto.DebugString());
133}
134
135void ThreadPoolDevice::CopyTensorInSameDevice(
136 const Tensor* input_tensor, Tensor* output_tensor,
137 const DeviceContext* device_context, StatusCallback done) {
138 if (input_tensor->NumElements() != output_tensor->NumElements()) {
139 done(errors::Internal(
140 "CPU->CPU copy shape mismatch: input=", input_tensor->shape(),
141 ", output=", output_tensor->shape()));
142 return;
143 }
144 tensor::DeepCopy(*input_tensor, output_tensor);
145 done(OkStatus());
146}
147
148namespace {
149const absl::flat_hash_set<std::string>* GetOpsToLogFromEnv() {
150 auto* result = new absl::flat_hash_set<std::string>;
151 const char* env = getenv("TF_CPU_DEBUG_OPS_TO_LOG");
152 if (!env) {
153 return result;
154 }
155
156 std::vector<absl::string_view> ops = absl::StrSplit(env, ',');
157 LOG(INFO) << "Will log inputs & outputs from the following ops: ";
158 for (absl::string_view op : ops) {
159 result->insert(std::string(op));
160 LOG(INFO) << " |" << op << "|";
161 }
162
163 return result;
164}
165
166bool ShouldLogInputsAndOutputs(OpKernel* op_kernel) {
167 static const absl::flat_hash_set<std::string>& ops_to_log =
168 *GetOpsToLogFromEnv();
169 static const bool is_empty = ops_to_log.empty();
170 if (is_empty) {
171 return false;
172 }
173 return ops_to_log.count(op_kernel->type_string());
174}
175} // namespace
176
177void ThreadPoolDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
178 bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
179
180 if (should_log_inputs_and_outputs) {
181 LogInputs(op_kernel, context);
182 }
183
184 op_kernel->Compute(context);
185
186 if (context->status().ok() && node_file_writer_) {
187 Status s = node_file_writer_->RecordNodeExecution(op_kernel, context);
188 if (!s.ok()) {
189 LOG(ERROR) << s;
190 context->SetStatus(s);
191 }
192 }
193
194 if (should_log_inputs_and_outputs) {
195 LogOutputs(op_kernel, context);
196 }
197}
198
199void ThreadPoolDevice::ComputeAsync(AsyncOpKernel* op_kernel,
200 OpKernelContext* context,
201 AsyncOpKernel::DoneCallback done) {
202 bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
203
204 if (should_log_inputs_and_outputs) {
205 LogInputs(op_kernel, context);
206 AsyncOpKernel::DoneCallback parent_done = done;
207 done = [this, parent_done, op_kernel, context]() {
208 LogOutputs(op_kernel, context);
209 parent_done();
210 };
211 }
212
213 op_kernel->ComputeAsync(context, done);
214}
215
216void ThreadPoolDevice::LogInputs(OpKernel* op_kernel,
217 OpKernelContext* context) {
218 LOG(INFO) << "Inputs for " << op_kernel->name() << " (total "
219 << context->num_inputs() << "):";
220 for (int i = 0; i < context->num_inputs(); i++) {
221 if (!context->has_input(i)) {
222 LOG(INFO) << "input # " << i << " is absent";
223 continue;
224 }
225 LOG(INFO) << "input # " << i;
226 LOG(INFO) << context->input(i).DebugString(-1);
227 }
228 LOG(INFO) << "";
229}
230
231void ThreadPoolDevice::LogOutputs(OpKernel* op_kernel,
232 OpKernelContext* context) {
233 if (!context->status().ok()) {
234 LOG(INFO) << op_kernel->name()
235 << " failed: " << context->status().error_message();
236 return;
237 }
238
239 LOG(INFO) << "Outputs for " << op_kernel->name() << " (total "
240 << context->num_inputs() << "):";
241 for (int i = 0; i < context->num_outputs(); i++) {
242 Tensor* output = context->mutable_output(i);
243 if (output == nullptr) {
244 LOG(INFO) << "output # " << i << " is null";
245 } else {
246 LOG(INFO) << "output # " << i;
247 LOG(INFO) << output->DebugString(-1);
248 }
249 }
250 LOG(INFO) << "";
251}
252
253#ifdef INTEL_MKL
254namespace {
255class MklCPUAllocatorFactory : public AllocatorFactory {
256 public:
257 bool NumaEnabled() override { return false; }
258
259 Allocator* CreateAllocator() override { return new MklCPUAllocator; }
260
261 // Note: Ignores numa_node, for now.
262 virtual SubAllocator* CreateSubAllocator(int numa_node) {
263 return new MklSubAllocator;
264 }
265};
266
267REGISTER_MEM_ALLOCATOR("MklCPUAllocator", (IsMKLEnabled() ? 200 : 50),
268 MklCPUAllocatorFactory);
269
270} // namespace
271#endif // INTEL_MKL
272
273} // namespace tensorflow
274