1 | #include <torch/csrc/distributed/rpc/tensorpipe_agent.h> |
2 | #include <torch/csrc/distributed/rpc/tensorpipe_utils.h> |
3 | |
4 | #if defined(USE_TENSORPIPE) && !defined(USE_ROCM) |
5 | |
6 | #include <c10/cuda/CUDACachingAllocator.h> |
7 | #include <c10/cuda/CUDAGuard.h> |
8 | #include <c10/cuda/CUDAStream.h> |
9 | |
10 | #include <tensorpipe/tensorpipe.h> |
11 | #include <tensorpipe/tensorpipe_cuda.h> |
12 | |
13 | namespace torch { |
14 | namespace distributed { |
15 | namespace rpc { |
16 | namespace { |
17 | |
18 | #if TENSORPIPE_HAS_CUDA_IPC_CHANNEL |
19 | |
20 | std::unique_ptr<ChannelRegistration> makeCudaIpcChannel() { |
21 | auto context = tensorpipe::channel::cuda_ipc::create(); |
22 | return std::make_unique<ChannelRegistration>( |
23 | ChannelRegistration{std::move(context), kCudaIpcChannelPriority}); |
24 | } |
25 | |
26 | // The cuda_ipc channels use cudaMemcpy to transmit CUDA tensor across processes |
27 | C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_ipc, makeCudaIpcChannel); |
28 | |
29 | #endif |
30 | |
31 | #if TENSORPIPE_HAS_CUDA_GDR_CHANNEL |
32 | |
33 | std::unique_ptr<ChannelRegistration> makeCudaGdrChannel() { |
34 | auto context = tensorpipe::channel::cuda_gdr::create(); |
35 | return std::make_unique<ChannelRegistration>( |
36 | ChannelRegistration{std::move(context), kCudaGdrChannelPriority}); |
37 | } |
38 | |
39 | // The cuda_gdr channel sends CUDA memory over InfiniBand using GPUDirect RDMA. |
40 | // It directly registers the user-provided tensor with libibverbs, an operation |
41 | // which is expensive the first time, but it then caches the registration in |
42 | // order to amortize the cost and get low latency for subsequent transfers. A |
43 | // ready-to-send/ready-to-receive handshake is still needed before the transfer |
44 | // in order to ensure readiness and to agree on the device indices and thus the |
45 | // queue pair to use. It automatically pairs each GPU to the "closest" NIC if |
46 | // there are multiple of them (closest = longest prefix match in PCI tree). |
47 | C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_gdr, makeCudaGdrChannel); |
48 | |
49 | #endif |
50 | |
51 | std::unique_ptr<ChannelRegistration> makeCudaXthChannel() { |
52 | auto context = tensorpipe::channel::cuda_xth::create(); |
53 | return std::make_unique<ChannelRegistration>( |
54 | ChannelRegistration{std::move(context), kCudaXthChannelPriority}); |
55 | } |
56 | |
57 | // The cuda_xth channel supports same-process GPU-to-GPU comm |
58 | C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_xth, makeCudaXthChannel); |
59 | |
60 | std::unique_ptr<ChannelRegistration> makeCudaBasicChannel() { |
61 | auto context = tensorpipe::channel::cuda_basic::create( |
62 | tensorpipe::channel::basic::create()); |
63 | return std::make_unique<ChannelRegistration>( |
64 | ChannelRegistration{std::move(context), kCudaBasicChannelPriority}); |
65 | } |
66 | |
67 | // The cuda_basic is the fallback channel for GPU-to-GPU comm |
68 | C10_REGISTER_CREATOR( |
69 | TensorPipeChannelRegistry, |
70 | cuda_basic, |
71 | makeCudaBasicChannel); |
72 | |
73 | class TensorpipeCudaConverter : public TensorpipeDeviceTypeConverter { |
74 | public: |
75 | c10::optional<std::vector<char>> prepareTensorForSending( |
76 | const c10::Storage& storage, |
77 | const std::vector<c10::Stream>& streams, |
78 | tensorpipe::Message& message) const override { |
79 | auto stream = |
80 | at::cuda::CUDAStream(getStreamForDevice(streams, storage.device())); |
81 | // record tensor data ptrs on TensorPipe streams, so that the tensors |
82 | // won't be destructed before TensorPipe finishing sending them. |
83 | c10::cuda::CUDACachingAllocator::recordStream(storage.data_ptr(), stream); |
84 | |
85 | tensorpipe::CudaBuffer buffer; |
86 | buffer.ptr = storage.data<char>(); |
87 | buffer.stream = stream.stream(); |
88 | |
89 | tensorpipe::Message::Tensor tensor; |
90 | tensor.buffer = buffer; |
91 | tensor.length = storage.nbytes(); |
92 | |
93 | message.tensors.push_back(std::move(tensor)); |
94 | |
95 | return c10::nullopt; |
96 | } |
97 | |
98 | at::DataPtr allocateTensorForReceiving( |
99 | int deviceIndex, |
100 | size_t length, |
101 | const std::vector<c10::Stream>& streams, |
102 | tensorpipe::Allocation& allocation) const override { |
103 | c10::Device device(c10::kCUDA, deviceIndex); |
104 | at::cuda::CUDAStream stream(getStreamForDevice(streams, device)); |
105 | // CUDACachingAllocator will call recordStream accordingly on the current |
106 | // stream. |
107 | at::cuda::CUDAStreamGuard guard(stream); |
108 | at::DataPtr dataPtr = |
109 | c10::cuda::CUDACachingAllocator::get()->allocate(length); |
110 | |
111 | tensorpipe::CudaBuffer buffer; |
112 | buffer.ptr = dataPtr.get(); |
113 | buffer.stream = stream.stream(); |
114 | |
115 | tensorpipe::Allocation::Tensor tensor; |
116 | tensor.buffer = buffer; |
117 | |
118 | allocation.tensors.push_back(tensor); |
119 | |
120 | return dataPtr; |
121 | } |
122 | }; |
123 | |
124 | C10_REGISTER_TENSORPIPE_DEVICE_TYPE_CONVERTER(CUDA, TensorpipeCudaConverter); |
125 | |
126 | } // namespace |
127 | } // namespace rpc |
128 | } // namespace distributed |
129 | } // namespace torch |
130 | |
131 | #endif |
132 | |