1 | /** |
2 | * Copyright (c) 2017-present, Facebook, Inc. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under the BSD-style license found in the |
6 | * LICENSE file in the root directory of this source tree. |
7 | */ |
8 | |
9 | #include "gloo/cuda_allreduce_ring_chunked.h" |
10 | |
11 | #include "gloo/cuda_collectives_device.h" |
12 | #include "gloo/cuda_collectives_host.h" |
13 | #include "gloo/cuda_private.h" |
14 | |
15 | namespace gloo { |
16 | |
17 | template <typename T, typename W> |
18 | struct CudaAllreduceRingChunked<T, W>::ChunkContext { |
19 | ChunkContext( |
20 | typename W::Pointer&& scratch, |
21 | std::unique_ptr<LocalOp<T> >&& reduceOp, |
22 | std::unique_ptr<LocalOp<T> >&& broadcastOp) |
23 | : scratch(std::move(scratch)), |
24 | length(this->scratch.getCount()), |
25 | reduceOp(std::move(reduceOp)), |
26 | broadcastOp(std::move(broadcastOp)) {} |
27 | ChunkContext(ChunkContext&& other) = default; |
28 | |
29 | // Instances cannot be copied or copy-assigned |
30 | ChunkContext(const ChunkContext&) = delete; |
31 | ChunkContext& operator=(const ChunkContext&) = delete; |
32 | |
33 | // Pointer to chunk in scratch buffer |
34 | typename W::Pointer scratch; |
35 | const size_t length; |
36 | |
37 | // The operations used for local device reduce before running the |
38 | // algorithm and local device broadcast after. |
39 | std::unique_ptr<LocalOp<T> > reduceOp; |
40 | std::unique_ptr<LocalOp<T> > broadcastOp; |
41 | }; |
42 | |
43 | template <typename T, typename W> |
44 | CudaAllreduceRingChunked<T, W>::CudaAllreduceRingChunked( |
45 | const std::shared_ptr<Context>& context, |
46 | const std::vector<T*>& ptrs, |
47 | const int count, |
48 | const std::vector<cudaStream_t>& streams) |
49 | : Algorithm(context), |
50 | count_(count), |
51 | bytes_(count * sizeof(T)), |
52 | synchronizeDeviceOutputs_(streams.size() == 0), |
53 | fn_(CudaReductionFunction<T>::sum) { |
54 | auto newStream = true; |
55 | if (streams.size() > 0) { |
56 | GLOO_ENFORCE_EQ(streams.size(), ptrs.size()); |
57 | newStream = false; |
58 | } |
59 | |
60 | for (auto i = 0; i < ptrs.size(); i++) { |
61 | auto ptr = CudaDevicePointer<T>::create(ptrs[i], count_); |
62 | if (newStream) { |
63 | streams_.push_back(CudaStream(ptr.getDeviceID())); |
64 | } else { |
65 | streams_.push_back(CudaStream(ptr.getDeviceID(), streams[i])); |
66 | } |
67 | devicePtrs_.push_back(std::move(ptr)); |
68 | } |
69 | |
70 | // Determine chunk size. Use chunks of no less than 1024 bytes |
71 | // (256 * sizeof(float)). |
72 | constexpr unsigned long minSize = 256; |
73 | chunks_ = this->contextSize_ * 2; |
74 | #ifdef _WIN32 |
75 | chunkSize_ = std::max((size_t)minSize, (size_t)((count_ + chunks_ - 1) / chunks_)); |
76 | #else |
77 | chunkSize_ = std::max(minSize, (count_ + chunks_ - 1) / chunks_); |
78 | #endif |
79 | chunkBytes_ = chunkSize_ * sizeof(T); |
80 | |
81 | // Workspace specific initialization (see below) |
82 | init(); |
83 | |
84 | for (auto offset = 0; offset < count_; offset += chunkSize_) { |
85 | auto length = chunkSize_; |
86 | if (offset + length <= count_) { |
87 | // Chunk completely in range, full chunk. |
88 | } else { |
89 | // Chunk partially in range, partial chunk. |
90 | length = count_ - offset; |
91 | } |
92 | |
93 | chunkContext_.push_back( |
94 | ChunkContext( |
95 | scratch_.range(offset, length), |
96 | cudaDeviceReduce( |
97 | streams_, devicePtrs_, scratch_, fn_, offset, length), |
98 | cudaDeviceBroadcast( |
99 | streams_, devicePtrs_, scratch_, offset, length))); |
100 | } |
101 | |
102 | if (this->contextSize_ == 1) { |
103 | return; |
104 | } |
105 | |
106 | auto& leftPair = this->getLeftPair(); |
107 | auto& rightPair = this->getRightPair(); |
108 | for (auto i = 0; i < 2; i++) { |
109 | auto slot = this->context_->nextSlot(); |
110 | |
111 | // Buffer to send to (rank+1). |
112 | sendDataBuf_[i] = |
113 | rightPair->createSendBuffer(slot, *scratch_, bytes_); |
114 | // Buffer that (rank-1) writes to. |
115 | recvDataBuf_[i] = |
116 | leftPair->createRecvBuffer(slot, *inbox_[i], chunkBytes_); |
117 | } |
118 | |
119 | // Dummy buffers for localized barrier. |
120 | // Before sending to the right, we only need to know that the node |
121 | // on the right is done using the inbox that's about to be written |
122 | // into. No need for a global barrier. |
123 | auto notificationSlot = this->context_->nextSlot(); |
124 | sendNotificationBuf_ = |
125 | leftPair->createSendBuffer(notificationSlot, &dummy_, sizeof(dummy_)); |
126 | recvNotificationBuf_ = |
127 | rightPair->createRecvBuffer(notificationSlot, &dummy_, sizeof(dummy_)); |
128 | } |
129 | |
130 | template <typename T, typename W> |
131 | CudaAllreduceRingChunked<T, W>::~CudaAllreduceRingChunked() { |
132 | } |
133 | |
134 | template <typename T, typename W> |
135 | void CudaAllreduceRingChunked<T, W>::run() { |
136 | CudaDeviceGuard guard; |
137 | CudaStream& stream = *scratchStream_; |
138 | |
139 | // Kick off local reduction for each chunk. |
140 | // The result is stored in scratch_ at the corresponding chunk offset. |
141 | // Make sure to iterate over the chunks in the order they will be sent. |
142 | for (auto i = 0; i < chunks_; i++) { |
143 | const auto chunkOffset = getChunkOffset(i); |
144 | if (chunkOffset < chunkContext_.size()) { |
145 | auto& context = chunkContext_[chunkOffset]; |
146 | context.reduceOp->runAsync(); |
147 | } |
148 | } |
149 | |
150 | if (this->contextSize_ == 1) { |
151 | |
152 | // Wait for the local reduction to complete then broadcast chunk to devices |
153 | for (auto i = 0; i < chunks_; i++) { |
154 | const auto chunkOffset = getChunkOffset(i); |
155 | if (chunkOffset < chunkContext_.size()) { |
156 | auto& context = chunkContext_[chunkOffset]; |
157 | context.reduceOp->wait(); |
158 | context.broadcastOp->runAsync(); |
159 | } |
160 | } |
161 | |
162 | // Wait for broadcast to complete |
163 | for (auto i = 0; i < chunks_; i++) { |
164 | const auto chunkOffset = getChunkOffset(i); |
165 | if (chunkOffset < chunkContext_.size()) { |
166 | auto& context = chunkContext_[chunkOffset]; |
167 | context.broadcastOp->wait(); |
168 | } |
169 | } |
170 | return; |
171 | } |
172 | |
173 | // First pass reduces a chunk in each round |
174 | for (auto round = 0; round < chunks_; round++) { |
175 | const auto chunkOffset = getChunkOffset(round); |
176 | |
177 | if (chunkOffset < chunkContext_.size()) { |
178 | auto& context = chunkContext_[chunkOffset]; |
179 | |
180 | // Wait for the local reduction to complete |
181 | // When using the host workspace this also makes sure the reduction |
182 | // result is copied into the host side scratch buffer. |
183 | context.reduceOp->wait(); |
184 | |
185 | // Reduce chunk from previous round. Nothing to do for initial rounds. |
186 | if (round >= 2) { |
187 | // Wait for inbox write to complete |
188 | recvDataBuf_[chunkOffset & 1]->waitRecv(); |
189 | |
190 | // Reduce |
191 | fn_->call( |
192 | context.scratch, |
193 | inbox_[chunkOffset & 1], |
194 | context.scratch.getCount(), |
195 | stream); |
196 | stream.wait(); |
197 | } |
198 | } else { |
199 | // Empty chunk but still need to wait on the inbox write to ensure the |
200 | // algorithm progresses. Nothing to do for initial rounds. |
201 | if (round >= 2) { |
202 | recvDataBuf_[chunkOffset & 1]->waitRecv(); |
203 | } |
204 | } |
205 | |
206 | // Skip buffer passing notifications in initial rounds |
207 | if (round >= 2) { |
208 | // Send notification to node on the left that |
209 | // this node is ready for an inbox write. |
210 | sendNotificationBuf_->send(); |
211 | |
212 | // Wait for notification from node on the right |
213 | // to be sure this node can start an inbox write. |
214 | recvNotificationBuf_->waitRecv(); |
215 | } |
216 | |
217 | // Copy accumulated chunk |
218 | copyChunkAtOffset(chunkOffset); |
219 | } |
220 | |
221 | // Second pass around the ring to broadcast result. |
222 | for (int round = 0; round < chunks_; round++) { |
223 | const auto chunkOffset = getChunkOffset(round); |
224 | |
225 | if (chunkOffset < chunkContext_.size()) { |
226 | auto& context = chunkContext_[chunkOffset]; |
227 | |
228 | // End at chunks_-2 since that's where the accumulation |
229 | // stopped in the previous set of rounds. |
230 | if (round < (chunks_ - 2)) { |
231 | // Wait for inbox write to complete |
232 | recvDataBuf_[chunkOffset & 1]->waitRecv(); |
233 | |
234 | // Copy chunk from inbox to scratch space |
235 | stream.copyAsync(context.scratch, inbox_[chunkOffset & 1]); |
236 | stream.wait(); |
237 | } |
238 | |
239 | // Broadcast chunk to devices. Do this in all rounds with non-empty chunk. |
240 | context.broadcastOp->runAsync(); |
241 | } else { |
242 | // Empty chunk but still need to wait on the inbox write to ensure the |
243 | // algorithm progresses. |
244 | if (round < (chunks_ - 2)) { |
245 | recvDataBuf_[chunkOffset & 1]->waitRecv(); |
246 | } |
247 | } |
248 | |
249 | // Skip copying in the last two rounds |
250 | if (round < (chunks_ - 4)) { |
251 | // Send notification to node on the left that |
252 | // this node is ready for an inbox write. |
253 | sendNotificationBuf_->send(); |
254 | |
255 | // Wait for notification from node on the right |
256 | // to be sure this node can start an inbox write. |
257 | recvNotificationBuf_->waitRecv(); |
258 | |
259 | // Copy accumulated chunks |
260 | copyChunkAtOffset(chunkOffset); |
261 | } |
262 | } |
263 | |
264 | // Final barrier to make sure every node has finished |
265 | // Otherwise, a second all reduce call might interfere |
266 | // with one that it still in progress on some nodes. |
267 | sendNotificationBuf_->send(); |
268 | recvNotificationBuf_->waitRecv(); |
269 | |
270 | // If running synchronously, wait for all chunk broadcasts to complete |
271 | if (synchronizeDeviceOutputs_) { |
272 | for (auto i = 0; i < chunks_; i++) { |
273 | const auto chunkOffset = getChunkOffset(i); |
274 | if (chunkOffset < chunkContext_.size()) { |
275 | chunkContext_[chunkOffset].broadcastOp->wait(); |
276 | } |
277 | } |
278 | } |
279 | } |
280 | |
281 | template <typename T, typename W> |
282 | int CudaAllreduceRingChunked<T, W>::getChunkOffset(int round) { |
283 | // Imagine a square grid with chunks of memory laid out vertically and nodes |
284 | // horizontally. The diagonal of this grid marks which nodes sends which |
285 | // chunks of memory in the prelude. Processing happens by moving this |
286 | // diagonal forward and have it wrap around the edge. This means that node |
287 | // with rank 0 at round 2 will process the last chunk. This explains why |
288 | // we subtract the round in the offset equation below. |
289 | // |
290 | // Because we're dealing with double buffering in this implementation, we |
291 | // have twice the number of chunks and process them in pairs. This explains |
292 | // why we ignore the LSB on the round number when subtracting it. The LSB is |
293 | // later added to flip back and forth between the two buffers for this pair |
294 | // of chunks. The number of chunks is finally added to make sure we can wrap |
295 | // correctly (no modulo against negative number). |
296 | return ((2 * this->contextRank_) - (round & ~0x1) + (round & 0x1) + chunks_) % |
297 | chunks_; |
298 | } |
299 | |
300 | template <typename T, typename W> |
301 | void CudaAllreduceRingChunked<T, W>::copyChunkAtOffset(int chunkOffset) { |
302 | // Populate inbox of next participant in the ring. |
303 | size_t offset; |
304 | size_t length; |
305 | if (chunkOffset < chunkContext_.size()) { |
306 | const auto& context = chunkContext_[chunkOffset]; |
307 | offset = chunkOffset * chunkSize_; |
308 | length = context.length; |
309 | } else { |
310 | // When nothing is put on the wire for empty chunks. @pietern |
311 | // has seen this algorithm hang. This is probably related to the |
312 | // chunk iteration order described in the run function. |
313 | // Chunk out of range, copy _something_. |
314 | offset = 0; |
315 | length = 1; |
316 | } |
317 | |
318 | // Initiate write to inbox of node on the right. |
319 | sendDataBuf_[chunkOffset & 0x1]->send(offset * sizeof(T), length * sizeof(T)); |
320 | } |
321 | |
322 | template <typename T, typename W> |
323 | template <typename U> |
324 | void CudaAllreduceRingChunked<T, W>::init( |
325 | typename std::enable_if<std::is_same<U, CudaHostWorkspace<T> >::value, |
326 | typename U::Pointer>::type*) { |
327 | // Since reduction is executed on the CPU, the scratch space |
328 | // where the reduction is accumulated is a new host side buffer. |
329 | scratch_ = W::Pointer::alloc(count_); |
330 | scratchStream_ = &streams_[0]; |
331 | |
332 | // Allocate inboxes |
333 | for (auto i = 0; i < 2; i++) { |
334 | inbox_[i] = W::Pointer::alloc(chunkSize_); |
335 | } |
336 | } |
337 | |
338 | template <typename T, typename W> |
339 | template <typename U> |
340 | void CudaAllreduceRingChunked<T, W>::init( |
341 | typename std::enable_if<std::is_same<U, CudaDeviceWorkspace<T> >::value, |
342 | typename U::Pointer>::type*) { |
343 | // The networking adapter does DMA to/from GPU memory, so we should reduce |
344 | // onto the device that's closest to the networking adapter bound |
345 | // to our context. This uses PCI distance to find closest GPU. |
346 | auto index = findCudaDevicePointerClosestToDevice( |
347 | devicePtrs_, this->context_->getDevice()); |
348 | scratch_ = CudaDevicePointer<T>::create(devicePtrs_[index]); |
349 | scratchStream_ = &streams_[index]; |
350 | |
351 | // Allocate inboxes |
352 | CudaDeviceScope scope(scratch_.getDeviceID()); |
353 | for (auto i = 0; i < 2; i++) { |
354 | inbox_[i] = W::Pointer::alloc(chunkSize_); |
355 | } |
356 | } |
357 | |
358 | // Instantiate templates |
359 | #define INSTANTIATE_TEMPLATE(T) \ |
360 | template class CudaAllreduceRingChunked<T, CudaHostWorkspace<T> >; \ |
361 | template class CudaAllreduceRingChunked<T, CudaDeviceWorkspace<T> >; |
362 | |
363 | INSTANTIATE_TEMPLATE(int8_t); |
364 | INSTANTIATE_TEMPLATE(uint8_t); |
365 | INSTANTIATE_TEMPLATE(int32_t); |
366 | INSTANTIATE_TEMPLATE(int64_t); |
367 | INSTANTIATE_TEMPLATE(uint64_t); |
368 | INSTANTIATE_TEMPLATE(float); |
369 | INSTANTIATE_TEMPLATE(double); |
370 | INSTANTIATE_TEMPLATE(float16); |
371 | |
372 | } // namespace gloo |
373 | |