1/**
2 * Copyright (c) 2017-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#pragma once
10
11#include <math.h>
12#include <stddef.h>
13#include <string.h>
14#include <cstring>
15#include <iomanip>
16#include <unordered_map>
17
18#include "gloo/algorithm.h"
19#include "gloo/common/error.h"
20#include "gloo/context.h"
21#include "gloo/cuda.h"
22#include "gloo/cuda_workspace.h"
23
24namespace gloo {
25
26namespace cuda {
27namespace bcube {
28
29class Node;
30class Group;
31
32} // namespace bcube
33} // namespace cuda
34
35/**
36* This is the main allreduce implementation. Bcube is a scheme where nodes are
37* divided in groups. In reduce-scatter stage, in each group, a node peers with
38* `base - 1` other nodes. In the first step data is reduced between nodes
39* within the group. In the next step each node of a group peers with `base - 1`
40* nodes from other exclusively different groups. Since each node would start
41* with reduced data communicating with it would be like communicating with
42* `base` number of nodes/groups from the previous step. This process continues
43* until all the groups are covered and to be able to do that the algorithm
44* would have log_base(n) number of steps. Each step the node reduces
45* totalNumElems_ / (base^step) amount of elements. At the end of reduce-scatter
46* stage each node would have reduced a chunk of elements. Now, in all-gather
47* we follow a reverse process of reduce-scatter to communicate the reduced data
48* with other nodes.
49 */
50template <typename T, typename W = CudaHostWorkspace<T>>
51class CudaAllreduceBcube : public Algorithm {
52 public:
53 CudaAllreduceBcube(
54 const std::shared_ptr<Context>& context,
55 const std::vector<T*>& ptrs,
56 const int count,
57 const std::vector<cudaStream_t>& streams = std::vector<cudaStream_t>(),
58 const CudaReductionFunction<T>* fn = CudaReductionFunction<T>::sum);
59
60 virtual ~CudaAllreduceBcube() = default;
61
62 virtual void run() override;
63
64 private:
65 /**
66 * Number of words to be printed per section by printElems
67 */
68 static constexpr int wordsPerSection = 4;
69 /**
70 * Number of words to be printed per line by printElems
71 */
72 static constexpr int wordsPerLine = 4 * wordsPerSection;
73 /**
74 * Just a reference to current nodes rank
75 */
76 const int myRank_{0};
77 /**
78 * Number of nodes in a typical group
79 */
80 const int base_{2};
81 /**
82 * Total number of nodes
83 */
84 const int nodes_{0};
85 /**
86 * Pointer to the elements for this device
87 */
88 std::vector<CudaDevicePointer<T>> devicePtrs_;
89 /**
90 * Streams to allow async copy operations with all the device pointers
91 */
92 std::vector<CudaStream> streams_;
93 /**
94 * Since reduction is executed on the CPU, the scratch space is where they are
95 * accumulated is a new host side buffer.
96 */
97 typename W::Pointer scratch_;
98 /**
99 * Streams for async copy operations for this device pointers
100 */
101 CudaStream* scratchStream_;
102 /**
103 * Total number of elements to process
104 */
105 const int totalNumElems_{0};
106 /**
107 * Total number of bytes to process
108 */
109 const int bytes_{0};
110 /**
111 * Total number of steps
112 */
113 const size_t steps_{0};
114 /**
115 * The reduce operation function
116 */
117 const CudaReductionFunction<T>* fn_{nullptr};
118 /**
119 * List of actual buffers for incoming data
120 */
121 std::vector<typename W::Pointer> recvBufs_;
122 /**
123 * Map of rank to incoming buffer index in recvBufs
124 */
125 std::unordered_map<int, int> recvBufIdx_;
126 /**
127 * Map of rank to Buffer which will be used for outgoing data
128 */
129 std::unordered_map<int, std::unique_ptr<transport::Buffer>> sendDataBufs_;
130 /**
131 * Map of rank to Buffer which will be used for incoming data
132 */
133 std::unordered_map<int, std::unique_ptr<transport::Buffer>> recvDataBufs_;
134 /**
135 * Helps with multiple local pointers, local reduce operations
136 */
137 std::unique_ptr<LocalOp<T>> localReduceOp_;
138 /**
139 * Helps with multiple local pointers, local broadcast operations
140 */
141 std::unique_ptr<LocalOp<T>> localBroadcastOp_;
142 /**
143 * Dummy data used to signal end of one setup
144 */
145 int dummy_;
146 /**
147 * Map of rank to Buffer which will be used for outgoing synchronization data
148 * at end of reduce-scatter and all-gather
149 */
150 std::unordered_map<int, std::unique_ptr<transport::Buffer>>
151 sendNotificationBufs_;
152 /**
153 * Map of rank to Buffer which will be used for incoming synchronization data
154 * at end of reduce-scatter and all-gather
155 */
156 std::unordered_map<int, std::unique_ptr<transport::Buffer>>
157 recvNotificationBufs_;
158 /**
159 * List of all the nodes
160 */
161 std::vector<cuda::bcube::Node> allNodes_;
162
163 /**
164 * Compute number of steps required in reduce-scatter and all-gather (each)
165 * @param nodes The total number of nodes
166 * @para peers The maximum number of peers in a group
167 */
168 static int computeSteps(int nodes, int peers);
169
170 /**
171 * Basically a gate to make sure only the right node(s) print logs
172 * @param rank Rank of the current node
173 */
174 static bool printCheck(int /*rank*/);
175
176 /**
177 * Prints a break given the offset of an element about to be printed
178 * @param p Pointer to the elements
179 * @param x The current offset to the pointer to words
180 */
181 static void printBreak(T* p, int x);
182 /**
183 * Pretty prints a list of elements
184 * @param p Pointer to the elements
185 * @param count The number of elements to be printed
186 * @param start The offset from which to print
187 */
188 static void printElems(T* p, int count, int start = 0);
189 /**
190 * Prints contents in the ptrs array at a particular stage
191 * @param msg Custom message to be printed
192 */
193 void printStageBuffer(const std::string& msg);
194 /**
195 * Prints specified buffer during a step
196 * @param step The step when the buffer is being printed
197 * @param srcRank The sender of the data
198 * @param destRank The receiver of data
199 * @param p Poniter to the buffer to be printed
200 * @param count Number of elements to be printed
201 * @param start The offset from which to print
202 */
203 void printStepBuffer(
204 const std::string& stage,
205 int step,
206 int srcRank,
207 int destRank,
208 T* p,
209 int count,
210 int start = 0);
211 /**
212 * Get all the peers of node with specified rank
213 * @param rank Rank of the node for which peers are needed
214 * @param step The step for which we need to get peers
215 * @return List of ranks of all peer nodes
216 */
217 const std::vector<int>& getPeersPerStep(int rank, int step);
218 /**
219 * Get count of elements specified node needs to process in specified the step
220 * @param rank Rank of the node for which count is requested
221 * @param step The step for which we are querying count
222 */
223 int getNumElemsPerStep(int rank, int step);
224 /**
225 * Get offset to ptrs array specified node needs to start processing from in
226 * the specified step
227 * @param rank Rank of the node for which offset is requested
228 * @param step The step for which we are querying offset
229 */
230 int getPtrOffsetPerStep(int rank, int step);
231 /**
232 * Creates all the nodes with sequential ranks
233 */
234 void createNodes();
235 /**
236 * Updates the peer, count and offset values for all the nodes in a group
237 * @param step The step for which we are updating the values
238 * @param groups The group object with all peer, count and offset data
239 */
240 void updateGroupNodes(int step, const cuda::bcube::Group& group);
241 /**
242 * Setup all the nodes
243 * Here are the things we do in this function
244 * - Create nodes
245 * - Compute and store elements per group in each step
246 * - Step up all the nodes
247 */
248 void setupNodes();
249
250 template <typename U = W>
251 void init(
252 typename std::enable_if<
253 std::is_same<U, CudaHostWorkspace<T>>::value,
254 typename U::Pointer>::type* = 0);
255
256 template <typename U = W>
257 void init(
258 typename std::enable_if<
259 std::is_same<U, CudaDeviceWorkspace<T>>::value,
260 typename U::Pointer>::type* = 0);
261};
262
263namespace cuda {
264namespace bcube {
265
266/**
267 * This is a helper class. We create one object for each node
268 * participating in allreduce operation with respective rank. It enacapsulates
269 * information related to processing of elements. That is, how many elements
270 * need to be sent from what offset or received by a particular node and be
271 * reduced at what offset etc.
272 */
273class Node {
274 public:
275 explicit Node(int rank, int steps);
276 /**
277 * Get the rank of this node
278 */
279 int getRank() const;
280 /**
281 * Used to record all the peer nodes, the number of elements to process and
282 * the offset from which data in the original ptr buffer will be processed by
283 * this node in a particular step. This is to be done as part of setup()
284 * function only.
285 * @param step The step for which we are recording attributes
286 * @param peerRanks All peer ranks. This would contain self too so need to
287 * @param numElems The number of elements this node will be processing in the
288 * @param offset The offset in the ptrs array
289 * filter that out.
290 */
291 void setPerStepAttributes(
292 int step,
293 const std::vector<int>& peerRanks,
294 int numElems,
295 int offset);
296 /**
297 * Get all the nodes this node peers with in a particular step
298 * @param step The step for which we need to get peers
299 * @return List of ranks of all peer nodes
300 */
301 const std::vector<int>& getPeersPerStep(int step) const;
302 /**
303 * Get count of elements this node needs to process in a specified the step
304 * @param step The step for which we are querying count
305 */
306 int getNumElemsPerStep(int step) const;
307 /**
308 * Get offset to ptrs array this node needs to start processing from in the
309 * specified step
310 * @param step The step for which we are querying offset
311 */
312 int getPtrOffsetPerStep(int step) const;
313
314 private:
315 /**
316 * Rank of this node
317 */
318 const int rank_;
319 /**
320 * A vector of a list of ranks (value) of nodes this node would peer with in a
321 * step (index)
322 */
323 std::vector<std::vector<int>> peersPerStep_;
324 /**
325 * A vector of number of elements (value) this node needs to process in a step
326 * (index). This could be the number of elements to be received and reduced by
327 * a node and correspondingly sent by its peers during a step of
328 * reduce-scatter stage, or, similarly, the number of elements received and
329 * copied in the ptrs_ array by a node and correspondingly sent by it's peer
330 * during a step of all-gather stage.
331 */
332 std::vector<int> numElemsPerStep_;
333 /**
334 * A vector of offset (value) within the ptrs_ array from which data needs to
335 * be processed by this node in a step (index). This would be used by peers to
336 * send data from ptrs_ array to this node and used with reduce function
337 * during reduce-scatter phase or during all-gather to send elements to peers
338 * from ptrs_ array.
339 */
340 std::vector<int> ptrOffsetPerStep_;
341};
342
343/**
344 * This is another helper class. As part of each step of processing we divide
345 * nodes into multiple groups. This class helps track properties of that group.
346 * Such as, which nodes are part of the group, how many elements collectively
347 * all nodes need to process and at what offset etc.
348 */
349class Group {
350 public:
351 Group(
352 int step,
353 const Node& firstNode,
354 int peerDistance,
355 int base,
356 int nodes,
357 int totalNumElems);
358 /**
359 * Simple getter for all the nodes in the group
360 * @return List of ranks of nodes in the group
361 */
362 const std::vector<int>& getNodeRanks() const;
363 /**
364 * Get the offset from which the group should process data
365 * @return Offset in the ptrs array
366 */
367 int getPtrOffset() const;
368 /**
369 * Get the number of elements this group is supposed to process
370 * @return Count of elements (in ptr or receive buffers)
371 */
372 int getNumElems() const;
373
374 private:
375 const std::vector<int> nodeRanks_;
376 const int ptrOffset_;
377 const int numElems_;
378 /**
379 * Computes the number of elements this group needs to process. If this is the
380 * first step we start with all elements. For subsequent steps it's number of
381 * elements processed by single node in previous step. If this value is
382 * smaller than number of peers in the group simply use number of peers as the
383 * count so that at least one element is exchanged. Also, note that in this
384 * case some nodes may end up duplicating the work as the ptrOffset wraps
385 * around the totalNumElems_ in updateGroupNodes() function.
386 * @param step The current step
387 * @param firstNode The first node in the group
388 * @param peers The total number of peers in the group
389 * @count The total number of elements to be processed by this node
390 * @return The number of elements to be processed by this group
391 */
392 static int
393 computeNumElems(int step, const Node& firstNode, int peers, int count);
394 /**
395 * Determines all the nodes in a group in a particular step
396 * @param peerDistance This is the distance between rank of each peer in the
397 * group
398 * @return List of ranks of nodes in the group
399 */
400 std::vector<int>
401 getNodeRanks(int firstNodeRank, int peerDistance, int base, int nodes) const;
402};
403
404} // namespace bcube
405} // namespace cuda
406
407} // namespace gloo
408