1#include <torch/csrc/distributed/rpc/tensorpipe_agent.h>
2#include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
3
4#if defined(USE_TENSORPIPE) && !defined(USE_ROCM)
5
6#include <c10/cuda/CUDACachingAllocator.h>
7#include <c10/cuda/CUDAGuard.h>
8#include <c10/cuda/CUDAStream.h>
9
10#include <tensorpipe/tensorpipe.h>
11#include <tensorpipe/tensorpipe_cuda.h>
12
13namespace torch {
14namespace distributed {
15namespace rpc {
16namespace {
17
18#if TENSORPIPE_HAS_CUDA_IPC_CHANNEL
19
20std::unique_ptr<ChannelRegistration> makeCudaIpcChannel() {
21 auto context = tensorpipe::channel::cuda_ipc::create();
22 return std::make_unique<ChannelRegistration>(
23 ChannelRegistration{std::move(context), kCudaIpcChannelPriority});
24}
25
26// The cuda_ipc channels use cudaMemcpy to transmit CUDA tensor across processes
27C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_ipc, makeCudaIpcChannel);
28
29#endif
30
31#if TENSORPIPE_HAS_CUDA_GDR_CHANNEL
32
33std::unique_ptr<ChannelRegistration> makeCudaGdrChannel() {
34 auto context = tensorpipe::channel::cuda_gdr::create();
35 return std::make_unique<ChannelRegistration>(
36 ChannelRegistration{std::move(context), kCudaGdrChannelPriority});
37}
38
39// The cuda_gdr channel sends CUDA memory over InfiniBand using GPUDirect RDMA.
40// It directly registers the user-provided tensor with libibverbs, an operation
41// which is expensive the first time, but it then caches the registration in
42// order to amortize the cost and get low latency for subsequent transfers. A
43// ready-to-send/ready-to-receive handshake is still needed before the transfer
44// in order to ensure readiness and to agree on the device indices and thus the
45// queue pair to use. It automatically pairs each GPU to the "closest" NIC if
46// there are multiple of them (closest = longest prefix match in PCI tree).
47C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_gdr, makeCudaGdrChannel);
48
49#endif
50
51std::unique_ptr<ChannelRegistration> makeCudaXthChannel() {
52 auto context = tensorpipe::channel::cuda_xth::create();
53 return std::make_unique<ChannelRegistration>(
54 ChannelRegistration{std::move(context), kCudaXthChannelPriority});
55}
56
57// The cuda_xth channel supports same-process GPU-to-GPU comm
58C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_xth, makeCudaXthChannel);
59
60std::unique_ptr<ChannelRegistration> makeCudaBasicChannel() {
61 auto context = tensorpipe::channel::cuda_basic::create(
62 tensorpipe::channel::basic::create());
63 return std::make_unique<ChannelRegistration>(
64 ChannelRegistration{std::move(context), kCudaBasicChannelPriority});
65}
66
67// The cuda_basic is the fallback channel for GPU-to-GPU comm
68C10_REGISTER_CREATOR(
69 TensorPipeChannelRegistry,
70 cuda_basic,
71 makeCudaBasicChannel);
72
73class TensorpipeCudaConverter : public TensorpipeDeviceTypeConverter {
74 public:
75 c10::optional<std::vector<char>> prepareTensorForSending(
76 const c10::Storage& storage,
77 const std::vector<c10::Stream>& streams,
78 tensorpipe::Message& message) const override {
79 auto stream =
80 at::cuda::CUDAStream(getStreamForDevice(streams, storage.device()));
81 // record tensor data ptrs on TensorPipe streams, so that the tensors
82 // won't be destructed before TensorPipe finishing sending them.
83 c10::cuda::CUDACachingAllocator::recordStream(storage.data_ptr(), stream);
84
85 tensorpipe::CudaBuffer buffer;
86 buffer.ptr = storage.data<char>();
87 buffer.stream = stream.stream();
88
89 tensorpipe::Message::Tensor tensor;
90 tensor.buffer = buffer;
91 tensor.length = storage.nbytes();
92
93 message.tensors.push_back(std::move(tensor));
94
95 return c10::nullopt;
96 }
97
98 at::DataPtr allocateTensorForReceiving(
99 int deviceIndex,
100 size_t length,
101 const std::vector<c10::Stream>& streams,
102 tensorpipe::Allocation& allocation) const override {
103 c10::Device device(c10::kCUDA, deviceIndex);
104 at::cuda::CUDAStream stream(getStreamForDevice(streams, device));
105 // CUDACachingAllocator will call recordStream accordingly on the current
106 // stream.
107 at::cuda::CUDAStreamGuard guard(stream);
108 at::DataPtr dataPtr =
109 c10::cuda::CUDACachingAllocator::get()->allocate(length);
110
111 tensorpipe::CudaBuffer buffer;
112 buffer.ptr = dataPtr.get();
113 buffer.stream = stream.stream();
114
115 tensorpipe::Allocation::Tensor tensor;
116 tensor.buffer = buffer;
117
118 allocation.tensors.push_back(tensor);
119
120 return dataPtr;
121 }
122};
123
124C10_REGISTER_TENSORPIPE_DEVICE_TYPE_CONVERTER(CUDA, TensorpipeCudaConverter);
125
126} // namespace
127} // namespace rpc
128} // namespace distributed
129} // namespace torch
130
131#endif
132