1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/distributed_runtime/worker_cache_logger.h"
17
18#include "tensorflow/core/common_runtime/step_stats_collector.h"
19#include "tensorflow/core/framework/allocation_description.pb.h"
20#include "tensorflow/core/framework/tensor_description.pb.h"
21#include "tensorflow/core/lib/strings/strcat.h"
22#include "tensorflow/core/lib/strings/stringprintf.h"
23#include "tensorflow/core/platform/mutex.h"
24#include "tensorflow/core/platform/types.h"
25
26namespace tensorflow {
27
28namespace {
29// Maximum number of step_ids for which RPC logs can be maintained.
30// TODO(mrry): Make this configurable if necessary.
31const int32_t kWorkerCacheLoggerLimit = 1 << 10;
32} // namespace
33
34void WorkerCacheLogger::SetLogging(bool v) {
35 mutex_lock l(count_mu_);
36 if (v) {
37 ++want_logging_count_;
38 } else {
39 --want_logging_count_;
40 // If RPCs get canceled, it may be possible for the count
41 // to go negative. This should not be a fatal error, since
42 // logging is non-critical.
43 if (want_logging_count_ < 0) want_logging_count_ = 0;
44 }
45}
46
47void WorkerCacheLogger::ClearLogs() {
48 mutex_lock l(mu_);
49 ClearLogsWithLock();
50}
51
52void WorkerCacheLogger::ClearLogsWithLock() {
53 for (auto& iter : log_map_) {
54 delete iter.second.collector;
55 }
56 log_map_.clear();
57}
58
59bool WorkerCacheLogger::RetrieveLogs(int64_t step_id, StepStats* ss) {
60 mutex_lock l(mu_);
61 LogMap::iterator iter = log_map_.find(step_id);
62 if (iter != log_map_.end()) {
63 iter->second.collector->FinalizeAndSwap(ss);
64 delete iter->second.collector;
65 log_map_.erase(iter);
66 return true;
67 }
68 return false;
69}
70
71void WorkerCacheLogger::Save(const string& device, int64_t step_id,
72 NodeExecStats* ns) {
73 mutex_lock l(mu_);
74 StepLog* sl = &log_map_[step_id];
75 if (!sl->collector) {
76 sl->collector = new StepStatsCollector(&sl->step_stats);
77 }
78 sl->collector->Save(device, ns);
79 if (log_map_.size() > kWorkerCacheLoggerLimit) {
80 // Something's gone wrong. Just empty the cache.
81 ClearLogsWithLock();
82 }
83}
84
85void WorkerCacheLogger::RecordRecvTensor(int64_t step_id, int64_t start_usecs,
86 int64_t end_usecs,
87 const string& tensor_name,
88 const string& src_device,
89 const string& dst_device,
90 int64_t bytes) {
91 RecordDataTransfer(step_id, start_usecs, end_usecs, tensor_name, src_device,
92 dst_device, bytes, "", "RecvTensor");
93}
94
95void WorkerCacheLogger::RecordDataTransfer(int64_t step_id, int64_t start_usecs,
96 int64_t end_usecs,
97 const string& tensor_name,
98 const string& src_device,
99 const string& dst_device,
100 int64_t bytes, const string& details,
101 const string& transfer_method_name) {
102 NodeExecStats* ns = new NodeExecStats;
103 ns->set_node_name(transfer_method_name);
104 int64_t elapsed_usecs = end_usecs - start_usecs;
105 if (details.empty()) {
106 auto byte_string = strings::StrCat("[", bytes, "B] ");
107 if (bytes >= 0.1 * 1048576.0) {
108 byte_string = strings::Printf("[%.1fMB] ", bytes / 1048576.0);
109 }
110 float mbs_rate = (8.0 * static_cast<float>(bytes)) / elapsed_usecs;
111 auto rate_string = (mbs_rate >= 1000.0)
112 ? strings::Printf("[%.1fGb/s] ", mbs_rate / 1000.0)
113 : strings::Printf("[%fMb/s] ", mbs_rate);
114 auto label = strings::StrCat(byte_string, rate_string, tensor_name,
115 " from ", src_device, " to ", dst_device);
116 ns->set_timeline_label(label);
117 } else {
118 ns->set_timeline_label(details);
119 }
120
121 ns->set_all_start_micros(start_usecs);
122 ns->set_op_start_rel_micros(0);
123 ns->set_op_end_rel_micros(elapsed_usecs);
124 ns->set_all_end_rel_micros(elapsed_usecs);
125 NodeOutput* no = ns->add_output();
126 no->set_slot(0);
127 no->mutable_tensor_description()
128 ->mutable_allocation_description()
129 ->set_requested_bytes(bytes);
130 Save(dst_device, step_id, ns);
131}
132
133} // namespace tensorflow
134