1 | /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include "tensorflow/core/distributed_runtime/worker_cache_logger.h" |
17 | |
18 | #include "tensorflow/core/common_runtime/step_stats_collector.h" |
19 | #include "tensorflow/core/framework/allocation_description.pb.h" |
20 | #include "tensorflow/core/framework/tensor_description.pb.h" |
21 | #include "tensorflow/core/lib/strings/strcat.h" |
22 | #include "tensorflow/core/lib/strings/stringprintf.h" |
23 | #include "tensorflow/core/platform/mutex.h" |
24 | #include "tensorflow/core/platform/types.h" |
25 | |
26 | namespace tensorflow { |
27 | |
28 | namespace { |
29 | // Maximum number of step_ids for which RPC logs can be maintained. |
30 | // TODO(mrry): Make this configurable if necessary. |
31 | const int32_t kWorkerCacheLoggerLimit = 1 << 10; |
32 | } // namespace |
33 | |
34 | void WorkerCacheLogger::SetLogging(bool v) { |
35 | mutex_lock l(count_mu_); |
36 | if (v) { |
37 | ++want_logging_count_; |
38 | } else { |
39 | --want_logging_count_; |
40 | // If RPCs get canceled, it may be possible for the count |
41 | // to go negative. This should not be a fatal error, since |
42 | // logging is non-critical. |
43 | if (want_logging_count_ < 0) want_logging_count_ = 0; |
44 | } |
45 | } |
46 | |
47 | void WorkerCacheLogger::ClearLogs() { |
48 | mutex_lock l(mu_); |
49 | ClearLogsWithLock(); |
50 | } |
51 | |
52 | void WorkerCacheLogger::ClearLogsWithLock() { |
53 | for (auto& iter : log_map_) { |
54 | delete iter.second.collector; |
55 | } |
56 | log_map_.clear(); |
57 | } |
58 | |
59 | bool WorkerCacheLogger::RetrieveLogs(int64_t step_id, StepStats* ss) { |
60 | mutex_lock l(mu_); |
61 | LogMap::iterator iter = log_map_.find(step_id); |
62 | if (iter != log_map_.end()) { |
63 | iter->second.collector->FinalizeAndSwap(ss); |
64 | delete iter->second.collector; |
65 | log_map_.erase(iter); |
66 | return true; |
67 | } |
68 | return false; |
69 | } |
70 | |
71 | void WorkerCacheLogger::Save(const string& device, int64_t step_id, |
72 | NodeExecStats* ns) { |
73 | mutex_lock l(mu_); |
74 | StepLog* sl = &log_map_[step_id]; |
75 | if (!sl->collector) { |
76 | sl->collector = new StepStatsCollector(&sl->step_stats); |
77 | } |
78 | sl->collector->Save(device, ns); |
79 | if (log_map_.size() > kWorkerCacheLoggerLimit) { |
80 | // Something's gone wrong. Just empty the cache. |
81 | ClearLogsWithLock(); |
82 | } |
83 | } |
84 | |
85 | void WorkerCacheLogger::RecordRecvTensor(int64_t step_id, int64_t start_usecs, |
86 | int64_t end_usecs, |
87 | const string& tensor_name, |
88 | const string& src_device, |
89 | const string& dst_device, |
90 | int64_t bytes) { |
91 | RecordDataTransfer(step_id, start_usecs, end_usecs, tensor_name, src_device, |
92 | dst_device, bytes, "" , "RecvTensor" ); |
93 | } |
94 | |
95 | void WorkerCacheLogger::RecordDataTransfer(int64_t step_id, int64_t start_usecs, |
96 | int64_t end_usecs, |
97 | const string& tensor_name, |
98 | const string& src_device, |
99 | const string& dst_device, |
100 | int64_t bytes, const string& details, |
101 | const string& transfer_method_name) { |
102 | NodeExecStats* ns = new NodeExecStats; |
103 | ns->set_node_name(transfer_method_name); |
104 | int64_t elapsed_usecs = end_usecs - start_usecs; |
105 | if (details.empty()) { |
106 | auto byte_string = strings::StrCat("[" , bytes, "B] " ); |
107 | if (bytes >= 0.1 * 1048576.0) { |
108 | byte_string = strings::Printf("[%.1fMB] " , bytes / 1048576.0); |
109 | } |
110 | float mbs_rate = (8.0 * static_cast<float>(bytes)) / elapsed_usecs; |
111 | auto rate_string = (mbs_rate >= 1000.0) |
112 | ? strings::Printf("[%.1fGb/s] " , mbs_rate / 1000.0) |
113 | : strings::Printf("[%fMb/s] " , mbs_rate); |
114 | auto label = strings::StrCat(byte_string, rate_string, tensor_name, |
115 | " from " , src_device, " to " , dst_device); |
116 | ns->set_timeline_label(label); |
117 | } else { |
118 | ns->set_timeline_label(details); |
119 | } |
120 | |
121 | ns->set_all_start_micros(start_usecs); |
122 | ns->set_op_start_rel_micros(0); |
123 | ns->set_op_end_rel_micros(elapsed_usecs); |
124 | ns->set_all_end_rel_micros(elapsed_usecs); |
125 | NodeOutput* no = ns->add_output(); |
126 | no->set_slot(0); |
127 | no->mutable_tensor_description() |
128 | ->mutable_allocation_description() |
129 | ->set_requested_bytes(bytes); |
130 | Save(dst_device, step_id, ns); |
131 | } |
132 | |
133 | } // namespace tensorflow |
134 | |