1 | /** |
2 | * Copyright (c) 2017-present, Facebook, Inc. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under the BSD-style license found in the |
6 | * LICENSE file in the root directory of this source tree. |
7 | */ |
8 | |
9 | #pragma once |
10 | |
11 | #include <math.h> |
12 | #include <stddef.h> |
13 | #include <string.h> |
14 | #include <cstring> |
15 | #include <iomanip> |
16 | #include <unordered_map> |
17 | |
18 | #include "gloo/algorithm.h" |
19 | #include "gloo/common/error.h" |
20 | #include "gloo/context.h" |
21 | #include "gloo/cuda.h" |
22 | #include "gloo/cuda_workspace.h" |
23 | |
24 | namespace gloo { |
25 | |
26 | namespace cuda { |
27 | namespace bcube { |
28 | |
29 | class Node; |
30 | class Group; |
31 | |
32 | } // namespace bcube |
33 | } // namespace cuda |
34 | |
35 | /** |
36 | * This is the main allreduce implementation. Bcube is a scheme where nodes are |
37 | * divided in groups. In reduce-scatter stage, in each group, a node peers with |
38 | * `base - 1` other nodes. In the first step data is reduced between nodes |
39 | * within the group. In the next step each node of a group peers with `base - 1` |
40 | * nodes from other exclusively different groups. Since each node would start |
41 | * with reduced data communicating with it would be like communicating with |
42 | * `base` number of nodes/groups from the previous step. This process continues |
43 | * until all the groups are covered and to be able to do that the algorithm |
44 | * would have log_base(n) number of steps. Each step the node reduces |
45 | * totalNumElems_ / (base^step) amount of elements. At the end of reduce-scatter |
46 | * stage each node would have reduced a chunk of elements. Now, in all-gather |
47 | * we follow a reverse process of reduce-scatter to communicate the reduced data |
48 | * with other nodes. |
49 | */ |
50 | template <typename T, typename W = CudaHostWorkspace<T>> |
51 | class CudaAllreduceBcube : public Algorithm { |
52 | public: |
53 | CudaAllreduceBcube( |
54 | const std::shared_ptr<Context>& context, |
55 | const std::vector<T*>& ptrs, |
56 | const int count, |
57 | const std::vector<cudaStream_t>& streams = std::vector<cudaStream_t>(), |
58 | const CudaReductionFunction<T>* fn = CudaReductionFunction<T>::sum); |
59 | |
60 | virtual ~CudaAllreduceBcube() = default; |
61 | |
62 | virtual void run() override; |
63 | |
64 | private: |
65 | /** |
66 | * Number of words to be printed per section by printElems |
67 | */ |
68 | static constexpr int wordsPerSection = 4; |
69 | /** |
70 | * Number of words to be printed per line by printElems |
71 | */ |
72 | static constexpr int wordsPerLine = 4 * wordsPerSection; |
73 | /** |
74 | * Just a reference to current nodes rank |
75 | */ |
76 | const int myRank_{0}; |
77 | /** |
78 | * Number of nodes in a typical group |
79 | */ |
80 | const int base_{2}; |
81 | /** |
82 | * Total number of nodes |
83 | */ |
84 | const int nodes_{0}; |
85 | /** |
86 | * Pointer to the elements for this device |
87 | */ |
88 | std::vector<CudaDevicePointer<T>> devicePtrs_; |
89 | /** |
90 | * Streams to allow async copy operations with all the device pointers |
91 | */ |
92 | std::vector<CudaStream> streams_; |
93 | /** |
94 | * Since reduction is executed on the CPU, the scratch space is where they are |
95 | * accumulated is a new host side buffer. |
96 | */ |
97 | typename W::Pointer scratch_; |
98 | /** |
99 | * Streams for async copy operations for this device pointers |
100 | */ |
101 | CudaStream* scratchStream_; |
102 | /** |
103 | * Total number of elements to process |
104 | */ |
105 | const int totalNumElems_{0}; |
106 | /** |
107 | * Total number of bytes to process |
108 | */ |
109 | const int bytes_{0}; |
110 | /** |
111 | * Total number of steps |
112 | */ |
113 | const size_t steps_{0}; |
114 | /** |
115 | * The reduce operation function |
116 | */ |
117 | const CudaReductionFunction<T>* fn_{nullptr}; |
118 | /** |
119 | * List of actual buffers for incoming data |
120 | */ |
121 | std::vector<typename W::Pointer> recvBufs_; |
122 | /** |
123 | * Map of rank to incoming buffer index in recvBufs |
124 | */ |
125 | std::unordered_map<int, int> recvBufIdx_; |
126 | /** |
127 | * Map of rank to Buffer which will be used for outgoing data |
128 | */ |
129 | std::unordered_map<int, std::unique_ptr<transport::Buffer>> sendDataBufs_; |
130 | /** |
131 | * Map of rank to Buffer which will be used for incoming data |
132 | */ |
133 | std::unordered_map<int, std::unique_ptr<transport::Buffer>> recvDataBufs_; |
134 | /** |
135 | * Helps with multiple local pointers, local reduce operations |
136 | */ |
137 | std::unique_ptr<LocalOp<T>> localReduceOp_; |
138 | /** |
139 | * Helps with multiple local pointers, local broadcast operations |
140 | */ |
141 | std::unique_ptr<LocalOp<T>> localBroadcastOp_; |
142 | /** |
143 | * Dummy data used to signal end of one setup |
144 | */ |
145 | int dummy_; |
146 | /** |
147 | * Map of rank to Buffer which will be used for outgoing synchronization data |
148 | * at end of reduce-scatter and all-gather |
149 | */ |
150 | std::unordered_map<int, std::unique_ptr<transport::Buffer>> |
151 | sendNotificationBufs_; |
152 | /** |
153 | * Map of rank to Buffer which will be used for incoming synchronization data |
154 | * at end of reduce-scatter and all-gather |
155 | */ |
156 | std::unordered_map<int, std::unique_ptr<transport::Buffer>> |
157 | recvNotificationBufs_; |
158 | /** |
159 | * List of all the nodes |
160 | */ |
161 | std::vector<cuda::bcube::Node> allNodes_; |
162 | |
163 | /** |
164 | * Compute number of steps required in reduce-scatter and all-gather (each) |
165 | * @param nodes The total number of nodes |
166 | * @para peers The maximum number of peers in a group |
167 | */ |
168 | static int computeSteps(int nodes, int peers); |
169 | |
170 | /** |
171 | * Basically a gate to make sure only the right node(s) print logs |
172 | * @param rank Rank of the current node |
173 | */ |
174 | static bool printCheck(int /*rank*/); |
175 | |
176 | /** |
177 | * Prints a break given the offset of an element about to be printed |
178 | * @param p Pointer to the elements |
179 | * @param x The current offset to the pointer to words |
180 | */ |
181 | static void printBreak(T* p, int x); |
182 | /** |
183 | * Pretty prints a list of elements |
184 | * @param p Pointer to the elements |
185 | * @param count The number of elements to be printed |
186 | * @param start The offset from which to print |
187 | */ |
188 | static void printElems(T* p, int count, int start = 0); |
189 | /** |
190 | * Prints contents in the ptrs array at a particular stage |
191 | * @param msg Custom message to be printed |
192 | */ |
193 | void printStageBuffer(const std::string& msg); |
194 | /** |
195 | * Prints specified buffer during a step |
196 | * @param step The step when the buffer is being printed |
197 | * @param srcRank The sender of the data |
198 | * @param destRank The receiver of data |
199 | * @param p Poniter to the buffer to be printed |
200 | * @param count Number of elements to be printed |
201 | * @param start The offset from which to print |
202 | */ |
203 | void printStepBuffer( |
204 | const std::string& stage, |
205 | int step, |
206 | int srcRank, |
207 | int destRank, |
208 | T* p, |
209 | int count, |
210 | int start = 0); |
211 | /** |
212 | * Get all the peers of node with specified rank |
213 | * @param rank Rank of the node for which peers are needed |
214 | * @param step The step for which we need to get peers |
215 | * @return List of ranks of all peer nodes |
216 | */ |
217 | const std::vector<int>& getPeersPerStep(int rank, int step); |
218 | /** |
219 | * Get count of elements specified node needs to process in specified the step |
220 | * @param rank Rank of the node for which count is requested |
221 | * @param step The step for which we are querying count |
222 | */ |
223 | int getNumElemsPerStep(int rank, int step); |
224 | /** |
225 | * Get offset to ptrs array specified node needs to start processing from in |
226 | * the specified step |
227 | * @param rank Rank of the node for which offset is requested |
228 | * @param step The step for which we are querying offset |
229 | */ |
230 | int getPtrOffsetPerStep(int rank, int step); |
231 | /** |
232 | * Creates all the nodes with sequential ranks |
233 | */ |
234 | void createNodes(); |
235 | /** |
236 | * Updates the peer, count and offset values for all the nodes in a group |
237 | * @param step The step for which we are updating the values |
238 | * @param groups The group object with all peer, count and offset data |
239 | */ |
240 | void updateGroupNodes(int step, const cuda::bcube::Group& group); |
241 | /** |
242 | * Setup all the nodes |
243 | * Here are the things we do in this function |
244 | * - Create nodes |
245 | * - Compute and store elements per group in each step |
246 | * - Step up all the nodes |
247 | */ |
248 | void setupNodes(); |
249 | |
250 | template <typename U = W> |
251 | void init( |
252 | typename std::enable_if< |
253 | std::is_same<U, CudaHostWorkspace<T>>::value, |
254 | typename U::Pointer>::type* = 0); |
255 | |
256 | template <typename U = W> |
257 | void init( |
258 | typename std::enable_if< |
259 | std::is_same<U, CudaDeviceWorkspace<T>>::value, |
260 | typename U::Pointer>::type* = 0); |
261 | }; |
262 | |
263 | namespace cuda { |
264 | namespace bcube { |
265 | |
266 | /** |
267 | * This is a helper class. We create one object for each node |
268 | * participating in allreduce operation with respective rank. It enacapsulates |
269 | * information related to processing of elements. That is, how many elements |
270 | * need to be sent from what offset or received by a particular node and be |
271 | * reduced at what offset etc. |
272 | */ |
273 | class Node { |
274 | public: |
275 | explicit Node(int rank, int steps); |
276 | /** |
277 | * Get the rank of this node |
278 | */ |
279 | int getRank() const; |
280 | /** |
281 | * Used to record all the peer nodes, the number of elements to process and |
282 | * the offset from which data in the original ptr buffer will be processed by |
283 | * this node in a particular step. This is to be done as part of setup() |
284 | * function only. |
285 | * @param step The step for which we are recording attributes |
286 | * @param peerRanks All peer ranks. This would contain self too so need to |
287 | * @param numElems The number of elements this node will be processing in the |
288 | * @param offset The offset in the ptrs array |
289 | * filter that out. |
290 | */ |
291 | void setPerStepAttributes( |
292 | int step, |
293 | const std::vector<int>& peerRanks, |
294 | int numElems, |
295 | int offset); |
296 | /** |
297 | * Get all the nodes this node peers with in a particular step |
298 | * @param step The step for which we need to get peers |
299 | * @return List of ranks of all peer nodes |
300 | */ |
301 | const std::vector<int>& getPeersPerStep(int step) const; |
302 | /** |
303 | * Get count of elements this node needs to process in a specified the step |
304 | * @param step The step for which we are querying count |
305 | */ |
306 | int getNumElemsPerStep(int step) const; |
307 | /** |
308 | * Get offset to ptrs array this node needs to start processing from in the |
309 | * specified step |
310 | * @param step The step for which we are querying offset |
311 | */ |
312 | int getPtrOffsetPerStep(int step) const; |
313 | |
314 | private: |
315 | /** |
316 | * Rank of this node |
317 | */ |
318 | const int rank_; |
319 | /** |
320 | * A vector of a list of ranks (value) of nodes this node would peer with in a |
321 | * step (index) |
322 | */ |
323 | std::vector<std::vector<int>> peersPerStep_; |
324 | /** |
325 | * A vector of number of elements (value) this node needs to process in a step |
326 | * (index). This could be the number of elements to be received and reduced by |
327 | * a node and correspondingly sent by its peers during a step of |
328 | * reduce-scatter stage, or, similarly, the number of elements received and |
329 | * copied in the ptrs_ array by a node and correspondingly sent by it's peer |
330 | * during a step of all-gather stage. |
331 | */ |
332 | std::vector<int> numElemsPerStep_; |
333 | /** |
334 | * A vector of offset (value) within the ptrs_ array from which data needs to |
335 | * be processed by this node in a step (index). This would be used by peers to |
336 | * send data from ptrs_ array to this node and used with reduce function |
337 | * during reduce-scatter phase or during all-gather to send elements to peers |
338 | * from ptrs_ array. |
339 | */ |
340 | std::vector<int> ptrOffsetPerStep_; |
341 | }; |
342 | |
343 | /** |
344 | * This is another helper class. As part of each step of processing we divide |
345 | * nodes into multiple groups. This class helps track properties of that group. |
346 | * Such as, which nodes are part of the group, how many elements collectively |
347 | * all nodes need to process and at what offset etc. |
348 | */ |
349 | class Group { |
350 | public: |
351 | Group( |
352 | int step, |
353 | const Node& firstNode, |
354 | int peerDistance, |
355 | int base, |
356 | int nodes, |
357 | int totalNumElems); |
358 | /** |
359 | * Simple getter for all the nodes in the group |
360 | * @return List of ranks of nodes in the group |
361 | */ |
362 | const std::vector<int>& getNodeRanks() const; |
363 | /** |
364 | * Get the offset from which the group should process data |
365 | * @return Offset in the ptrs array |
366 | */ |
367 | int getPtrOffset() const; |
368 | /** |
369 | * Get the number of elements this group is supposed to process |
370 | * @return Count of elements (in ptr or receive buffers) |
371 | */ |
372 | int getNumElems() const; |
373 | |
374 | private: |
375 | const std::vector<int> nodeRanks_; |
376 | const int ptrOffset_; |
377 | const int numElems_; |
378 | /** |
379 | * Computes the number of elements this group needs to process. If this is the |
380 | * first step we start with all elements. For subsequent steps it's number of |
381 | * elements processed by single node in previous step. If this value is |
382 | * smaller than number of peers in the group simply use number of peers as the |
383 | * count so that at least one element is exchanged. Also, note that in this |
384 | * case some nodes may end up duplicating the work as the ptrOffset wraps |
385 | * around the totalNumElems_ in updateGroupNodes() function. |
386 | * @param step The current step |
387 | * @param firstNode The first node in the group |
388 | * @param peers The total number of peers in the group |
389 | * @count The total number of elements to be processed by this node |
390 | * @return The number of elements to be processed by this group |
391 | */ |
392 | static int |
393 | computeNumElems(int step, const Node& firstNode, int peers, int count); |
394 | /** |
395 | * Determines all the nodes in a group in a particular step |
396 | * @param peerDistance This is the distance between rank of each peer in the |
397 | * group |
398 | * @return List of ranks of nodes in the group |
399 | */ |
400 | std::vector<int> |
401 | getNodeRanks(int firstNodeRank, int peerDistance, int base, int nodes) const; |
402 | }; |
403 | |
404 | } // namespace bcube |
405 | } // namespace cuda |
406 | |
407 | } // namespace gloo |
408 | |