1 | #ifndef _WIN32 |
2 | #include <signal.h> |
3 | #include <sys/wait.h> |
4 | #include <unistd.h> |
5 | #endif |
6 | |
7 | #include <sys/types.h> |
8 | |
9 | #include <condition_variable> |
10 | #include <iostream> |
11 | #include <mutex> |
12 | #include <sstream> |
13 | #include <thread> |
14 | |
15 | #include <gtest/gtest.h> |
16 | #include <torch/csrc/autograd/profiler.h> |
17 | #include <torch/cuda.h> |
18 | |
19 | #include <c10/util/irange.h> |
20 | #include <torch/csrc/distributed/c10d/FileStore.hpp> |
21 | #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp> |
22 | #include "TestUtils.hpp" |
23 | |
24 | using namespace c10d::test; |
25 | using namespace torch::autograd::profiler; |
26 | |
27 | constexpr auto kSendDelay = std::chrono::milliseconds(100); |
28 | constexpr auto kWaitTimeout = std::chrono::milliseconds(1); |
29 | |
30 | #ifndef _WIN32 |
31 | class SignalTest { |
32 | public: |
33 | SignalTest(const std::string& path) : path_(path) {} |
34 | |
35 | ~SignalTest() { |
36 | if (arm_.joinable()) { |
37 | arm_.join(); |
38 | } |
39 | } |
40 | |
41 | // Arms test to send signal to PID when the semaphore unlocks. This |
42 | // happens as soon as the first collective completes successfully. |
43 | void arm(int pid, int signal) { |
44 | arm_ = std::thread([=] { |
45 | sem_.wait(); |
46 | kill(pid, signal); |
47 | }); |
48 | } |
49 | |
50 | c10::intrusive_ptr<::c10d::Work> run(int rank, int size) { |
51 | auto store = c10::make_intrusive<::c10d::FileStore>(path_, size); |
52 | |
53 | auto options = ::c10d::ProcessGroupGloo::Options::create(); |
54 | // Set a timeout that is small enough to make this test run fast, but also |
55 | // make sure that we don't get timeouts in the ProcessGroupGloo constructor. |
56 | options->timeout = std::chrono::milliseconds(1000); |
57 | options->devices.push_back( |
58 | ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1" )); |
59 | |
60 | ::c10d::ProcessGroupGloo pg(store, rank, size, options); |
61 | |
62 | // Initialize tensor list |
63 | std::vector<at::Tensor> tensors = { |
64 | at::ones({16, 16}), |
65 | }; |
66 | |
67 | // Loop until an exception happens |
68 | c10::intrusive_ptr<::c10d::Work> work; |
69 | while (true) { |
70 | work = pg.allreduce(tensors); |
71 | try { |
72 | work->wait(); |
73 | } catch (const std::exception& e) { |
74 | break; |
75 | } |
76 | sem_.post(); |
77 | } |
78 | |
79 | return work; |
80 | } |
81 | |
82 | protected: |
83 | std::string path_; |
84 | std::thread arm_; |
85 | Semaphore sem_; |
86 | }; |
87 | |
88 | c10::intrusive_ptr<::c10d::Work> testSignal( |
89 | const std::string& path, |
90 | int signal) { |
91 | Fork fork; |
92 | if (fork.isChild()) { |
93 | SignalTest test(path); |
94 | test.run(1, 2); |
95 | exit(1); |
96 | } |
97 | |
98 | SignalTest test(path); |
99 | test.arm(fork.pid, signal); |
100 | return test.run(0, 2); |
101 | } |
102 | #endif |
103 | |
104 | class ProcessGroupGlooDelayed : public ::c10d::ProcessGroupGloo { |
105 | public: |
106 | ProcessGroupGlooDelayed( |
107 | const c10::intrusive_ptr<::c10d::Store>& store, |
108 | int rank, |
109 | int size, |
110 | c10::intrusive_ptr<Options> options) |
111 | : ProcessGroupGloo(store, rank, size, options) {} |
112 | |
113 | c10::intrusive_ptr<::c10d::Work> send( |
114 | std::vector<at::Tensor>& tensors, |
115 | int dstRank, |
116 | int tag) override { |
117 | std::this_thread::sleep_for(kSendDelay); |
118 | return ::c10d::ProcessGroupGloo::send(tensors, dstRank, tag); |
119 | } |
120 | }; |
121 | |
122 | class CollectiveTest { |
123 | public: |
124 | static std::vector<CollectiveTest> initialize( |
125 | const std::string& path, |
126 | int num, |
127 | bool delayed = false) { |
128 | std::vector<CollectiveTest> tests; |
129 | for (C10_UNUSED const auto i : c10::irange(num)) { |
130 | tests.emplace_back(CollectiveTest(path)); |
131 | } |
132 | |
133 | std::vector<std::thread> threads; |
134 | for (const auto i : c10::irange(num)) { |
135 | threads.emplace_back(std::thread( |
136 | [i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); })); |
137 | } |
138 | for (auto& thread : threads) { |
139 | thread.join(); |
140 | } |
141 | |
142 | return tests; |
143 | } |
144 | |
145 | CollectiveTest(std::string path) : path_(std::move(path)) {} |
146 | |
147 | CollectiveTest(CollectiveTest&& other) { |
148 | path_ = std::move(other.path_); |
149 | pg_ = std::move(other.pg_); |
150 | } |
151 | |
152 | ::c10d::ProcessGroupGloo& getProcessGroup() { |
153 | return *pg_; |
154 | } |
155 | |
156 | void start(int rank, int size, bool delayed) { |
157 | auto store = c10::make_intrusive<::c10d::FileStore>(path_, size); |
158 | |
159 | // Set a timeout that is small enough to make this test run fast, but also |
160 | // make sure that we don't get timeouts in the ProcessGroupGloo constructor. |
161 | auto options = ::c10d::ProcessGroupGloo::Options::create(); |
162 | options->timeout = std::chrono::milliseconds(1000); |
163 | options->devices.push_back( |
164 | ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1" )); |
165 | |
166 | if (!delayed) { |
167 | pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( |
168 | new ::c10d::ProcessGroupGloo(store, rank, size, options)); |
169 | } else { |
170 | pg_ = std::unique_ptr<ProcessGroupGlooDelayed>( |
171 | new ProcessGroupGlooDelayed(store, rank, size, options)); |
172 | } |
173 | } |
174 | |
175 | protected: |
176 | std::string path_; |
177 | std::unique_ptr<::c10d::ProcessGroupGloo> pg_; |
178 | }; |
179 | |
180 | std::vector<std::vector<at::Tensor>> copyTensors( |
181 | const std::vector<std::vector<at::Tensor>>& inputs) { |
182 | std::vector<std::vector<at::Tensor>> outputs(inputs.size()); |
183 | for (const auto i : c10::irange(inputs.size())) { |
184 | const auto& input = inputs[i]; |
185 | std::vector<at::Tensor> output(input.size()); |
186 | for (const auto j : c10::irange(input.size())) { |
187 | output[j] = input[j].cpu(); |
188 | } |
189 | outputs[i] = output; |
190 | } |
191 | return outputs; |
192 | } |
193 | |
194 | std::vector<std::vector<at::Tensor>> waitWork( |
195 | std::vector<c10::intrusive_ptr<c10d::Work>> works) { |
196 | std::vector<std::vector<at::Tensor>> outputTensors; |
197 | for (auto& work : works) { |
198 | try { |
199 | work->wait(); |
200 | } catch (const std::exception& ex) { |
201 | LOG(ERROR) << "Exception received: " << ex.what() << std::endl; |
202 | } |
203 | outputTensors.emplace_back(work->result()); |
204 | } |
205 | return copyTensors(outputTensors); |
206 | } |
207 | |
208 | std::vector<std::vector<at::Tensor>> waitFuture( |
209 | std::vector<c10::intrusive_ptr<c10d::Work>> works) { |
210 | std::vector<std::vector<at::Tensor>> outputTensors; |
211 | for (auto& work : works) { |
212 | auto fut = work->getFuture(); |
213 | try { |
214 | fut->wait(); |
215 | } catch (const std::exception& ex) { |
216 | LOG(ERROR) << "Exception received: " << ex.what() << std::endl; |
217 | } |
218 | auto result = fut->value(); |
219 | if (result.isNone()) { |
220 | outputTensors.emplace_back(); |
221 | } else if (result.isTensorList()) { |
222 | outputTensors.emplace_back(result.toTensorVector()); |
223 | } else { |
224 | TORCH_CHECK(false, "future result should be tensor list or none" ); |
225 | } |
226 | } |
227 | return copyTensors(outputTensors); |
228 | } |
229 | |
230 | void checkProfiledEvents( |
231 | const thread_event_lists& event_lists, |
232 | const char* expected_profile_str, |
233 | int expected_count, |
234 | std::vector<std::vector<int64_t>> expected_shapes, |
235 | bool verify_shapes = true) { |
236 | if (verify_shapes) { |
237 | EXPECT_EQ(expected_count, expected_shapes.size()); |
238 | } |
239 | std::vector<bool> matched_shapes(expected_count); |
240 | for (const auto& li : event_lists) { |
241 | for (const auto& evt : li) { |
242 | auto match = !strcmp(evt.name(), expected_profile_str); |
243 | if (verify_shapes && match) { |
244 | auto shapesVec = evt.shapes(); |
245 | for (const auto i : c10::irange(expected_count)) { |
246 | // Assumptions: no two expected shapes are the same |
247 | if (shapesVec[0] == expected_shapes[i]) { |
248 | matched_shapes[i] = true; |
249 | } |
250 | } |
251 | } |
252 | } |
253 | } |
254 | if (verify_shapes) { |
255 | for (bool match : matched_shapes) { |
256 | EXPECT_TRUE(match); |
257 | } |
258 | } |
259 | } |
260 | |
261 | void testAllreduce(const std::string& path, const at::DeviceType b) { |
262 | const auto size = 4; |
263 | auto tests = CollectiveTest::initialize(path, size); |
264 | |
265 | // Generate inputs |
266 | std::vector<std::vector<at::Tensor>> inputs(size); |
267 | std::vector<std::vector<int64_t>> allShapes; |
268 | std::vector<int64_t> shapes = {16, 16}; |
269 | for (const auto i : c10::irange(size)) { |
270 | auto tensor = at::ones(shapes, b) * i; |
271 | std::vector<int64_t> shapesVec = shapes; |
272 | allShapes.emplace_back(std::move(shapesVec)); |
273 | inputs[i] = std::vector<at::Tensor>({tensor}); |
274 | } |
275 | |
276 | // Kick off work |
277 | std::vector<c10::intrusive_ptr<::c10d::Work>> work(size); |
278 | const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce" ; |
279 | enableProfilerLegacy(ProfilerConfig( |
280 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
281 | for (const auto i : c10::irange(size)) { |
282 | work[i] = tests[i].getProcessGroup().allreduce(inputs[i]); |
283 | } |
284 | // Wait for work to complete |
285 | auto outputs = waitFuture(work); |
286 | |
287 | auto event_lists = disableProfilerLegacy(); |
288 | checkProfiledEvents( |
289 | std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes); |
290 | |
291 | // Verify outputs |
292 | const auto expected = (size * (size - 1)) / 2; |
293 | for (const auto i : c10::irange(size)) { |
294 | auto& tensor = outputs[i][0]; |
295 | auto data = tensor.data_ptr<float>(); |
296 | for (const auto j : c10::irange(tensor.numel())) { |
297 | EXPECT_EQ(data[j], expected); |
298 | } |
299 | } |
300 | } |
301 | |
302 | // UsingWorkAPI tests are to make sure we still properly support work API. |
303 | // This should go away as we deprecate it. |
304 | void testAllreduceUsingWorkAPI( |
305 | const std::string& path, |
306 | const at::DeviceType b) { |
307 | const auto size = 4; |
308 | auto tests = CollectiveTest::initialize(path, size); |
309 | |
310 | // Generate inputs |
311 | std::vector<std::vector<at::Tensor>> inputs(size); |
312 | std::vector<std::vector<int64_t>> allShapes; |
313 | std::vector<int64_t> shapes = {16, 16}; |
314 | for (const auto i : c10::irange(size)) { |
315 | auto tensor = at::ones(shapes, b) * i; |
316 | std::vector<int64_t> shapesVec = shapes; |
317 | allShapes.emplace_back(std::move(shapesVec)); |
318 | inputs[i] = std::vector<at::Tensor>({tensor}); |
319 | } |
320 | |
321 | // Kick off work |
322 | std::vector<c10::intrusive_ptr<::c10d::Work>> work(size); |
323 | const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce" ; |
324 | enableProfilerLegacy(ProfilerConfig( |
325 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
326 | for (const auto i : c10::irange(size)) { |
327 | work[i] = tests[i].getProcessGroup().allreduce(inputs[i]); |
328 | } |
329 | // Wait for work to complete |
330 | auto outputs = waitWork(work); |
331 | |
332 | auto event_lists = disableProfilerLegacy(); |
333 | checkProfiledEvents( |
334 | std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes); |
335 | |
336 | // Verify outputs |
337 | const auto expected = (size * (size - 1)) / 2; |
338 | for (const auto i : c10::irange(size)) { |
339 | auto& tensor = outputs[i][0]; |
340 | auto data = tensor.data_ptr<float>(); |
341 | for (const auto j : c10::irange(tensor.numel())) { |
342 | EXPECT_EQ(data[j], expected); |
343 | } |
344 | } |
345 | } |
346 | |
347 | void testBroadcast(const std::string& path, const at::DeviceType b) { |
348 | const auto size = 2; |
349 | const auto stride = 2; |
350 | auto tests = CollectiveTest::initialize(path, size); |
351 | |
352 | std::vector<std::vector<at::Tensor>> inputs(size); |
353 | std::vector<int64_t> shapes = {16, 16}; |
354 | // Try every permutation of root rank and root tensor |
355 | for (const auto i : c10::irange(size)) { |
356 | for (const auto j : c10::irange(stride)) { |
357 | std::vector<std::vector<int64_t>> allShapes; |
358 | // Initialize inputs |
359 | for (const auto k : c10::irange(size)) { |
360 | std::vector<int64_t> shapesVec = shapes; |
361 | allShapes.emplace_back(std::move(shapesVec)); |
362 | inputs[k].resize(stride); |
363 | // This won't work if we ever support sparse CUDA |
364 | at::OptionalDeviceGuard deviceGuard; |
365 | for (const auto l : c10::irange(stride)) { |
366 | if (b == at::DeviceType::CUDA) { |
367 | deviceGuard.reset_device(at::Device(at::kCUDA, l)); |
368 | } |
369 | inputs[k][l] = at::ones(shapes, b) * (k * stride + l); |
370 | } |
371 | } |
372 | |
373 | ::c10d::BroadcastOptions options; |
374 | options.rootRank = i; |
375 | options.rootTensor = j; |
376 | |
377 | // Kick off work |
378 | const char* GLOO_BROADCAST_STR = "gloo:broadcast" ; |
379 | enableProfilerLegacy(ProfilerConfig( |
380 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
381 | std::vector<c10::intrusive_ptr<::c10d::Work>> work(size); |
382 | |
383 | for (const auto i : c10::irange(size)) { |
384 | work[i] = tests[i].getProcessGroup().broadcast(inputs[i], options); |
385 | } |
386 | |
387 | // Wait for work to complete |
388 | auto outputs = waitFuture(work); |
389 | |
390 | auto event_lists = disableProfilerLegacy(); |
391 | checkProfiledEvents( |
392 | std::move(event_lists), GLOO_BROADCAST_STR, size, allShapes); |
393 | |
394 | // Verify outputs |
395 | const auto expected = (i * stride + j); |
396 | for (const auto k : c10::irange(size)) { |
397 | for (const auto l : c10::irange(stride)) { |
398 | auto& tensor = outputs[k][l]; |
399 | auto data = tensor.data_ptr<float>(); |
400 | for (const auto n : c10::irange(tensor.numel())) { |
401 | EXPECT_EQ(data[n], expected); |
402 | } |
403 | } |
404 | } |
405 | } |
406 | } |
407 | } |
408 | |
409 | void testAlltoall(const std::string& path, const at::DeviceType b) { |
410 | const auto size = 4; |
411 | auto tests = CollectiveTest::initialize(path, size); |
412 | |
413 | // Generate inputs |
414 | std::vector<at::Tensor> inputs(size); |
415 | std::vector<std::vector<int32_t>> blobs = { |
416 | {0, 1, 2, 3, 4, 5}, |
417 | {10, 11, 12, 13, 14, 15, 16, 17, 18}, |
418 | {20, 21, 22, 23, 24}, |
419 | {30, 31, 32, 33, 34, 35, 36}, |
420 | }; |
421 | for (const auto rank : c10::irange(size)) { |
422 | const std::vector<int32_t>& blob = blobs[rank]; |
423 | inputs[rank] = at::from_blob((int32_t*)(blob.data()), blob.size()).to(b); |
424 | } |
425 | |
426 | // Allocate outputs |
427 | std::vector<at::Tensor> outputs(size); |
428 | std::vector<int> outputLengths = {9, 7, 6, 5}; |
429 | for (const auto rank : c10::irange(size)) { |
430 | outputs[rank] = |
431 | at::empty(outputLengths[rank], c10::TensorOptions(at::kInt).device(b)); |
432 | } |
433 | |
434 | // Generate splits |
435 | std::vector<std::vector<int64_t>> inputSplits = { |
436 | {2, 2, 1, 1}, |
437 | {3, 2, 2, 2}, |
438 | {2, 1, 1, 1}, |
439 | {2, 2, 2, 1}, |
440 | }; |
441 | std::vector<std::vector<int64_t>> outputSplits = { |
442 | {2, 3, 2, 2}, |
443 | {2, 2, 1, 2}, |
444 | {1, 2, 1, 2}, |
445 | {1, 2, 1, 1}, |
446 | }; |
447 | |
448 | // Kick off work |
449 | std::vector<c10::intrusive_ptr<::c10d::Work>> work(size); |
450 | const char* GLOO_A2A_STR = "gloo:all_to_all" ; |
451 | std::vector<std::vector<int64_t>> allShapes; |
452 | for (const auto& vec : inputSplits) { |
453 | // Due to concatenation of tensors, shape will actually be the sum |
454 | int64_t sum = 0; |
455 | for (const auto& s : vec) { |
456 | sum += s; |
457 | } |
458 | allShapes.push_back({sum}); |
459 | } |
460 | enableProfilerLegacy(ProfilerConfig( |
461 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
462 | for (const auto rank : c10::irange(size)) { |
463 | work[rank] = tests[rank].getProcessGroup().alltoall_base( |
464 | outputs[rank], inputs[rank], outputSplits[rank], inputSplits[rank]); |
465 | } |
466 | |
467 | // Wait for work to complete |
468 | for (const auto i : c10::irange(size)) { |
469 | work[i]->wait(); |
470 | } |
471 | |
472 | auto event_lists = disableProfilerLegacy(); |
473 | checkProfiledEvents(std::move(event_lists), GLOO_A2A_STR, size, allShapes); |
474 | // Verify outputs |
475 | std::vector<std::vector<int32_t>> expected = { |
476 | {0, 1, 10, 11, 12, 20, 21, 30, 31}, |
477 | {2, 3, 13, 14, 22, 32, 33}, |
478 | {4, 15, 16, 23, 34, 35}, |
479 | {5, 17, 18, 24, 36}, |
480 | }; |
481 | for (const auto rank : c10::irange(size)) { |
482 | at::Tensor tensor = outputs[rank].cpu(); |
483 | EXPECT_EQ(tensor.numel(), expected[rank].size()); |
484 | auto data = tensor.data_ptr<int32_t>(); |
485 | for (const auto j : c10::irange(tensor.numel())) { |
486 | EXPECT_EQ(data[j], expected[rank][j]); |
487 | } |
488 | } |
489 | } |
490 | |
491 | void testBarrier(const std::string& path) { |
492 | const auto size = 2; |
493 | auto tests = CollectiveTest::initialize(path, size); |
494 | |
495 | // Kick off work |
496 | enableProfilerLegacy(ProfilerConfig( |
497 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
498 | std::vector<c10::intrusive_ptr<::c10d::Work>> work(size); |
499 | for (const auto i : c10::irange(size)) { |
500 | work[i] = tests[i].getProcessGroup().barrier(); |
501 | } |
502 | |
503 | // Wait for work to complete |
504 | waitFuture(work); |
505 | |
506 | auto event_lists = disableProfilerLegacy(); |
507 | const char* GLOO_STR = "gloo:barrier" ; |
508 | std::vector<std::vector<int64_t>> allShapes; |
509 | // Barrier does not use tensors, so skip shape checking. |
510 | checkProfiledEvents( |
511 | std::move(event_lists), |
512 | GLOO_STR, |
513 | size, |
514 | allShapes, |
515 | /* verify_shapes */ false); |
516 | } |
517 | |
518 | void testMonitoredBarrier(const std::string& path) { |
519 | const auto size = 2; |
520 | auto tests = CollectiveTest::initialize(path, size); |
521 | // Non-failure case: all ranks pass the blocking monitored barrier. |
522 | auto runMonitoredBarrier = [&](int i) { |
523 | tests[i].getProcessGroup().monitoredBarrier(); |
524 | }; |
525 | std::vector<std::thread> threads; |
526 | threads.reserve(size); |
527 | for (const auto r : c10::irange(size)) { |
528 | threads.emplace_back(std::thread([=]() { runMonitoredBarrier(r); })); |
529 | } |
530 | for (auto& t : threads) { |
531 | t.join(); |
532 | } |
533 | // Failure case: Only rank 0 calls into monitored barrier, should result in |
534 | // error |
535 | auto runMonitoredBarrierWithException = [&](int i) { |
536 | if (i != 0) { |
537 | return; |
538 | } |
539 | |
540 | try { |
541 | tests[i].getProcessGroup().monitoredBarrier(); |
542 | FAIL() << "Exception should have been thrown." ; |
543 | } catch (const std::exception& e) { |
544 | auto pos = std::string(e.what()).find("Rank 1" ); |
545 | EXPECT_TRUE(pos != std::string::npos); |
546 | } |
547 | }; |
548 | threads.clear(); |
549 | for (const auto r : c10::irange(size)) { |
550 | threads.emplace_back( |
551 | std::thread([=]() { runMonitoredBarrierWithException(r); })); |
552 | } |
553 | for (auto& t : threads) { |
554 | t.join(); |
555 | } |
556 | } |
557 | |
558 | void testSequenceNumInit(const std::string& path) { |
559 | const auto size = 4; |
560 | auto tests = CollectiveTest::initialize(path, size); |
561 | for (const auto i : c10::irange(size)) { |
562 | tests[i].getProcessGroup().setSequenceNumberForGroup(); |
563 | } |
564 | |
565 | std::unordered_set<uint64_t> nums; |
566 | for (const auto i : c10::irange(size)) { |
567 | auto seqNum = tests[i].getProcessGroup().getSequenceNumberForGroup(); |
568 | nums.insert(seqNum); |
569 | } |
570 | EXPECT_EQ(nums.size(), 1); |
571 | } |
572 | |
573 | void testWaitDelay(const std::string& path) { |
574 | const auto size = 2; |
575 | auto tests = CollectiveTest::initialize(path, size, /* delay */ true); |
576 | |
577 | constexpr uint64_t tag = 0x1337; |
578 | // test that waiting for work to be sent can be aborted successfully. |
579 | auto selfRank = 0; |
580 | auto dstRank = 1; |
581 | std::vector<at::Tensor> tensors = { |
582 | at::ones({16, 16}), |
583 | }; |
584 | auto& pg = tests[selfRank].getProcessGroup(); |
585 | auto sendWork = pg.send(tensors, dstRank, tag); |
586 | EXPECT_THROW(sendWork->wait(kWaitTimeout), std::exception); |
587 | } |
588 | |
589 | void testSend(const std::string& path) { |
590 | const auto size = 2; |
591 | auto tests = CollectiveTest::initialize(path, size); |
592 | |
593 | constexpr uint64_t tag = 0x1337; |
594 | // test that waiting for work to be sent can be aborted successfully. |
595 | auto selfRank = 0; |
596 | auto dstRank = 1; |
597 | std::vector<int64_t> shapes{16, 16}; |
598 | std::vector<std::vector<int64_t>> allShapes; |
599 | allShapes.push_back(shapes); |
600 | std::vector<at::Tensor> tensors = { |
601 | at::ones(shapes), |
602 | }; |
603 | auto& pg = tests[selfRank].getProcessGroup(); |
604 | const char* GLOO_SEND_STR = "gloo:send" ; |
605 | enableProfilerLegacy(ProfilerConfig( |
606 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
607 | auto sendWork = pg.send(tensors, dstRank, tag); |
608 | bool sendCompleted; |
609 | std::thread waitSendThreadAbort([&]() { sendCompleted = sendWork->wait(); }); |
610 | sendWork->abort(); |
611 | // Block until the sendWork gets successfully aborted |
612 | waitSendThreadAbort.join(); |
613 | EXPECT_FALSE(sendCompleted); |
614 | auto event_lists = disableProfilerLegacy(); |
615 | checkProfiledEvents(std::move(event_lists), GLOO_SEND_STR, 1, allShapes); |
616 | |
617 | // Now create a separate sender thread to ensure that future waitsends can |
618 | // complete successfully. |
619 | |
620 | // Helper receiver to simulate a real recv/send pair |
621 | std::thread recvThread([&]() { |
622 | auto selfRank = 1; |
623 | auto srcRank = 0; |
624 | auto& pg = tests[selfRank].getProcessGroup(); |
625 | std::vector<at::Tensor> tensors = { |
626 | at::ones({16, 16}), |
627 | }; |
628 | |
629 | auto recvWork = pg.recv(tensors, srcRank, tag); |
630 | recvWork->wait(); |
631 | }); |
632 | |
633 | // Sender thread |
634 | std::thread sendThread([&]() { sendCompleted = sendWork->wait(); }); |
635 | sendThread.join(); |
636 | recvThread.join(); |
637 | EXPECT_TRUE(sendCompleted); |
638 | } |
639 | |
640 | void testRecv(const std::string& path) { |
641 | const auto size = 2; |
642 | auto tests = CollectiveTest::initialize(path, size); |
643 | constexpr uint64_t tag = 0x1337; |
644 | // test that waiting for work to be received can be aborted successfully. |
645 | auto selfRank = 0; |
646 | auto srcRank = 1; |
647 | std::vector<int64_t> shapes = {16, 16}; |
648 | std::vector<std::vector<int64_t>> allShapes; |
649 | allShapes.push_back(shapes); |
650 | std::vector<at::Tensor> tensors = { |
651 | at::ones(shapes), |
652 | }; |
653 | const char* GLOO_RECV_STR = "gloo:recv" ; |
654 | auto& pg = tests[selfRank].getProcessGroup(); |
655 | enableProfilerLegacy(ProfilerConfig( |
656 | ProfilerState::CPU, /* report_input_shapes */ true, false)); |
657 | auto recvWork = pg.recv(tensors, srcRank, tag); |
658 | bool recvCompleted; |
659 | std::thread waitRecvThreadAbort([&]() { recvCompleted = recvWork->wait(); }); |
660 | recvWork->abort(); |
661 | // Block until the first recv gets successfully aborted |
662 | waitRecvThreadAbort.join(); |
663 | EXPECT_FALSE(recvCompleted); |
664 | auto event_lists = disableProfilerLegacy(); |
665 | checkProfiledEvents(std::move(event_lists), GLOO_RECV_STR, 1, allShapes); |
666 | |
667 | // Now create a separate receiver thread to ensure that future waits can |
668 | // complete successfully. |
669 | |
670 | // Helper sender thread to simulate a real recv/send pair. |
671 | std::thread senderThread([&]() { |
672 | auto selfRank = 1; |
673 | auto destRank = 0; |
674 | |
675 | auto& pg = tests[selfRank].getProcessGroup(); |
676 | |
677 | std::vector<at::Tensor> tensors = { |
678 | at::ones({16, 16}), |
679 | }; |
680 | auto sendWork = pg.send(tensors, destRank, tag); |
681 | sendWork->wait(); |
682 | }); |
683 | // Receiver thread. |
684 | std::thread receiverThread([&]() { recvCompleted = recvWork->wait(); }); |
685 | senderThread.join(); |
686 | receiverThread.join(); |
687 | EXPECT_TRUE(recvCompleted); |
688 | } |
689 | |
690 | void testStoreSetGet(const std::string& path) { |
691 | const auto size = 2; |
692 | auto tests = CollectiveTest::initialize(path, size); |
693 | // test that get() gets the same value as the one that was set() |
694 | std::vector<uint8_t> testVector = {1, 1, 1, 1}; |
695 | // Cast to ProcessGroupGloo::GlooStore to test specific GlooStore APIs. |
696 | auto rank_0_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>( |
697 | tests[0].getProcessGroup()._getStore().get()); |
698 | auto rank_1_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>( |
699 | tests[1].getProcessGroup()._getStore().get()); |
700 | |
701 | rank_0_glooStore->setUint("testKey" , testVector); |
702 | auto value = rank_1_glooStore->getUint("testKey" ); |
703 | EXPECT_TRUE(value == testVector); |
704 | } |
705 | |
706 | #ifndef _WIN32 |
707 | TEST(ProcessGroupGlooTest, testSIGSTOPException) { |
708 | // test SIGSTOP |
709 | // Fork() and TSAN don't play well together, so skip the test if we're testing |
710 | // with TSAN. |
711 | if (isTSANEnabled()) { |
712 | LOG(INFO) << "Skipping test since Fork() + TSAN is broken" ; |
713 | return; |
714 | } |
715 | |
716 | TemporaryFile file; |
717 | auto work = testSignal(file.path, SIGSTOP); |
718 | EXPECT_FALSE(work->isSuccess()); |
719 | EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception); |
720 | } |
721 | |
722 | TEST(ProcessGroupGlooTest, testSIGKILLException) { |
723 | // test SIGKILL |
724 | // Fork() and TSAN don't play well together, so skip the test if we're testing |
725 | // with TSAN. |
726 | if (isTSANEnabled()) { |
727 | LOG(INFO) << "Skipping test since Fork() + TSAN is broken" ; |
728 | return; |
729 | } |
730 | |
731 | TemporaryFile file; |
732 | auto work = testSignal(file.path, SIGKILL); |
733 | EXPECT_FALSE(work->isSuccess()); |
734 | EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception); |
735 | } |
736 | #endif |
737 | |
738 | TEST(ProcessGroupGlooTest, testAllReduceCPU) { |
739 | { |
740 | TemporaryFile file; |
741 | testAllreduce(file.path, at::DeviceType::CPU); |
742 | testAllreduceUsingWorkAPI(file.path, at::DeviceType::CPU); |
743 | } |
744 | } |
745 | |
746 | TEST(ProcessGroupGlooTest, testBroadcastCPU) { |
747 | { |
748 | TemporaryFile file; |
749 | testBroadcast(file.path, at::DeviceType::CPU); |
750 | } |
751 | } |
752 | |
753 | TEST(ProcessGroupGlooTest, testAllToAllCPU) { |
754 | { |
755 | TemporaryFile file; |
756 | testAlltoall(file.path, at::DeviceType::CPU); |
757 | } |
758 | } |
759 | |
760 | TEST(ProcessGroupGlooTest, testBarrier) { |
761 | { |
762 | TemporaryFile file; |
763 | testBarrier(file.path); |
764 | } |
765 | } |
766 | |
767 | TEST(ProcessGroupGlooTest, testMonitoredBarrier) { |
768 | TemporaryFile file; |
769 | testMonitoredBarrier(file.path); |
770 | } |
771 | |
772 | TEST(ProcessGroupGlooTest, testSequenceNumInit) { |
773 | TemporaryFile file; |
774 | testSequenceNumInit(file.path); |
775 | } |
776 | |
777 | TEST(ProcessGroupGlooTest, testSend) { |
778 | { |
779 | TemporaryFile file; |
780 | testSend(file.path); |
781 | } |
782 | } |
783 | |
784 | TEST(ProcessGroupGlooTest, testRecv) { |
785 | { |
786 | TemporaryFile file; |
787 | testRecv(file.path); |
788 | } |
789 | } |
790 | |
791 | TEST(ProcessGroupGlooTest, testStoreSetGet) { |
792 | TemporaryFile file; |
793 | testStoreSetGet(file.path); |
794 | } |
795 | |
796 | TEST(ProcessGroupGlooTest, testWaitDelay) { |
797 | { |
798 | TemporaryFile file; |
799 | testWaitDelay(file.path); |
800 | } |
801 | } |
802 | |
803 | #ifdef USE_CUDA |
804 | // CUDA-only tests |
805 | TEST(ProcessGroupGlooTest, testAllReduceCUDA) { |
806 | if (!torch::cuda::is_available()) { |
807 | LOG(INFO) << "Skipping test - requires CUDA" ; |
808 | return; |
809 | } |
810 | { |
811 | TemporaryFile file; |
812 | testAllreduce(file.path, at::DeviceType::CUDA); |
813 | testAllreduceUsingWorkAPI(file.path, at::DeviceType::CUDA); |
814 | } |
815 | } |
816 | |
817 | TEST(ProcessGroupGlooTest, testBroadcastCUDA) { |
818 | if (torch::cuda::device_count() <= 1) { |
819 | LOG(INFO) << "Skipping test - requires multiple CUDA devices" ; |
820 | return; |
821 | } |
822 | { |
823 | TemporaryFile file; |
824 | testBroadcast(file.path, at::DeviceType::CUDA); |
825 | } |
826 | } |
827 | |
828 | TEST(ProcessGroupGlooTest, testAlltoallCUDA) { |
829 | if (!torch::cuda::is_available()) { |
830 | LOG(INFO) << "Skipping test - requires CUDA" ; |
831 | return; |
832 | } |
833 | { |
834 | TemporaryFile file; |
835 | testAlltoall(file.path, at::DeviceType::CUDA); |
836 | } |
837 | } |
838 | |
839 | TEST(ProcessGroupGlooTest, testBackendName) { |
840 | { |
841 | TemporaryFile file; |
842 | const auto size = 2; |
843 | auto tests = CollectiveTest::initialize(file.path, size); |
844 | |
845 | for (const auto i : c10::irange(size)) { |
846 | EXPECT_EQ( |
847 | tests[i].getProcessGroup().getBackendName(), |
848 | std::string(c10d::GLOO_BACKEND_NAME)); |
849 | } |
850 | } |
851 | } |
852 | |
853 | #endif |
854 | |