1 | /** |
2 | * Copyright (c) 2017-present, Facebook, Inc. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under the BSD-style license found in the |
6 | * LICENSE file in the root directory of this source tree. |
7 | */ |
8 | |
9 | #pragma once |
10 | |
11 | #include "gloo/common/common.h" |
12 | #include "gloo/common/logging.h" |
13 | #include "gloo/cuda.h" |
14 | |
15 | namespace gloo { |
16 | |
17 | // Forward declaration |
18 | template <typename T, typename Dst> |
19 | class CudaLocalHostReduce; |
20 | |
21 | // Partial specialization for device pointer target |
22 | template <typename T> |
23 | class CudaLocalHostReduce<T, CudaDevicePointer<T> > : public LocalOp<T> { |
24 | public: |
25 | CudaLocalHostReduce( |
26 | std::vector<CudaStream>& streams, |
27 | std::vector<CudaDevicePointer<T> >& devicePtrs, |
28 | CudaDevicePointer<T>& targetPtr, |
29 | const CudaReductionFunction<T>* fn, |
30 | size_t offset, |
31 | size_t count) |
32 | : streams_(streams), |
33 | targetPtr_(targetPtr.range(offset, count)), |
34 | offset_(offset), |
35 | count_(count), |
36 | fn_(fn) { |
37 | // Incorporate offset/count into devicePtrs |
38 | devicePtrs_.reserve(devicePtrs.size()); |
39 | for (const auto& ptr : devicePtrs) { |
40 | devicePtrs_.push_back(ptr.range(offset, count)); |
41 | } |
42 | // Allocate N temporary buffers to async copy device ptrs into. |
43 | for (auto i = 0; i < devicePtrs_.size(); i++) { |
44 | tmpPtrs_.push_back(CudaHostPointer<T>::alloc(count)); |
45 | } |
46 | } |
47 | |
48 | virtual void runAsync() { |
49 | // Asynchronously copy device memory to host |
50 | for (auto i = 0; i < devicePtrs_.size(); i++) { |
51 | streams_[i].copyAsync(tmpPtrs_[i], devicePtrs_[i]); |
52 | } |
53 | // Reduce specified pointers into tmpPtrs_[0] |
54 | streams_[0].wait(); |
55 | for (auto i = 1; i < devicePtrs_.size(); i++) { |
56 | streams_[i].wait(); |
57 | fn_->call(tmpPtrs_[0], tmpPtrs_[i], count_, streams_[i]); |
58 | } |
59 | // Copy final reduction back to device |
60 | streams_[0].copyAsync(targetPtr_, tmpPtrs_[0]); |
61 | } |
62 | |
63 | virtual void wait() { |
64 | // Reduction happens on CPU but we still have to wait for the |
65 | // memory copy of the result back to device. |
66 | streams_[0].wait(); |
67 | } |
68 | |
69 | protected: |
70 | std::vector<CudaStream>& streams_; |
71 | std::vector<CudaDevicePointer<T> > devicePtrs_; |
72 | CudaDevicePointer<T> targetPtr_; |
73 | const size_t offset_; |
74 | const size_t count_; |
75 | const CudaReductionFunction<T>* fn_; |
76 | |
77 | // Temporary buffers used for async memory copies |
78 | std::vector<CudaHostPointer<T> > tmpPtrs_; |
79 | }; |
80 | |
81 | // Partial specialization for host pointer target |
82 | template <typename T> |
83 | class CudaLocalHostReduce<T, CudaHostPointer<T> > : public LocalOp<T> { |
84 | public: |
85 | CudaLocalHostReduce( |
86 | std::vector<CudaStream>& streams, |
87 | std::vector<CudaDevicePointer<T> >& devicePtrs, |
88 | CudaHostPointer<T>& targetPtr, |
89 | const CudaReductionFunction<T>* fn, |
90 | size_t offset, |
91 | size_t count) |
92 | : streams_(streams), |
93 | targetPtr_(targetPtr.range(offset, count)), |
94 | offset_(offset), |
95 | count_(count), |
96 | fn_(fn) { |
97 | // Incorporate offset/count into devicePtrs |
98 | devicePtrs_.reserve(devicePtrs.size()); |
99 | for (const auto& ptr : devicePtrs) { |
100 | devicePtrs_.push_back(ptr.range(offset, count)); |
101 | } |
102 | // Allocate N-1 temporary buffers to async copy device ptrs into. |
103 | for (auto i = 1; i < devicePtrs_.size(); i++) { |
104 | tmpPtrs_.push_back(CudaHostPointer<T>::alloc(count)); |
105 | } |
106 | } |
107 | |
108 | virtual void runAsync() { |
109 | // Asynchronously copy device memory to host |
110 | streams_[0].copyAsync(targetPtr_, devicePtrs_[0]); |
111 | for (auto i = 1; i < devicePtrs_.size(); i++) { |
112 | streams_[i].copyAsync(tmpPtrs_[i-1], devicePtrs_[i]); |
113 | } |
114 | // Reduce specified pointers into targetPtr_ |
115 | streams_[0].wait(); |
116 | for (auto i = 1; i < devicePtrs_.size(); i++) { |
117 | streams_[i].wait(); |
118 | fn_->call(targetPtr_, tmpPtrs_[i-1], count_, streams_[i]); |
119 | } |
120 | } |
121 | |
122 | virtual void wait() { |
123 | // Because reduction happens on CPU, this op is synchronous. |
124 | } |
125 | |
126 | protected: |
127 | std::vector<CudaStream>& streams_; |
128 | std::vector<CudaDevicePointer<T> > devicePtrs_; |
129 | CudaHostPointer<T> targetPtr_; |
130 | const size_t offset_; |
131 | const size_t count_; |
132 | const CudaReductionFunction<T>* fn_; |
133 | |
134 | // Temporary buffers used for async memory copies |
135 | std::vector<CudaHostPointer<T> > tmpPtrs_; |
136 | }; |
137 | |
138 | // Forward declaration |
139 | template <typename T, typename Src> |
140 | class CudaLocalHostBroadcast; |
141 | |
142 | // Specialization for device pointer source |
143 | template <typename T> |
144 | class CudaLocalHostBroadcast<T, CudaDevicePointer<T> > : public LocalOp<T> { |
145 | public: |
146 | CudaLocalHostBroadcast( |
147 | std::vector<CudaStream>& streams, |
148 | std::vector<CudaDevicePointer<T> >& devicePtrs, |
149 | CudaDevicePointer<T>& sourcePtr, |
150 | size_t offset, |
151 | size_t count) |
152 | : streams_(streams), |
153 | sourcePtr_(sourcePtr.range(offset, count)) { |
154 | // Incorporate offset/count into devicePtrs |
155 | devicePtrs_.reserve(devicePtrs.size()); |
156 | for (const auto& ptr : devicePtrs) { |
157 | devicePtrs_.push_back(ptr.range(offset, count)); |
158 | } |
159 | } |
160 | |
161 | virtual void runAsync() { |
162 | // Asynchronously copy source to device ptrs |
163 | for (auto i = 0; i < devicePtrs_.size(); i++) { |
164 | streams_[i].copyAsync(devicePtrs_[i], sourcePtr_); |
165 | } |
166 | } |
167 | |
168 | virtual void wait() { |
169 | for (auto i = 0; i < devicePtrs_.size(); i++) { |
170 | streams_[i].wait(); |
171 | } |
172 | } |
173 | |
174 | protected: |
175 | std::vector<CudaStream>& streams_; |
176 | std::vector<CudaDevicePointer<T> > devicePtrs_; |
177 | CudaDevicePointer<T> sourcePtr_; |
178 | }; |
179 | |
180 | // Specialization for host pointer source |
181 | template <typename T> |
182 | class CudaLocalHostBroadcast<T, CudaHostPointer<T> > : public LocalOp<T> { |
183 | public: |
184 | CudaLocalHostBroadcast( |
185 | std::vector<CudaStream>& streams, |
186 | std::vector<CudaDevicePointer<T> >& devicePtrs, |
187 | CudaHostPointer<T>& sourcePtr, |
188 | size_t offset, |
189 | size_t count) |
190 | : streams_(streams), |
191 | sourcePtr_(sourcePtr.range(offset, count)) { |
192 | // Incorporate offset/count into devicePtrs |
193 | devicePtrs_.reserve(devicePtrs.size()); |
194 | for (const auto& ptr : devicePtrs) { |
195 | devicePtrs_.push_back(ptr.range(offset, count)); |
196 | } |
197 | } |
198 | |
199 | virtual void runAsync() { |
200 | // Asynchronously copy host memory to device |
201 | for (auto i = 0; i < devicePtrs_.size(); i++) { |
202 | streams_[i].copyAsync(devicePtrs_[i], sourcePtr_); |
203 | } |
204 | } |
205 | |
206 | virtual void wait() { |
207 | for (auto i = 0; i < devicePtrs_.size(); i++) { |
208 | streams_[i].wait(); |
209 | } |
210 | } |
211 | |
212 | protected: |
213 | std::vector<CudaStream>& streams_; |
214 | std::vector<CudaDevicePointer<T> > devicePtrs_; |
215 | CudaHostPointer<T> sourcePtr_; |
216 | }; |
217 | |
218 | template <typename T, typename Dst> |
219 | std::unique_ptr<LocalOp<T> > cudaHostReduce( |
220 | std::vector<CudaStream>& streams, |
221 | std::vector<CudaDevicePointer<T> >& devicePtrs, |
222 | Dst& targetPtr, |
223 | const CudaReductionFunction<T>* fn, |
224 | size_t offset, |
225 | size_t count) { |
226 | GLOO_ENFORCE_EQ(streams.size(), devicePtrs.size()); |
227 | // Simple copy operation if there is only a single device pointer. |
228 | if (devicePtrs.size() == 1) { |
229 | return make_unique< |
230 | CudaLocalMemcpy<T, CudaDevicePointer<T>, Dst> >( |
231 | streams[0], |
232 | devicePtrs[0], |
233 | targetPtr, |
234 | offset, |
235 | count); |
236 | } |
237 | return make_unique<CudaLocalHostReduce<T, Dst> >( |
238 | streams, |
239 | devicePtrs, |
240 | targetPtr, |
241 | fn, |
242 | offset, |
243 | count); |
244 | } |
245 | |
246 | template <typename T, typename Src> |
247 | std::unique_ptr<LocalOp<T> > cudaHostBroadcast( |
248 | std::vector<CudaStream>& streams, |
249 | std::vector<CudaDevicePointer<T> >& devicePtrs, |
250 | Src& sourcePtr, |
251 | size_t offset, |
252 | size_t count) { |
253 | GLOO_ENFORCE_EQ(streams.size(), devicePtrs.size()); |
254 | // Simple copy operation if there is only a single device pointer. |
255 | if (devicePtrs.size() == 1) { |
256 | return make_unique< |
257 | CudaLocalMemcpy<T, Src, CudaDevicePointer<T> > >( |
258 | streams[0], |
259 | sourcePtr, |
260 | devicePtrs[0], |
261 | offset, |
262 | count); |
263 | } |
264 | return make_unique<CudaLocalHostBroadcast<T, Src> >( |
265 | streams, |
266 | devicePtrs, |
267 | sourcePtr, |
268 | offset, |
269 | count); |
270 | } |
271 | |
272 | } // namespace gloo |
273 | |