1/**
2 * Copyright (c) 2017-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#pragma once
10
11#include "gloo/common/common.h"
12#include "gloo/common/logging.h"
13#include "gloo/cuda.h"
14
15namespace gloo {
16
17// Forward declaration
18template <typename T, typename Dst>
19class CudaLocalHostReduce;
20
21// Partial specialization for device pointer target
22template <typename T>
23class CudaLocalHostReduce<T, CudaDevicePointer<T> > : public LocalOp<T> {
24 public:
25 CudaLocalHostReduce(
26 std::vector<CudaStream>& streams,
27 std::vector<CudaDevicePointer<T> >& devicePtrs,
28 CudaDevicePointer<T>& targetPtr,
29 const CudaReductionFunction<T>* fn,
30 size_t offset,
31 size_t count)
32 : streams_(streams),
33 targetPtr_(targetPtr.range(offset, count)),
34 offset_(offset),
35 count_(count),
36 fn_(fn) {
37 // Incorporate offset/count into devicePtrs
38 devicePtrs_.reserve(devicePtrs.size());
39 for (const auto& ptr : devicePtrs) {
40 devicePtrs_.push_back(ptr.range(offset, count));
41 }
42 // Allocate N temporary buffers to async copy device ptrs into.
43 for (auto i = 0; i < devicePtrs_.size(); i++) {
44 tmpPtrs_.push_back(CudaHostPointer<T>::alloc(count));
45 }
46 }
47
48 virtual void runAsync() {
49 // Asynchronously copy device memory to host
50 for (auto i = 0; i < devicePtrs_.size(); i++) {
51 streams_[i].copyAsync(tmpPtrs_[i], devicePtrs_[i]);
52 }
53 // Reduce specified pointers into tmpPtrs_[0]
54 streams_[0].wait();
55 for (auto i = 1; i < devicePtrs_.size(); i++) {
56 streams_[i].wait();
57 fn_->call(tmpPtrs_[0], tmpPtrs_[i], count_, streams_[i]);
58 }
59 // Copy final reduction back to device
60 streams_[0].copyAsync(targetPtr_, tmpPtrs_[0]);
61 }
62
63 virtual void wait() {
64 // Reduction happens on CPU but we still have to wait for the
65 // memory copy of the result back to device.
66 streams_[0].wait();
67 }
68
69 protected:
70 std::vector<CudaStream>& streams_;
71 std::vector<CudaDevicePointer<T> > devicePtrs_;
72 CudaDevicePointer<T> targetPtr_;
73 const size_t offset_;
74 const size_t count_;
75 const CudaReductionFunction<T>* fn_;
76
77 // Temporary buffers used for async memory copies
78 std::vector<CudaHostPointer<T> > tmpPtrs_;
79};
80
81// Partial specialization for host pointer target
82template <typename T>
83class CudaLocalHostReduce<T, CudaHostPointer<T> > : public LocalOp<T> {
84 public:
85 CudaLocalHostReduce(
86 std::vector<CudaStream>& streams,
87 std::vector<CudaDevicePointer<T> >& devicePtrs,
88 CudaHostPointer<T>& targetPtr,
89 const CudaReductionFunction<T>* fn,
90 size_t offset,
91 size_t count)
92 : streams_(streams),
93 targetPtr_(targetPtr.range(offset, count)),
94 offset_(offset),
95 count_(count),
96 fn_(fn) {
97 // Incorporate offset/count into devicePtrs
98 devicePtrs_.reserve(devicePtrs.size());
99 for (const auto& ptr : devicePtrs) {
100 devicePtrs_.push_back(ptr.range(offset, count));
101 }
102 // Allocate N-1 temporary buffers to async copy device ptrs into.
103 for (auto i = 1; i < devicePtrs_.size(); i++) {
104 tmpPtrs_.push_back(CudaHostPointer<T>::alloc(count));
105 }
106 }
107
108 virtual void runAsync() {
109 // Asynchronously copy device memory to host
110 streams_[0].copyAsync(targetPtr_, devicePtrs_[0]);
111 for (auto i = 1; i < devicePtrs_.size(); i++) {
112 streams_[i].copyAsync(tmpPtrs_[i-1], devicePtrs_[i]);
113 }
114 // Reduce specified pointers into targetPtr_
115 streams_[0].wait();
116 for (auto i = 1; i < devicePtrs_.size(); i++) {
117 streams_[i].wait();
118 fn_->call(targetPtr_, tmpPtrs_[i-1], count_, streams_[i]);
119 }
120 }
121
122 virtual void wait() {
123 // Because reduction happens on CPU, this op is synchronous.
124 }
125
126 protected:
127 std::vector<CudaStream>& streams_;
128 std::vector<CudaDevicePointer<T> > devicePtrs_;
129 CudaHostPointer<T> targetPtr_;
130 const size_t offset_;
131 const size_t count_;
132 const CudaReductionFunction<T>* fn_;
133
134 // Temporary buffers used for async memory copies
135 std::vector<CudaHostPointer<T> > tmpPtrs_;
136};
137
138// Forward declaration
139template <typename T, typename Src>
140class CudaLocalHostBroadcast;
141
142// Specialization for device pointer source
143template <typename T>
144class CudaLocalHostBroadcast<T, CudaDevicePointer<T> > : public LocalOp<T> {
145 public:
146 CudaLocalHostBroadcast(
147 std::vector<CudaStream>& streams,
148 std::vector<CudaDevicePointer<T> >& devicePtrs,
149 CudaDevicePointer<T>& sourcePtr,
150 size_t offset,
151 size_t count)
152 : streams_(streams),
153 sourcePtr_(sourcePtr.range(offset, count)) {
154 // Incorporate offset/count into devicePtrs
155 devicePtrs_.reserve(devicePtrs.size());
156 for (const auto& ptr : devicePtrs) {
157 devicePtrs_.push_back(ptr.range(offset, count));
158 }
159 }
160
161 virtual void runAsync() {
162 // Asynchronously copy source to device ptrs
163 for (auto i = 0; i < devicePtrs_.size(); i++) {
164 streams_[i].copyAsync(devicePtrs_[i], sourcePtr_);
165 }
166 }
167
168 virtual void wait() {
169 for (auto i = 0; i < devicePtrs_.size(); i++) {
170 streams_[i].wait();
171 }
172 }
173
174 protected:
175 std::vector<CudaStream>& streams_;
176 std::vector<CudaDevicePointer<T> > devicePtrs_;
177 CudaDevicePointer<T> sourcePtr_;
178};
179
180// Specialization for host pointer source
181template <typename T>
182class CudaLocalHostBroadcast<T, CudaHostPointer<T> > : public LocalOp<T> {
183 public:
184 CudaLocalHostBroadcast(
185 std::vector<CudaStream>& streams,
186 std::vector<CudaDevicePointer<T> >& devicePtrs,
187 CudaHostPointer<T>& sourcePtr,
188 size_t offset,
189 size_t count)
190 : streams_(streams),
191 sourcePtr_(sourcePtr.range(offset, count)) {
192 // Incorporate offset/count into devicePtrs
193 devicePtrs_.reserve(devicePtrs.size());
194 for (const auto& ptr : devicePtrs) {
195 devicePtrs_.push_back(ptr.range(offset, count));
196 }
197 }
198
199 virtual void runAsync() {
200 // Asynchronously copy host memory to device
201 for (auto i = 0; i < devicePtrs_.size(); i++) {
202 streams_[i].copyAsync(devicePtrs_[i], sourcePtr_);
203 }
204 }
205
206 virtual void wait() {
207 for (auto i = 0; i < devicePtrs_.size(); i++) {
208 streams_[i].wait();
209 }
210 }
211
212 protected:
213 std::vector<CudaStream>& streams_;
214 std::vector<CudaDevicePointer<T> > devicePtrs_;
215 CudaHostPointer<T> sourcePtr_;
216};
217
218template <typename T, typename Dst>
219std::unique_ptr<LocalOp<T> > cudaHostReduce(
220 std::vector<CudaStream>& streams,
221 std::vector<CudaDevicePointer<T> >& devicePtrs,
222 Dst& targetPtr,
223 const CudaReductionFunction<T>* fn,
224 size_t offset,
225 size_t count) {
226 GLOO_ENFORCE_EQ(streams.size(), devicePtrs.size());
227 // Simple copy operation if there is only a single device pointer.
228 if (devicePtrs.size() == 1) {
229 return make_unique<
230 CudaLocalMemcpy<T, CudaDevicePointer<T>, Dst> >(
231 streams[0],
232 devicePtrs[0],
233 targetPtr,
234 offset,
235 count);
236 }
237 return make_unique<CudaLocalHostReduce<T, Dst> >(
238 streams,
239 devicePtrs,
240 targetPtr,
241 fn,
242 offset,
243 count);
244}
245
246template <typename T, typename Src>
247std::unique_ptr<LocalOp<T> > cudaHostBroadcast(
248 std::vector<CudaStream>& streams,
249 std::vector<CudaDevicePointer<T> >& devicePtrs,
250 Src& sourcePtr,
251 size_t offset,
252 size_t count) {
253 GLOO_ENFORCE_EQ(streams.size(), devicePtrs.size());
254 // Simple copy operation if there is only a single device pointer.
255 if (devicePtrs.size() == 1) {
256 return make_unique<
257 CudaLocalMemcpy<T, Src, CudaDevicePointer<T> > >(
258 streams[0],
259 sourcePtr,
260 devicePtrs[0],
261 offset,
262 count);
263 }
264 return make_unique<CudaLocalHostBroadcast<T, Src> >(
265 streams,
266 devicePtrs,
267 sourcePtr,
268 offset,
269 count);
270}
271
272} // namespace gloo
273