1/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_
17#define TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_
18
19#define EIGEN_USE_THREADS
20
21#include "third_party/eigen3/Eigen/Core"
22#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
23#include "tensorflow/core/framework/tensor.h"
24#include "tensorflow/core/framework/tensor_types.h"
25#include "tensorflow/core/platform/types.h"
26
27namespace tensorflow {
28
29using CPUDevice = Eigen::ThreadPoolDevice;
30
31namespace functor {
32
33// Compute reduction over outer dimensions.
34// Example:
35// input: [D1, D2, ... , DN]
36// ->
37// output: [Di, ... , DN] where i belongs to set [1,N]
38template <typename InputT, typename AccumT, typename OutputT,
39 typename BinaryFunctor>
40struct ReduceOuterDimensions {
41 ReduceOuterDimensions() {}
42
43 template <int num_dims>
44 void operator()(const CPUDevice& device,
45 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims,
46 const Tensor& input, Tensor* output) const {
47 // Compute inner and outer dim after reshaping into 2d tensor.
48 const int num_output_dims = output->dims();
49 auto output_dims = output->template flat<OutputT>().dimensions();
50
51 Eigen::Index inner_dim = 1, outer_dim = 1;
52 for (int i = 0; i < num_dims - num_output_dims; ++i)
53 outer_dim *= input_dims[i];
54 for (int i = num_dims - num_output_dims; i < num_dims; ++i)
55 inner_dim *= input_dims[i];
56
57 if (1 == outer_dim) {
58 // Nothing to do but passing input to output.
59 output->template flat<OutputT>() =
60 input.template flat<InputT>().template cast<OutputT>().reshape(
61 output_dims);
62 return;
63 }
64
65 // Get device thread num.
66 const Eigen::Index num_threads = device.numThreads();
67
68 // If the inner dim parallelism is large enough
69 // TODO(ezhulenev): There seems to be no benefits in going this route. Check
70 // if this can be improved, or use better heuristic?
71 if (inner_dim > num_threads * 32) {
72 // Do not create more blocks than there are threads in a pool.
73 const Eigen::Index num_blocks = num_threads;
74
75 // Block size along the outer dimension.
76 const Eigen::Index inner_block_size = Eigen::divup(inner_dim, num_blocks);
77 const InputT* input_data = input.template flat<InputT>().data();
78
79 // Allocate temporary buffer for partial reductions.
80 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index> buffer(
81 {inner_dim});
82 buffer.setZero();
83 AccumT* buffer_data = buffer.data();
84
85 using Buffer = Eigen::TensorMap<
86 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>,
87 Eigen::Unaligned>;
88
89 using Input = Eigen::TensorMap<
90 Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>,
91 Eigen::Unaligned>;
92
93 const auto compute = [inner_dim, outer_dim, num_blocks, inner_block_size,
94 input_data, buffer_data](
95 Eigen::Index start, Eigen::Index limit) -> void {
96 DCHECK(start >= 0 && limit <= num_blocks);
97 Eigen::Index inner_dim_start = start * inner_block_size;
98 Eigen::Index inner_dim_limit = limit * inner_block_size;
99 inner_dim_limit = std::min(inner_dim, inner_dim_limit);
100 Eigen::Index my_job_len = inner_dim_limit - inner_dim_start;
101
102 const InputT* my_job_start = input_data + inner_dim_start;
103 Buffer buf(buffer_data + inner_dim_start, my_job_len);
104
105 for (Eigen::Index i = 0; i < outer_dim; ++i) {
106 auto in = Input(my_job_start + i * inner_dim, my_job_len);
107 auto cast = in.template cast<AccumT>();
108 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf),
109 const decltype(cast)>(buf, cast);
110 }
111 };
112
113 // Compute cost of reducing a single block.
114 const Eigen::Index compute_size = outer_dim * inner_block_size;
115 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT);
116 const Eigen::TensorOpCost cost(
117 compute_input_bytes,
118 0, // We'll be mostly writing to L1, assume store cost is 0
119 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost);
120
121 device.parallelFor(num_blocks, cost, compute);
122
123 // Write final result to the output.
124 output->template flat<OutputT>() =
125 buffer.template cast<OutputT>().reshape(output_dims);
126 } else {
127 // Compute block size along the outer dimension for efficiency.
128 const Eigen::Index parallel_cell_size = inner_dim;
129 const Eigen::Index total_workload = outer_dim * inner_dim;
130 const Eigen::Index max_parallelism = total_workload / parallel_cell_size;
131
132 const Eigen::Index min_block_workload = 2000;
133 const Eigen::Index min_block_size =
134 Eigen::divup(min_block_workload, parallel_cell_size);
135 const Eigen::Index max_num_blocks = std::min(
136 max_parallelism, Eigen::divup(total_workload, min_block_size));
137
138 // Do not create more blocks than there are threads in a pool.
139 const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads);
140
141 // Block size along the outer dimension.
142 const Eigen::Index outer_block_size = Eigen::divup(outer_dim, num_blocks);
143
144 const InputT* input_data = input.template flat<InputT>().data();
145
146 // Allocate temporary buffer for partial reductions.
147 Tensor buffer(DataTypeToEnum<AccumT>::v(), {num_blocks, inner_dim});
148 buffer.template flat<AccumT>().setZero();
149 AccumT* buffer_data = buffer.template flat<AccumT>().data();
150
151 using Buffer = Eigen::TensorMap<
152 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>,
153 Eigen::Unaligned>;
154
155 using Input = Eigen::TensorMap<
156 Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>,
157 Eigen::Unaligned>;
158
159 const auto compute = [inner_dim, num_blocks, outer_block_size,
160 buffer_data, input_data, outer_dim](
161 Eigen::Index start, Eigen::Index limit) -> void {
162 DCHECK(start >= 0 && limit <= num_blocks);
163 Eigen::Index outer_dim_start = start * outer_block_size;
164 Eigen::Index outer_dim_limit = limit * outer_block_size;
165 outer_dim_limit = std::min(outer_dim, outer_dim_limit);
166
167 Buffer buf(buffer_data + start * inner_dim, inner_dim);
168 for (Eigen::Index i = outer_dim_start; i < outer_dim_limit; ++i) {
169 auto in = Input(input_data + i * inner_dim, inner_dim);
170 auto cast = in.template cast<AccumT>();
171 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf),
172 const decltype(cast)>(buf, cast);
173 }
174 };
175
176 // Compute cost of reducing a single block.
177 const Eigen::Index compute_size = outer_block_size * inner_dim;
178 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT);
179 const Eigen::TensorOpCost cost(
180 compute_input_bytes,
181 0, // We'll be mostly writing to L1, assume store cost is 0
182 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost);
183
184 device.parallelFor(num_blocks, cost, compute);
185
186 // Aggregate partial results from temporary buffer into first block.
187 auto buf0 = Buffer(buffer_data, inner_dim);
188 // Just sum the buffer up, as inner dimensions is not large in this case.
189 for (int i = 1; i < num_blocks; ++i) {
190 auto buf = Buffer(buffer_data + i * inner_dim, inner_dim);
191 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0),
192 const decltype(buf)>(buf0, buf);
193 }
194 // Write final result to the output.
195 output->template flat<OutputT>() =
196 buf0.template cast<OutputT>().reshape(output_dims);
197 }
198 }
199};
200
201// Compute reduction to some serial middle dimensions (like a axis).
202// Example:
203// input: [D1, D2, ... , DN]
204// ->
205// output: [Di, ... , Dj] where i & j belongs to set [1,N].
206template <typename InputT, typename AccumT, typename OutputT,
207 typename BinaryFunctor, typename Reducer>
208struct ReduceMiddleDimensions {
209 ReduceMiddleDimensions() {}
210
211 template <int num_dims>
212 void operator()(const CPUDevice& device,
213 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims,
214 const Tensor& input, Tensor* output,
215 const int axis_begin_dim) const {
216 // Compute dims after reshaping into 3d tensor.
217 const int num_output_dims = output->dims();
218 auto output_dims = output->template flat<OutputT>().dimensions();
219
220 Eigen::Index inner_dim = 1, middle_dim = 1, outer_dim = 1;
221 for (int i = 0; i < axis_begin_dim; ++i) outer_dim *= input_dims[i];
222 for (int i = axis_begin_dim; i < axis_begin_dim + num_output_dims; ++i)
223 middle_dim *= input_dims[i];
224 for (int i = axis_begin_dim + num_output_dims; i < num_dims; ++i)
225 inner_dim *= input_dims[i];
226
227 if ((1 == inner_dim * outer_dim)) {
228 // Nothing to do.
229 output->template flat<OutputT>() =
230 input.template flat<InputT>().template cast<OutputT>().reshape(
231 output_dims);
232 return;
233 }
234
235 // Compute block size along the outer dimension for efficiency.
236 const Eigen::Index parallel_cell_size = inner_dim;
237 const Eigen::Index max_parallelism = outer_dim * middle_dim;
238 const Eigen::Index total_workload = max_parallelism * inner_dim;
239
240 const Eigen::Index min_block_workload = 2000;
241 const Eigen::Index min_block_size =
242 Eigen::divup(min_block_workload, parallel_cell_size);
243 const Eigen::Index max_num_blocks =
244 std::min(max_parallelism, Eigen::divup(total_workload, min_block_size));
245
246 // Do not create more blocks than there are threads in a pool.
247 const Eigen::Index num_threads = device.numThreads();
248 const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads);
249
250 // Block size along the outer dimension.
251 const Eigen::Index outer_block_size =
252 Eigen::divup(total_workload, num_blocks);
253
254 const InputT* input_data = input.template flat<InputT>().data();
255
256 // Allocate temporary buffer for partial reductions.
257 Eigen::Tensor<AccumT, 2> buffer(num_blocks, middle_dim);
258 buffer.setZero();
259 AccumT* buffer_data = buffer.data();
260
261 using Buffer = Eigen::TensorMap<Eigen::Tensor<AccumT, 1>>;
262 using Input = Eigen::TensorMap<Eigen::Tensor<const InputT, 1>>;
263
264 Eigen::array<Eigen::Index, 1> reduction_axis = {0};
265 Reducer reducer;
266 const BinaryFunctor binary_op;
267
268 const auto compute = [inner_dim, middle_dim, input_data, buffer_data,
269 total_workload, num_blocks, outer_block_size,
270 reduction_axis, reducer, binary_op](
271 Eigen::Index start, Eigen::Index limit) -> void {
272 DCHECK(start >= 0 && limit <= num_blocks);
273 Eigen::Index block_start = start * outer_block_size;
274 Eigen::Index block_limit = limit * outer_block_size;
275 block_limit = std::min(total_workload, block_limit);
276 Buffer buf(buffer_data + start * middle_dim, middle_dim);
277
278 const int align_start =
279 ((block_start + inner_dim - 1) / inner_dim) * inner_dim;
280 const int align_end = (block_limit / inner_dim) * inner_dim;
281
282 Eigen::Index coordinate = block_start / inner_dim % middle_dim;
283 Eigen::Tensor<AccumT, 0> reduced =
284 Input(&input_data[block_start], align_start - block_start)
285 .reduce(reduction_axis, reducer)
286 .template cast<AccumT>();
287
288 buf(coordinate) = binary_op(buf(coordinate), reduced(0));
289
290 coordinate = align_start / inner_dim % middle_dim;
291 for (int i = align_start; i < align_end; i += inner_dim) {
292 reduced = Input(&input_data[i], inner_dim)
293 .reduce(reduction_axis, reducer)
294 .template cast<AccumT>();
295 buf(coordinate) = binary_op(buf(coordinate), reduced(0));
296 ++coordinate;
297 if (middle_dim == coordinate) coordinate = 0;
298 }
299
300 reduced = Input(&input_data[align_end], block_limit - align_end)
301 .reduce(reduction_axis, reducer)
302 .template cast<AccumT>();
303 buf(coordinate) = binary_op(buf(coordinate), reduced(0));
304 };
305
306 // Compute cost of reducing a single block.
307 const Eigen::Index compute_size = outer_block_size * inner_dim;
308 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT);
309 const Eigen::TensorOpCost cost(
310 compute_input_bytes,
311 0, // We'll be mostly writing to L1, assume store cost is 0
312 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost);
313
314 device.parallelFor(num_blocks, cost, compute);
315
316 using Output = Eigen::TensorMap<
317 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>,
318 Eigen::Unaligned>;
319 // Aggregate partial results from temporary buffer into first block.
320 auto buf0 = Output(buffer_data, middle_dim);
321 // TODO(ezhulenev): Parallelize this loop for large inner dimensions?
322 for (int i = 1; i < num_blocks; ++i) {
323 auto buf = Output(buffer_data + i * middle_dim, middle_dim);
324 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0),
325 const decltype(buf)>(buf0, buf);
326 }
327
328 // Write final result to the output.
329 output->template flat<OutputT>() =
330 buf0.template cast<OutputT>().reshape(output_dims);
331 }
332};
333
334} // namespace functor
335} // namespace tensorflow
336
337#endif // TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_
338