1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_TSL_PLATFORM_THREADPOOL_H_ |
17 | #define TENSORFLOW_TSL_PLATFORM_THREADPOOL_H_ |
18 | |
19 | #include <functional> |
20 | #include <memory> |
21 | |
22 | #include "absl/types/optional.h" |
23 | #include "tensorflow/tsl/platform/env.h" |
24 | #include "tensorflow/tsl/platform/macros.h" |
25 | #include "tensorflow/tsl/platform/threadpool_interface.h" |
26 | #include "tensorflow/tsl/platform/types.h" |
27 | |
28 | namespace Eigen { |
29 | class Allocator; |
30 | class ThreadPoolInterface; |
31 | struct ThreadPoolDevice; |
32 | |
33 | template <typename Environment> |
34 | class ThreadPoolTempl; |
35 | } // namespace Eigen |
36 | |
37 | namespace tsl { |
38 | namespace thread { |
39 | |
40 | struct EigenEnvironment; |
41 | |
42 | class ThreadPool { |
43 | public: |
44 | // Scheduling strategies for ParallelFor. The strategy governs how the given |
45 | // units of work are distributed among the available threads in the |
46 | // threadpool. |
47 | enum class SchedulingStrategy { |
48 | // The Adaptive scheduling strategy adaptively chooses the shard sizes based |
49 | // on the cost of each unit of work, and the cost model of the underlying |
50 | // threadpool device. |
51 | // |
52 | // The 'cost_per_unit' is an estimate of the number of CPU cycles (or |
53 | // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating |
54 | // creates too many shards and CPU time will be dominated by per-shard |
55 | // overhead, such as Context creation. Underestimating may not fully make |
56 | // use of the specified parallelism, and may also cause inefficiencies due |
57 | // to load balancing issues and stragglers. |
58 | kAdaptive, |
59 | // The Fixed Block Size scheduling strategy shards the given units of work |
60 | // into shards of fixed size. In case the total number of units is not |
61 | // evenly divisible by 'block_size', at most one of the shards may be of |
62 | // smaller size. The exact number of shards may be found by a call to |
63 | // NumShardsUsedByFixedBlockSizeScheduling. |
64 | // |
65 | // Each shard may be executed on a different thread in parallel, depending |
66 | // on the number of threads available in the pool. Note that when there |
67 | // aren't enough threads in the pool to achieve full parallelism, function |
68 | // calls will be automatically queued. |
69 | kFixedBlockSize |
70 | }; |
71 | |
72 | // Contains additional parameters for either the Adaptive or the Fixed Block |
73 | // Size scheduling strategy. |
74 | class SchedulingParams { |
75 | public: |
76 | explicit SchedulingParams(SchedulingStrategy strategy, |
77 | absl::optional<int64_t> cost_per_unit, |
78 | absl::optional<int64_t> block_size) |
79 | : strategy_(strategy), |
80 | cost_per_unit_(cost_per_unit), |
81 | block_size_(block_size) {} |
82 | |
83 | SchedulingStrategy strategy() const { return strategy_; } |
84 | absl::optional<int64_t> cost_per_unit() const { return cost_per_unit_; } |
85 | absl::optional<int64_t> block_size() const { return block_size_; } |
86 | |
87 | private: |
88 | // The underlying Scheduling Strategy for which this instance contains |
89 | // additional parameters. |
90 | SchedulingStrategy strategy_; |
91 | |
92 | // The estimated cost per unit of work in number of CPU cycles (or |
93 | // nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling |
94 | // strategy. |
95 | absl::optional<int64_t> cost_per_unit_; |
96 | |
97 | // The block size of each shard. Only applicable for Fixed Block Size |
98 | // scheduling strategy. |
99 | absl::optional<int64_t> block_size_; |
100 | }; |
101 | |
102 | // Constructs a pool that contains "num_threads" threads with specified |
103 | // "name". env->StartThread() is used to create individual threads with the |
104 | // given ThreadOptions. If "low_latency_hint" is true the thread pool |
105 | // implementation may use it as a hint that lower latency is preferred at the |
106 | // cost of higher CPU usage, e.g. by letting one or more idle threads spin |
107 | // wait. Conversely, if the threadpool is used to schedule high-latency |
108 | // operations like I/O the hint should be set to false. |
109 | // |
110 | // REQUIRES: num_threads > 0 |
111 | ThreadPool(Env* env, const ThreadOptions& thread_options, |
112 | const std::string& name, int num_threads, bool low_latency_hint, |
113 | Eigen::Allocator* allocator = nullptr); |
114 | |
115 | // Constructs a pool for low-latency ops that contains "num_threads" threads |
116 | // with specified "name". env->StartThread() is used to create individual |
117 | // threads. |
118 | // REQUIRES: num_threads > 0 |
119 | ThreadPool(Env* env, const std::string& name, int num_threads); |
120 | |
121 | // Constructs a pool for low-latency ops that contains "num_threads" threads |
122 | // with specified "name". env->StartThread() is used to create individual |
123 | // threads with the given ThreadOptions. |
124 | // REQUIRES: num_threads > 0 |
125 | ThreadPool(Env* env, const ThreadOptions& thread_options, |
126 | const std::string& name, int num_threads); |
127 | |
128 | // Constructs a pool that wraps around the thread::ThreadPoolInterface |
129 | // instance provided by the caller. Caller retains ownership of |
130 | // `user_threadpool` and must ensure its lifetime is longer than the |
131 | // ThreadPool instance. |
132 | explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool); |
133 | |
134 | // Waits until all scheduled work has finished and then destroy the |
135 | // set of threads. |
136 | ~ThreadPool(); |
137 | |
138 | // Schedules fn() for execution in the pool of threads. |
139 | void Schedule(std::function<void()> fn); |
140 | |
141 | void SetStealPartitions( |
142 | const std::vector<std::pair<unsigned, unsigned>>& partitions); |
143 | |
144 | void ScheduleWithHint(std::function<void()> fn, int start, int limit); |
145 | |
146 | // Returns the number of shards used by ParallelForFixedBlockSizeScheduling |
147 | // with these parameters. |
148 | int NumShardsUsedByFixedBlockSizeScheduling(const int64_t total, |
149 | const int64_t block_size); |
150 | |
151 | // Returns the number of threads spawned by calling TransformRangeConcurrently |
152 | // with these parameters. |
153 | // Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling. |
154 | int NumShardsUsedByTransformRangeConcurrently(const int64_t block_size, |
155 | const int64_t total); |
156 | |
157 | // ParallelFor shards the "total" units of work assuming each unit of work |
158 | // having roughly "cost_per_unit" cost, in cycles. Each unit of work is |
159 | // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work |
160 | // and the total cost of each shard is roughly the same. |
161 | // |
162 | // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds |
163 | // if not CPU-bound) to complete a unit of work. Overestimating creates too |
164 | // many shards and CPU time will be dominated by per-shard overhead, such as |
165 | // Context creation. Underestimating may not fully make use of the specified |
166 | // parallelism, and may also cause inefficiencies due to load balancing |
167 | // issues and stragglers. |
168 | void ParallelFor(int64_t total, int64_t cost_per_unit, |
169 | const std::function<void(int64_t, int64_t)>& fn); |
170 | |
171 | // Similar to ParallelFor above, but takes the specified scheduling strategy |
172 | // into account. |
173 | void ParallelFor(int64_t total, const SchedulingParams& scheduling_params, |
174 | const std::function<void(int64_t, int64_t)>& fn); |
175 | |
176 | // Same as ParallelFor with Fixed Block Size scheduling strategy. |
177 | // Deprecated. Prefer ParallelFor with a SchedulingStrategy argument. |
178 | void TransformRangeConcurrently( |
179 | const int64_t block_size, const int64_t total, |
180 | const std::function<void(int64_t, int64_t)>& fn); |
181 | |
182 | // Shards the "total" units of work. For more details, see "ParallelFor". |
183 | // |
184 | // The function is passed a thread_id between 0 and NumThreads() *inclusive*. |
185 | // This is because some work can happen on the caller thread while the threads |
186 | // in the pool are also being used. |
187 | // |
188 | // The caller can allocate NumThreads() + 1 separate buffers for each thread. |
189 | // Each thread can safely write to the buffer given by its id without |
190 | // synchronization. However, the worker fn may be called multiple times |
191 | // sequentially with the same id. |
192 | // |
193 | // At most NumThreads() unique ids will actually be used, and only a few may |
194 | // be used for small workloads. If each buffer is expensive, the buffers |
195 | // should be stored in an array initially filled with null, and a buffer |
196 | // should be allocated by fn the first time that the id is used. |
197 | void ParallelForWithWorkerId( |
198 | int64_t total, int64_t cost_per_unit, |
199 | const std::function<void(int64_t, int64_t, int)>& fn); |
200 | |
201 | // Similar to ParallelForWithWorkerId above, but takes the specified |
202 | // scheduling strategy into account. |
203 | void ParallelForWithWorkerId( |
204 | int64_t total, const SchedulingParams& scheduling_params, |
205 | const std::function<void(int64_t, int64_t, int)>& fn); |
206 | |
207 | // Returns the number of threads in the pool. |
208 | int NumThreads() const; |
209 | |
210 | // Returns current thread id between 0 and NumThreads() - 1, if called from a |
211 | // thread in the pool. Returns -1 otherwise. |
212 | int CurrentThreadId() const; |
213 | |
214 | // If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface, |
215 | // returns a non-null pointer. The caller does not own the object the returned |
216 | // pointer points to, and should not attempt to delete. |
217 | Eigen::ThreadPoolInterface* AsEigenThreadPool() const; |
218 | |
219 | private: |
220 | // Divides the work represented by the range [0, total) into k shards. |
221 | // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k). |
222 | // Each shard may be executed on a different thread in parallel, depending on |
223 | // the number of threads available in the pool. |
224 | // When (i+1)*block_size > total, fn(i*block_size, total) is called instead. |
225 | // Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size). |
226 | // Requires 0 < block_size <= total. |
227 | void ParallelForFixedBlockSizeScheduling( |
228 | const int64_t total, const int64_t block_size, |
229 | const std::function<void(int64_t, int64_t)>& fn); |
230 | |
231 | // underlying_threadpool_ is the user_threadpool if user_threadpool is |
232 | // provided in the constructor. Otherwise it is the eigen_threadpool_. |
233 | Eigen::ThreadPoolInterface* underlying_threadpool_; |
234 | // eigen_threadpool_ is instantiated and owned by thread::ThreadPool if |
235 | // user_threadpool is not in the constructor. |
236 | std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_; |
237 | std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_; |
238 | TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool); |
239 | }; |
240 | |
241 | } // namespace thread |
242 | } // namespace tsl |
243 | |
244 | #endif // TENSORFLOW_TSL_PLATFORM_THREADPOOL_H_ |
245 | |