1#include <c10/cuda/CUDAGuard.h>
2#include <c10/util/irange.h>
3
4#include <ATen/cuda/CUDAContext.h>
5#include <gtest/gtest.h>
6#include <torch/csrc/distributed/c10d/FileStore.hpp>
7#include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
8#include "CUDATest.hpp"
9#include "TestUtils.hpp"
10
11using namespace c10d::test;
12
13using at::cuda::CUDAStream;
14
15template <typename T, typename... Args>
16std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
17 std::vector<T> tests;
18 for (C10_UNUSED const auto i : c10::irange(N)) {
19 tests.push_back(std::move(T(path, std::forward<Args>(args)...)));
20 }
21
22 std::vector<std::thread> threads;
23 for (C10_UNUSED const auto i : c10::irange(N)) {
24 threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); }));
25 }
26
27 for (auto& thread : threads) {
28 thread.join();
29 }
30
31 return tests;
32}
33
34class AsyncTest {
35 public:
36 AsyncTest(std::string path) : path_(std::move(path)) {}
37
38 AsyncTest(AsyncTest&& other) {
39 path_ = std::move(other.path_);
40 pg_ = std::move(other.pg_);
41 }
42
43 ::c10d::ProcessGroupGloo& getProcessGroup() {
44 return *pg_;
45 }
46
47 void start(int rank, int size) {
48 auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
49
50 // Use tiny timeout to make this test run fast
51 auto options = ::c10d::ProcessGroupGloo::Options::create();
52 options->timeout = std::chrono::milliseconds(50);
53 options->devices.push_back(
54 ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
55
56 pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
57 new ::c10d::ProcessGroupGloo(store, rank, size, options));
58 }
59
60 protected:
61 std::string path_;
62 std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
63};
64
65class AsyncInputIsOutputTest : public AsyncTest {
66 public:
67 AsyncInputIsOutputTest(const std::string& path, int numTensors)
68 : AsyncTest(path),
69 numTensors_(numTensors),
70 numDevices_(cudaNumDevices()) {
71 // Allocate inputs on available devices in a round robin fashion.
72 ::at::globalContext().lazyInitCUDA();
73 inputs_.resize(numTensors_);
74 for (const auto i : c10::irange(numTensors_)) {
75 inputs_[i] = at::empty(
76 {16, 16},
77 at::device(
78 {at::kCUDA, static_cast<c10::DeviceIndex>(i % numDevices_)}));
79 }
80
81 // Allocate a stream per device.
82 //
83 // The "current stream" is set globally per device in THC, so we
84 // can't make two tensors on the same device use different streams
85 // and pass this along to the collective (since it uses the THC
86 // getters to retrieve the current stream).
87 //
88 at::cuda::OptionalCUDAGuard deviceGuard;
89 streams_.reserve(numDevices_);
90 for (const auto i : c10::irange(numDevices_)) {
91 deviceGuard.set_index(i);
92 streams_.push_back(at::cuda::getStreamFromPool());
93 }
94 }
95
96 void wait(c10::intrusive_ptr<c10d::Work>& work) {
97 c10::cuda::CUDAMultiStreamGuard guard(streams_);
98 work->wait();
99 }
100
101 std::vector<at::Tensor> getCpuTensors(
102 const std::vector<at::Tensor>& gpu_tensors) {
103 std::vector<at::Tensor> outputs(gpu_tensors.size());
104
105 // For the duration of this function, make THC use our streams
106 c10::cuda::CUDAMultiStreamGuard guard(streams_);
107
108 // Copy inputs to outputs
109 for (unsigned i = 0; i < gpu_tensors.size(); i++) {
110 outputs[i] = gpu_tensors[i].cpu();
111 }
112
113 return outputs;
114 }
115
116 std::vector<at::Tensor> getTensors() {
117 return getCpuTensors(inputs_);
118 }
119
120 protected:
121 const int numTensors_;
122 const int numDevices_;
123 std::vector<at::Tensor> inputs_;
124 std::vector<CUDAStream> streams_;
125};
126
127class AsyncAllreduceTest : public AsyncInputIsOutputTest {
128 public:
129 AsyncAllreduceTest(const std::string& path, int numTensors)
130 : AsyncInputIsOutputTest(path, numTensors) {}
131
132 c10::intrusive_ptr<c10d::Work> run() {
133 // For the duration of this function, make THC use our streams
134 c10::cuda::CUDAMultiStreamGuard guard(streams_);
135
136 // Launch sleep on every stream
137 at::cuda::OptionalCUDAGuard deviceGuard;
138 for (const auto i : c10::irange(numDevices_)) {
139 deviceGuard.set_index(i);
140 cudaSleep(streams_[i], 10 * 1000 * 1000);
141 }
142
143 // Launch value initialization for every tensor
144 for (const auto i : c10::irange(numTensors_)) {
145 deviceGuard.set_index(i % numDevices_);
146 inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
147 }
148
149 return pg_->allreduce(inputs_);
150 }
151};
152
153class AsyncBroadcastTest : public AsyncInputIsOutputTest {
154 public:
155 AsyncBroadcastTest(const std::string& path, int numTensors)
156 : AsyncInputIsOutputTest(path, numTensors) {}
157
158 c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
159 // For the duration of this function, make THC use our streams
160 c10::cuda::CUDAMultiStreamGuard guard(streams_);
161
162 // Launch sleep on every stream
163 at::cuda::OptionalCUDAGuard deviceGuard;
164 for (const auto i : c10::irange(numDevices_)) {
165 deviceGuard.set_index(i);
166 cudaSleep(streams_[i], 10 * 1000 * 1000);
167 }
168
169 // Launch value initialization for every tensor
170 for (const auto i : c10::irange(numTensors_)) {
171 deviceGuard.set_index(i % numDevices_);
172 inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
173 }
174
175 ::c10d::BroadcastOptions options;
176 options.rootRank = rootRank;
177 options.rootTensor = rootTensor;
178 return pg_->broadcast(inputs_, options);
179 }
180};
181
182void runAsyncAllreduceTest(
183 const std::string& path,
184 size_t numProcesses = 4,
185 size_t numTensors = 2) {
186 auto tests = initialize<AsyncAllreduceTest>(path, numProcesses, numTensors);
187 std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
188 for (const auto i : c10::irange(numProcesses)) {
189 work[i] = tests[i].run();
190 }
191
192 // Wait for work to complete
193 for (const auto i : c10::irange(numProcesses)) {
194 tests[i].wait(work[i]);
195 }
196
197 // Check results
198 for (const auto i : c10::irange(numProcesses)) {
199 const auto size = numProcesses * numTensors;
200 const auto expected = (size * (size - 1)) / 2;
201 auto tensors = tests[i].getTensors();
202 auto results = tests[i].getCpuTensors(work[i]->result());
203 EXPECT_EQ(tensors.size(), results.size());
204
205 for (const auto j : c10::irange(tensors.size())) {
206 auto& tensor = tensors[j];
207 auto data = tensor.data_ptr<float>();
208
209 auto& result_tensor = results[j];
210 auto result_data = result_tensor.data_ptr<float>();
211
212 EXPECT_EQ(tensor.numel(), result_tensor.numel());
213
214 for (const auto k : c10::irange(tensor.numel())) {
215 EXPECT_EQ(data[k], expected);
216 EXPECT_EQ(result_data[k], expected);
217 }
218 }
219 }
220}
221
222void runAsyncBroadcastTest(
223 const std::string& path,
224 size_t numProcesses = 4,
225 size_t numTensors = 1) {
226 auto tests = initialize<AsyncBroadcastTest>(path, numProcesses, numTensors);
227
228 // Try every permutation of root rank and root tensor
229 for (const auto rootRank : c10::irange(numProcesses)) {
230 for (const auto rootTensor : c10::irange(numTensors)) {
231 std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
232 for (const auto i : c10::irange(numProcesses)) {
233 work[i] = tests[i].run(rootRank, rootTensor);
234 }
235
236 // Wait for work to complete
237 for (const auto i : c10::irange(numProcesses)) {
238 tests[i].wait(work[i]);
239 }
240
241 // Check results
242 const auto expected = (rootRank * numTensors + rootTensor);
243 for (const auto i : c10::irange(numProcesses)) {
244 auto tensors = tests[i].getTensors();
245 for (const auto& tensor : tensors) {
246 const auto* const data = tensor.data_ptr<float>();
247 for (const auto k : c10::irange(tensor.numel())) {
248 EXPECT_EQ(data[k], expected);
249 }
250 }
251 }
252 }
253 }
254}
255
256#ifdef USE_CUDA
257TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) {
258 if (!at::cuda::is_available()) {
259 LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce";
260 return;
261 }
262 TemporaryFile file;
263 runAsyncAllreduceTest(file.path);
264}
265
266TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) {
267 if (!at::cuda::is_available()) {
268 LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast";
269 return;
270 }
271 TemporaryFile file;
272 runAsyncBroadcastTest(file.path);
273}
274#endif
275