1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
17#define TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
18
19#include "tensorflow/core/kernels/typed_conditional_accumulator_base.h"
20
21namespace tensorflow {
22
23/**
24 * An aggregation object for adding sparse gradients, represented as a tuple of
25 * indices, values, and a (possibly empty) shape.
26 *
27 * The two main methods of this class are TryApplyGrad and TryTakeGrad.
28 *
29 * TryApplyGrad tries add a gradient to the accumulator. The attempt is
30 * successful if local_step >= global_step, i.e., if the gradient is not stale,
31 * having been computed using up-to-date information. Otherwise, the gradient is
32 * silently dropped.
33 *
34 * TryTakeGrad logs an attempt to read the average gradient. The attempt is
35 * blocked until the number of gradients accumulated (via TryApplyGrad) is equal
36 * or exceeds the number requested by TryTakeGrad.
37 * Once this condition is satisfied, the following actions are taken:
38 * (1) the value of the average gradient is returned
39 * (2) the count of accumulated gradients is reset to 0
40 * (3) the internal global_step value (current_global_step_) is incremented by 1
41 *
42 * SparseConditionalAccumulator is the datatype-dependent templated sub-class of
43 * ConditionalAccumulatorBase. It implements the virtual arithmetic methods that
44 * are used by for aggregating, averaging, allocating, returning indexed slices.
45 */
46template <typename Device, typename T>
47class SparseConditionalAccumulator
48 : public TypedConditionalAccumulatorBase<
49 std::tuple<const Tensor*, const Tensor*, const Tensor*>> {
50 public:
51 SparseConditionalAccumulator(const DataType& dtype,
52 const PartialTensorShape& shape,
53 const string& name, const string& reduction_type)
54 : TypedConditionalAccumulatorBase<
55 std::tuple<const Tensor*, const Tensor*, const Tensor*>>(
56 dtype, shape, name, reduction_type),
57 accum_val_(std::make_unique<Tensor>()) {}
58
59 protected:
60 std::unique_ptr<std::vector<int64_t>> accum_idx_vec_;
61 std::unique_ptr<std::vector<int>> count_element_;
62
63 std::unique_ptr<Tensor> accum_val_;
64
65 typedef Eigen::TensorMap<Eigen::Tensor<T, 1, Eigen::RowMajor>,
66 Eigen::Unaligned>
67 SliceT;
68 typedef Eigen::TensorMap<Eigen::Tensor<const T, 1, Eigen::RowMajor>,
69 Eigen::Unaligned>
70 SliceConstT;
71
72 Status ValidateShape(
73 std::tuple<const Tensor*, const Tensor*, const Tensor*>* tensor,
74 bool has_known_shape) TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
75 const Tensor* tensor_idx = std::get<0>(*tensor);
76 const Tensor* tensor_val = std::get<1>(*tensor);
77 const Tensor* tensor_shape = std::get<2>(*tensor);
78 int64_t grad_val_dims = tensor_val->dims();
79 int64_t grad_dims = grad_val_dims;
80
81 // Compare with provided shape
82 if (has_known_shape) {
83 if (shape_.dims() > tensor_shape->NumElements()) {
84 return errors::InvalidArgument(
85 "Shape mismatch: expected shape rank at least ", shape_.dims(),
86 ", got ", tensor_shape->NumElements());
87 }
88 const auto tensor_shape_flat = tensor_shape->flat<int64_t>();
89 for (int64_t i = 0; i < shape_.dims(); i++) {
90 if (shape_.dim_size(i) != -1 &&
91 shape_.dim_size(i) != tensor_shape_flat(i)) {
92 return errors::InvalidArgument("Shape mismatch: expected shape dim ",
93 i, " to be ", shape_.dim_size(i),
94 ", got ", tensor_shape_flat(i));
95 }
96 }
97 }
98 // Check that indices are within limits
99 if (shape_.dims() > 0 && shape_.dim_size(0) != -1 &&
100 tensor_idx->dims() > 0) {
101 for (int64_t i = 0; i < tensor_idx->dim_size(0); i++) {
102 if (tensor_idx->vec<int64_t>()(i) >= shape_.dim_size(0)) {
103 return errors::InvalidArgument(
104 "Shape mismatch: index of slice ", i, " exceeded limits of shape",
105 "; index is ", tensor_idx->vec<int64_t>()(i), " exceeded ",
106 shape_.dim_size(0));
107 }
108 }
109 }
110
111 // Check values compatibility with accumulated gradient if available
112 if (counter_ > 0) {
113 int64_t accum_val_dims = accum_val_->dims();
114 if (accum_val_dims != grad_val_dims) {
115 return errors::InvalidArgument("Shape mismatch: expected values rank ",
116 accum_val_dims, ", got ", grad_val_dims);
117 }
118 for (int64_t i = 1; i < accum_val_dims; i++) {
119 if (accum_val_->dim_size(i) != tensor_val->dim_size(i)) {
120 return errors::InvalidArgument("Shape mismatch: expected values dim ",
121 i, " to be ", accum_val_->dim_size(i),
122 ", got ", tensor_val->dim_size(i));
123 }
124 }
125 } else {
126 // If there are no accumulated gradients, check against shape_
127 if (shape_.dims() > grad_dims) {
128 return errors::InvalidArgument(
129 "Shape mismatch: expected values rank at least ", shape_.dims(),
130 ", got ", grad_dims);
131 }
132 // Check that values have correct dimensions
133 for (int64_t i = 1; i < shape_.dims(); i++) {
134 if (shape_.dim_size(i) != -1 &&
135 shape_.dim_size(i) != tensor_val->dim_size(i)) {
136 return errors::InvalidArgument("Shape mismatch: expected values dim ",
137 i, " to be ", shape_.dim_size(i),
138 ", got ", tensor_val->dim_size(i));
139 }
140 }
141 }
142
143 return OkStatus();
144 }
145
146 void AllocateAndAssignToAccumGradFunction(
147 OpKernelContext* ctx,
148 std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override {
149 const Tensor* grad_idx = std::get<0>(*grad);
150 const Tensor* grad_val = std::get<1>(*grad);
151
152 const int64_t nnz = grad_idx->dim_size(0);
153
154 // Assign indices
155 accum_idx_vec_ = std::make_unique<std::vector<int64_t>>();
156 accum_idx_vec_->reserve(nnz);
157 for (int i = 0; i < nnz; i++) {
158 accum_idx_vec_->push_back(grad_idx->vec<int64_t>()(i));
159 }
160
161 // Assign values to accum_val_tensor
162 OP_REQUIRES_OK(
163 ctx, ctx->allocate_temp(dtype_, grad_val->shape(), accum_val_.get()));
164 accum_val_->flat<T>().device(ctx->template eigen_device<Device>()) =
165 grad_val->flat<T>();
166
167 // Assign count_element_
168 count_element_ = std::make_unique<std::vector<int>>(nnz, 1);
169
170 // Do not need shape; Assume that the op has checked that the shapes match,
171 // so grad's shape == shape_
172 }
173
174 void AddToAccumGradFunction(
175 OpKernelContext* ctx,
176 std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override {
177 // Modeled after third_party/tensorflow/core/kernels/sparse_add_op
178
179 const Tensor* grad_idx = std::get<0>(*grad);
180 const Tensor* grad_val = std::get<1>(*grad);
181
182 const int64_t accum_nnz = accum_idx_vec_->size();
183 const int64_t grad_nnz = grad_idx->dim_size(0);
184
185 // Source enumerates the origin of a non-zero element: whether it is from
186 // the new gradient, the accumulated gradient, or the sum of both.
187 enum Source { from_accum, from_grad, from_accum_and_grad };
188
189 // (1) do a pass over inputs, and append values and indices to vectors
190 std::vector<std::tuple<Source, int64, int64>> entries_to_copy;
191 entries_to_copy.reserve(accum_nnz + grad_nnz);
192
193 // Pass over all non-zero elements of both the gradient and the accumulated
194 // value, to identify where each non-zero element of the sum comes from.
195 // The input and output indexed slices are assumed to be ordered along
196 // increasing dimension number.
197 int64_t i = 0, j = 0;
198 int64_t sum_nnz = 0;
199 while (i < accum_nnz && j < grad_nnz) {
200 sum_nnz++;
201 switch (cmp(accum_idx_vec_.get(), grad_idx, i, j)) {
202 case -1:
203 entries_to_copy.emplace_back(from_accum, i, -1);
204 ++i;
205 break;
206 case 0:
207 entries_to_copy.emplace_back(from_accum_and_grad, i, j);
208 ++i;
209 ++j;
210 break;
211 case 1:
212 entries_to_copy.emplace_back(from_grad, -1, j);
213 ++j;
214 break;
215 }
216 }
217
218 // Handle leftovers
219 while (i < accum_nnz) {
220 sum_nnz++;
221 entries_to_copy.emplace_back(from_accum, i, -1);
222 ++i;
223 }
224 while (j < grad_nnz) {
225 sum_nnz++;
226 entries_to_copy.emplace_back(from_grad, -1, j);
227 ++j;
228 }
229
230 // (2) Copy or sum the non-zero elements into sum_indices and sum_tensor
231 std::vector<int64_t>* sum_indices_vec = new std::vector<int64_t>();
232 sum_indices_vec->reserve(sum_nnz);
233
234 std::vector<int>* sum_counts = new std::vector<int>();
235 sum_counts->reserve(sum_nnz);
236
237 Tensor* sum_tensor = new Tensor();
238
239 TensorShape sum_shape = grad_val->shape();
240 sum_shape.set_dim(0, sum_nnz);
241
242 OP_REQUIRES_OK(ctx, ctx->allocate_temp(dtype_, sum_shape, sum_tensor));
243 auto sum_flat = sum_tensor->flat_outer_dims<T>();
244 auto accum_flat = accum_val_->flat_outer_dims<T>();
245 auto grad_flat = grad_val->flat_outer_dims<T>();
246
247 const int64_t num_col = grad_flat.dimension(1);
248
249 Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(num_col);
250
251 for (i = 0; i < sum_nnz; ++i) {
252 const Source src = std::get<0>(entries_to_copy[i]);
253 const int64_t idx_a = std::get<1>(entries_to_copy[i]);
254 const int64_t idx_b = std::get<2>(entries_to_copy[i]);
255 T* sum_slice_ptr = &sum_flat(i, 0);
256 SliceT sum_slice(sum_slice_ptr, slice_shape);
257 if (src == from_accum) {
258 // Element comes from accumulator; directly copy data structures over
259 sum_indices_vec->push_back(accum_idx_vec_->at(idx_a));
260 T* accum_slice_ptr = &accum_flat(idx_a, 0);
261 SliceT accum_slice(accum_slice_ptr, slice_shape);
262 sum_slice = accum_slice;
263 sum_counts->push_back(count_element_->at(idx_a));
264 } else if (src == from_accum_and_grad) {
265 // Element is a sum of accumulated value and new gradient;
266 // compute sum here
267 sum_indices_vec->push_back(accum_idx_vec_->at(idx_a));
268 const T* grad_slice_ptr = &grad_flat(idx_b, 0);
269 SliceConstT grad_slice(grad_slice_ptr, slice_shape);
270 T* accum_slice_ptr = &accum_flat(idx_a, 0);
271 SliceT accum_slice(accum_slice_ptr, slice_shape);
272 sum_slice = grad_slice + accum_slice;
273 sum_counts->push_back(count_element_->at(idx_a) + 1);
274 } else if (src == from_grad) {
275 // Element comes from new gradient; make a copy of indices and values
276 sum_indices_vec->push_back(grad_idx->vec<int64_t>()(idx_b));
277 const T* grad_slice_ptr = &grad_flat(idx_b, 0);
278 SliceConstT grad_slice(grad_slice_ptr, slice_shape);
279 sum_slice = grad_slice;
280 sum_counts->push_back(1);
281 }
282 }
283
284 // (3) Keep output, i.e., switch pointers to point to new data structures
285 // representing the sum
286 // Indices
287 accum_idx_vec_.reset(sum_indices_vec);
288 // Values
289 accum_val_.reset(sum_tensor);
290 // Counts
291 count_element_.reset(sum_counts);
292
293 // No need to copy shape, since shape remains the same after sum.
294 }
295
296 void DivideAccumGradByCounter(OpKernelContext* ctx) override
297 TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
298 const int64_t nnz = count_element_->size();
299 auto accum_flat = accum_val_->flat_outer_dims<T>();
300 std::vector<T> count_typet;
301 std::transform(count_element_->begin(), count_element_->end(),
302 std::back_inserter(count_typet),
303 TypeConverter<T, int>::ConvertUToT);
304
305 // Option 1: divide all by counter
306 /*
307 std::transform(
308 &accum_flat(0,0), &accum_flat(nnz,0), &accum_flat(0,0),
309 std::bind2nd(std::divides<T>(),
310 TypeConverter<T, int>::ConvertUToT(this->counter_)));
311 */
312
313 // Option 2: average element-wise
314 Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(accum_flat.dimension(1));
315 for (int64_t i = 0; i < nnz; i++) {
316 T* accum_slice_ptr = &accum_flat(i, 0);
317 SliceT accum_slice(accum_slice_ptr, slice_shape);
318 accum_slice.device(ctx->template eigen_device<Device>()) =
319 accum_slice / count_typet[i];
320 }
321 }
322
323 bool SetOutput(OpKernelContext* ctx) override {
324 bool is_successful = true;
325 if (is_successful) is_successful = ReturnIdxTensor(ctx);
326 if (is_successful) is_successful = ReturnValTensor(ctx);
327 if (is_successful) is_successful = ReturnShapeTensor(ctx);
328 return is_successful;
329 }
330
331 bool GetAndValidateTensorInputForApplyGrad(
332 OpKernelContext* ctx,
333 std::tuple<const Tensor*, const Tensor*, const Tensor*>** tensor) override
334 TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
335 // TODO(xinghao, jmchen): The roundabout way of getting attr from
336 // OpKernelContext (instead of OpKernelConstruction) is a hack, and should
337 // be fixed if it affects efficiency.
338 bool has_known_shape = false;
339 OP_REQUIRES_OK_BOOLEAN(
340 ctx, GetNodeAttr(ctx->op_kernel().def(), "has_known_shape",
341 &has_known_shape));
342
343 // Get input gradient tensors
344 const Tensor* grad_idx_tensor;
345 OP_REQUIRES_OK_BOOLEAN(ctx,
346 ctx->input("gradient_indices", &grad_idx_tensor));
347 const Tensor* grad_val_tensor;
348 OP_REQUIRES_OK_BOOLEAN(ctx,
349 ctx->input("gradient_values", &grad_val_tensor));
350 const Tensor* grad_shape_tensor = nullptr;
351 if (has_known_shape) {
352 OP_REQUIRES_OK_BOOLEAN(ctx,
353 ctx->input("gradient_shape", &grad_shape_tensor));
354 }
355
356 // Checks
357 OP_REQUIRES_BOOLEAN(
358 ctx, TensorShapeUtils::IsVector(grad_idx_tensor->shape()),
359 errors::InvalidArgument(
360 "Input indices should be vector but received shape: ",
361 grad_idx_tensor->shape().DebugString()));
362 const int64_t nnz = grad_idx_tensor->dim_size(0);
363 OP_REQUIRES_BOOLEAN(
364 ctx, grad_val_tensor->dims() > 0,
365 errors::InvalidArgument("Values cannot be 0-dimensional."));
366 OP_REQUIRES_BOOLEAN(ctx, grad_val_tensor->dim_size(0) == nnz,
367 errors::InvalidArgument("Expected ", nnz,
368 " non-empty input values, got ",
369 grad_val_tensor->dim_size(0)));
370
371 *tensor = new std::tuple<const Tensor*, const Tensor*, const Tensor*>(
372 grad_idx_tensor, grad_val_tensor, grad_shape_tensor);
373
374 OP_REQUIRES_OK_BOOLEAN(ctx, this->ValidateShape(*tensor, has_known_shape));
375
376 return true;
377 }
378
379 void CleanUpGradTensor(std::tuple<const Tensor*, const Tensor*,
380 const Tensor*>* tensor) override {
381 if (tensor != nullptr) delete tensor;
382 }
383
384 private:
385 inline int cmp(std::vector<int64_t>* a_idx, const Tensor* b_idx,
386 const int64_t a_row, const int64_t b_row) {
387 const int64_t a = a_idx->at(a_row);
388 const int64_t b = b_idx->vec<int64_t>()(b_row);
389 if (a < b) {
390 return -1;
391 } else if (a > b) {
392 return 1;
393 }
394 return 0;
395 }
396
397 inline bool ReturnIdxTensor(OpKernelContext* ctx) {
398 Tensor* idx_tensor;
399 const int64_t nnz = accum_idx_vec_->size();
400 OP_REQUIRES_OK_BOOLEAN(ctx, ctx->allocate_output(0, {nnz}, &idx_tensor));
401 // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit
402 // the remaining code and just return false
403 auto idx_tensor_vec = idx_tensor->vec<int64_t>();
404 for (int i = 0; i < nnz; ++i) {
405 idx_tensor_vec(i) = accum_idx_vec_->at(i);
406 }
407 return true;
408 }
409
410 inline bool ReturnValTensor(OpKernelContext* ctx) {
411 ctx->set_output(1, *accum_val_);
412 return true;
413 }
414
415 inline bool ReturnShapeTensor(OpKernelContext* ctx) {
416 int64_t accum_val_dims = accum_val_->dims();
417 Tensor* shape_tensor;
418 OP_REQUIRES_OK_BOOLEAN(
419 ctx, ctx->allocate_output(2, {accum_val_dims}, &shape_tensor));
420 // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit
421 // the remaining code and just return false
422
423 // First dim of shape is defined by shape_, others by accum_val_->shape
424 shape_tensor->flat<int64_t>()(0) =
425 (shape_.dims() > 0) ? shape_.dim_size(0) : -1;
426 for (int64_t i = 1; i < accum_val_dims; i++) {
427 shape_tensor->flat<int64_t>()(i) = accum_val_->dim_size(i);
428 }
429 return true;
430 }
431
432 TF_DISALLOW_COPY_AND_ASSIGN(SparseConditionalAccumulator);
433};
434
435} // namespace tensorflow
436
437#endif // TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
438