1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_CORE_KERNELS_CONCAT_LIB_CPU_H_ |
17 | #define TENSORFLOW_CORE_KERNELS_CONCAT_LIB_CPU_H_ |
18 | |
19 | #define EIGEN_USE_THREADS |
20 | |
21 | #include <vector> |
22 | #include "tensorflow/core/framework/register_types.h" |
23 | #include "tensorflow/core/kernels/concat_lib.h" |
24 | #include "tensorflow/core/util/work_sharder.h" |
25 | |
26 | namespace tensorflow { |
27 | |
28 | // ElementCopier must be a struct with a single Copy function, which is passed |
29 | // the output pointer, input pointer, input index, and number of elements to |
30 | // copy from input to output. |
31 | template <typename T, typename ElementCopier> |
32 | void ConcatCPUImpl( |
33 | DeviceBase* d, |
34 | const std::vector<std::unique_ptr<typename TTypes<T, 2>::ConstMatrix>>& |
35 | inputs, |
36 | int64_t cost_per_unit, ElementCopier copier, |
37 | typename TTypes<T, 2>::Matrix* output) { |
38 | size_t num_inputs = inputs.size(); |
39 | |
40 | std::vector<ptrdiff_t> sizes; |
41 | sizes.reserve(num_inputs); |
42 | int64_t row_size = 0; |
43 | for (const auto& input : inputs) { |
44 | sizes.push_back(input->dimension(1)); |
45 | row_size += sizes.back(); |
46 | } |
47 | |
48 | // cost_per_unit is estimated bytes to copy per output array element (for |
49 | // strings this includes an estimate of the number of bytes of the actual |
50 | // string data, as well). |
51 | const int64_t estimated_total_cost = output->size() * cost_per_unit; |
52 | |
53 | auto worker_threads = d->tensorflow_cpu_worker_threads(); |
54 | int num_threads = std::min(4, worker_threads->num_threads); |
55 | num_threads = static_cast<int>( |
56 | std::min<int64_t>(num_threads, estimated_total_cost / 16384)); |
57 | // Single threaded mode. |
58 | // TODO(dga): Deduplicate this code w.r.t. sharded code below. |
59 | if (num_threads == 0) { |
60 | T* out = &(*output)(0, 0); |
61 | std::vector<const T*> inp; |
62 | inp.reserve(num_inputs); |
63 | for (const auto& input : inputs) { |
64 | inp.push_back(&(*input)(0, 0)); |
65 | } |
66 | const int64_t dim0 = output->dimension(0); |
67 | for (int64_t i = 0; i < dim0; ++i) { |
68 | for (int64_t j = 0; j < num_inputs; ++j) { |
69 | auto size = sizes[j]; |
70 | copier.Copy(out, inp[j], j, size); |
71 | out += size; |
72 | inp[j] += size; |
73 | } |
74 | } |
75 | return; |
76 | } |
77 | |
78 | // Sharded mode. |
79 | auto work = [&row_size, &sizes, &inputs, &output, &copier, &num_inputs]( |
80 | int64_t start, int64_t end) { |
81 | int64_t skipped_rows = start / row_size; |
82 | T* out = output->data() + skipped_rows * row_size; |
83 | T* out_start = output->data() + start; |
84 | T* out_end = output->data() + end; |
85 | |
86 | // Handle partial row at start |
87 | if (out < out_start) { |
88 | for (size_t j = 0; j < num_inputs; ++j) { |
89 | ptrdiff_t size = sizes[j]; |
90 | ptrdiff_t offset = out_start - out; |
91 | if (size <= offset) { |
92 | out += size; |
93 | continue; |
94 | } |
95 | const T* inp = &(*inputs[j])(skipped_rows, 0); |
96 | if (offset > 0) { |
97 | out += offset; |
98 | inp += offset; |
99 | size -= offset; |
100 | } |
101 | size = std::min(size, out_end - out); |
102 | if (size <= 0) break; |
103 | copier.Copy(out, inp, j, size); |
104 | out += size; |
105 | } |
106 | ++skipped_rows; |
107 | } |
108 | if (out == out_end) return; |
109 | CHECK(out >= out_start); |
110 | CHECK(out < out_end); |
111 | |
112 | // Copy remaining data. |
113 | std::vector<const T*> inp; |
114 | inp.reserve(num_inputs); |
115 | for (const auto& input : inputs) { |
116 | inp.push_back(&(*input)(skipped_rows, 0)); |
117 | } |
118 | const int64_t dim0 = output->dimension(0); |
119 | for (int64_t i = skipped_rows; i < dim0; ++i) { |
120 | for (int64_t j = 0; j < num_inputs; ++j) { |
121 | ptrdiff_t size = std::min(sizes[j], out_end - out); |
122 | copier.Copy(out, inp[j], j, size); |
123 | out += size; |
124 | inp[j] += size; |
125 | if (out == out_end) return; |
126 | } |
127 | } |
128 | }; |
129 | Shard(worker_threads->num_threads, worker_threads->workers, output->size(), |
130 | cost_per_unit, work); |
131 | } |
132 | |
133 | } // namespace tensorflow |
134 | |
135 | #endif // TENSORFLOW_CORE_KERNELS_CONCAT_LIB_CPU_H_ |
136 | |