1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_CACHE_LOGGER_H_
17#define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_CACHE_LOGGER_H_
18
19#include <string>
20#include <unordered_map>
21
22#include "tensorflow/core/framework/step_stats.pb.h"
23#include "tensorflow/core/platform/mutex.h"
24#include "tensorflow/core/platform/thread_annotations.h"
25#include "tensorflow/core/platform/types.h"
26
27namespace tensorflow {
28class StepStatsCollector;
29
30// WorkerCacheLogger is a thread-safe utility for use by a WorkerCache
31// to optionally log some selected RPC activity. A single instance
32// should be owned by a WorkerCache, for use by its RemoteWorker
33// instances.
34
35class WorkerCacheLogger {
36 public:
37 // Start/Stop logging activity. This function increments/decrements
38 // a counter so that if two separate steps turn logging on/off,
39 // logging should be on for the union of the durations of both,
40 // regardless of relative timing.
41 void SetLogging(bool v);
42
43 // Discard any saved log data.
44 void ClearLogs();
45
46 // Return logs for the identified step in *ss. Any returned data will no
47 // longer be stored. Returns true iff *ss was modified.
48 bool RetrieveLogs(int64_t step_id, StepStats* ss);
49
50 // Return true if there is any outstanding request for logging on
51 // the RPC channels.
52 bool LoggingActive() {
53 mutex_lock l(count_mu_);
54 return want_logging_count_ > 0;
55 }
56
57 // Generates a NodeExecStats record with the given data, and saves for
58 // later retrieval by RetrieveLogs().
59 void RecordRecvTensor(int64_t step_id, int64_t start_usecs, int64_t end_usecs,
60 const string& tensor_name, const string& src_device,
61 const string& dst_device, int64_t bytes);
62
63 // Generates a NodeExecStats record with the given data, and saves for
64 // later retrieval by RetrieveLogs().
65 void RecordDataTransfer(int64_t step_id, int64_t start_usecs,
66 int64_t end_usecs, const string& tensor_name,
67 const string& src_device, const string& dst_device,
68 int64_t bytes, const string& details,
69 const string& transfer_method_name);
70
71 private:
72 mutex count_mu_;
73 int32 want_logging_count_ TF_GUARDED_BY(count_mu_) = 0;
74
75 struct StepLog {
76 StepStats step_stats;
77 StepStatsCollector* collector;
78 };
79 typedef std::unordered_map<int64_t, StepLog> LogMap;
80 mutex mu_;
81 LogMap log_map_ TF_GUARDED_BY(mu_);
82
83 // Records "ns" in log_map_ under the given device and step.
84 void Save(const string& device, int64_t step_id, NodeExecStats* ns);
85
86 void ClearLogsWithLock() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
87};
88} // namespace tensorflow
89#endif // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_CACHE_LOGGER_H_
90