1 | #include <torch/csrc/distributed/rpc/tensorpipe_utils.h> |
2 | |
3 | #ifdef USE_TENSORPIPE |
4 | |
5 | #include <c10/util/irange.h> |
6 | |
7 | #include <tensorpipe/tensorpipe.h> |
8 | |
9 | namespace torch { |
10 | namespace distributed { |
11 | namespace rpc { |
12 | namespace { |
13 | |
14 | // The TensorPipe agent splits the RPC message's information across multiple |
15 | // payloads. This allows the agent to provide the data to TensorPipe without |
16 | // performing a copy into a single contiguous buffer, and without storing it as |
17 | // metadata, which is less efficient. |
18 | |
19 | // First come the rpc::Message::type() and ::id(). |
20 | constexpr int kTpMessageTypeIdx = 0; |
21 | constexpr int kTpMessageIdIdx = 1; |
22 | // Then comes the rpc::Message::payload(); |
23 | constexpr int kTpMessagePayloadIdx = 2; |
24 | // Last comes the pickle of rpc::Message::tensors() (with the tensors themselves |
25 | // stored as, well, tensors in the tensorpipe::Message). |
26 | constexpr int kTpMessagePickleIdx = 3; |
27 | |
28 | inline c10::Device indexToDevice(c10::DeviceIndex index) { |
29 | if (index == -1) { |
30 | return c10::Device(at::kCPU); |
31 | } else { |
32 | return c10::Device(at::kCUDA, index); |
33 | } |
34 | } |
35 | |
36 | class TensorpipeCpuConverter : public TensorpipeDeviceTypeConverter { |
37 | public: |
38 | c10::optional<std::vector<char>> prepareTensorForSending( |
39 | const c10::Storage& storage, |
40 | const std::vector<c10::Stream>& /* streams */, |
41 | tensorpipe::Message& message) const override { |
42 | // Enforce memory copy if tensor is created from torch::from_blob, means |
43 | // that the tensor doesn't own the memory. |
44 | bool storageHasDeleter = storage.data_ptr().get_context() != nullptr; |
45 | if (!storageHasDeleter) { |
46 | std::vector<char> storageData( |
47 | storage.data<char>(), storage.data<char>() + storage.nbytes()); |
48 | |
49 | tensorpipe::CpuBuffer buffer; |
50 | buffer.ptr = storageData.data(); |
51 | |
52 | tensorpipe::Message::Tensor tensor; |
53 | tensor.buffer = buffer; |
54 | tensor.length = storageData.size(); |
55 | |
56 | message.tensors.push_back(std::move(tensor)); |
57 | |
58 | return c10::make_optional(std::move(storageData)); |
59 | } else { |
60 | tensorpipe::CpuBuffer buffer; |
61 | buffer.ptr = storage.data<char>(); |
62 | |
63 | tensorpipe::Message::Tensor tensor; |
64 | tensor.buffer = buffer; |
65 | tensor.length = storage.nbytes(); |
66 | |
67 | message.tensors.push_back(std::move(tensor)); |
68 | |
69 | return c10::nullopt; |
70 | } |
71 | } |
72 | |
73 | at::DataPtr allocateTensorForReceiving( |
74 | int /* deviceIndex */, |
75 | size_t length, |
76 | const std::vector<c10::Stream>& /* streams */, |
77 | tensorpipe::Allocation& allocation) const override { |
78 | at::DataPtr dataPtr = at::getCPUAllocator()->allocate(length); |
79 | |
80 | tensorpipe::CpuBuffer buffer; |
81 | buffer.ptr = dataPtr.get(); |
82 | |
83 | tensorpipe::Allocation::Tensor tensor; |
84 | tensor.buffer = buffer; |
85 | |
86 | allocation.tensors.push_back(std::move(tensor)); |
87 | |
88 | return dataPtr; |
89 | } |
90 | }; |
91 | |
92 | C10_REGISTER_TENSORPIPE_DEVICE_TYPE_CONVERTER(CPU, TensorpipeCpuConverter); |
93 | |
94 | c10::DeviceType convertDeviceType(const std::string& tpDeviceType) { |
95 | if (tpDeviceType == tensorpipe::kCpuDeviceType) { |
96 | return c10::kCPU; |
97 | } else if (tpDeviceType == tensorpipe::kCudaDeviceType) { |
98 | return c10::kCUDA; |
99 | } else { |
100 | TORCH_INTERNAL_ASSERT(false, "Unrecognized TensorPipe buffer type." ); |
101 | } |
102 | } |
103 | |
104 | } // namespace |
105 | |
106 | // As the vector of streams will typically be very small (1-8 items) we expect |
107 | // a linear search to be as fast (or faster?) than if we used a hashmap. |
108 | const c10::Stream& getStreamForDevice( |
109 | const std::vector<c10::Stream>& streams, |
110 | const c10::Device& device) { |
111 | for (const c10::Stream& stream : streams) { |
112 | if (stream.device() == device) { |
113 | return stream; |
114 | } |
115 | } |
116 | TORCH_INTERNAL_ASSERT(false, "No stream found for device " , device); |
117 | } |
118 | |
119 | std::array< |
120 | std::atomic<const TensorpipeDeviceTypeConverter*>, |
121 | static_cast<size_t>(DeviceType::COMPILE_TIME_MAX_DEVICE_TYPES)> |
122 | device_type_converter_registry; |
123 | |
124 | TensorpipeDeviceTypeConverterRegistrar::TensorpipeDeviceTypeConverterRegistrar( |
125 | DeviceType type, |
126 | const TensorpipeDeviceTypeConverter* impl) { |
127 | device_type_converter_registry[static_cast<size_t>(type)].store(impl); |
128 | } |
129 | |
130 | std::tuple<tensorpipe::Message, TensorpipeWriteBuffers> tensorpipeSerialize( |
131 | c10::intrusive_ptr<Message> rpcMessage, |
132 | std::vector<c10::Device> devices, |
133 | const std::vector<c10::Stream>& streams) { |
134 | tensorpipe::Message tpMessage; |
135 | TensorpipeWriteBuffers buffers; |
136 | |
137 | // Metadata |
138 | buffers.type = std::make_unique<MessageType>(rpcMessage->type()); |
139 | buffers.id = std::make_unique<int64_t>(rpcMessage->id()); |
140 | // kTpMessageTypeIdx = 0 |
141 | tpMessage.payloads.push_back( |
142 | tensorpipe::Message::Payload{buffers.type.get(), sizeof(MessageType)}); |
143 | // kTpMessageIdIdx = 1 |
144 | tpMessage.payloads.push_back( |
145 | tensorpipe::Message::Payload{buffers.id.get(), sizeof(int64_t)}); |
146 | |
147 | // Payload |
148 | buffers.payload = std::move(rpcMessage->payload()); |
149 | // TensorPipe uses the same Message class for both reading and writing, thus |
150 | // it uses non-const pointers even though it doesn't modify them when writing. |
151 | // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) |
152 | char* payloadPtr = const_cast<char*>(buffers.payload.data()); |
153 | // kTpMessagePayloadIdx = 2 |
154 | tpMessage.payloads.push_back( |
155 | tensorpipe::Message::Payload{payloadPtr, buffers.payload.size()}); |
156 | |
157 | { |
158 | // The function below might allocate new tensors if there are Tensor views. |
159 | // Apply stream guard here to include those Tensor allocation operations to |
160 | // the streams. |
161 | c10::MultiStreamGuard guard(streams); |
162 | // Tensors |
163 | buffers.tensors = cloneSparseTensors(rpcMessage->tensors()).vec(); |
164 | } |
165 | |
166 | torch::jit::Pickler pickler([&](const void* buf, size_t sz) -> size_t { |
167 | buffers.pickle.insert( |
168 | buffers.pickle.end(), |
169 | static_cast<const char*>(buf), |
170 | static_cast<const char*>(buf) + sz); |
171 | return sz; |
172 | }); |
173 | pickler.protocol(); |
174 | pickler.pushIValue(buffers.tensors); |
175 | pickler.stop(); |
176 | // kTpMessagePickleIdx = 3 |
177 | tpMessage.payloads.push_back(tensorpipe::Message::Payload{ |
178 | buffers.pickle.data(), buffers.pickle.size()}); |
179 | const std::vector<torch::Tensor>& tensorDataVec = pickler.tensorData(); |
180 | tpMessage.tensors.reserve(tensorDataVec.size()); |
181 | for (const auto i : c10::irange(tensorDataVec.size())) { |
182 | const torch::Tensor& tensor = tensorDataVec[i]; |
183 | |
184 | const TensorpipeDeviceTypeConverter* converter = |
185 | getDeviceTypeConverter(tensor.device().type()); |
186 | TORCH_CHECK( |
187 | converter != nullptr, |
188 | "Attempting to send a Tensor with unexpected device type " , |
189 | tensor.device()); |
190 | |
191 | TORCH_INTERNAL_ASSERT(tpMessage.tensors.size() == i); |
192 | c10::optional<std::vector<char>> maybeCopiedTensor = |
193 | converter->prepareTensorForSending( |
194 | tensor.storage(), streams, tpMessage); |
195 | TORCH_INTERNAL_ASSERT(tpMessage.tensors.size() == i + 1); |
196 | |
197 | tensorpipe::Device targetDevice = devices.empty() || devices[i].is_cpu() |
198 | ? tensorpipe::Device{tensorpipe::kCpuDeviceType, 0} |
199 | : tensorpipe::Device{tensorpipe::kCudaDeviceType, devices[i].index()}; |
200 | tpMessage.tensors.back().targetDevice = std::move(targetDevice); |
201 | |
202 | if (maybeCopiedTensor.has_value()) { |
203 | buffers.copiedTensors.push_back(std::move(maybeCopiedTensor).value()); |
204 | } |
205 | } |
206 | |
207 | return std::make_tuple(std::move(tpMessage), std::move(buffers)); |
208 | } |
209 | |
210 | std::pair<tensorpipe::Allocation, TensorpipeReadBuffers> tensorpipeAllocate( |
211 | const tensorpipe::Descriptor& tpDescriptor, |
212 | const std::vector<c10::Stream>& streams) { |
213 | tensorpipe::Allocation tpAllocation; |
214 | TensorpipeReadBuffers buffers; |
215 | |
216 | TORCH_INTERNAL_ASSERT( |
217 | tpDescriptor.payloads.size() == 4, |
218 | "message expected to contain 4 payloads, whereas it contained " , |
219 | tpDescriptor.payloads.size(), |
220 | " payloads" ); |
221 | tpAllocation.payloads.resize(tpDescriptor.payloads.size()); |
222 | |
223 | TORCH_INTERNAL_ASSERT( |
224 | tpDescriptor.payloads[kTpMessageTypeIdx].length == sizeof(MessageType), |
225 | "first payload expected to contain " , |
226 | sizeof(MessageType), |
227 | " bytes, whereas it contained " , |
228 | tpDescriptor.payloads[kTpMessageTypeIdx].length, |
229 | " bytes" ); |
230 | buffers.type = std::make_unique<MessageType>(); |
231 | tpAllocation.payloads[kTpMessageTypeIdx].data = buffers.type.get(); |
232 | |
233 | TORCH_INTERNAL_ASSERT( |
234 | tpDescriptor.payloads[kTpMessageIdIdx].length == sizeof(int64_t), |
235 | "second payload expected to contain " , |
236 | sizeof(int64_t), |
237 | " bytes, whereas it contained " , |
238 | tpDescriptor.payloads[kTpMessageIdIdx].length, |
239 | " bytes" ); |
240 | buffers.id = std::make_unique<int64_t>(); |
241 | tpAllocation.payloads[kTpMessageIdIdx].data = buffers.id.get(); |
242 | |
243 | // FIXME The two resizes below zero out the vectors, which is not needed. |
244 | buffers.payload.resize(tpDescriptor.payloads[kTpMessagePayloadIdx].length); |
245 | tpAllocation.payloads[kTpMessagePayloadIdx].data = buffers.payload.data(); |
246 | |
247 | buffers.pickle.resize(tpDescriptor.payloads[kTpMessagePickleIdx].length); |
248 | tpAllocation.payloads[kTpMessagePickleIdx].data = buffers.pickle.data(); |
249 | |
250 | size_t numTensors = tpDescriptor.tensors.size(); |
251 | tpAllocation.tensors.reserve(numTensors); |
252 | for (const auto tensorIdx : c10::irange(numTensors)) { |
253 | const tensorpipe::Descriptor::Tensor& tensor = |
254 | tpDescriptor.tensors[tensorIdx]; |
255 | TORCH_INTERNAL_ASSERT(tensor.targetDevice.has_value()); |
256 | c10::DeviceType targetDeviceType = |
257 | convertDeviceType(tensor.targetDevice->type); |
258 | |
259 | const TensorpipeDeviceTypeConverter* converter = |
260 | getDeviceTypeConverter(targetDeviceType); |
261 | TORCH_INTERNAL_ASSERT( |
262 | converter != nullptr, |
263 | "Attempting to receive a Tensor with unexpected device type " , |
264 | targetDeviceType); |
265 | |
266 | TORCH_INTERNAL_ASSERT(tpAllocation.tensors.size() == tensorIdx); |
267 | at::DataPtr dataPtr = converter->allocateTensorForReceiving( |
268 | tensor.targetDevice->index, tensor.length, streams, tpAllocation); |
269 | TORCH_INTERNAL_ASSERT(tpAllocation.tensors.size() == tensorIdx + 1); |
270 | |
271 | buffers.tensors.push_back(std::move(dataPtr)); |
272 | } |
273 | |
274 | return {std::move(tpAllocation), std::move(buffers)}; |
275 | } |
276 | |
277 | c10::intrusive_ptr<Message> tensorpipeDeserialize( |
278 | tensorpipe::Descriptor&& tpDescriptor, |
279 | TensorpipeReadBuffers&& buffers) { |
280 | // Tensors |
281 | std::vector<at::Tensor> tensors; |
282 | const char* pickleData = buffers.pickle.data(); |
283 | size_t pickleLen = buffers.pickle.size(); |
284 | size_t picklePos = 0; |
285 | auto pickleReadFunc = [&](char* buf, size_t n) -> size_t { |
286 | if (picklePos >= pickleLen || n == 0) { |
287 | return 0; |
288 | } |
289 | size_t toCopy = std::min(picklePos + n, pickleLen) - picklePos; |
290 | memcpy(buf, pickleData + picklePos, toCopy); |
291 | picklePos += toCopy; |
292 | return toCopy; |
293 | }; |
294 | auto tensorReadFunc = [&](const std::string& ename) -> at::DataPtr { |
295 | unsigned long index = std::stoul(ename); |
296 | return std::move(buffers.tensors.at(index)); |
297 | }; |
298 | |
299 | // No need to pass typeResolver here, as it always processes string and |
300 | // tensors only |
301 | torch::jit::Unpickler unpickler( |
302 | pickleReadFunc, |
303 | nullptr, |
304 | nullptr, |
305 | tensorReadFunc, |
306 | {}, |
307 | /* use_storage_device*/ true); |
308 | |
309 | auto ival = unpickler.parse_ivalue(); |
310 | for (auto&& t : ival.toTensorList()) { |
311 | tensors.emplace_back(std::move(t)); |
312 | } |
313 | |
314 | for (const auto i : c10::irange(tpDescriptor.tensors.size())) { |
315 | auto& tensor = tpDescriptor.tensors[i]; |
316 | if (tensor.targetDevice.has_value() && |
317 | tensor.targetDevice->type == tensorpipe::kCudaDeviceType) { |
318 | TORCH_INTERNAL_ASSERT( |
319 | tensors[i].device() == indexToDevice(tensor.targetDevice->index), |
320 | "Tensor " , |
321 | i, |
322 | " in message " , |
323 | *buffers.id, |
324 | " was expected to be received on device " , |
325 | tensor.targetDevice->index, |
326 | ", but got it on " , |
327 | tensors[i].device()); |
328 | } |
329 | } |
330 | |
331 | return c10::make_intrusive<Message>( |
332 | std::move(buffers.payload), |
333 | std::move(tensors), |
334 | *buffers.type, |
335 | *buffers.id); |
336 | } |
337 | } // namespace rpc |
338 | } // namespace distributed |
339 | } // namespace torch |
340 | |
341 | #endif // USE_TENSORPIPE |
342 | |