1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ |
17 | #define TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ |
18 | |
19 | #include "tensorflow/core/kernels/typed_conditional_accumulator_base.h" |
20 | |
21 | namespace tensorflow { |
22 | |
23 | /** |
24 | * An aggregation object for adding sparse gradients, represented as a tuple of |
25 | * indices, values, and a (possibly empty) shape. |
26 | * |
27 | * The two main methods of this class are TryApplyGrad and TryTakeGrad. |
28 | * |
29 | * TryApplyGrad tries add a gradient to the accumulator. The attempt is |
30 | * successful if local_step >= global_step, i.e., if the gradient is not stale, |
31 | * having been computed using up-to-date information. Otherwise, the gradient is |
32 | * silently dropped. |
33 | * |
34 | * TryTakeGrad logs an attempt to read the average gradient. The attempt is |
35 | * blocked until the number of gradients accumulated (via TryApplyGrad) is equal |
36 | * or exceeds the number requested by TryTakeGrad. |
37 | * Once this condition is satisfied, the following actions are taken: |
38 | * (1) the value of the average gradient is returned |
39 | * (2) the count of accumulated gradients is reset to 0 |
40 | * (3) the internal global_step value (current_global_step_) is incremented by 1 |
41 | * |
42 | * SparseConditionalAccumulator is the datatype-dependent templated sub-class of |
43 | * ConditionalAccumulatorBase. It implements the virtual arithmetic methods that |
44 | * are used by for aggregating, averaging, allocating, returning indexed slices. |
45 | */ |
46 | template <typename Device, typename T> |
47 | class SparseConditionalAccumulator |
48 | : public TypedConditionalAccumulatorBase< |
49 | std::tuple<const Tensor*, const Tensor*, const Tensor*>> { |
50 | public: |
51 | SparseConditionalAccumulator(const DataType& dtype, |
52 | const PartialTensorShape& shape, |
53 | const string& name, const string& reduction_type) |
54 | : TypedConditionalAccumulatorBase< |
55 | std::tuple<const Tensor*, const Tensor*, const Tensor*>>( |
56 | dtype, shape, name, reduction_type), |
57 | accum_val_(std::make_unique<Tensor>()) {} |
58 | |
59 | protected: |
60 | std::unique_ptr<std::vector<int64_t>> accum_idx_vec_; |
61 | std::unique_ptr<std::vector<int>> count_element_; |
62 | |
63 | std::unique_ptr<Tensor> accum_val_; |
64 | |
65 | typedef Eigen::TensorMap<Eigen::Tensor<T, 1, Eigen::RowMajor>, |
66 | Eigen::Unaligned> |
67 | SliceT; |
68 | typedef Eigen::TensorMap<Eigen::Tensor<const T, 1, Eigen::RowMajor>, |
69 | Eigen::Unaligned> |
70 | SliceConstT; |
71 | |
72 | Status ValidateShape( |
73 | std::tuple<const Tensor*, const Tensor*, const Tensor*>* tensor, |
74 | bool has_known_shape) TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { |
75 | const Tensor* tensor_idx = std::get<0>(*tensor); |
76 | const Tensor* tensor_val = std::get<1>(*tensor); |
77 | const Tensor* tensor_shape = std::get<2>(*tensor); |
78 | int64_t grad_val_dims = tensor_val->dims(); |
79 | int64_t grad_dims = grad_val_dims; |
80 | |
81 | // Compare with provided shape |
82 | if (has_known_shape) { |
83 | if (shape_.dims() > tensor_shape->NumElements()) { |
84 | return errors::InvalidArgument( |
85 | "Shape mismatch: expected shape rank at least " , shape_.dims(), |
86 | ", got " , tensor_shape->NumElements()); |
87 | } |
88 | const auto tensor_shape_flat = tensor_shape->flat<int64_t>(); |
89 | for (int64_t i = 0; i < shape_.dims(); i++) { |
90 | if (shape_.dim_size(i) != -1 && |
91 | shape_.dim_size(i) != tensor_shape_flat(i)) { |
92 | return errors::InvalidArgument("Shape mismatch: expected shape dim " , |
93 | i, " to be " , shape_.dim_size(i), |
94 | ", got " , tensor_shape_flat(i)); |
95 | } |
96 | } |
97 | } |
98 | // Check that indices are within limits |
99 | if (shape_.dims() > 0 && shape_.dim_size(0) != -1 && |
100 | tensor_idx->dims() > 0) { |
101 | for (int64_t i = 0; i < tensor_idx->dim_size(0); i++) { |
102 | if (tensor_idx->vec<int64_t>()(i) >= shape_.dim_size(0)) { |
103 | return errors::InvalidArgument( |
104 | "Shape mismatch: index of slice " , i, " exceeded limits of shape" , |
105 | "; index is " , tensor_idx->vec<int64_t>()(i), " exceeded " , |
106 | shape_.dim_size(0)); |
107 | } |
108 | } |
109 | } |
110 | |
111 | // Check values compatibility with accumulated gradient if available |
112 | if (counter_ > 0) { |
113 | int64_t accum_val_dims = accum_val_->dims(); |
114 | if (accum_val_dims != grad_val_dims) { |
115 | return errors::InvalidArgument("Shape mismatch: expected values rank " , |
116 | accum_val_dims, ", got " , grad_val_dims); |
117 | } |
118 | for (int64_t i = 1; i < accum_val_dims; i++) { |
119 | if (accum_val_->dim_size(i) != tensor_val->dim_size(i)) { |
120 | return errors::InvalidArgument("Shape mismatch: expected values dim " , |
121 | i, " to be " , accum_val_->dim_size(i), |
122 | ", got " , tensor_val->dim_size(i)); |
123 | } |
124 | } |
125 | } else { |
126 | // If there are no accumulated gradients, check against shape_ |
127 | if (shape_.dims() > grad_dims) { |
128 | return errors::InvalidArgument( |
129 | "Shape mismatch: expected values rank at least " , shape_.dims(), |
130 | ", got " , grad_dims); |
131 | } |
132 | // Check that values have correct dimensions |
133 | for (int64_t i = 1; i < shape_.dims(); i++) { |
134 | if (shape_.dim_size(i) != -1 && |
135 | shape_.dim_size(i) != tensor_val->dim_size(i)) { |
136 | return errors::InvalidArgument("Shape mismatch: expected values dim " , |
137 | i, " to be " , shape_.dim_size(i), |
138 | ", got " , tensor_val->dim_size(i)); |
139 | } |
140 | } |
141 | } |
142 | |
143 | return OkStatus(); |
144 | } |
145 | |
146 | void AllocateAndAssignToAccumGradFunction( |
147 | OpKernelContext* ctx, |
148 | std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override { |
149 | const Tensor* grad_idx = std::get<0>(*grad); |
150 | const Tensor* grad_val = std::get<1>(*grad); |
151 | |
152 | const int64_t nnz = grad_idx->dim_size(0); |
153 | |
154 | // Assign indices |
155 | accum_idx_vec_ = std::make_unique<std::vector<int64_t>>(); |
156 | accum_idx_vec_->reserve(nnz); |
157 | for (int i = 0; i < nnz; i++) { |
158 | accum_idx_vec_->push_back(grad_idx->vec<int64_t>()(i)); |
159 | } |
160 | |
161 | // Assign values to accum_val_tensor |
162 | OP_REQUIRES_OK( |
163 | ctx, ctx->allocate_temp(dtype_, grad_val->shape(), accum_val_.get())); |
164 | accum_val_->flat<T>().device(ctx->template eigen_device<Device>()) = |
165 | grad_val->flat<T>(); |
166 | |
167 | // Assign count_element_ |
168 | count_element_ = std::make_unique<std::vector<int>>(nnz, 1); |
169 | |
170 | // Do not need shape; Assume that the op has checked that the shapes match, |
171 | // so grad's shape == shape_ |
172 | } |
173 | |
174 | void AddToAccumGradFunction( |
175 | OpKernelContext* ctx, |
176 | std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override { |
177 | // Modeled after third_party/tensorflow/core/kernels/sparse_add_op |
178 | |
179 | const Tensor* grad_idx = std::get<0>(*grad); |
180 | const Tensor* grad_val = std::get<1>(*grad); |
181 | |
182 | const int64_t accum_nnz = accum_idx_vec_->size(); |
183 | const int64_t grad_nnz = grad_idx->dim_size(0); |
184 | |
185 | // Source enumerates the origin of a non-zero element: whether it is from |
186 | // the new gradient, the accumulated gradient, or the sum of both. |
187 | enum Source { from_accum, from_grad, from_accum_and_grad }; |
188 | |
189 | // (1) do a pass over inputs, and append values and indices to vectors |
190 | std::vector<std::tuple<Source, int64, int64>> entries_to_copy; |
191 | entries_to_copy.reserve(accum_nnz + grad_nnz); |
192 | |
193 | // Pass over all non-zero elements of both the gradient and the accumulated |
194 | // value, to identify where each non-zero element of the sum comes from. |
195 | // The input and output indexed slices are assumed to be ordered along |
196 | // increasing dimension number. |
197 | int64_t i = 0, j = 0; |
198 | int64_t sum_nnz = 0; |
199 | while (i < accum_nnz && j < grad_nnz) { |
200 | sum_nnz++; |
201 | switch (cmp(accum_idx_vec_.get(), grad_idx, i, j)) { |
202 | case -1: |
203 | entries_to_copy.emplace_back(from_accum, i, -1); |
204 | ++i; |
205 | break; |
206 | case 0: |
207 | entries_to_copy.emplace_back(from_accum_and_grad, i, j); |
208 | ++i; |
209 | ++j; |
210 | break; |
211 | case 1: |
212 | entries_to_copy.emplace_back(from_grad, -1, j); |
213 | ++j; |
214 | break; |
215 | } |
216 | } |
217 | |
218 | // Handle leftovers |
219 | while (i < accum_nnz) { |
220 | sum_nnz++; |
221 | entries_to_copy.emplace_back(from_accum, i, -1); |
222 | ++i; |
223 | } |
224 | while (j < grad_nnz) { |
225 | sum_nnz++; |
226 | entries_to_copy.emplace_back(from_grad, -1, j); |
227 | ++j; |
228 | } |
229 | |
230 | // (2) Copy or sum the non-zero elements into sum_indices and sum_tensor |
231 | std::vector<int64_t>* sum_indices_vec = new std::vector<int64_t>(); |
232 | sum_indices_vec->reserve(sum_nnz); |
233 | |
234 | std::vector<int>* sum_counts = new std::vector<int>(); |
235 | sum_counts->reserve(sum_nnz); |
236 | |
237 | Tensor* sum_tensor = new Tensor(); |
238 | |
239 | TensorShape sum_shape = grad_val->shape(); |
240 | sum_shape.set_dim(0, sum_nnz); |
241 | |
242 | OP_REQUIRES_OK(ctx, ctx->allocate_temp(dtype_, sum_shape, sum_tensor)); |
243 | auto sum_flat = sum_tensor->flat_outer_dims<T>(); |
244 | auto accum_flat = accum_val_->flat_outer_dims<T>(); |
245 | auto grad_flat = grad_val->flat_outer_dims<T>(); |
246 | |
247 | const int64_t num_col = grad_flat.dimension(1); |
248 | |
249 | Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(num_col); |
250 | |
251 | for (i = 0; i < sum_nnz; ++i) { |
252 | const Source src = std::get<0>(entries_to_copy[i]); |
253 | const int64_t idx_a = std::get<1>(entries_to_copy[i]); |
254 | const int64_t idx_b = std::get<2>(entries_to_copy[i]); |
255 | T* sum_slice_ptr = &sum_flat(i, 0); |
256 | SliceT sum_slice(sum_slice_ptr, slice_shape); |
257 | if (src == from_accum) { |
258 | // Element comes from accumulator; directly copy data structures over |
259 | sum_indices_vec->push_back(accum_idx_vec_->at(idx_a)); |
260 | T* accum_slice_ptr = &accum_flat(idx_a, 0); |
261 | SliceT accum_slice(accum_slice_ptr, slice_shape); |
262 | sum_slice = accum_slice; |
263 | sum_counts->push_back(count_element_->at(idx_a)); |
264 | } else if (src == from_accum_and_grad) { |
265 | // Element is a sum of accumulated value and new gradient; |
266 | // compute sum here |
267 | sum_indices_vec->push_back(accum_idx_vec_->at(idx_a)); |
268 | const T* grad_slice_ptr = &grad_flat(idx_b, 0); |
269 | SliceConstT grad_slice(grad_slice_ptr, slice_shape); |
270 | T* accum_slice_ptr = &accum_flat(idx_a, 0); |
271 | SliceT accum_slice(accum_slice_ptr, slice_shape); |
272 | sum_slice = grad_slice + accum_slice; |
273 | sum_counts->push_back(count_element_->at(idx_a) + 1); |
274 | } else if (src == from_grad) { |
275 | // Element comes from new gradient; make a copy of indices and values |
276 | sum_indices_vec->push_back(grad_idx->vec<int64_t>()(idx_b)); |
277 | const T* grad_slice_ptr = &grad_flat(idx_b, 0); |
278 | SliceConstT grad_slice(grad_slice_ptr, slice_shape); |
279 | sum_slice = grad_slice; |
280 | sum_counts->push_back(1); |
281 | } |
282 | } |
283 | |
284 | // (3) Keep output, i.e., switch pointers to point to new data structures |
285 | // representing the sum |
286 | // Indices |
287 | accum_idx_vec_.reset(sum_indices_vec); |
288 | // Values |
289 | accum_val_.reset(sum_tensor); |
290 | // Counts |
291 | count_element_.reset(sum_counts); |
292 | |
293 | // No need to copy shape, since shape remains the same after sum. |
294 | } |
295 | |
296 | void DivideAccumGradByCounter(OpKernelContext* ctx) override |
297 | TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { |
298 | const int64_t nnz = count_element_->size(); |
299 | auto accum_flat = accum_val_->flat_outer_dims<T>(); |
300 | std::vector<T> count_typet; |
301 | std::transform(count_element_->begin(), count_element_->end(), |
302 | std::back_inserter(count_typet), |
303 | TypeConverter<T, int>::ConvertUToT); |
304 | |
305 | // Option 1: divide all by counter |
306 | /* |
307 | std::transform( |
308 | &accum_flat(0,0), &accum_flat(nnz,0), &accum_flat(0,0), |
309 | std::bind2nd(std::divides<T>(), |
310 | TypeConverter<T, int>::ConvertUToT(this->counter_))); |
311 | */ |
312 | |
313 | // Option 2: average element-wise |
314 | Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(accum_flat.dimension(1)); |
315 | for (int64_t i = 0; i < nnz; i++) { |
316 | T* accum_slice_ptr = &accum_flat(i, 0); |
317 | SliceT accum_slice(accum_slice_ptr, slice_shape); |
318 | accum_slice.device(ctx->template eigen_device<Device>()) = |
319 | accum_slice / count_typet[i]; |
320 | } |
321 | } |
322 | |
323 | bool SetOutput(OpKernelContext* ctx) override { |
324 | bool is_successful = true; |
325 | if (is_successful) is_successful = ReturnIdxTensor(ctx); |
326 | if (is_successful) is_successful = ReturnValTensor(ctx); |
327 | if (is_successful) is_successful = ReturnShapeTensor(ctx); |
328 | return is_successful; |
329 | } |
330 | |
331 | bool GetAndValidateTensorInputForApplyGrad( |
332 | OpKernelContext* ctx, |
333 | std::tuple<const Tensor*, const Tensor*, const Tensor*>** tensor) override |
334 | TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { |
335 | // TODO(xinghao, jmchen): The roundabout way of getting attr from |
336 | // OpKernelContext (instead of OpKernelConstruction) is a hack, and should |
337 | // be fixed if it affects efficiency. |
338 | bool has_known_shape = false; |
339 | OP_REQUIRES_OK_BOOLEAN( |
340 | ctx, GetNodeAttr(ctx->op_kernel().def(), "has_known_shape" , |
341 | &has_known_shape)); |
342 | |
343 | // Get input gradient tensors |
344 | const Tensor* grad_idx_tensor; |
345 | OP_REQUIRES_OK_BOOLEAN(ctx, |
346 | ctx->input("gradient_indices" , &grad_idx_tensor)); |
347 | const Tensor* grad_val_tensor; |
348 | OP_REQUIRES_OK_BOOLEAN(ctx, |
349 | ctx->input("gradient_values" , &grad_val_tensor)); |
350 | const Tensor* grad_shape_tensor = nullptr; |
351 | if (has_known_shape) { |
352 | OP_REQUIRES_OK_BOOLEAN(ctx, |
353 | ctx->input("gradient_shape" , &grad_shape_tensor)); |
354 | } |
355 | |
356 | // Checks |
357 | OP_REQUIRES_BOOLEAN( |
358 | ctx, TensorShapeUtils::IsVector(grad_idx_tensor->shape()), |
359 | errors::InvalidArgument( |
360 | "Input indices should be vector but received shape: " , |
361 | grad_idx_tensor->shape().DebugString())); |
362 | const int64_t nnz = grad_idx_tensor->dim_size(0); |
363 | OP_REQUIRES_BOOLEAN( |
364 | ctx, grad_val_tensor->dims() > 0, |
365 | errors::InvalidArgument("Values cannot be 0-dimensional." )); |
366 | OP_REQUIRES_BOOLEAN(ctx, grad_val_tensor->dim_size(0) == nnz, |
367 | errors::InvalidArgument("Expected " , nnz, |
368 | " non-empty input values, got " , |
369 | grad_val_tensor->dim_size(0))); |
370 | |
371 | *tensor = new std::tuple<const Tensor*, const Tensor*, const Tensor*>( |
372 | grad_idx_tensor, grad_val_tensor, grad_shape_tensor); |
373 | |
374 | OP_REQUIRES_OK_BOOLEAN(ctx, this->ValidateShape(*tensor, has_known_shape)); |
375 | |
376 | return true; |
377 | } |
378 | |
379 | void CleanUpGradTensor(std::tuple<const Tensor*, const Tensor*, |
380 | const Tensor*>* tensor) override { |
381 | if (tensor != nullptr) delete tensor; |
382 | } |
383 | |
384 | private: |
385 | inline int cmp(std::vector<int64_t>* a_idx, const Tensor* b_idx, |
386 | const int64_t a_row, const int64_t b_row) { |
387 | const int64_t a = a_idx->at(a_row); |
388 | const int64_t b = b_idx->vec<int64_t>()(b_row); |
389 | if (a < b) { |
390 | return -1; |
391 | } else if (a > b) { |
392 | return 1; |
393 | } |
394 | return 0; |
395 | } |
396 | |
397 | inline bool ReturnIdxTensor(OpKernelContext* ctx) { |
398 | Tensor* idx_tensor; |
399 | const int64_t nnz = accum_idx_vec_->size(); |
400 | OP_REQUIRES_OK_BOOLEAN(ctx, ctx->allocate_output(0, {nnz}, &idx_tensor)); |
401 | // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit |
402 | // the remaining code and just return false |
403 | auto idx_tensor_vec = idx_tensor->vec<int64_t>(); |
404 | for (int i = 0; i < nnz; ++i) { |
405 | idx_tensor_vec(i) = accum_idx_vec_->at(i); |
406 | } |
407 | return true; |
408 | } |
409 | |
410 | inline bool ReturnValTensor(OpKernelContext* ctx) { |
411 | ctx->set_output(1, *accum_val_); |
412 | return true; |
413 | } |
414 | |
415 | inline bool ReturnShapeTensor(OpKernelContext* ctx) { |
416 | int64_t accum_val_dims = accum_val_->dims(); |
417 | Tensor* shape_tensor; |
418 | OP_REQUIRES_OK_BOOLEAN( |
419 | ctx, ctx->allocate_output(2, {accum_val_dims}, &shape_tensor)); |
420 | // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit |
421 | // the remaining code and just return false |
422 | |
423 | // First dim of shape is defined by shape_, others by accum_val_->shape |
424 | shape_tensor->flat<int64_t>()(0) = |
425 | (shape_.dims() > 0) ? shape_.dim_size(0) : -1; |
426 | for (int64_t i = 1; i < accum_val_dims; i++) { |
427 | shape_tensor->flat<int64_t>()(i) = accum_val_->dim_size(i); |
428 | } |
429 | return true; |
430 | } |
431 | |
432 | TF_DISALLOW_COPY_AND_ASSIGN(SparseConditionalAccumulator); |
433 | }; |
434 | |
435 | } // namespace tensorflow |
436 | |
437 | #endif // TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ |
438 | |