1 | /** |
2 | * Copyright (c) 2018-present, Facebook, Inc. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under the BSD-style license found in the |
6 | * LICENSE file in the root directory of this source tree. |
7 | */ |
8 | |
9 | #include "gloo/cuda_allreduce_bcube.h" |
10 | |
11 | #include "gloo/cuda_collectives_device.h" |
12 | #include "gloo/cuda_collectives_host.h" |
13 | #include "gloo/cuda_private.h" |
14 | |
15 | #include <sstream> |
16 | #ifndef _WIN32 |
17 | #include <unistd.h> |
18 | #endif |
19 | |
20 | #ifdef DEBUG |
21 | #define DEBUG_PRINT_STAGE(stage) \ |
22 | do { \ |
23 | printStageBuffer(stage); \ |
24 | } while (false) |
25 | #define DEBUG_PRINT_SEND(stage) \ |
26 | do { \ |
27 | printStepBuffer( \ |
28 | stage, step, myRank_, destRank, &scratch_[0], sendCount, ptrOffset); \ |
29 | } while (false) |
30 | #define DEBUG_PRINT_RECV(stage) \ |
31 | do { \ |
32 | printStepBuffer( \ |
33 | stage, \ |
34 | step, \ |
35 | srcRank, \ |
36 | myRank_, \ |
37 | &recvBufs_[recvBufIdx_[srcRank]][0], \ |
38 | recvCount); \ |
39 | } while (false) |
40 | #else |
41 | #define DEBUG_PRINT_STAGE(stage) |
42 | #define DEBUG_PRINT_SEND(stage) |
43 | #define DEBUG_PRINT_RECV(stage) |
44 | #endif |
45 | |
46 | namespace gloo { |
47 | |
48 | template <typename T, typename W> |
49 | CudaAllreduceBcube<T, W>::CudaAllreduceBcube( |
50 | const std::shared_ptr<Context>& context, |
51 | const std::vector<T*>& ptrs, |
52 | const int count, |
53 | const std::vector<cudaStream_t>& streams, |
54 | const CudaReductionFunction<T>* fn) |
55 | : Algorithm(context), |
56 | myRank_(this->context_->rank), |
57 | base_(this->context_->base ? this->context_->base : 2), |
58 | nodes_(this->contextSize_), |
59 | totalNumElems_(count), |
60 | bytes_(totalNumElems_ * sizeof(T)), |
61 | steps_(computeSteps(nodes_, base_)), |
62 | fn_(fn), |
63 | recvBufs_(steps_ * base_) { |
64 | auto newStream = true; |
65 | if (streams.size() > 0) { |
66 | GLOO_ENFORCE_EQ(streams.size(), ptrs.size()); |
67 | newStream = false; |
68 | } |
69 | for (auto i = 0; i < ptrs.size(); i++) { |
70 | auto ptr = CudaDevicePointer<T>::create(ptrs[i], totalNumElems_); |
71 | if (newStream) { |
72 | streams_.push_back(CudaStream(ptr.getDeviceID())); |
73 | } else { |
74 | streams_.push_back(CudaStream(ptr.getDeviceID(), streams[i])); |
75 | } |
76 | devicePtrs_.push_back(std::move(ptr)); |
77 | } |
78 | // Workspace specific initialization (see below) |
79 | init(); |
80 | if (nodes_ == 1) { |
81 | return; |
82 | } |
83 | setupNodes(); |
84 | /* |
85 | * Reserve max needed number of context slots. Up to 2 slots per process |
86 | * pair are needed (one for regular sends and one for notifications). For |
87 | * simplicity, the same mapping is used on all processes so that the slots |
88 | * trivially match across processes |
89 | */ |
90 | int slotOffset_ = this->context_->nextSlot( |
91 | 2 * this->contextSize_ * (this->contextSize_ - 1)); |
92 | |
93 | int bufIdx = 0; |
94 | for (int step = 0; step < steps_; ++step) { |
95 | for (int destRank : getPeersPerStep(myRank_, step)) { |
96 | int recvSize = std::max( |
97 | getNumElemsPerStep(myRank_, step), |
98 | getNumElemsPerStep(destRank, step)); |
99 | auto& pair = this->context_->getPair(destRank); |
100 | auto slot = slotOffset_ + |
101 | 2 * (std::min(myRank_, destRank) * nodes_ + |
102 | std::max(myRank_, destRank)); |
103 | sendDataBufs_[destRank] = pair->createSendBuffer(slot, *scratch_, bytes_); |
104 | recvBufs_[bufIdx] = W::Pointer::alloc(recvSize); |
105 | recvDataBufs_[destRank] = pair->createRecvBuffer( |
106 | slot, &recvBufs_[bufIdx][0], recvSize * sizeof(T)); |
107 | recvBufIdx_[destRank] = bufIdx; |
108 | ++bufIdx; |
109 | ++slot; |
110 | sendNotificationBufs_[destRank] = |
111 | pair->createSendBuffer(slot, &dummy_, sizeof(dummy_)); |
112 | recvNotificationBufs_[destRank] = |
113 | pair->createRecvBuffer(slot, &dummy_, sizeof(dummy_)); |
114 | } // nodes |
115 | } // steps |
116 | } |
117 | |
118 | template <typename T, typename W> |
119 | void CudaAllreduceBcube<T, W>::run() { |
120 | CudaDeviceGuard guard; |
121 | CudaStream& stream = *scratchStream_; |
122 | |
123 | localReduceOp_->run(); |
124 | |
125 | if (nodes_ == 1) { |
126 | GLOO_ENFORCE( |
127 | localBroadcastOp_, |
128 | "localBroadcastOp must be initialized for single machine" ); |
129 | localBroadcastOp_->run(); |
130 | return; |
131 | } |
132 | |
133 | // Reduce-scatter |
134 | DEBUG_PRINT_STAGE("start" ); |
135 | for (int step = 0; step < steps_; ++step) { |
136 | const auto& peerRanks = getPeersPerStep(myRank_, step); |
137 | for (int destRank : peerRanks) { |
138 | int sendCount = getNumElemsPerStep(destRank, step); |
139 | int ptrOffset = getPtrOffsetPerStep(destRank, step); |
140 | DEBUG_PRINT_SEND("reduce-scatter" ); |
141 | sendDataBufs_[destRank]->send( |
142 | ptrOffset * sizeof(T), sendCount * sizeof(T)); |
143 | } // sends within group |
144 | |
145 | for (int srcRank : peerRanks) { |
146 | int recvCount = getNumElemsPerStep(myRank_, step); |
147 | int ptrOffset = getPtrOffsetPerStep(myRank_, step); |
148 | recvDataBufs_[srcRank]->waitRecv(); |
149 | DEBUG_PRINT_RECV("reduce-scatter" ); |
150 | auto recvBufAtOffset = |
151 | recvBufs_[recvBufIdx_[srcRank]].range(0, recvCount); |
152 | auto scratchAtOffset = scratch_.range(ptrOffset, recvCount); |
153 | fn_->call(scratchAtOffset, recvBufAtOffset, recvCount, stream); |
154 | stream.wait(); |
155 | /* |
156 | * Send notification to the pair we just received from that |
157 | * we're done dealing with the receive buffer. |
158 | */ |
159 | sendNotificationBufs_[srcRank]->send(); |
160 | } // recvs within group and reduces |
161 | } // reduce-scatter steps |
162 | |
163 | DEBUG_PRINT_STAGE("reduce-scattered" ); |
164 | |
165 | // All-gather |
166 | for (int step = steps_ - 1; step >= 0; --step) { |
167 | const auto& peerRanks = getPeersPerStep(myRank_, step); |
168 | for (int destRank : peerRanks) { |
169 | int sendCount = getNumElemsPerStep(myRank_, step); |
170 | int ptrOffset = getPtrOffsetPerStep(myRank_, step); |
171 | /* |
172 | * Wait for notification from the peer to make sure we can send data |
173 | * without risking any overwrites in its receive buffer. |
174 | */ |
175 | recvNotificationBufs_[destRank]->waitRecv(); |
176 | DEBUG_PRINT_SEND("all-gather" ); |
177 | sendDataBufs_[destRank]->send( |
178 | ptrOffset * sizeof(T), sendCount * sizeof(T)); |
179 | } |
180 | |
181 | for (int srcRank : peerRanks) { |
182 | int recvCount = getNumElemsPerStep(srcRank, step); |
183 | int ptrOffset = getPtrOffsetPerStep(srcRank, step); |
184 | recvDataBufs_[srcRank]->waitRecv(); |
185 | DEBUG_PRINT_RECV("all-gather" ); |
186 | auto recvBufAtOffset = |
187 | recvBufs_[recvBufIdx_[srcRank]].range(0, recvCount); |
188 | auto scratchAtOffset = scratch_.range(ptrOffset, recvCount); |
189 | stream.copyAsync(scratchAtOffset, recvBufAtOffset); |
190 | stream.wait(); |
191 | if (step == 0) { |
192 | /* |
193 | * Send notification to the pair we just received from that |
194 | * we're done dealing with the receive buffer.`` |
195 | */ |
196 | sendNotificationBufs_[srcRank]->send(); |
197 | } |
198 | } // recvs within group and reduces |
199 | } // all-gather steps |
200 | |
201 | DEBUG_PRINT_STAGE("all-reduced" ); |
202 | |
203 | localBroadcastOp_->runAsync(); |
204 | localBroadcastOp_->wait(); |
205 | |
206 | /* |
207 | * Wait for notifications from our peers within the block to make |
208 | * sure we can send data immediately without risking overwriting |
209 | * data in its receive buffer before it consumed that data. |
210 | */ |
211 | for (int peerRank : getPeersPerStep(myRank_, 0)) { |
212 | recvNotificationBufs_[peerRank]->waitRecv(); |
213 | } |
214 | } |
215 | |
216 | template <typename T, typename W> |
217 | int CudaAllreduceBcube<T, W>::computeSteps(int nodes, int peers) { |
218 | float lg2n = log2(nodes); |
219 | float lg2p = log2(peers); |
220 | return ceil(lg2n / lg2p); |
221 | } |
222 | |
223 | template <typename T, typename W> |
224 | bool CudaAllreduceBcube<T, W>::printCheck(int /* rank */) { |
225 | return false; |
226 | } |
227 | |
228 | template <typename T, typename W> |
229 | void CudaAllreduceBcube<T, W>::printBreak(T* p, int x) { |
230 | if (0 == x % wordsPerLine) { |
231 | std::cout << std::endl |
232 | << &p[x] << " " << std::setfill('0') << std::setw(5) << x << ": " ; |
233 | } else if (0 == x % wordsPerSection) { |
234 | std::cout << "- " ; |
235 | } |
236 | } |
237 | |
238 | template <typename T, typename W> |
239 | void CudaAllreduceBcube<T, W>::printElems(T* p, int count, int start) { |
240 | auto alignedStart = (start / wordsPerLine) * wordsPerLine; |
241 | for (int x = alignedStart; x < start + count; ++x) { |
242 | printBreak(p, x); |
243 | if (x < start) { |
244 | std::cout << "..... " ; |
245 | } else { |
246 | std::cout << std::setfill('0') << std::setw(5) << p[x] << " " ; |
247 | } |
248 | } |
249 | } |
250 | |
251 | template <typename T, typename W> |
252 | void CudaAllreduceBcube<T, W>::printStageBuffer(const std::string& msg) { |
253 | if (printCheck(myRank_)) { |
254 | std::cout << "rank (" << myRank_ << ") " << msg << ": " ; |
255 | printElems(&scratch_[0], totalNumElems_); |
256 | std::cout << std::endl; |
257 | } |
258 | } |
259 | |
260 | template <typename T, typename W> |
261 | void CudaAllreduceBcube<T, W>::printStepBuffer( |
262 | const std::string& stage, |
263 | int step, |
264 | int srcRank, |
265 | int destRank, |
266 | T* p, |
267 | int count, |
268 | int start) { |
269 | if (printCheck(myRank_)) { |
270 | std::cout << stage << ": step (" << step << ") " |
271 | << "srcRank (" << srcRank << ") -> " |
272 | << "destRank (" << destRank << "): " ; |
273 | printElems(p, count, start); |
274 | std::cout << std::endl; |
275 | } |
276 | } |
277 | |
278 | template <typename T, typename W> |
279 | const std::vector<int>& CudaAllreduceBcube<T, W>::getPeersPerStep( |
280 | int rank, |
281 | int step) { |
282 | return allNodes_[rank].getPeersPerStep(step); |
283 | } |
284 | |
285 | template <typename T, typename W> |
286 | int CudaAllreduceBcube<T, W>::getNumElemsPerStep(int rank, int step) { |
287 | return allNodes_[rank].getNumElemsPerStep(step); |
288 | } |
289 | |
290 | template <typename T, typename W> |
291 | int CudaAllreduceBcube<T, W>::getPtrOffsetPerStep(int rank, int step) { |
292 | return allNodes_[rank].getPtrOffsetPerStep(step); |
293 | } |
294 | |
295 | template <typename T, typename W> |
296 | void CudaAllreduceBcube<T, W>::createNodes() { |
297 | for (int rank = 0; rank < nodes_; ++rank) { |
298 | allNodes_.emplace_back(rank, steps_); |
299 | } |
300 | } |
301 | |
302 | template <typename T, typename W> |
303 | void CudaAllreduceBcube<T, W>::updateGroupNodes( |
304 | int step, |
305 | const cuda::bcube::Group& group) { |
306 | const std::vector<int>& peers = group.getNodeRanks(); |
307 | const int = peers.size(); |
308 | int ptrOffset = group.getPtrOffset(); |
309 | int count = group.getNumElems() / peersSz; |
310 | const int countRem = group.getNumElems() % peersSz; |
311 | if (0 == count) { |
312 | count = 1; |
313 | } |
314 | for (int i = 0; i < peersSz; ++i) { |
315 | cuda::bcube::Node& node = allNodes_[peers[i]]; |
316 | if (peersSz - 1 != i) { // if not the last node in group |
317 | node.setPerStepAttributes(step, peers, count, ptrOffset); |
318 | ptrOffset += count; |
319 | } else { |
320 | /* |
321 | * The last node get the remainder elements if the number of |
322 | * elements is not exactly divisible by number of peers |
323 | */ |
324 | node.setPerStepAttributes(step, peers, count + countRem, ptrOffset); |
325 | ptrOffset += count + countRem; |
326 | } |
327 | ptrOffset %= totalNumElems_; |
328 | } |
329 | } |
330 | |
331 | template <typename T, typename W> |
332 | void CudaAllreduceBcube<T, W>::setupNodes() { |
333 | // Create all the nodes upfront |
334 | createNodes(); |
335 | |
336 | // Now we actually try to set up the nodes |
337 | int peerDistance = 1; |
338 | for (int step = 0; step < steps_; ++step) { |
339 | std::vector<cuda::bcube::Group> groups; |
340 | // Iterate over all the nodes to identify the first node of each group |
341 | for (int rank = 0; rank < nodes_; ++rank) { |
342 | const cuda::bcube::Node& firstNode = allNodes_[rank]; |
343 | // Only the ones with no peers would be first node |
344 | if (0 == firstNode.getPeersPerStep(step).size()) { |
345 | // Create a new group |
346 | groups.emplace_back( |
347 | step, firstNode, peerDistance, base_, nodes_, totalNumElems_); |
348 | // check the size to keep link happy :/ |
349 | if (0 < groups.size()) { |
350 | // Iterrate over all the peer nodes and set them up for the step |
351 | updateGroupNodes(step, groups.back()); |
352 | } |
353 | } // if (0 == firstNode ... |
354 | } // for (int rank = 0.. |
355 | // Done iterating over all the nodes. Update peerDistance for next step. |
356 | peerDistance *= base_; |
357 | } // for (int step ... |
358 | } // setupNodes |
359 | |
360 | template <typename T, typename W> |
361 | template <typename U> |
362 | void CudaAllreduceBcube<T, W>::init( |
363 | typename std::enable_if< |
364 | std::is_same<U, CudaHostWorkspace<T>>::value, |
365 | typename U::Pointer>::type*) { |
366 | // Since reduction is executed on the CPU, the scratch space |
367 | // where they are accumulated is a new host side buffer. |
368 | scratch_ = W::Pointer::alloc(totalNumElems_); |
369 | scratchStream_ = &streams_[0]; |
370 | |
371 | // Set up local reduction and broadcast operations on the host. |
372 | // If devicePtrs_.size() == 1 these functions construct an op that |
373 | // executes a memcpy such that scratch_ always holds the result. |
374 | if (bytes_ < kOnDeviceThreshold) { |
375 | localReduceOp_ = |
376 | cudaHostReduce(streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_); |
377 | localBroadcastOp_ = |
378 | cudaHostBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_); |
379 | } else { |
380 | localReduceOp_ = cudaDeviceReduce( |
381 | streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_); |
382 | localBroadcastOp_ = |
383 | cudaDeviceBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_); |
384 | } |
385 | } |
386 | |
387 | template <typename T, typename W> |
388 | template <typename U> |
389 | void CudaAllreduceBcube<T, W>::init( |
390 | typename std::enable_if< |
391 | std::is_same<U, CudaDeviceWorkspace<T>>::value, |
392 | typename U::Pointer>::type*) { |
393 | // The networking adapter does DMA to/from GPU memory, so we should reduce |
394 | // onto the device that's closest to the networking adapter bound |
395 | // to our context. This uses PCI distance to find closest GPU. |
396 | auto index = findCudaDevicePointerClosestToDevice( |
397 | devicePtrs_, this->context_->getDevice()); |
398 | scratch_ = CudaDevicePointer<T>::create(devicePtrs_[index]); |
399 | scratchStream_ = &streams_[index]; |
400 | |
401 | // Set up local reduction and broadcast operations on the device. |
402 | // When running with a device workspace we intend to never leave the device. |
403 | |
404 | if (devicePtrs_.size() > 1) { |
405 | localReduceOp_ = cudaDeviceReduce( |
406 | streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_); |
407 | localBroadcastOp_ = |
408 | cudaDeviceBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_); |
409 | } |
410 | } |
411 | |
412 | namespace cuda { |
413 | namespace bcube { |
414 | |
415 | Node::Node(int rank, int steps) : rank_(rank) { |
416 | for (int i = 0; i < steps; ++i) { |
417 | peersPerStep_.emplace_back(); |
418 | } |
419 | numElemsPerStep_.reserve(steps); |
420 | ptrOffsetPerStep_.reserve(steps); |
421 | } |
422 | |
423 | int Node::getRank() const { |
424 | return rank_; |
425 | } |
426 | |
427 | void Node::setPerStepAttributes( |
428 | int step, |
429 | const std::vector<int>& peerRanks, |
430 | int numElems, |
431 | int offset) { |
432 | for (int peerRank : peerRanks) { |
433 | if (peerRank != rank_) { |
434 | peersPerStep_[step].emplace_back(peerRank); |
435 | } |
436 | } |
437 | numElemsPerStep_[step] = numElems; |
438 | ptrOffsetPerStep_[step] = offset; |
439 | } |
440 | |
441 | const std::vector<int>& Node::getPeersPerStep(int step) const { |
442 | return peersPerStep_[step]; |
443 | } |
444 | |
445 | int Node::getNumElemsPerStep(int step) const { |
446 | return numElemsPerStep_[step]; |
447 | } |
448 | |
449 | int Node::getPtrOffsetPerStep(int step) const { |
450 | return ptrOffsetPerStep_[step]; |
451 | } |
452 | |
453 | Group::Group( |
454 | int step, |
455 | const Node& firstNode, |
456 | int peerDistance, |
457 | int base, |
458 | int nodes, |
459 | int totalNumElems) |
460 | : nodeRanks_(getNodeRanks(firstNode.getRank(), peerDistance, base, nodes)), |
461 | ptrOffset_((0 == step) ? 0 : firstNode.getPtrOffsetPerStep(step - 1)), |
462 | numElems_( |
463 | computeNumElems(step, firstNode, nodeRanks_.size(), totalNumElems)) {} |
464 | |
465 | const std::vector<int>& Group::getNodeRanks() const { |
466 | return nodeRanks_; |
467 | } |
468 | |
469 | int Group::getPtrOffset() const { |
470 | return ptrOffset_; |
471 | } |
472 | |
473 | int Group::getNumElems() const { |
474 | return numElems_; |
475 | } |
476 | |
477 | int Group::computeNumElems( |
478 | int step, |
479 | const Node& firstNode, |
480 | int peers, |
481 | int count) { |
482 | int groupCount = (0 == step) ? count : firstNode.getNumElemsPerStep(step - 1); |
483 | return std::max(groupCount, peers); |
484 | } |
485 | |
486 | std::vector<int> Group::getNodeRanks( |
487 | int firstNodeRank, |
488 | int peerDistance, |
489 | int base, |
490 | int nodes) const { |
491 | std::vector<int> groupPeers; |
492 | for (int i = 0; i < base; ++i) { |
493 | int peerRank = firstNodeRank + i * peerDistance; |
494 | if (peerRank < nodes) { |
495 | groupPeers.emplace_back(peerRank); |
496 | } |
497 | } |
498 | return groupPeers; |
499 | } |
500 | |
501 | } // namespace bcube |
502 | } // namespace cuda |
503 | |
504 | #define INSTANTIATE_TEMPLATE(T) \ |
505 | template class CudaAllreduceBcube<T, CudaHostWorkspace<T>>; \ |
506 | template class CudaAllreduceBcube<T, CudaDeviceWorkspace<T>>; |
507 | |
508 | INSTANTIATE_TEMPLATE(int8_t); |
509 | INSTANTIATE_TEMPLATE(uint8_t); |
510 | INSTANTIATE_TEMPLATE(int32_t); |
511 | INSTANTIATE_TEMPLATE(int64_t); |
512 | INSTANTIATE_TEMPLATE(uint64_t); |
513 | INSTANTIATE_TEMPLATE(float); |
514 | INSTANTIATE_TEMPLATE(double); |
515 | INSTANTIATE_TEMPLATE(float16); |
516 | |
517 | } // namespace gloo |
518 | |