1#include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
2
3#ifdef USE_TENSORPIPE
4
5#include <c10/util/irange.h>
6
7#include <tensorpipe/tensorpipe.h>
8
9namespace torch {
10namespace distributed {
11namespace rpc {
12namespace {
13
14// The TensorPipe agent splits the RPC message's information across multiple
15// payloads. This allows the agent to provide the data to TensorPipe without
16// performing a copy into a single contiguous buffer, and without storing it as
17// metadata, which is less efficient.
18
19// First come the rpc::Message::type() and ::id().
20constexpr int kTpMessageTypeIdx = 0;
21constexpr int kTpMessageIdIdx = 1;
22// Then comes the rpc::Message::payload();
23constexpr int kTpMessagePayloadIdx = 2;
24// Last comes the pickle of rpc::Message::tensors() (with the tensors themselves
25// stored as, well, tensors in the tensorpipe::Message).
26constexpr int kTpMessagePickleIdx = 3;
27
28inline c10::Device indexToDevice(c10::DeviceIndex index) {
29 if (index == -1) {
30 return c10::Device(at::kCPU);
31 } else {
32 return c10::Device(at::kCUDA, index);
33 }
34}
35
36class TensorpipeCpuConverter : public TensorpipeDeviceTypeConverter {
37 public:
38 c10::optional<std::vector<char>> prepareTensorForSending(
39 const c10::Storage& storage,
40 const std::vector<c10::Stream>& /* streams */,
41 tensorpipe::Message& message) const override {
42 // Enforce memory copy if tensor is created from torch::from_blob, means
43 // that the tensor doesn't own the memory.
44 bool storageHasDeleter = storage.data_ptr().get_context() != nullptr;
45 if (!storageHasDeleter) {
46 std::vector<char> storageData(
47 storage.data<char>(), storage.data<char>() + storage.nbytes());
48
49 tensorpipe::CpuBuffer buffer;
50 buffer.ptr = storageData.data();
51
52 tensorpipe::Message::Tensor tensor;
53 tensor.buffer = buffer;
54 tensor.length = storageData.size();
55
56 message.tensors.push_back(std::move(tensor));
57
58 return c10::make_optional(std::move(storageData));
59 } else {
60 tensorpipe::CpuBuffer buffer;
61 buffer.ptr = storage.data<char>();
62
63 tensorpipe::Message::Tensor tensor;
64 tensor.buffer = buffer;
65 tensor.length = storage.nbytes();
66
67 message.tensors.push_back(std::move(tensor));
68
69 return c10::nullopt;
70 }
71 }
72
73 at::DataPtr allocateTensorForReceiving(
74 int /* deviceIndex */,
75 size_t length,
76 const std::vector<c10::Stream>& /* streams */,
77 tensorpipe::Allocation& allocation) const override {
78 at::DataPtr dataPtr = at::getCPUAllocator()->allocate(length);
79
80 tensorpipe::CpuBuffer buffer;
81 buffer.ptr = dataPtr.get();
82
83 tensorpipe::Allocation::Tensor tensor;
84 tensor.buffer = buffer;
85
86 allocation.tensors.push_back(std::move(tensor));
87
88 return dataPtr;
89 }
90};
91
92C10_REGISTER_TENSORPIPE_DEVICE_TYPE_CONVERTER(CPU, TensorpipeCpuConverter);
93
94c10::DeviceType convertDeviceType(const std::string& tpDeviceType) {
95 if (tpDeviceType == tensorpipe::kCpuDeviceType) {
96 return c10::kCPU;
97 } else if (tpDeviceType == tensorpipe::kCudaDeviceType) {
98 return c10::kCUDA;
99 } else {
100 TORCH_INTERNAL_ASSERT(false, "Unrecognized TensorPipe buffer type.");
101 }
102}
103
104} // namespace
105
106// As the vector of streams will typically be very small (1-8 items) we expect
107// a linear search to be as fast (or faster?) than if we used a hashmap.
108const c10::Stream& getStreamForDevice(
109 const std::vector<c10::Stream>& streams,
110 const c10::Device& device) {
111 for (const c10::Stream& stream : streams) {
112 if (stream.device() == device) {
113 return stream;
114 }
115 }
116 TORCH_INTERNAL_ASSERT(false, "No stream found for device ", device);
117}
118
119std::array<
120 std::atomic<const TensorpipeDeviceTypeConverter*>,
121 static_cast<size_t>(DeviceType::COMPILE_TIME_MAX_DEVICE_TYPES)>
122 device_type_converter_registry;
123
124TensorpipeDeviceTypeConverterRegistrar::TensorpipeDeviceTypeConverterRegistrar(
125 DeviceType type,
126 const TensorpipeDeviceTypeConverter* impl) {
127 device_type_converter_registry[static_cast<size_t>(type)].store(impl);
128}
129
130std::tuple<tensorpipe::Message, TensorpipeWriteBuffers> tensorpipeSerialize(
131 c10::intrusive_ptr<Message> rpcMessage,
132 std::vector<c10::Device> devices,
133 const std::vector<c10::Stream>& streams) {
134 tensorpipe::Message tpMessage;
135 TensorpipeWriteBuffers buffers;
136
137 // Metadata
138 buffers.type = std::make_unique<MessageType>(rpcMessage->type());
139 buffers.id = std::make_unique<int64_t>(rpcMessage->id());
140 // kTpMessageTypeIdx = 0
141 tpMessage.payloads.push_back(
142 tensorpipe::Message::Payload{buffers.type.get(), sizeof(MessageType)});
143 // kTpMessageIdIdx = 1
144 tpMessage.payloads.push_back(
145 tensorpipe::Message::Payload{buffers.id.get(), sizeof(int64_t)});
146
147 // Payload
148 buffers.payload = std::move(rpcMessage->payload());
149 // TensorPipe uses the same Message class for both reading and writing, thus
150 // it uses non-const pointers even though it doesn't modify them when writing.
151 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
152 char* payloadPtr = const_cast<char*>(buffers.payload.data());
153 // kTpMessagePayloadIdx = 2
154 tpMessage.payloads.push_back(
155 tensorpipe::Message::Payload{payloadPtr, buffers.payload.size()});
156
157 {
158 // The function below might allocate new tensors if there are Tensor views.
159 // Apply stream guard here to include those Tensor allocation operations to
160 // the streams.
161 c10::MultiStreamGuard guard(streams);
162 // Tensors
163 buffers.tensors = cloneSparseTensors(rpcMessage->tensors()).vec();
164 }
165
166 torch::jit::Pickler pickler([&](const void* buf, size_t sz) -> size_t {
167 buffers.pickle.insert(
168 buffers.pickle.end(),
169 static_cast<const char*>(buf),
170 static_cast<const char*>(buf) + sz);
171 return sz;
172 });
173 pickler.protocol();
174 pickler.pushIValue(buffers.tensors);
175 pickler.stop();
176 // kTpMessagePickleIdx = 3
177 tpMessage.payloads.push_back(tensorpipe::Message::Payload{
178 buffers.pickle.data(), buffers.pickle.size()});
179 const std::vector<torch::Tensor>& tensorDataVec = pickler.tensorData();
180 tpMessage.tensors.reserve(tensorDataVec.size());
181 for (const auto i : c10::irange(tensorDataVec.size())) {
182 const torch::Tensor& tensor = tensorDataVec[i];
183
184 const TensorpipeDeviceTypeConverter* converter =
185 getDeviceTypeConverter(tensor.device().type());
186 TORCH_CHECK(
187 converter != nullptr,
188 "Attempting to send a Tensor with unexpected device type ",
189 tensor.device());
190
191 TORCH_INTERNAL_ASSERT(tpMessage.tensors.size() == i);
192 c10::optional<std::vector<char>> maybeCopiedTensor =
193 converter->prepareTensorForSending(
194 tensor.storage(), streams, tpMessage);
195 TORCH_INTERNAL_ASSERT(tpMessage.tensors.size() == i + 1);
196
197 tensorpipe::Device targetDevice = devices.empty() || devices[i].is_cpu()
198 ? tensorpipe::Device{tensorpipe::kCpuDeviceType, 0}
199 : tensorpipe::Device{tensorpipe::kCudaDeviceType, devices[i].index()};
200 tpMessage.tensors.back().targetDevice = std::move(targetDevice);
201
202 if (maybeCopiedTensor.has_value()) {
203 buffers.copiedTensors.push_back(std::move(maybeCopiedTensor).value());
204 }
205 }
206
207 return std::make_tuple(std::move(tpMessage), std::move(buffers));
208}
209
210std::pair<tensorpipe::Allocation, TensorpipeReadBuffers> tensorpipeAllocate(
211 const tensorpipe::Descriptor& tpDescriptor,
212 const std::vector<c10::Stream>& streams) {
213 tensorpipe::Allocation tpAllocation;
214 TensorpipeReadBuffers buffers;
215
216 TORCH_INTERNAL_ASSERT(
217 tpDescriptor.payloads.size() == 4,
218 "message expected to contain 4 payloads, whereas it contained ",
219 tpDescriptor.payloads.size(),
220 " payloads");
221 tpAllocation.payloads.resize(tpDescriptor.payloads.size());
222
223 TORCH_INTERNAL_ASSERT(
224 tpDescriptor.payloads[kTpMessageTypeIdx].length == sizeof(MessageType),
225 "first payload expected to contain ",
226 sizeof(MessageType),
227 " bytes, whereas it contained ",
228 tpDescriptor.payloads[kTpMessageTypeIdx].length,
229 " bytes");
230 buffers.type = std::make_unique<MessageType>();
231 tpAllocation.payloads[kTpMessageTypeIdx].data = buffers.type.get();
232
233 TORCH_INTERNAL_ASSERT(
234 tpDescriptor.payloads[kTpMessageIdIdx].length == sizeof(int64_t),
235 "second payload expected to contain ",
236 sizeof(int64_t),
237 " bytes, whereas it contained ",
238 tpDescriptor.payloads[kTpMessageIdIdx].length,
239 " bytes");
240 buffers.id = std::make_unique<int64_t>();
241 tpAllocation.payloads[kTpMessageIdIdx].data = buffers.id.get();
242
243 // FIXME The two resizes below zero out the vectors, which is not needed.
244 buffers.payload.resize(tpDescriptor.payloads[kTpMessagePayloadIdx].length);
245 tpAllocation.payloads[kTpMessagePayloadIdx].data = buffers.payload.data();
246
247 buffers.pickle.resize(tpDescriptor.payloads[kTpMessagePickleIdx].length);
248 tpAllocation.payloads[kTpMessagePickleIdx].data = buffers.pickle.data();
249
250 size_t numTensors = tpDescriptor.tensors.size();
251 tpAllocation.tensors.reserve(numTensors);
252 for (const auto tensorIdx : c10::irange(numTensors)) {
253 const tensorpipe::Descriptor::Tensor& tensor =
254 tpDescriptor.tensors[tensorIdx];
255 TORCH_INTERNAL_ASSERT(tensor.targetDevice.has_value());
256 c10::DeviceType targetDeviceType =
257 convertDeviceType(tensor.targetDevice->type);
258
259 const TensorpipeDeviceTypeConverter* converter =
260 getDeviceTypeConverter(targetDeviceType);
261 TORCH_INTERNAL_ASSERT(
262 converter != nullptr,
263 "Attempting to receive a Tensor with unexpected device type ",
264 targetDeviceType);
265
266 TORCH_INTERNAL_ASSERT(tpAllocation.tensors.size() == tensorIdx);
267 at::DataPtr dataPtr = converter->allocateTensorForReceiving(
268 tensor.targetDevice->index, tensor.length, streams, tpAllocation);
269 TORCH_INTERNAL_ASSERT(tpAllocation.tensors.size() == tensorIdx + 1);
270
271 buffers.tensors.push_back(std::move(dataPtr));
272 }
273
274 return {std::move(tpAllocation), std::move(buffers)};
275}
276
277c10::intrusive_ptr<Message> tensorpipeDeserialize(
278 tensorpipe::Descriptor&& tpDescriptor,
279 TensorpipeReadBuffers&& buffers) {
280 // Tensors
281 std::vector<at::Tensor> tensors;
282 const char* pickleData = buffers.pickle.data();
283 size_t pickleLen = buffers.pickle.size();
284 size_t picklePos = 0;
285 auto pickleReadFunc = [&](char* buf, size_t n) -> size_t {
286 if (picklePos >= pickleLen || n == 0) {
287 return 0;
288 }
289 size_t toCopy = std::min(picklePos + n, pickleLen) - picklePos;
290 memcpy(buf, pickleData + picklePos, toCopy);
291 picklePos += toCopy;
292 return toCopy;
293 };
294 auto tensorReadFunc = [&](const std::string& ename) -> at::DataPtr {
295 unsigned long index = std::stoul(ename);
296 return std::move(buffers.tensors.at(index));
297 };
298
299 // No need to pass typeResolver here, as it always processes string and
300 // tensors only
301 torch::jit::Unpickler unpickler(
302 pickleReadFunc,
303 nullptr,
304 nullptr,
305 tensorReadFunc,
306 {},
307 /* use_storage_device*/ true);
308
309 auto ival = unpickler.parse_ivalue();
310 for (auto&& t : ival.toTensorList()) {
311 tensors.emplace_back(std::move(t));
312 }
313
314 for (const auto i : c10::irange(tpDescriptor.tensors.size())) {
315 auto& tensor = tpDescriptor.tensors[i];
316 if (tensor.targetDevice.has_value() &&
317 tensor.targetDevice->type == tensorpipe::kCudaDeviceType) {
318 TORCH_INTERNAL_ASSERT(
319 tensors[i].device() == indexToDevice(tensor.targetDevice->index),
320 "Tensor ",
321 i,
322 " in message ",
323 *buffers.id,
324 " was expected to be received on device ",
325 tensor.targetDevice->index,
326 ", but got it on ",
327 tensors[i].device());
328 }
329 }
330
331 return c10::make_intrusive<Message>(
332 std::move(buffers.payload),
333 std::move(tensors),
334 *buffers.type,
335 *buffers.id);
336}
337} // namespace rpc
338} // namespace distributed
339} // namespace torch
340
341#endif // USE_TENSORPIPE
342