1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_TSL_PLATFORM_THREADPOOL_H_
17#define TENSORFLOW_TSL_PLATFORM_THREADPOOL_H_
18
19#include <functional>
20#include <memory>
21
22#include "absl/types/optional.h"
23#include "tensorflow/tsl/platform/env.h"
24#include "tensorflow/tsl/platform/macros.h"
25#include "tensorflow/tsl/platform/threadpool_interface.h"
26#include "tensorflow/tsl/platform/types.h"
27
28namespace Eigen {
29class Allocator;
30class ThreadPoolInterface;
31struct ThreadPoolDevice;
32
33template <typename Environment>
34class ThreadPoolTempl;
35} // namespace Eigen
36
37namespace tsl {
38namespace thread {
39
40struct EigenEnvironment;
41
42class ThreadPool {
43 public:
44 // Scheduling strategies for ParallelFor. The strategy governs how the given
45 // units of work are distributed among the available threads in the
46 // threadpool.
47 enum class SchedulingStrategy {
48 // The Adaptive scheduling strategy adaptively chooses the shard sizes based
49 // on the cost of each unit of work, and the cost model of the underlying
50 // threadpool device.
51 //
52 // The 'cost_per_unit' is an estimate of the number of CPU cycles (or
53 // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating
54 // creates too many shards and CPU time will be dominated by per-shard
55 // overhead, such as Context creation. Underestimating may not fully make
56 // use of the specified parallelism, and may also cause inefficiencies due
57 // to load balancing issues and stragglers.
58 kAdaptive,
59 // The Fixed Block Size scheduling strategy shards the given units of work
60 // into shards of fixed size. In case the total number of units is not
61 // evenly divisible by 'block_size', at most one of the shards may be of
62 // smaller size. The exact number of shards may be found by a call to
63 // NumShardsUsedByFixedBlockSizeScheduling.
64 //
65 // Each shard may be executed on a different thread in parallel, depending
66 // on the number of threads available in the pool. Note that when there
67 // aren't enough threads in the pool to achieve full parallelism, function
68 // calls will be automatically queued.
69 kFixedBlockSize
70 };
71
72 // Contains additional parameters for either the Adaptive or the Fixed Block
73 // Size scheduling strategy.
74 class SchedulingParams {
75 public:
76 explicit SchedulingParams(SchedulingStrategy strategy,
77 absl::optional<int64_t> cost_per_unit,
78 absl::optional<int64_t> block_size)
79 : strategy_(strategy),
80 cost_per_unit_(cost_per_unit),
81 block_size_(block_size) {}
82
83 SchedulingStrategy strategy() const { return strategy_; }
84 absl::optional<int64_t> cost_per_unit() const { return cost_per_unit_; }
85 absl::optional<int64_t> block_size() const { return block_size_; }
86
87 private:
88 // The underlying Scheduling Strategy for which this instance contains
89 // additional parameters.
90 SchedulingStrategy strategy_;
91
92 // The estimated cost per unit of work in number of CPU cycles (or
93 // nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling
94 // strategy.
95 absl::optional<int64_t> cost_per_unit_;
96
97 // The block size of each shard. Only applicable for Fixed Block Size
98 // scheduling strategy.
99 absl::optional<int64_t> block_size_;
100 };
101
102 // Constructs a pool that contains "num_threads" threads with specified
103 // "name". env->StartThread() is used to create individual threads with the
104 // given ThreadOptions. If "low_latency_hint" is true the thread pool
105 // implementation may use it as a hint that lower latency is preferred at the
106 // cost of higher CPU usage, e.g. by letting one or more idle threads spin
107 // wait. Conversely, if the threadpool is used to schedule high-latency
108 // operations like I/O the hint should be set to false.
109 //
110 // REQUIRES: num_threads > 0
111 ThreadPool(Env* env, const ThreadOptions& thread_options,
112 const std::string& name, int num_threads, bool low_latency_hint,
113 Eigen::Allocator* allocator = nullptr);
114
115 // Constructs a pool for low-latency ops that contains "num_threads" threads
116 // with specified "name". env->StartThread() is used to create individual
117 // threads.
118 // REQUIRES: num_threads > 0
119 ThreadPool(Env* env, const std::string& name, int num_threads);
120
121 // Constructs a pool for low-latency ops that contains "num_threads" threads
122 // with specified "name". env->StartThread() is used to create individual
123 // threads with the given ThreadOptions.
124 // REQUIRES: num_threads > 0
125 ThreadPool(Env* env, const ThreadOptions& thread_options,
126 const std::string& name, int num_threads);
127
128 // Constructs a pool that wraps around the thread::ThreadPoolInterface
129 // instance provided by the caller. Caller retains ownership of
130 // `user_threadpool` and must ensure its lifetime is longer than the
131 // ThreadPool instance.
132 explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool);
133
134 // Waits until all scheduled work has finished and then destroy the
135 // set of threads.
136 ~ThreadPool();
137
138 // Schedules fn() for execution in the pool of threads.
139 void Schedule(std::function<void()> fn);
140
141 void SetStealPartitions(
142 const std::vector<std::pair<unsigned, unsigned>>& partitions);
143
144 void ScheduleWithHint(std::function<void()> fn, int start, int limit);
145
146 // Returns the number of shards used by ParallelForFixedBlockSizeScheduling
147 // with these parameters.
148 int NumShardsUsedByFixedBlockSizeScheduling(const int64_t total,
149 const int64_t block_size);
150
151 // Returns the number of threads spawned by calling TransformRangeConcurrently
152 // with these parameters.
153 // Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling.
154 int NumShardsUsedByTransformRangeConcurrently(const int64_t block_size,
155 const int64_t total);
156
157 // ParallelFor shards the "total" units of work assuming each unit of work
158 // having roughly "cost_per_unit" cost, in cycles. Each unit of work is
159 // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
160 // and the total cost of each shard is roughly the same.
161 //
162 // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
163 // if not CPU-bound) to complete a unit of work. Overestimating creates too
164 // many shards and CPU time will be dominated by per-shard overhead, such as
165 // Context creation. Underestimating may not fully make use of the specified
166 // parallelism, and may also cause inefficiencies due to load balancing
167 // issues and stragglers.
168 void ParallelFor(int64_t total, int64_t cost_per_unit,
169 const std::function<void(int64_t, int64_t)>& fn);
170
171 // Similar to ParallelFor above, but takes the specified scheduling strategy
172 // into account.
173 void ParallelFor(int64_t total, const SchedulingParams& scheduling_params,
174 const std::function<void(int64_t, int64_t)>& fn);
175
176 // Same as ParallelFor with Fixed Block Size scheduling strategy.
177 // Deprecated. Prefer ParallelFor with a SchedulingStrategy argument.
178 void TransformRangeConcurrently(
179 const int64_t block_size, const int64_t total,
180 const std::function<void(int64_t, int64_t)>& fn);
181
182 // Shards the "total" units of work. For more details, see "ParallelFor".
183 //
184 // The function is passed a thread_id between 0 and NumThreads() *inclusive*.
185 // This is because some work can happen on the caller thread while the threads
186 // in the pool are also being used.
187 //
188 // The caller can allocate NumThreads() + 1 separate buffers for each thread.
189 // Each thread can safely write to the buffer given by its id without
190 // synchronization. However, the worker fn may be called multiple times
191 // sequentially with the same id.
192 //
193 // At most NumThreads() unique ids will actually be used, and only a few may
194 // be used for small workloads. If each buffer is expensive, the buffers
195 // should be stored in an array initially filled with null, and a buffer
196 // should be allocated by fn the first time that the id is used.
197 void ParallelForWithWorkerId(
198 int64_t total, int64_t cost_per_unit,
199 const std::function<void(int64_t, int64_t, int)>& fn);
200
201 // Similar to ParallelForWithWorkerId above, but takes the specified
202 // scheduling strategy into account.
203 void ParallelForWithWorkerId(
204 int64_t total, const SchedulingParams& scheduling_params,
205 const std::function<void(int64_t, int64_t, int)>& fn);
206
207 // Returns the number of threads in the pool.
208 int NumThreads() const;
209
210 // Returns current thread id between 0 and NumThreads() - 1, if called from a
211 // thread in the pool. Returns -1 otherwise.
212 int CurrentThreadId() const;
213
214 // If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface,
215 // returns a non-null pointer. The caller does not own the object the returned
216 // pointer points to, and should not attempt to delete.
217 Eigen::ThreadPoolInterface* AsEigenThreadPool() const;
218
219 private:
220 // Divides the work represented by the range [0, total) into k shards.
221 // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
222 // Each shard may be executed on a different thread in parallel, depending on
223 // the number of threads available in the pool.
224 // When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
225 // Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size).
226 // Requires 0 < block_size <= total.
227 void ParallelForFixedBlockSizeScheduling(
228 const int64_t total, const int64_t block_size,
229 const std::function<void(int64_t, int64_t)>& fn);
230
231 // underlying_threadpool_ is the user_threadpool if user_threadpool is
232 // provided in the constructor. Otherwise it is the eigen_threadpool_.
233 Eigen::ThreadPoolInterface* underlying_threadpool_;
234 // eigen_threadpool_ is instantiated and owned by thread::ThreadPool if
235 // user_threadpool is not in the constructor.
236 std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_;
237 std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_;
238 TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
239};
240
241} // namespace thread
242} // namespace tsl
243
244#endif // TENSORFLOW_TSL_PLATFORM_THREADPOOL_H_
245