1 | #include <c10/cuda/CUDAGuard.h> |
2 | #include <c10/util/irange.h> |
3 | |
4 | #include <ATen/cuda/CUDAContext.h> |
5 | #include <gtest/gtest.h> |
6 | #include <torch/csrc/distributed/c10d/FileStore.hpp> |
7 | #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp> |
8 | #include "CUDATest.hpp" |
9 | #include "TestUtils.hpp" |
10 | |
11 | using namespace c10d::test; |
12 | |
13 | using at::cuda::CUDAStream; |
14 | |
15 | template <typename T, typename... Args> |
16 | std::vector<T> initialize(const std::string& path, int N, Args&&... args) { |
17 | std::vector<T> tests; |
18 | for (C10_UNUSED const auto i : c10::irange(N)) { |
19 | tests.push_back(std::move(T(path, std::forward<Args>(args)...))); |
20 | } |
21 | |
22 | std::vector<std::thread> threads; |
23 | for (C10_UNUSED const auto i : c10::irange(N)) { |
24 | threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); })); |
25 | } |
26 | |
27 | for (auto& thread : threads) { |
28 | thread.join(); |
29 | } |
30 | |
31 | return tests; |
32 | } |
33 | |
34 | class AsyncTest { |
35 | public: |
36 | AsyncTest(std::string path) : path_(std::move(path)) {} |
37 | |
38 | AsyncTest(AsyncTest&& other) { |
39 | path_ = std::move(other.path_); |
40 | pg_ = std::move(other.pg_); |
41 | } |
42 | |
43 | ::c10d::ProcessGroupGloo& getProcessGroup() { |
44 | return *pg_; |
45 | } |
46 | |
47 | void start(int rank, int size) { |
48 | auto store = c10::make_intrusive<::c10d::FileStore>(path_, size); |
49 | |
50 | // Use tiny timeout to make this test run fast |
51 | auto options = ::c10d::ProcessGroupGloo::Options::create(); |
52 | options->timeout = std::chrono::milliseconds(50); |
53 | options->devices.push_back( |
54 | ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1" )); |
55 | |
56 | pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( |
57 | new ::c10d::ProcessGroupGloo(store, rank, size, options)); |
58 | } |
59 | |
60 | protected: |
61 | std::string path_; |
62 | std::unique_ptr<::c10d::ProcessGroupGloo> pg_; |
63 | }; |
64 | |
65 | class AsyncInputIsOutputTest : public AsyncTest { |
66 | public: |
67 | AsyncInputIsOutputTest(const std::string& path, int numTensors) |
68 | : AsyncTest(path), |
69 | numTensors_(numTensors), |
70 | numDevices_(cudaNumDevices()) { |
71 | // Allocate inputs on available devices in a round robin fashion. |
72 | ::at::globalContext().lazyInitCUDA(); |
73 | inputs_.resize(numTensors_); |
74 | for (const auto i : c10::irange(numTensors_)) { |
75 | inputs_[i] = at::empty( |
76 | {16, 16}, |
77 | at::device( |
78 | {at::kCUDA, static_cast<c10::DeviceIndex>(i % numDevices_)})); |
79 | } |
80 | |
81 | // Allocate a stream per device. |
82 | // |
83 | // The "current stream" is set globally per device in THC, so we |
84 | // can't make two tensors on the same device use different streams |
85 | // and pass this along to the collective (since it uses the THC |
86 | // getters to retrieve the current stream). |
87 | // |
88 | at::cuda::OptionalCUDAGuard deviceGuard; |
89 | streams_.reserve(numDevices_); |
90 | for (const auto i : c10::irange(numDevices_)) { |
91 | deviceGuard.set_index(i); |
92 | streams_.push_back(at::cuda::getStreamFromPool()); |
93 | } |
94 | } |
95 | |
96 | void wait(c10::intrusive_ptr<c10d::Work>& work) { |
97 | c10::cuda::CUDAMultiStreamGuard guard(streams_); |
98 | work->wait(); |
99 | } |
100 | |
101 | std::vector<at::Tensor> getCpuTensors( |
102 | const std::vector<at::Tensor>& gpu_tensors) { |
103 | std::vector<at::Tensor> outputs(gpu_tensors.size()); |
104 | |
105 | // For the duration of this function, make THC use our streams |
106 | c10::cuda::CUDAMultiStreamGuard guard(streams_); |
107 | |
108 | // Copy inputs to outputs |
109 | for (unsigned i = 0; i < gpu_tensors.size(); i++) { |
110 | outputs[i] = gpu_tensors[i].cpu(); |
111 | } |
112 | |
113 | return outputs; |
114 | } |
115 | |
116 | std::vector<at::Tensor> getTensors() { |
117 | return getCpuTensors(inputs_); |
118 | } |
119 | |
120 | protected: |
121 | const int numTensors_; |
122 | const int numDevices_; |
123 | std::vector<at::Tensor> inputs_; |
124 | std::vector<CUDAStream> streams_; |
125 | }; |
126 | |
127 | class AsyncAllreduceTest : public AsyncInputIsOutputTest { |
128 | public: |
129 | AsyncAllreduceTest(const std::string& path, int numTensors) |
130 | : AsyncInputIsOutputTest(path, numTensors) {} |
131 | |
132 | c10::intrusive_ptr<c10d::Work> run() { |
133 | // For the duration of this function, make THC use our streams |
134 | c10::cuda::CUDAMultiStreamGuard guard(streams_); |
135 | |
136 | // Launch sleep on every stream |
137 | at::cuda::OptionalCUDAGuard deviceGuard; |
138 | for (const auto i : c10::irange(numDevices_)) { |
139 | deviceGuard.set_index(i); |
140 | cudaSleep(streams_[i], 10 * 1000 * 1000); |
141 | } |
142 | |
143 | // Launch value initialization for every tensor |
144 | for (const auto i : c10::irange(numTensors_)) { |
145 | deviceGuard.set_index(i % numDevices_); |
146 | inputs_[i].fill_(pg_->getRank() * numTensors_ + i); |
147 | } |
148 | |
149 | return pg_->allreduce(inputs_); |
150 | } |
151 | }; |
152 | |
153 | class AsyncBroadcastTest : public AsyncInputIsOutputTest { |
154 | public: |
155 | AsyncBroadcastTest(const std::string& path, int numTensors) |
156 | : AsyncInputIsOutputTest(path, numTensors) {} |
157 | |
158 | c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) { |
159 | // For the duration of this function, make THC use our streams |
160 | c10::cuda::CUDAMultiStreamGuard guard(streams_); |
161 | |
162 | // Launch sleep on every stream |
163 | at::cuda::OptionalCUDAGuard deviceGuard; |
164 | for (const auto i : c10::irange(numDevices_)) { |
165 | deviceGuard.set_index(i); |
166 | cudaSleep(streams_[i], 10 * 1000 * 1000); |
167 | } |
168 | |
169 | // Launch value initialization for every tensor |
170 | for (const auto i : c10::irange(numTensors_)) { |
171 | deviceGuard.set_index(i % numDevices_); |
172 | inputs_[i].fill_(pg_->getRank() * numTensors_ + i); |
173 | } |
174 | |
175 | ::c10d::BroadcastOptions options; |
176 | options.rootRank = rootRank; |
177 | options.rootTensor = rootTensor; |
178 | return pg_->broadcast(inputs_, options); |
179 | } |
180 | }; |
181 | |
182 | void runAsyncAllreduceTest( |
183 | const std::string& path, |
184 | size_t numProcesses = 4, |
185 | size_t numTensors = 2) { |
186 | auto tests = initialize<AsyncAllreduceTest>(path, numProcesses, numTensors); |
187 | std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses); |
188 | for (const auto i : c10::irange(numProcesses)) { |
189 | work[i] = tests[i].run(); |
190 | } |
191 | |
192 | // Wait for work to complete |
193 | for (const auto i : c10::irange(numProcesses)) { |
194 | tests[i].wait(work[i]); |
195 | } |
196 | |
197 | // Check results |
198 | for (const auto i : c10::irange(numProcesses)) { |
199 | const auto size = numProcesses * numTensors; |
200 | const auto expected = (size * (size - 1)) / 2; |
201 | auto tensors = tests[i].getTensors(); |
202 | auto results = tests[i].getCpuTensors(work[i]->result()); |
203 | EXPECT_EQ(tensors.size(), results.size()); |
204 | |
205 | for (const auto j : c10::irange(tensors.size())) { |
206 | auto& tensor = tensors[j]; |
207 | auto data = tensor.data_ptr<float>(); |
208 | |
209 | auto& result_tensor = results[j]; |
210 | auto result_data = result_tensor.data_ptr<float>(); |
211 | |
212 | EXPECT_EQ(tensor.numel(), result_tensor.numel()); |
213 | |
214 | for (const auto k : c10::irange(tensor.numel())) { |
215 | EXPECT_EQ(data[k], expected); |
216 | EXPECT_EQ(result_data[k], expected); |
217 | } |
218 | } |
219 | } |
220 | } |
221 | |
222 | void runAsyncBroadcastTest( |
223 | const std::string& path, |
224 | size_t numProcesses = 4, |
225 | size_t numTensors = 1) { |
226 | auto tests = initialize<AsyncBroadcastTest>(path, numProcesses, numTensors); |
227 | |
228 | // Try every permutation of root rank and root tensor |
229 | for (const auto rootRank : c10::irange(numProcesses)) { |
230 | for (const auto rootTensor : c10::irange(numTensors)) { |
231 | std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses); |
232 | for (const auto i : c10::irange(numProcesses)) { |
233 | work[i] = tests[i].run(rootRank, rootTensor); |
234 | } |
235 | |
236 | // Wait for work to complete |
237 | for (const auto i : c10::irange(numProcesses)) { |
238 | tests[i].wait(work[i]); |
239 | } |
240 | |
241 | // Check results |
242 | const auto expected = (rootRank * numTensors + rootTensor); |
243 | for (const auto i : c10::irange(numProcesses)) { |
244 | auto tensors = tests[i].getTensors(); |
245 | for (const auto& tensor : tensors) { |
246 | const auto* const data = tensor.data_ptr<float>(); |
247 | for (const auto k : c10::irange(tensor.numel())) { |
248 | EXPECT_EQ(data[k], expected); |
249 | } |
250 | } |
251 | } |
252 | } |
253 | } |
254 | } |
255 | |
256 | #ifdef USE_CUDA |
257 | TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) { |
258 | if (!at::cuda::is_available()) { |
259 | LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce" ; |
260 | return; |
261 | } |
262 | TemporaryFile file; |
263 | runAsyncAllreduceTest(file.path); |
264 | } |
265 | |
266 | TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) { |
267 | if (!at::cuda::is_available()) { |
268 | LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast" ; |
269 | return; |
270 | } |
271 | TemporaryFile file; |
272 | runAsyncBroadcastTest(file.path); |
273 | } |
274 | #endif |
275 | |