1/**
2 * Copyright (c) 2018-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#include "gloo/cuda_allreduce_bcube.h"
10
11#include "gloo/cuda_collectives_device.h"
12#include "gloo/cuda_collectives_host.h"
13#include "gloo/cuda_private.h"
14
15#include <sstream>
16#ifndef _WIN32
17#include <unistd.h>
18#endif
19
20#ifdef DEBUG
21#define DEBUG_PRINT_STAGE(stage) \
22 do { \
23 printStageBuffer(stage); \
24 } while (false)
25#define DEBUG_PRINT_SEND(stage) \
26 do { \
27 printStepBuffer( \
28 stage, step, myRank_, destRank, &scratch_[0], sendCount, ptrOffset); \
29 } while (false)
30#define DEBUG_PRINT_RECV(stage) \
31 do { \
32 printStepBuffer( \
33 stage, \
34 step, \
35 srcRank, \
36 myRank_, \
37 &recvBufs_[recvBufIdx_[srcRank]][0], \
38 recvCount); \
39 } while (false)
40#else
41#define DEBUG_PRINT_STAGE(stage)
42#define DEBUG_PRINT_SEND(stage)
43#define DEBUG_PRINT_RECV(stage)
44#endif
45
46namespace gloo {
47
48template <typename T, typename W>
49CudaAllreduceBcube<T, W>::CudaAllreduceBcube(
50 const std::shared_ptr<Context>& context,
51 const std::vector<T*>& ptrs,
52 const int count,
53 const std::vector<cudaStream_t>& streams,
54 const CudaReductionFunction<T>* fn)
55 : Algorithm(context),
56 myRank_(this->context_->rank),
57 base_(this->context_->base ? this->context_->base : 2),
58 nodes_(this->contextSize_),
59 totalNumElems_(count),
60 bytes_(totalNumElems_ * sizeof(T)),
61 steps_(computeSteps(nodes_, base_)),
62 fn_(fn),
63 recvBufs_(steps_ * base_) {
64 auto newStream = true;
65 if (streams.size() > 0) {
66 GLOO_ENFORCE_EQ(streams.size(), ptrs.size());
67 newStream = false;
68 }
69 for (auto i = 0; i < ptrs.size(); i++) {
70 auto ptr = CudaDevicePointer<T>::create(ptrs[i], totalNumElems_);
71 if (newStream) {
72 streams_.push_back(CudaStream(ptr.getDeviceID()));
73 } else {
74 streams_.push_back(CudaStream(ptr.getDeviceID(), streams[i]));
75 }
76 devicePtrs_.push_back(std::move(ptr));
77 }
78 // Workspace specific initialization (see below)
79 init();
80 if (nodes_ == 1) {
81 return;
82 }
83 setupNodes();
84 /*
85 * Reserve max needed number of context slots. Up to 2 slots per process
86 * pair are needed (one for regular sends and one for notifications). For
87 * simplicity, the same mapping is used on all processes so that the slots
88 * trivially match across processes
89 */
90 int slotOffset_ = this->context_->nextSlot(
91 2 * this->contextSize_ * (this->contextSize_ - 1));
92
93 int bufIdx = 0;
94 for (int step = 0; step < steps_; ++step) {
95 for (int destRank : getPeersPerStep(myRank_, step)) {
96 int recvSize = std::max(
97 getNumElemsPerStep(myRank_, step),
98 getNumElemsPerStep(destRank, step));
99 auto& pair = this->context_->getPair(destRank);
100 auto slot = slotOffset_ +
101 2 * (std::min(myRank_, destRank) * nodes_ +
102 std::max(myRank_, destRank));
103 sendDataBufs_[destRank] = pair->createSendBuffer(slot, *scratch_, bytes_);
104 recvBufs_[bufIdx] = W::Pointer::alloc(recvSize);
105 recvDataBufs_[destRank] = pair->createRecvBuffer(
106 slot, &recvBufs_[bufIdx][0], recvSize * sizeof(T));
107 recvBufIdx_[destRank] = bufIdx;
108 ++bufIdx;
109 ++slot;
110 sendNotificationBufs_[destRank] =
111 pair->createSendBuffer(slot, &dummy_, sizeof(dummy_));
112 recvNotificationBufs_[destRank] =
113 pair->createRecvBuffer(slot, &dummy_, sizeof(dummy_));
114 } // nodes
115 } // steps
116}
117
118template <typename T, typename W>
119void CudaAllreduceBcube<T, W>::run() {
120 CudaDeviceGuard guard;
121 CudaStream& stream = *scratchStream_;
122
123 localReduceOp_->run();
124
125 if (nodes_ == 1) {
126 GLOO_ENFORCE(
127 localBroadcastOp_,
128 "localBroadcastOp must be initialized for single machine");
129 localBroadcastOp_->run();
130 return;
131 }
132
133 // Reduce-scatter
134 DEBUG_PRINT_STAGE("start");
135 for (int step = 0; step < steps_; ++step) {
136 const auto& peerRanks = getPeersPerStep(myRank_, step);
137 for (int destRank : peerRanks) {
138 int sendCount = getNumElemsPerStep(destRank, step);
139 int ptrOffset = getPtrOffsetPerStep(destRank, step);
140 DEBUG_PRINT_SEND("reduce-scatter");
141 sendDataBufs_[destRank]->send(
142 ptrOffset * sizeof(T), sendCount * sizeof(T));
143 } // sends within group
144
145 for (int srcRank : peerRanks) {
146 int recvCount = getNumElemsPerStep(myRank_, step);
147 int ptrOffset = getPtrOffsetPerStep(myRank_, step);
148 recvDataBufs_[srcRank]->waitRecv();
149 DEBUG_PRINT_RECV("reduce-scatter");
150 auto recvBufAtOffset =
151 recvBufs_[recvBufIdx_[srcRank]].range(0, recvCount);
152 auto scratchAtOffset = scratch_.range(ptrOffset, recvCount);
153 fn_->call(scratchAtOffset, recvBufAtOffset, recvCount, stream);
154 stream.wait();
155 /*
156 * Send notification to the pair we just received from that
157 * we're done dealing with the receive buffer.
158 */
159 sendNotificationBufs_[srcRank]->send();
160 } // recvs within group and reduces
161 } // reduce-scatter steps
162
163 DEBUG_PRINT_STAGE("reduce-scattered");
164
165 // All-gather
166 for (int step = steps_ - 1; step >= 0; --step) {
167 const auto& peerRanks = getPeersPerStep(myRank_, step);
168 for (int destRank : peerRanks) {
169 int sendCount = getNumElemsPerStep(myRank_, step);
170 int ptrOffset = getPtrOffsetPerStep(myRank_, step);
171 /*
172 * Wait for notification from the peer to make sure we can send data
173 * without risking any overwrites in its receive buffer.
174 */
175 recvNotificationBufs_[destRank]->waitRecv();
176 DEBUG_PRINT_SEND("all-gather");
177 sendDataBufs_[destRank]->send(
178 ptrOffset * sizeof(T), sendCount * sizeof(T));
179 }
180
181 for (int srcRank : peerRanks) {
182 int recvCount = getNumElemsPerStep(srcRank, step);
183 int ptrOffset = getPtrOffsetPerStep(srcRank, step);
184 recvDataBufs_[srcRank]->waitRecv();
185 DEBUG_PRINT_RECV("all-gather");
186 auto recvBufAtOffset =
187 recvBufs_[recvBufIdx_[srcRank]].range(0, recvCount);
188 auto scratchAtOffset = scratch_.range(ptrOffset, recvCount);
189 stream.copyAsync(scratchAtOffset, recvBufAtOffset);
190 stream.wait();
191 if (step == 0) {
192 /*
193 * Send notification to the pair we just received from that
194 * we're done dealing with the receive buffer.``
195 */
196 sendNotificationBufs_[srcRank]->send();
197 }
198 } // recvs within group and reduces
199 } // all-gather steps
200
201 DEBUG_PRINT_STAGE("all-reduced");
202
203 localBroadcastOp_->runAsync();
204 localBroadcastOp_->wait();
205
206 /*
207 * Wait for notifications from our peers within the block to make
208 * sure we can send data immediately without risking overwriting
209 * data in its receive buffer before it consumed that data.
210 */
211 for (int peerRank : getPeersPerStep(myRank_, 0)) {
212 recvNotificationBufs_[peerRank]->waitRecv();
213 }
214}
215
216template <typename T, typename W>
217int CudaAllreduceBcube<T, W>::computeSteps(int nodes, int peers) {
218 float lg2n = log2(nodes);
219 float lg2p = log2(peers);
220 return ceil(lg2n / lg2p);
221}
222
223template <typename T, typename W>
224bool CudaAllreduceBcube<T, W>::printCheck(int /* rank */) {
225 return false;
226}
227
228template <typename T, typename W>
229void CudaAllreduceBcube<T, W>::printBreak(T* p, int x) {
230 if (0 == x % wordsPerLine) {
231 std::cout << std::endl
232 << &p[x] << " " << std::setfill('0') << std::setw(5) << x << ": ";
233 } else if (0 == x % wordsPerSection) {
234 std::cout << "- ";
235 }
236}
237
238template <typename T, typename W>
239void CudaAllreduceBcube<T, W>::printElems(T* p, int count, int start) {
240 auto alignedStart = (start / wordsPerLine) * wordsPerLine;
241 for (int x = alignedStart; x < start + count; ++x) {
242 printBreak(p, x);
243 if (x < start) {
244 std::cout << "..... ";
245 } else {
246 std::cout << std::setfill('0') << std::setw(5) << p[x] << " ";
247 }
248 }
249}
250
251template <typename T, typename W>
252void CudaAllreduceBcube<T, W>::printStageBuffer(const std::string& msg) {
253 if (printCheck(myRank_)) {
254 std::cout << "rank (" << myRank_ << ") " << msg << ": ";
255 printElems(&scratch_[0], totalNumElems_);
256 std::cout << std::endl;
257 }
258}
259
260template <typename T, typename W>
261void CudaAllreduceBcube<T, W>::printStepBuffer(
262 const std::string& stage,
263 int step,
264 int srcRank,
265 int destRank,
266 T* p,
267 int count,
268 int start) {
269 if (printCheck(myRank_)) {
270 std::cout << stage << ": step (" << step << ") "
271 << "srcRank (" << srcRank << ") -> "
272 << "destRank (" << destRank << "): ";
273 printElems(p, count, start);
274 std::cout << std::endl;
275 }
276}
277
278template <typename T, typename W>
279const std::vector<int>& CudaAllreduceBcube<T, W>::getPeersPerStep(
280 int rank,
281 int step) {
282 return allNodes_[rank].getPeersPerStep(step);
283}
284
285template <typename T, typename W>
286int CudaAllreduceBcube<T, W>::getNumElemsPerStep(int rank, int step) {
287 return allNodes_[rank].getNumElemsPerStep(step);
288}
289
290template <typename T, typename W>
291int CudaAllreduceBcube<T, W>::getPtrOffsetPerStep(int rank, int step) {
292 return allNodes_[rank].getPtrOffsetPerStep(step);
293}
294
295template <typename T, typename W>
296void CudaAllreduceBcube<T, W>::createNodes() {
297 for (int rank = 0; rank < nodes_; ++rank) {
298 allNodes_.emplace_back(rank, steps_);
299 }
300}
301
302template <typename T, typename W>
303void CudaAllreduceBcube<T, W>::updateGroupNodes(
304 int step,
305 const cuda::bcube::Group& group) {
306 const std::vector<int>& peers = group.getNodeRanks();
307 const int peersSz = peers.size();
308 int ptrOffset = group.getPtrOffset();
309 int count = group.getNumElems() / peersSz;
310 const int countRem = group.getNumElems() % peersSz;
311 if (0 == count) {
312 count = 1;
313 }
314 for (int i = 0; i < peersSz; ++i) {
315 cuda::bcube::Node& node = allNodes_[peers[i]];
316 if (peersSz - 1 != i) { // if not the last node in group
317 node.setPerStepAttributes(step, peers, count, ptrOffset);
318 ptrOffset += count;
319 } else {
320 /*
321 * The last node get the remainder elements if the number of
322 * elements is not exactly divisible by number of peers
323 */
324 node.setPerStepAttributes(step, peers, count + countRem, ptrOffset);
325 ptrOffset += count + countRem;
326 }
327 ptrOffset %= totalNumElems_;
328 }
329}
330
331template <typename T, typename W>
332void CudaAllreduceBcube<T, W>::setupNodes() {
333 // Create all the nodes upfront
334 createNodes();
335
336 // Now we actually try to set up the nodes
337 int peerDistance = 1;
338 for (int step = 0; step < steps_; ++step) {
339 std::vector<cuda::bcube::Group> groups;
340 // Iterate over all the nodes to identify the first node of each group
341 for (int rank = 0; rank < nodes_; ++rank) {
342 const cuda::bcube::Node& firstNode = allNodes_[rank];
343 // Only the ones with no peers would be first node
344 if (0 == firstNode.getPeersPerStep(step).size()) {
345 // Create a new group
346 groups.emplace_back(
347 step, firstNode, peerDistance, base_, nodes_, totalNumElems_);
348 // check the size to keep link happy :/
349 if (0 < groups.size()) {
350 // Iterrate over all the peer nodes and set them up for the step
351 updateGroupNodes(step, groups.back());
352 }
353 } // if (0 == firstNode ...
354 } // for (int rank = 0..
355 // Done iterating over all the nodes. Update peerDistance for next step.
356 peerDistance *= base_;
357 } // for (int step ...
358} // setupNodes
359
360template <typename T, typename W>
361template <typename U>
362void CudaAllreduceBcube<T, W>::init(
363 typename std::enable_if<
364 std::is_same<U, CudaHostWorkspace<T>>::value,
365 typename U::Pointer>::type*) {
366 // Since reduction is executed on the CPU, the scratch space
367 // where they are accumulated is a new host side buffer.
368 scratch_ = W::Pointer::alloc(totalNumElems_);
369 scratchStream_ = &streams_[0];
370
371 // Set up local reduction and broadcast operations on the host.
372 // If devicePtrs_.size() == 1 these functions construct an op that
373 // executes a memcpy such that scratch_ always holds the result.
374 if (bytes_ < kOnDeviceThreshold) {
375 localReduceOp_ =
376 cudaHostReduce(streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_);
377 localBroadcastOp_ =
378 cudaHostBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_);
379 } else {
380 localReduceOp_ = cudaDeviceReduce(
381 streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_);
382 localBroadcastOp_ =
383 cudaDeviceBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_);
384 }
385}
386
387template <typename T, typename W>
388template <typename U>
389void CudaAllreduceBcube<T, W>::init(
390 typename std::enable_if<
391 std::is_same<U, CudaDeviceWorkspace<T>>::value,
392 typename U::Pointer>::type*) {
393 // The networking adapter does DMA to/from GPU memory, so we should reduce
394 // onto the device that's closest to the networking adapter bound
395 // to our context. This uses PCI distance to find closest GPU.
396 auto index = findCudaDevicePointerClosestToDevice(
397 devicePtrs_, this->context_->getDevice());
398 scratch_ = CudaDevicePointer<T>::create(devicePtrs_[index]);
399 scratchStream_ = &streams_[index];
400
401 // Set up local reduction and broadcast operations on the device.
402 // When running with a device workspace we intend to never leave the device.
403
404 if (devicePtrs_.size() > 1) {
405 localReduceOp_ = cudaDeviceReduce(
406 streams_, devicePtrs_, scratch_, fn_, 0, totalNumElems_);
407 localBroadcastOp_ =
408 cudaDeviceBroadcast(streams_, devicePtrs_, scratch_, 0, totalNumElems_);
409 }
410}
411
412namespace cuda {
413namespace bcube {
414
415Node::Node(int rank, int steps) : rank_(rank) {
416 for (int i = 0; i < steps; ++i) {
417 peersPerStep_.emplace_back();
418 }
419 numElemsPerStep_.reserve(steps);
420 ptrOffsetPerStep_.reserve(steps);
421}
422
423int Node::getRank() const {
424 return rank_;
425}
426
427void Node::setPerStepAttributes(
428 int step,
429 const std::vector<int>& peerRanks,
430 int numElems,
431 int offset) {
432 for (int peerRank : peerRanks) {
433 if (peerRank != rank_) {
434 peersPerStep_[step].emplace_back(peerRank);
435 }
436 }
437 numElemsPerStep_[step] = numElems;
438 ptrOffsetPerStep_[step] = offset;
439}
440
441const std::vector<int>& Node::getPeersPerStep(int step) const {
442 return peersPerStep_[step];
443}
444
445int Node::getNumElemsPerStep(int step) const {
446 return numElemsPerStep_[step];
447}
448
449int Node::getPtrOffsetPerStep(int step) const {
450 return ptrOffsetPerStep_[step];
451}
452
453Group::Group(
454 int step,
455 const Node& firstNode,
456 int peerDistance,
457 int base,
458 int nodes,
459 int totalNumElems)
460 : nodeRanks_(getNodeRanks(firstNode.getRank(), peerDistance, base, nodes)),
461 ptrOffset_((0 == step) ? 0 : firstNode.getPtrOffsetPerStep(step - 1)),
462 numElems_(
463 computeNumElems(step, firstNode, nodeRanks_.size(), totalNumElems)) {}
464
465const std::vector<int>& Group::getNodeRanks() const {
466 return nodeRanks_;
467}
468
469int Group::getPtrOffset() const {
470 return ptrOffset_;
471}
472
473int Group::getNumElems() const {
474 return numElems_;
475}
476
477int Group::computeNumElems(
478 int step,
479 const Node& firstNode,
480 int peers,
481 int count) {
482 int groupCount = (0 == step) ? count : firstNode.getNumElemsPerStep(step - 1);
483 return std::max(groupCount, peers);
484}
485
486std::vector<int> Group::getNodeRanks(
487 int firstNodeRank,
488 int peerDistance,
489 int base,
490 int nodes) const {
491 std::vector<int> groupPeers;
492 for (int i = 0; i < base; ++i) {
493 int peerRank = firstNodeRank + i * peerDistance;
494 if (peerRank < nodes) {
495 groupPeers.emplace_back(peerRank);
496 }
497 }
498 return groupPeers;
499}
500
501} // namespace bcube
502} // namespace cuda
503
504#define INSTANTIATE_TEMPLATE(T) \
505 template class CudaAllreduceBcube<T, CudaHostWorkspace<T>>; \
506 template class CudaAllreduceBcube<T, CudaDeviceWorkspace<T>>;
507
508INSTANTIATE_TEMPLATE(int8_t);
509INSTANTIATE_TEMPLATE(uint8_t);
510INSTANTIATE_TEMPLATE(int32_t);
511INSTANTIATE_TEMPLATE(int64_t);
512INSTANTIATE_TEMPLATE(uint64_t);
513INSTANTIATE_TEMPLATE(float);
514INSTANTIATE_TEMPLATE(double);
515INSTANTIATE_TEMPLATE(float16);
516
517} // namespace gloo
518