1/**
2 * Copyright (c) 2017-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#include "gloo/cuda_allreduce_ring_chunked.h"
10
11#include "gloo/cuda_collectives_device.h"
12#include "gloo/cuda_collectives_host.h"
13#include "gloo/cuda_private.h"
14
15namespace gloo {
16
17template <typename T, typename W>
18struct CudaAllreduceRingChunked<T, W>::ChunkContext {
19 ChunkContext(
20 typename W::Pointer&& scratch,
21 std::unique_ptr<LocalOp<T> >&& reduceOp,
22 std::unique_ptr<LocalOp<T> >&& broadcastOp)
23 : scratch(std::move(scratch)),
24 length(this->scratch.getCount()),
25 reduceOp(std::move(reduceOp)),
26 broadcastOp(std::move(broadcastOp)) {}
27 ChunkContext(ChunkContext&& other) = default;
28
29 // Instances cannot be copied or copy-assigned
30 ChunkContext(const ChunkContext&) = delete;
31 ChunkContext& operator=(const ChunkContext&) = delete;
32
33 // Pointer to chunk in scratch buffer
34 typename W::Pointer scratch;
35 const size_t length;
36
37 // The operations used for local device reduce before running the
38 // algorithm and local device broadcast after.
39 std::unique_ptr<LocalOp<T> > reduceOp;
40 std::unique_ptr<LocalOp<T> > broadcastOp;
41};
42
43template <typename T, typename W>
44CudaAllreduceRingChunked<T, W>::CudaAllreduceRingChunked(
45 const std::shared_ptr<Context>& context,
46 const std::vector<T*>& ptrs,
47 const int count,
48 const std::vector<cudaStream_t>& streams)
49 : Algorithm(context),
50 count_(count),
51 bytes_(count * sizeof(T)),
52 synchronizeDeviceOutputs_(streams.size() == 0),
53 fn_(CudaReductionFunction<T>::sum) {
54 auto newStream = true;
55 if (streams.size() > 0) {
56 GLOO_ENFORCE_EQ(streams.size(), ptrs.size());
57 newStream = false;
58 }
59
60 for (auto i = 0; i < ptrs.size(); i++) {
61 auto ptr = CudaDevicePointer<T>::create(ptrs[i], count_);
62 if (newStream) {
63 streams_.push_back(CudaStream(ptr.getDeviceID()));
64 } else {
65 streams_.push_back(CudaStream(ptr.getDeviceID(), streams[i]));
66 }
67 devicePtrs_.push_back(std::move(ptr));
68 }
69
70 // Determine chunk size. Use chunks of no less than 1024 bytes
71 // (256 * sizeof(float)).
72 constexpr unsigned long minSize = 256;
73 chunks_ = this->contextSize_ * 2;
74#ifdef _WIN32
75 chunkSize_ = std::max((size_t)minSize, (size_t)((count_ + chunks_ - 1) / chunks_));
76#else
77 chunkSize_ = std::max(minSize, (count_ + chunks_ - 1) / chunks_);
78#endif
79 chunkBytes_ = chunkSize_ * sizeof(T);
80
81 // Workspace specific initialization (see below)
82 init();
83
84 for (auto offset = 0; offset < count_; offset += chunkSize_) {
85 auto length = chunkSize_;
86 if (offset + length <= count_) {
87 // Chunk completely in range, full chunk.
88 } else {
89 // Chunk partially in range, partial chunk.
90 length = count_ - offset;
91 }
92
93 chunkContext_.push_back(
94 ChunkContext(
95 scratch_.range(offset, length),
96 cudaDeviceReduce(
97 streams_, devicePtrs_, scratch_, fn_, offset, length),
98 cudaDeviceBroadcast(
99 streams_, devicePtrs_, scratch_, offset, length)));
100 }
101
102 if (this->contextSize_ == 1) {
103 return;
104 }
105
106 auto& leftPair = this->getLeftPair();
107 auto& rightPair = this->getRightPair();
108 for (auto i = 0; i < 2; i++) {
109 auto slot = this->context_->nextSlot();
110
111 // Buffer to send to (rank+1).
112 sendDataBuf_[i] =
113 rightPair->createSendBuffer(slot, *scratch_, bytes_);
114 // Buffer that (rank-1) writes to.
115 recvDataBuf_[i] =
116 leftPair->createRecvBuffer(slot, *inbox_[i], chunkBytes_);
117 }
118
119 // Dummy buffers for localized barrier.
120 // Before sending to the right, we only need to know that the node
121 // on the right is done using the inbox that's about to be written
122 // into. No need for a global barrier.
123 auto notificationSlot = this->context_->nextSlot();
124 sendNotificationBuf_ =
125 leftPair->createSendBuffer(notificationSlot, &dummy_, sizeof(dummy_));
126 recvNotificationBuf_ =
127 rightPair->createRecvBuffer(notificationSlot, &dummy_, sizeof(dummy_));
128}
129
130template <typename T, typename W>
131CudaAllreduceRingChunked<T, W>::~CudaAllreduceRingChunked() {
132}
133
134template <typename T, typename W>
135void CudaAllreduceRingChunked<T, W>::run() {
136 CudaDeviceGuard guard;
137 CudaStream& stream = *scratchStream_;
138
139 // Kick off local reduction for each chunk.
140 // The result is stored in scratch_ at the corresponding chunk offset.
141 // Make sure to iterate over the chunks in the order they will be sent.
142 for (auto i = 0; i < chunks_; i++) {
143 const auto chunkOffset = getChunkOffset(i);
144 if (chunkOffset < chunkContext_.size()) {
145 auto& context = chunkContext_[chunkOffset];
146 context.reduceOp->runAsync();
147 }
148 }
149
150 if (this->contextSize_ == 1) {
151
152 // Wait for the local reduction to complete then broadcast chunk to devices
153 for (auto i = 0; i < chunks_; i++) {
154 const auto chunkOffset = getChunkOffset(i);
155 if (chunkOffset < chunkContext_.size()) {
156 auto& context = chunkContext_[chunkOffset];
157 context.reduceOp->wait();
158 context.broadcastOp->runAsync();
159 }
160 }
161
162 // Wait for broadcast to complete
163 for (auto i = 0; i < chunks_; i++) {
164 const auto chunkOffset = getChunkOffset(i);
165 if (chunkOffset < chunkContext_.size()) {
166 auto& context = chunkContext_[chunkOffset];
167 context.broadcastOp->wait();
168 }
169 }
170 return;
171 }
172
173 // First pass reduces a chunk in each round
174 for (auto round = 0; round < chunks_; round++) {
175 const auto chunkOffset = getChunkOffset(round);
176
177 if (chunkOffset < chunkContext_.size()) {
178 auto& context = chunkContext_[chunkOffset];
179
180 // Wait for the local reduction to complete
181 // When using the host workspace this also makes sure the reduction
182 // result is copied into the host side scratch buffer.
183 context.reduceOp->wait();
184
185 // Reduce chunk from previous round. Nothing to do for initial rounds.
186 if (round >= 2) {
187 // Wait for inbox write to complete
188 recvDataBuf_[chunkOffset & 1]->waitRecv();
189
190 // Reduce
191 fn_->call(
192 context.scratch,
193 inbox_[chunkOffset & 1],
194 context.scratch.getCount(),
195 stream);
196 stream.wait();
197 }
198 } else {
199 // Empty chunk but still need to wait on the inbox write to ensure the
200 // algorithm progresses. Nothing to do for initial rounds.
201 if (round >= 2) {
202 recvDataBuf_[chunkOffset & 1]->waitRecv();
203 }
204 }
205
206 // Skip buffer passing notifications in initial rounds
207 if (round >= 2) {
208 // Send notification to node on the left that
209 // this node is ready for an inbox write.
210 sendNotificationBuf_->send();
211
212 // Wait for notification from node on the right
213 // to be sure this node can start an inbox write.
214 recvNotificationBuf_->waitRecv();
215 }
216
217 // Copy accumulated chunk
218 copyChunkAtOffset(chunkOffset);
219 }
220
221 // Second pass around the ring to broadcast result.
222 for (int round = 0; round < chunks_; round++) {
223 const auto chunkOffset = getChunkOffset(round);
224
225 if (chunkOffset < chunkContext_.size()) {
226 auto& context = chunkContext_[chunkOffset];
227
228 // End at chunks_-2 since that's where the accumulation
229 // stopped in the previous set of rounds.
230 if (round < (chunks_ - 2)) {
231 // Wait for inbox write to complete
232 recvDataBuf_[chunkOffset & 1]->waitRecv();
233
234 // Copy chunk from inbox to scratch space
235 stream.copyAsync(context.scratch, inbox_[chunkOffset & 1]);
236 stream.wait();
237 }
238
239 // Broadcast chunk to devices. Do this in all rounds with non-empty chunk.
240 context.broadcastOp->runAsync();
241 } else {
242 // Empty chunk but still need to wait on the inbox write to ensure the
243 // algorithm progresses.
244 if (round < (chunks_ - 2)) {
245 recvDataBuf_[chunkOffset & 1]->waitRecv();
246 }
247 }
248
249 // Skip copying in the last two rounds
250 if (round < (chunks_ - 4)) {
251 // Send notification to node on the left that
252 // this node is ready for an inbox write.
253 sendNotificationBuf_->send();
254
255 // Wait for notification from node on the right
256 // to be sure this node can start an inbox write.
257 recvNotificationBuf_->waitRecv();
258
259 // Copy accumulated chunks
260 copyChunkAtOffset(chunkOffset);
261 }
262 }
263
264 // Final barrier to make sure every node has finished
265 // Otherwise, a second all reduce call might interfere
266 // with one that it still in progress on some nodes.
267 sendNotificationBuf_->send();
268 recvNotificationBuf_->waitRecv();
269
270 // If running synchronously, wait for all chunk broadcasts to complete
271 if (synchronizeDeviceOutputs_) {
272 for (auto i = 0; i < chunks_; i++) {
273 const auto chunkOffset = getChunkOffset(i);
274 if (chunkOffset < chunkContext_.size()) {
275 chunkContext_[chunkOffset].broadcastOp->wait();
276 }
277 }
278 }
279}
280
281template <typename T, typename W>
282int CudaAllreduceRingChunked<T, W>::getChunkOffset(int round) {
283 // Imagine a square grid with chunks of memory laid out vertically and nodes
284 // horizontally. The diagonal of this grid marks which nodes sends which
285 // chunks of memory in the prelude. Processing happens by moving this
286 // diagonal forward and have it wrap around the edge. This means that node
287 // with rank 0 at round 2 will process the last chunk. This explains why
288 // we subtract the round in the offset equation below.
289 //
290 // Because we're dealing with double buffering in this implementation, we
291 // have twice the number of chunks and process them in pairs. This explains
292 // why we ignore the LSB on the round number when subtracting it. The LSB is
293 // later added to flip back and forth between the two buffers for this pair
294 // of chunks. The number of chunks is finally added to make sure we can wrap
295 // correctly (no modulo against negative number).
296 return ((2 * this->contextRank_) - (round & ~0x1) + (round & 0x1) + chunks_) %
297 chunks_;
298}
299
300template <typename T, typename W>
301void CudaAllreduceRingChunked<T, W>::copyChunkAtOffset(int chunkOffset) {
302 // Populate inbox of next participant in the ring.
303 size_t offset;
304 size_t length;
305 if (chunkOffset < chunkContext_.size()) {
306 const auto& context = chunkContext_[chunkOffset];
307 offset = chunkOffset * chunkSize_;
308 length = context.length;
309 } else {
310 // When nothing is put on the wire for empty chunks. @pietern
311 // has seen this algorithm hang. This is probably related to the
312 // chunk iteration order described in the run function.
313 // Chunk out of range, copy _something_.
314 offset = 0;
315 length = 1;
316 }
317
318 // Initiate write to inbox of node on the right.
319 sendDataBuf_[chunkOffset & 0x1]->send(offset * sizeof(T), length * sizeof(T));
320}
321
322template <typename T, typename W>
323template <typename U>
324void CudaAllreduceRingChunked<T, W>::init(
325 typename std::enable_if<std::is_same<U, CudaHostWorkspace<T> >::value,
326 typename U::Pointer>::type*) {
327 // Since reduction is executed on the CPU, the scratch space
328 // where the reduction is accumulated is a new host side buffer.
329 scratch_ = W::Pointer::alloc(count_);
330 scratchStream_ = &streams_[0];
331
332 // Allocate inboxes
333 for (auto i = 0; i < 2; i++) {
334 inbox_[i] = W::Pointer::alloc(chunkSize_);
335 }
336}
337
338template <typename T, typename W>
339template <typename U>
340void CudaAllreduceRingChunked<T, W>::init(
341 typename std::enable_if<std::is_same<U, CudaDeviceWorkspace<T> >::value,
342 typename U::Pointer>::type*) {
343 // The networking adapter does DMA to/from GPU memory, so we should reduce
344 // onto the device that's closest to the networking adapter bound
345 // to our context. This uses PCI distance to find closest GPU.
346 auto index = findCudaDevicePointerClosestToDevice(
347 devicePtrs_, this->context_->getDevice());
348 scratch_ = CudaDevicePointer<T>::create(devicePtrs_[index]);
349 scratchStream_ = &streams_[index];
350
351 // Allocate inboxes
352 CudaDeviceScope scope(scratch_.getDeviceID());
353 for (auto i = 0; i < 2; i++) {
354 inbox_[i] = W::Pointer::alloc(chunkSize_);
355 }
356}
357
358// Instantiate templates
359#define INSTANTIATE_TEMPLATE(T) \
360template class CudaAllreduceRingChunked<T, CudaHostWorkspace<T> >; \
361template class CudaAllreduceRingChunked<T, CudaDeviceWorkspace<T> >;
362
363INSTANTIATE_TEMPLATE(int8_t);
364INSTANTIATE_TEMPLATE(uint8_t);
365INSTANTIATE_TEMPLATE(int32_t);
366INSTANTIATE_TEMPLATE(int64_t);
367INSTANTIATE_TEMPLATE(uint64_t);
368INSTANTIATE_TEMPLATE(float);
369INSTANTIATE_TEMPLATE(double);
370INSTANTIATE_TEMPLATE(float16);
371
372} // namespace gloo
373