1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include "tensorflow/core/framework/run_handler_util.h" |
17 | |
18 | #include <cmath> |
19 | |
20 | #include "tensorflow/core/lib/strings/numbers.h" |
21 | #include "tensorflow/core/platform/logging.h" |
22 | #include "tensorflow/core/platform/str_util.h" |
23 | |
24 | namespace tensorflow { |
25 | |
26 | double ParamFromEnvWithDefault(const char* var_name, double default_value) { |
27 | const char* val = std::getenv(var_name); |
28 | double num; |
29 | return (val && strings::safe_strtod(val, &num)) ? num : default_value; |
30 | } |
31 | |
32 | std::vector<double> ParamFromEnvWithDefault(const char* var_name, |
33 | std::vector<double> default_value) { |
34 | const char* val = std::getenv(var_name); |
35 | if (!val) { |
36 | return default_value; |
37 | } |
38 | std::vector<string> splits = str_util::Split(val, "," ); |
39 | std::vector<double> result; |
40 | result.reserve(splits.size()); |
41 | for (auto& split : splits) { |
42 | double num; |
43 | if (strings::safe_strtod(split, &num)) { |
44 | result.push_back(num); |
45 | } else { |
46 | LOG(ERROR) << "Wrong format for " << var_name << ". Use default value." ; |
47 | return default_value; |
48 | } |
49 | } |
50 | return result; |
51 | } |
52 | |
53 | std::vector<int> ParamFromEnvWithDefault(const char* var_name, |
54 | std::vector<int> default_value) { |
55 | const char* val = std::getenv(var_name); |
56 | if (!val) { |
57 | return default_value; |
58 | } |
59 | std::vector<string> splits = str_util::Split(val, "," ); |
60 | std::vector<int> result; |
61 | result.reserve(splits.size()); |
62 | for (auto& split : splits) { |
63 | int num; |
64 | if (strings::safe_strto32(split, &num)) { |
65 | result.push_back(num); |
66 | } else { |
67 | LOG(ERROR) << "Wrong format for " << var_name << ". Use default value." ; |
68 | return default_value; |
69 | } |
70 | } |
71 | return result; |
72 | } |
73 | |
74 | bool ParamFromEnvBoolWithDefault(const char* var_name, bool default_value) { |
75 | const char* val = std::getenv(var_name); |
76 | return (val) ? str_util::Lowercase(val) == "true" : default_value; |
77 | } |
78 | |
79 | void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads, |
80 | int min_threads_per_request, |
81 | std::vector<std::uint_fast32_t>* start_vec, |
82 | std::vector<std::uint_fast32_t>* end_vec) { |
83 | // Each request is expected to have weight W[i] = num_active_requests - i. |
84 | // Therefore, total_weight = sum of all request weights. |
85 | float total_weight = 0.5f * num_active_requests * (num_active_requests + 1); |
86 | float demand_factor = static_cast<float>(num_threads) / total_weight; |
87 | float last_cumulative_weight = 0.0; |
88 | min_threads_per_request = std::max(1, min_threads_per_request); |
89 | for (int i = 0; i != num_active_requests; i++) { |
90 | float cumulative_weight = |
91 | static_cast<float>(i + 1) * |
92 | (num_active_requests - static_cast<float>(i) * 0.5f); |
93 | float weight = cumulative_weight - last_cumulative_weight; |
94 | // Quantize thread_demand by rounding up, and also satisfying |
95 | // `min_threads_per_request` constraint. |
96 | // Note: We subtract a small epsilon (0.00001) to prevent ceil(..) from |
97 | // rounding weights like 4.0 to 5. |
98 | int demand = std::max( |
99 | min_threads_per_request, |
100 | static_cast<int>(std::ceil(weight * demand_factor - 0.00001f))); |
101 | // For the quantized range [start, end); compute the floor of real start, |
102 | // and expand downwards from there with length `demand` and adjust for |
103 | // boundary conditions. |
104 | int start = last_cumulative_weight * demand_factor; |
105 | int end = std::min(num_threads, start + demand); |
106 | start = std::max(0, std::min(start, end - demand)); |
107 | start_vec->at(i) = start; |
108 | end_vec->at(i) = end; |
109 | last_cumulative_weight = cumulative_weight; |
110 | } |
111 | } |
112 | |
113 | void ComputeInterOpStealingRanges(int num_threads, int min_threads_per_domain, |
114 | std::vector<std::uint_fast32_t>* start_vec, |
115 | std::vector<std::uint_fast32_t>* end_vec) { |
116 | int steal_domain_size = std::min(min_threads_per_domain, num_threads); |
117 | unsigned steal_start = 0, steal_end = steal_domain_size; |
118 | for (int i = 0; i < num_threads; ++i) { |
119 | if (i >= steal_end) { |
120 | if (steal_end + steal_domain_size < num_threads) { |
121 | steal_start = steal_end; |
122 | steal_end += steal_domain_size; |
123 | } else { |
124 | steal_end = num_threads; |
125 | steal_start = steal_end - steal_domain_size; |
126 | } |
127 | } |
128 | start_vec->at(i) = steal_start; |
129 | end_vec->at(i) = steal_end; |
130 | } |
131 | } |
132 | |
133 | std::vector<int> ChooseRequestsWithExponentialDistribution( |
134 | int num_active_requests, int num_threads) { |
135 | // Fraction of the total threads that will be evenly distributed across |
136 | // requests. The rest of threads will be exponentially distributed across |
137 | // requests. |
138 | static const double kCapacityFractionForEvenDistribution = |
139 | ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_EVEN_FRACTION" , 0.5); |
140 | |
141 | // For the threads that will be exponentially distributed across requests, |
142 | // a request will get allocated (kPowerBase - 1) times as much threads as |
143 | // threads allocated to all requests that arrive after it. For example, the |
144 | // oldest request will be allocated num_threads*(kPowerBase-1)/kPowerBase |
145 | // number of threads. |
146 | static const double kPowerBase = |
147 | ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_POWER_BASE" , 2.0); |
148 | |
149 | static const int kMinEvenThreadsFromEnv = static_cast<int>( |
150 | ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_MIN_EVEN_THREADS" , 1)); |
151 | static const int kMaxEvenThreadsFromEnv = static_cast<int>( |
152 | ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_MAX_EVEN_THREADS" , 3)); |
153 | |
154 | std::vector<int> request_idx_list; |
155 | request_idx_list.resize(num_threads); |
156 | // Each request gets at least this number of threads that steal from it first. |
157 | int min_threads_per_request = |
158 | num_threads * kCapacityFractionForEvenDistribution / num_active_requests; |
159 | min_threads_per_request = |
160 | std::max(kMinEvenThreadsFromEnv, min_threads_per_request); |
161 | min_threads_per_request = |
162 | std::min(kMaxEvenThreadsFromEnv, min_threads_per_request); |
163 | |
164 | int num_remaining_threads = |
165 | std::max(0, num_threads - num_active_requests * min_threads_per_request); |
166 | int request_idx = -1; |
167 | int num_threads_next_request = 0; |
168 | |
169 | for (int tid = 0; tid < num_threads; ++tid) { |
170 | if (num_threads_next_request <= 0) { |
171 | request_idx = std::min(num_active_requests - 1, request_idx + 1); |
172 | int = |
173 | std::ceil(num_remaining_threads * (kPowerBase - 1.0) / kPowerBase); |
174 | num_remaining_threads -= num_extra_threads_next_request; |
175 | num_threads_next_request = |
176 | num_extra_threads_next_request + min_threads_per_request; |
177 | } |
178 | num_threads_next_request--; |
179 | request_idx_list[tid] = request_idx; |
180 | } |
181 | return request_idx_list; |
182 | } |
183 | |
184 | } // namespace tensorflow |
185 | |