1 | /* Copyright 2022 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | #include "tensorflow/core/data/metric_utils.h" |
16 | |
17 | #include <algorithm> |
18 | #include <cstdint> |
19 | #include <string> |
20 | #include <vector> |
21 | |
22 | #include "absl/time/time.h" |
23 | #include "tensorflow/core/data/utils.h" |
24 | #include "tensorflow/core/framework/dataset.h" |
25 | #include "tensorflow/core/framework/metrics.h" |
26 | #include "tensorflow/core/framework/tensor.h" |
27 | #include "tensorflow/core/framework/types.h" |
28 | #include "tensorflow/core/platform/env.h" |
29 | #include "tensorflow/core/platform/mutex.h" |
30 | |
31 | namespace tensorflow { |
32 | namespace data { |
33 | namespace { |
34 | |
35 | // Safely subtracts `x` from `y` avoiding underflow. |
36 | uint64_t safe_sub(uint64_t x, uint64_t y) { return x >= y ? x - y : 0; } |
37 | |
38 | } // namespace |
39 | |
40 | IteratorMetricsCollector::IteratorMetricsCollector( |
41 | const std::string& device_type, const Env& env) |
42 | : device_type_(device_type), env_(env) {} |
43 | |
44 | absl::Time IteratorMetricsCollector::RecordStart() { |
45 | const uint64_t start_time_us = env_.NowMicros(); |
46 | if (!ShouldCollectMetrics()) { |
47 | return absl::FromUnixMicros(start_time_us); |
48 | } |
49 | |
50 | mutex_lock l(mu_); |
51 | if (end_time_us_ == 0) { |
52 | // We initialize `end_time_us_` to the start time of the first request to |
53 | // make it possible to use the delta between `end_time_us_` and subsequent |
54 | // `GetNext()` end time to incrementally collect the duration of the |
55 | // iterator's lifetime. |
56 | end_time_us_ = start_time_us; |
57 | } |
58 | uint64_t gap_time_us = 0; |
59 | if (num_active_calls_ == 0) { |
60 | first_start_time_us_ = start_time_us; |
61 | gap_time_us = safe_sub(start_time_us, end_time_us_); |
62 | } |
63 | metrics::RecordTFDataIteratorGap(gap_time_us); |
64 | num_active_calls_++; |
65 | return absl::FromUnixMicros(start_time_us); |
66 | } |
67 | |
68 | void IteratorMetricsCollector::RecordStop(absl::Time start_time, |
69 | const std::vector<Tensor>& output) { |
70 | if (!ShouldCollectMetrics()) { |
71 | return; |
72 | } |
73 | |
74 | const uint64_t end_time_us = env_.NowMicros(); |
75 | AddLatencySample(safe_sub(end_time_us, absl::ToUnixMicros(start_time))); |
76 | IncrementThroughput(GetTotalBytes(output)); |
77 | mutex_lock l(mu_); |
78 | metrics::RecordTFDataIteratorLifetime(safe_sub(end_time_us, end_time_us_)); |
79 | end_time_us_ = std::max(end_time_us_, end_time_us); |
80 | num_active_calls_--; |
81 | if (num_active_calls_ == 0) { |
82 | metrics::RecordTFDataIteratorBusy( |
83 | safe_sub(end_time_us_, first_start_time_us_)); |
84 | } |
85 | } |
86 | |
87 | bool IteratorMetricsCollector::ShouldCollectMetrics() const { |
88 | return device_type_ == DEVICE_CPU; |
89 | } |
90 | |
91 | } // namespace data |
92 | } // namespace tensorflow |
93 | |