1#ifndef _WIN32
2#include <signal.h>
3#include <sys/wait.h>
4#include <unistd.h>
5#endif
6
7#include <sys/types.h>
8
9#include <condition_variable>
10#include <iostream>
11#include <mutex>
12#include <sstream>
13#include <thread>
14
15#include <gtest/gtest.h>
16#include <torch/csrc/autograd/profiler.h>
17#include <torch/cuda.h>
18
19#include <c10/util/irange.h>
20#include <torch/csrc/distributed/c10d/FileStore.hpp>
21#include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
22#include "TestUtils.hpp"
23
24using namespace c10d::test;
25using namespace torch::autograd::profiler;
26
27constexpr auto kSendDelay = std::chrono::milliseconds(100);
28constexpr auto kWaitTimeout = std::chrono::milliseconds(1);
29
30#ifndef _WIN32
31class SignalTest {
32 public:
33 SignalTest(const std::string& path) : path_(path) {}
34
35 ~SignalTest() {
36 if (arm_.joinable()) {
37 arm_.join();
38 }
39 }
40
41 // Arms test to send signal to PID when the semaphore unlocks. This
42 // happens as soon as the first collective completes successfully.
43 void arm(int pid, int signal) {
44 arm_ = std::thread([=] {
45 sem_.wait();
46 kill(pid, signal);
47 });
48 }
49
50 c10::intrusive_ptr<::c10d::Work> run(int rank, int size) {
51 auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
52
53 auto options = ::c10d::ProcessGroupGloo::Options::create();
54 // Set a timeout that is small enough to make this test run fast, but also
55 // make sure that we don't get timeouts in the ProcessGroupGloo constructor.
56 options->timeout = std::chrono::milliseconds(1000);
57 options->devices.push_back(
58 ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
59
60 ::c10d::ProcessGroupGloo pg(store, rank, size, options);
61
62 // Initialize tensor list
63 std::vector<at::Tensor> tensors = {
64 at::ones({16, 16}),
65 };
66
67 // Loop until an exception happens
68 c10::intrusive_ptr<::c10d::Work> work;
69 while (true) {
70 work = pg.allreduce(tensors);
71 try {
72 work->wait();
73 } catch (const std::exception& e) {
74 break;
75 }
76 sem_.post();
77 }
78
79 return work;
80 }
81
82 protected:
83 std::string path_;
84 std::thread arm_;
85 Semaphore sem_;
86};
87
88c10::intrusive_ptr<::c10d::Work> testSignal(
89 const std::string& path,
90 int signal) {
91 Fork fork;
92 if (fork.isChild()) {
93 SignalTest test(path);
94 test.run(1, 2);
95 exit(1);
96 }
97
98 SignalTest test(path);
99 test.arm(fork.pid, signal);
100 return test.run(0, 2);
101}
102#endif
103
104class ProcessGroupGlooDelayed : public ::c10d::ProcessGroupGloo {
105 public:
106 ProcessGroupGlooDelayed(
107 const c10::intrusive_ptr<::c10d::Store>& store,
108 int rank,
109 int size,
110 c10::intrusive_ptr<Options> options)
111 : ProcessGroupGloo(store, rank, size, options) {}
112
113 c10::intrusive_ptr<::c10d::Work> send(
114 std::vector<at::Tensor>& tensors,
115 int dstRank,
116 int tag) override {
117 std::this_thread::sleep_for(kSendDelay);
118 return ::c10d::ProcessGroupGloo::send(tensors, dstRank, tag);
119 }
120};
121
122class CollectiveTest {
123 public:
124 static std::vector<CollectiveTest> initialize(
125 const std::string& path,
126 int num,
127 bool delayed = false) {
128 std::vector<CollectiveTest> tests;
129 for (C10_UNUSED const auto i : c10::irange(num)) {
130 tests.emplace_back(CollectiveTest(path));
131 }
132
133 std::vector<std::thread> threads;
134 for (const auto i : c10::irange(num)) {
135 threads.emplace_back(std::thread(
136 [i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); }));
137 }
138 for (auto& thread : threads) {
139 thread.join();
140 }
141
142 return tests;
143 }
144
145 CollectiveTest(std::string path) : path_(std::move(path)) {}
146
147 CollectiveTest(CollectiveTest&& other) {
148 path_ = std::move(other.path_);
149 pg_ = std::move(other.pg_);
150 }
151
152 ::c10d::ProcessGroupGloo& getProcessGroup() {
153 return *pg_;
154 }
155
156 void start(int rank, int size, bool delayed) {
157 auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
158
159 // Set a timeout that is small enough to make this test run fast, but also
160 // make sure that we don't get timeouts in the ProcessGroupGloo constructor.
161 auto options = ::c10d::ProcessGroupGloo::Options::create();
162 options->timeout = std::chrono::milliseconds(1000);
163 options->devices.push_back(
164 ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
165
166 if (!delayed) {
167 pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
168 new ::c10d::ProcessGroupGloo(store, rank, size, options));
169 } else {
170 pg_ = std::unique_ptr<ProcessGroupGlooDelayed>(
171 new ProcessGroupGlooDelayed(store, rank, size, options));
172 }
173 }
174
175 protected:
176 std::string path_;
177 std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
178};
179
180std::vector<std::vector<at::Tensor>> copyTensors(
181 const std::vector<std::vector<at::Tensor>>& inputs) {
182 std::vector<std::vector<at::Tensor>> outputs(inputs.size());
183 for (const auto i : c10::irange(inputs.size())) {
184 const auto& input = inputs[i];
185 std::vector<at::Tensor> output(input.size());
186 for (const auto j : c10::irange(input.size())) {
187 output[j] = input[j].cpu();
188 }
189 outputs[i] = output;
190 }
191 return outputs;
192}
193
194std::vector<std::vector<at::Tensor>> waitWork(
195 std::vector<c10::intrusive_ptr<c10d::Work>> works) {
196 std::vector<std::vector<at::Tensor>> outputTensors;
197 for (auto& work : works) {
198 try {
199 work->wait();
200 } catch (const std::exception& ex) {
201 LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
202 }
203 outputTensors.emplace_back(work->result());
204 }
205 return copyTensors(outputTensors);
206}
207
208std::vector<std::vector<at::Tensor>> waitFuture(
209 std::vector<c10::intrusive_ptr<c10d::Work>> works) {
210 std::vector<std::vector<at::Tensor>> outputTensors;
211 for (auto& work : works) {
212 auto fut = work->getFuture();
213 try {
214 fut->wait();
215 } catch (const std::exception& ex) {
216 LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
217 }
218 auto result = fut->value();
219 if (result.isNone()) {
220 outputTensors.emplace_back();
221 } else if (result.isTensorList()) {
222 outputTensors.emplace_back(result.toTensorVector());
223 } else {
224 TORCH_CHECK(false, "future result should be tensor list or none");
225 }
226 }
227 return copyTensors(outputTensors);
228}
229
230void checkProfiledEvents(
231 const thread_event_lists& event_lists,
232 const char* expected_profile_str,
233 int expected_count,
234 std::vector<std::vector<int64_t>> expected_shapes,
235 bool verify_shapes = true) {
236 if (verify_shapes) {
237 EXPECT_EQ(expected_count, expected_shapes.size());
238 }
239 std::vector<bool> matched_shapes(expected_count);
240 for (const auto& li : event_lists) {
241 for (const auto& evt : li) {
242 auto match = !strcmp(evt.name(), expected_profile_str);
243 if (verify_shapes && match) {
244 auto shapesVec = evt.shapes();
245 for (const auto i : c10::irange(expected_count)) {
246 // Assumptions: no two expected shapes are the same
247 if (shapesVec[0] == expected_shapes[i]) {
248 matched_shapes[i] = true;
249 }
250 }
251 }
252 }
253 }
254 if (verify_shapes) {
255 for (bool match : matched_shapes) {
256 EXPECT_TRUE(match);
257 }
258 }
259}
260
261void testAllreduce(const std::string& path, const at::DeviceType b) {
262 const auto size = 4;
263 auto tests = CollectiveTest::initialize(path, size);
264
265 // Generate inputs
266 std::vector<std::vector<at::Tensor>> inputs(size);
267 std::vector<std::vector<int64_t>> allShapes;
268 std::vector<int64_t> shapes = {16, 16};
269 for (const auto i : c10::irange(size)) {
270 auto tensor = at::ones(shapes, b) * i;
271 std::vector<int64_t> shapesVec = shapes;
272 allShapes.emplace_back(std::move(shapesVec));
273 inputs[i] = std::vector<at::Tensor>({tensor});
274 }
275
276 // Kick off work
277 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
278 const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce";
279 enableProfilerLegacy(ProfilerConfig(
280 ProfilerState::CPU, /* report_input_shapes */ true, false));
281 for (const auto i : c10::irange(size)) {
282 work[i] = tests[i].getProcessGroup().allreduce(inputs[i]);
283 }
284 // Wait for work to complete
285 auto outputs = waitFuture(work);
286
287 auto event_lists = disableProfilerLegacy();
288 checkProfiledEvents(
289 std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
290
291 // Verify outputs
292 const auto expected = (size * (size - 1)) / 2;
293 for (const auto i : c10::irange(size)) {
294 auto& tensor = outputs[i][0];
295 auto data = tensor.data_ptr<float>();
296 for (const auto j : c10::irange(tensor.numel())) {
297 EXPECT_EQ(data[j], expected);
298 }
299 }
300}
301
302// UsingWorkAPI tests are to make sure we still properly support work API.
303// This should go away as we deprecate it.
304void testAllreduceUsingWorkAPI(
305 const std::string& path,
306 const at::DeviceType b) {
307 const auto size = 4;
308 auto tests = CollectiveTest::initialize(path, size);
309
310 // Generate inputs
311 std::vector<std::vector<at::Tensor>> inputs(size);
312 std::vector<std::vector<int64_t>> allShapes;
313 std::vector<int64_t> shapes = {16, 16};
314 for (const auto i : c10::irange(size)) {
315 auto tensor = at::ones(shapes, b) * i;
316 std::vector<int64_t> shapesVec = shapes;
317 allShapes.emplace_back(std::move(shapesVec));
318 inputs[i] = std::vector<at::Tensor>({tensor});
319 }
320
321 // Kick off work
322 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
323 const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce";
324 enableProfilerLegacy(ProfilerConfig(
325 ProfilerState::CPU, /* report_input_shapes */ true, false));
326 for (const auto i : c10::irange(size)) {
327 work[i] = tests[i].getProcessGroup().allreduce(inputs[i]);
328 }
329 // Wait for work to complete
330 auto outputs = waitWork(work);
331
332 auto event_lists = disableProfilerLegacy();
333 checkProfiledEvents(
334 std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
335
336 // Verify outputs
337 const auto expected = (size * (size - 1)) / 2;
338 for (const auto i : c10::irange(size)) {
339 auto& tensor = outputs[i][0];
340 auto data = tensor.data_ptr<float>();
341 for (const auto j : c10::irange(tensor.numel())) {
342 EXPECT_EQ(data[j], expected);
343 }
344 }
345}
346
347void testBroadcast(const std::string& path, const at::DeviceType b) {
348 const auto size = 2;
349 const auto stride = 2;
350 auto tests = CollectiveTest::initialize(path, size);
351
352 std::vector<std::vector<at::Tensor>> inputs(size);
353 std::vector<int64_t> shapes = {16, 16};
354 // Try every permutation of root rank and root tensor
355 for (const auto i : c10::irange(size)) {
356 for (const auto j : c10::irange(stride)) {
357 std::vector<std::vector<int64_t>> allShapes;
358 // Initialize inputs
359 for (const auto k : c10::irange(size)) {
360 std::vector<int64_t> shapesVec = shapes;
361 allShapes.emplace_back(std::move(shapesVec));
362 inputs[k].resize(stride);
363 // This won't work if we ever support sparse CUDA
364 at::OptionalDeviceGuard deviceGuard;
365 for (const auto l : c10::irange(stride)) {
366 if (b == at::DeviceType::CUDA) {
367 deviceGuard.reset_device(at::Device(at::kCUDA, l));
368 }
369 inputs[k][l] = at::ones(shapes, b) * (k * stride + l);
370 }
371 }
372
373 ::c10d::BroadcastOptions options;
374 options.rootRank = i;
375 options.rootTensor = j;
376
377 // Kick off work
378 const char* GLOO_BROADCAST_STR = "gloo:broadcast";
379 enableProfilerLegacy(ProfilerConfig(
380 ProfilerState::CPU, /* report_input_shapes */ true, false));
381 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
382
383 for (const auto i : c10::irange(size)) {
384 work[i] = tests[i].getProcessGroup().broadcast(inputs[i], options);
385 }
386
387 // Wait for work to complete
388 auto outputs = waitFuture(work);
389
390 auto event_lists = disableProfilerLegacy();
391 checkProfiledEvents(
392 std::move(event_lists), GLOO_BROADCAST_STR, size, allShapes);
393
394 // Verify outputs
395 const auto expected = (i * stride + j);
396 for (const auto k : c10::irange(size)) {
397 for (const auto l : c10::irange(stride)) {
398 auto& tensor = outputs[k][l];
399 auto data = tensor.data_ptr<float>();
400 for (const auto n : c10::irange(tensor.numel())) {
401 EXPECT_EQ(data[n], expected);
402 }
403 }
404 }
405 }
406 }
407}
408
409void testAlltoall(const std::string& path, const at::DeviceType b) {
410 const auto size = 4;
411 auto tests = CollectiveTest::initialize(path, size);
412
413 // Generate inputs
414 std::vector<at::Tensor> inputs(size);
415 std::vector<std::vector<int32_t>> blobs = {
416 {0, 1, 2, 3, 4, 5},
417 {10, 11, 12, 13, 14, 15, 16, 17, 18},
418 {20, 21, 22, 23, 24},
419 {30, 31, 32, 33, 34, 35, 36},
420 };
421 for (const auto rank : c10::irange(size)) {
422 const std::vector<int32_t>& blob = blobs[rank];
423 inputs[rank] = at::from_blob((int32_t*)(blob.data()), blob.size()).to(b);
424 }
425
426 // Allocate outputs
427 std::vector<at::Tensor> outputs(size);
428 std::vector<int> outputLengths = {9, 7, 6, 5};
429 for (const auto rank : c10::irange(size)) {
430 outputs[rank] =
431 at::empty(outputLengths[rank], c10::TensorOptions(at::kInt).device(b));
432 }
433
434 // Generate splits
435 std::vector<std::vector<int64_t>> inputSplits = {
436 {2, 2, 1, 1},
437 {3, 2, 2, 2},
438 {2, 1, 1, 1},
439 {2, 2, 2, 1},
440 };
441 std::vector<std::vector<int64_t>> outputSplits = {
442 {2, 3, 2, 2},
443 {2, 2, 1, 2},
444 {1, 2, 1, 2},
445 {1, 2, 1, 1},
446 };
447
448 // Kick off work
449 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
450 const char* GLOO_A2A_STR = "gloo:all_to_all";
451 std::vector<std::vector<int64_t>> allShapes;
452 for (const auto& vec : inputSplits) {
453 // Due to concatenation of tensors, shape will actually be the sum
454 int64_t sum = 0;
455 for (const auto& s : vec) {
456 sum += s;
457 }
458 allShapes.push_back({sum});
459 }
460 enableProfilerLegacy(ProfilerConfig(
461 ProfilerState::CPU, /* report_input_shapes */ true, false));
462 for (const auto rank : c10::irange(size)) {
463 work[rank] = tests[rank].getProcessGroup().alltoall_base(
464 outputs[rank], inputs[rank], outputSplits[rank], inputSplits[rank]);
465 }
466
467 // Wait for work to complete
468 for (const auto i : c10::irange(size)) {
469 work[i]->wait();
470 }
471
472 auto event_lists = disableProfilerLegacy();
473 checkProfiledEvents(std::move(event_lists), GLOO_A2A_STR, size, allShapes);
474 // Verify outputs
475 std::vector<std::vector<int32_t>> expected = {
476 {0, 1, 10, 11, 12, 20, 21, 30, 31},
477 {2, 3, 13, 14, 22, 32, 33},
478 {4, 15, 16, 23, 34, 35},
479 {5, 17, 18, 24, 36},
480 };
481 for (const auto rank : c10::irange(size)) {
482 at::Tensor tensor = outputs[rank].cpu();
483 EXPECT_EQ(tensor.numel(), expected[rank].size());
484 auto data = tensor.data_ptr<int32_t>();
485 for (const auto j : c10::irange(tensor.numel())) {
486 EXPECT_EQ(data[j], expected[rank][j]);
487 }
488 }
489}
490
491void testBarrier(const std::string& path) {
492 const auto size = 2;
493 auto tests = CollectiveTest::initialize(path, size);
494
495 // Kick off work
496 enableProfilerLegacy(ProfilerConfig(
497 ProfilerState::CPU, /* report_input_shapes */ true, false));
498 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
499 for (const auto i : c10::irange(size)) {
500 work[i] = tests[i].getProcessGroup().barrier();
501 }
502
503 // Wait for work to complete
504 waitFuture(work);
505
506 auto event_lists = disableProfilerLegacy();
507 const char* GLOO_STR = "gloo:barrier";
508 std::vector<std::vector<int64_t>> allShapes;
509 // Barrier does not use tensors, so skip shape checking.
510 checkProfiledEvents(
511 std::move(event_lists),
512 GLOO_STR,
513 size,
514 allShapes,
515 /* verify_shapes */ false);
516}
517
518void testMonitoredBarrier(const std::string& path) {
519 const auto size = 2;
520 auto tests = CollectiveTest::initialize(path, size);
521 // Non-failure case: all ranks pass the blocking monitored barrier.
522 auto runMonitoredBarrier = [&](int i) {
523 tests[i].getProcessGroup().monitoredBarrier();
524 };
525 std::vector<std::thread> threads;
526 threads.reserve(size);
527 for (const auto r : c10::irange(size)) {
528 threads.emplace_back(std::thread([=]() { runMonitoredBarrier(r); }));
529 }
530 for (auto& t : threads) {
531 t.join();
532 }
533 // Failure case: Only rank 0 calls into monitored barrier, should result in
534 // error
535 auto runMonitoredBarrierWithException = [&](int i) {
536 if (i != 0) {
537 return;
538 }
539
540 try {
541 tests[i].getProcessGroup().monitoredBarrier();
542 FAIL() << "Exception should have been thrown.";
543 } catch (const std::exception& e) {
544 auto pos = std::string(e.what()).find("Rank 1");
545 EXPECT_TRUE(pos != std::string::npos);
546 }
547 };
548 threads.clear();
549 for (const auto r : c10::irange(size)) {
550 threads.emplace_back(
551 std::thread([=]() { runMonitoredBarrierWithException(r); }));
552 }
553 for (auto& t : threads) {
554 t.join();
555 }
556}
557
558void testSequenceNumInit(const std::string& path) {
559 const auto size = 4;
560 auto tests = CollectiveTest::initialize(path, size);
561 for (const auto i : c10::irange(size)) {
562 tests[i].getProcessGroup().setSequenceNumberForGroup();
563 }
564
565 std::unordered_set<uint64_t> nums;
566 for (const auto i : c10::irange(size)) {
567 auto seqNum = tests[i].getProcessGroup().getSequenceNumberForGroup();
568 nums.insert(seqNum);
569 }
570 EXPECT_EQ(nums.size(), 1);
571}
572
573void testWaitDelay(const std::string& path) {
574 const auto size = 2;
575 auto tests = CollectiveTest::initialize(path, size, /* delay */ true);
576
577 constexpr uint64_t tag = 0x1337;
578 // test that waiting for work to be sent can be aborted successfully.
579 auto selfRank = 0;
580 auto dstRank = 1;
581 std::vector<at::Tensor> tensors = {
582 at::ones({16, 16}),
583 };
584 auto& pg = tests[selfRank].getProcessGroup();
585 auto sendWork = pg.send(tensors, dstRank, tag);
586 EXPECT_THROW(sendWork->wait(kWaitTimeout), std::exception);
587}
588
589void testSend(const std::string& path) {
590 const auto size = 2;
591 auto tests = CollectiveTest::initialize(path, size);
592
593 constexpr uint64_t tag = 0x1337;
594 // test that waiting for work to be sent can be aborted successfully.
595 auto selfRank = 0;
596 auto dstRank = 1;
597 std::vector<int64_t> shapes{16, 16};
598 std::vector<std::vector<int64_t>> allShapes;
599 allShapes.push_back(shapes);
600 std::vector<at::Tensor> tensors = {
601 at::ones(shapes),
602 };
603 auto& pg = tests[selfRank].getProcessGroup();
604 const char* GLOO_SEND_STR = "gloo:send";
605 enableProfilerLegacy(ProfilerConfig(
606 ProfilerState::CPU, /* report_input_shapes */ true, false));
607 auto sendWork = pg.send(tensors, dstRank, tag);
608 bool sendCompleted;
609 std::thread waitSendThreadAbort([&]() { sendCompleted = sendWork->wait(); });
610 sendWork->abort();
611 // Block until the sendWork gets successfully aborted
612 waitSendThreadAbort.join();
613 EXPECT_FALSE(sendCompleted);
614 auto event_lists = disableProfilerLegacy();
615 checkProfiledEvents(std::move(event_lists), GLOO_SEND_STR, 1, allShapes);
616
617 // Now create a separate sender thread to ensure that future waitsends can
618 // complete successfully.
619
620 // Helper receiver to simulate a real recv/send pair
621 std::thread recvThread([&]() {
622 auto selfRank = 1;
623 auto srcRank = 0;
624 auto& pg = tests[selfRank].getProcessGroup();
625 std::vector<at::Tensor> tensors = {
626 at::ones({16, 16}),
627 };
628
629 auto recvWork = pg.recv(tensors, srcRank, tag);
630 recvWork->wait();
631 });
632
633 // Sender thread
634 std::thread sendThread([&]() { sendCompleted = sendWork->wait(); });
635 sendThread.join();
636 recvThread.join();
637 EXPECT_TRUE(sendCompleted);
638}
639
640void testRecv(const std::string& path) {
641 const auto size = 2;
642 auto tests = CollectiveTest::initialize(path, size);
643 constexpr uint64_t tag = 0x1337;
644 // test that waiting for work to be received can be aborted successfully.
645 auto selfRank = 0;
646 auto srcRank = 1;
647 std::vector<int64_t> shapes = {16, 16};
648 std::vector<std::vector<int64_t>> allShapes;
649 allShapes.push_back(shapes);
650 std::vector<at::Tensor> tensors = {
651 at::ones(shapes),
652 };
653 const char* GLOO_RECV_STR = "gloo:recv";
654 auto& pg = tests[selfRank].getProcessGroup();
655 enableProfilerLegacy(ProfilerConfig(
656 ProfilerState::CPU, /* report_input_shapes */ true, false));
657 auto recvWork = pg.recv(tensors, srcRank, tag);
658 bool recvCompleted;
659 std::thread waitRecvThreadAbort([&]() { recvCompleted = recvWork->wait(); });
660 recvWork->abort();
661 // Block until the first recv gets successfully aborted
662 waitRecvThreadAbort.join();
663 EXPECT_FALSE(recvCompleted);
664 auto event_lists = disableProfilerLegacy();
665 checkProfiledEvents(std::move(event_lists), GLOO_RECV_STR, 1, allShapes);
666
667 // Now create a separate receiver thread to ensure that future waits can
668 // complete successfully.
669
670 // Helper sender thread to simulate a real recv/send pair.
671 std::thread senderThread([&]() {
672 auto selfRank = 1;
673 auto destRank = 0;
674
675 auto& pg = tests[selfRank].getProcessGroup();
676
677 std::vector<at::Tensor> tensors = {
678 at::ones({16, 16}),
679 };
680 auto sendWork = pg.send(tensors, destRank, tag);
681 sendWork->wait();
682 });
683 // Receiver thread.
684 std::thread receiverThread([&]() { recvCompleted = recvWork->wait(); });
685 senderThread.join();
686 receiverThread.join();
687 EXPECT_TRUE(recvCompleted);
688}
689
690void testStoreSetGet(const std::string& path) {
691 const auto size = 2;
692 auto tests = CollectiveTest::initialize(path, size);
693 // test that get() gets the same value as the one that was set()
694 std::vector<uint8_t> testVector = {1, 1, 1, 1};
695 // Cast to ProcessGroupGloo::GlooStore to test specific GlooStore APIs.
696 auto rank_0_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>(
697 tests[0].getProcessGroup()._getStore().get());
698 auto rank_1_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>(
699 tests[1].getProcessGroup()._getStore().get());
700
701 rank_0_glooStore->setUint("testKey", testVector);
702 auto value = rank_1_glooStore->getUint("testKey");
703 EXPECT_TRUE(value == testVector);
704}
705
706#ifndef _WIN32
707TEST(ProcessGroupGlooTest, testSIGSTOPException) {
708 // test SIGSTOP
709 // Fork() and TSAN don't play well together, so skip the test if we're testing
710 // with TSAN.
711 if (isTSANEnabled()) {
712 LOG(INFO) << "Skipping test since Fork() + TSAN is broken";
713 return;
714 }
715
716 TemporaryFile file;
717 auto work = testSignal(file.path, SIGSTOP);
718 EXPECT_FALSE(work->isSuccess());
719 EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception);
720}
721
722TEST(ProcessGroupGlooTest, testSIGKILLException) {
723 // test SIGKILL
724 // Fork() and TSAN don't play well together, so skip the test if we're testing
725 // with TSAN.
726 if (isTSANEnabled()) {
727 LOG(INFO) << "Skipping test since Fork() + TSAN is broken";
728 return;
729 }
730
731 TemporaryFile file;
732 auto work = testSignal(file.path, SIGKILL);
733 EXPECT_FALSE(work->isSuccess());
734 EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception);
735}
736#endif
737
738TEST(ProcessGroupGlooTest, testAllReduceCPU) {
739 {
740 TemporaryFile file;
741 testAllreduce(file.path, at::DeviceType::CPU);
742 testAllreduceUsingWorkAPI(file.path, at::DeviceType::CPU);
743 }
744}
745
746TEST(ProcessGroupGlooTest, testBroadcastCPU) {
747 {
748 TemporaryFile file;
749 testBroadcast(file.path, at::DeviceType::CPU);
750 }
751}
752
753TEST(ProcessGroupGlooTest, testAllToAllCPU) {
754 {
755 TemporaryFile file;
756 testAlltoall(file.path, at::DeviceType::CPU);
757 }
758}
759
760TEST(ProcessGroupGlooTest, testBarrier) {
761 {
762 TemporaryFile file;
763 testBarrier(file.path);
764 }
765}
766
767TEST(ProcessGroupGlooTest, testMonitoredBarrier) {
768 TemporaryFile file;
769 testMonitoredBarrier(file.path);
770}
771
772TEST(ProcessGroupGlooTest, testSequenceNumInit) {
773 TemporaryFile file;
774 testSequenceNumInit(file.path);
775}
776
777TEST(ProcessGroupGlooTest, testSend) {
778 {
779 TemporaryFile file;
780 testSend(file.path);
781 }
782}
783
784TEST(ProcessGroupGlooTest, testRecv) {
785 {
786 TemporaryFile file;
787 testRecv(file.path);
788 }
789}
790
791TEST(ProcessGroupGlooTest, testStoreSetGet) {
792 TemporaryFile file;
793 testStoreSetGet(file.path);
794}
795
796TEST(ProcessGroupGlooTest, testWaitDelay) {
797 {
798 TemporaryFile file;
799 testWaitDelay(file.path);
800 }
801}
802
803#ifdef USE_CUDA
804// CUDA-only tests
805TEST(ProcessGroupGlooTest, testAllReduceCUDA) {
806 if (!torch::cuda::is_available()) {
807 LOG(INFO) << "Skipping test - requires CUDA";
808 return;
809 }
810 {
811 TemporaryFile file;
812 testAllreduce(file.path, at::DeviceType::CUDA);
813 testAllreduceUsingWorkAPI(file.path, at::DeviceType::CUDA);
814 }
815}
816
817TEST(ProcessGroupGlooTest, testBroadcastCUDA) {
818 if (torch::cuda::device_count() <= 1) {
819 LOG(INFO) << "Skipping test - requires multiple CUDA devices";
820 return;
821 }
822 {
823 TemporaryFile file;
824 testBroadcast(file.path, at::DeviceType::CUDA);
825 }
826}
827
828TEST(ProcessGroupGlooTest, testAlltoallCUDA) {
829 if (!torch::cuda::is_available()) {
830 LOG(INFO) << "Skipping test - requires CUDA";
831 return;
832 }
833 {
834 TemporaryFile file;
835 testAlltoall(file.path, at::DeviceType::CUDA);
836 }
837}
838
839TEST(ProcessGroupGlooTest, testBackendName) {
840 {
841 TemporaryFile file;
842 const auto size = 2;
843 auto tests = CollectiveTest::initialize(file.path, size);
844
845 for (const auto i : c10::irange(size)) {
846 EXPECT_EQ(
847 tests[i].getProcessGroup().getBackendName(),
848 std::string(c10d::GLOO_BACKEND_NAME));
849 }
850 }
851}
852
853#endif
854