1/* Copyright 2022 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15#include "tensorflow/core/data/metric_utils.h"
16
17#include <algorithm>
18#include <cstdint>
19#include <string>
20#include <vector>
21
22#include "absl/time/time.h"
23#include "tensorflow/core/data/utils.h"
24#include "tensorflow/core/framework/dataset.h"
25#include "tensorflow/core/framework/metrics.h"
26#include "tensorflow/core/framework/tensor.h"
27#include "tensorflow/core/framework/types.h"
28#include "tensorflow/core/platform/env.h"
29#include "tensorflow/core/platform/mutex.h"
30
31namespace tensorflow {
32namespace data {
33namespace {
34
35// Safely subtracts `x` from `y` avoiding underflow.
36uint64_t safe_sub(uint64_t x, uint64_t y) { return x >= y ? x - y : 0; }
37
38} // namespace
39
40IteratorMetricsCollector::IteratorMetricsCollector(
41 const std::string& device_type, const Env& env)
42 : device_type_(device_type), env_(env) {}
43
44absl::Time IteratorMetricsCollector::RecordStart() {
45 const uint64_t start_time_us = env_.NowMicros();
46 if (!ShouldCollectMetrics()) {
47 return absl::FromUnixMicros(start_time_us);
48 }
49
50 mutex_lock l(mu_);
51 if (end_time_us_ == 0) {
52 // We initialize `end_time_us_` to the start time of the first request to
53 // make it possible to use the delta between `end_time_us_` and subsequent
54 // `GetNext()` end time to incrementally collect the duration of the
55 // iterator's lifetime.
56 end_time_us_ = start_time_us;
57 }
58 uint64_t gap_time_us = 0;
59 if (num_active_calls_ == 0) {
60 first_start_time_us_ = start_time_us;
61 gap_time_us = safe_sub(start_time_us, end_time_us_);
62 }
63 metrics::RecordTFDataIteratorGap(gap_time_us);
64 num_active_calls_++;
65 return absl::FromUnixMicros(start_time_us);
66}
67
68void IteratorMetricsCollector::RecordStop(absl::Time start_time,
69 const std::vector<Tensor>& output) {
70 if (!ShouldCollectMetrics()) {
71 return;
72 }
73
74 const uint64_t end_time_us = env_.NowMicros();
75 AddLatencySample(safe_sub(end_time_us, absl::ToUnixMicros(start_time)));
76 IncrementThroughput(GetTotalBytes(output));
77 mutex_lock l(mu_);
78 metrics::RecordTFDataIteratorLifetime(safe_sub(end_time_us, end_time_us_));
79 end_time_us_ = std::max(end_time_us_, end_time_us);
80 num_active_calls_--;
81 if (num_active_calls_ == 0) {
82 metrics::RecordTFDataIteratorBusy(
83 safe_sub(end_time_us_, first_start_time_us_));
84 }
85}
86
87bool IteratorMetricsCollector::ShouldCollectMetrics() const {
88 return device_type_ == DEVICE_CPU;
89}
90
91} // namespace data
92} // namespace tensorflow
93