1#include <gtest/gtest.h>
2
3#include <c10/util/irange.h>
4#include <tensorpipe/common/cpu_buffer.h>
5#include <tensorpipe/core/message.h>
6#include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
7#include <torch/torch.h>
8
9#include <memory>
10#include <string>
11#include <vector>
12
13TEST(TensorpipeSerialize, Base) {
14 // Sender serializes
15 at::Tensor t1 = torch::ones({1024}, at::ScalarType::Int);
16 at::Tensor t2 = torch::ones({1024}, at::ScalarType::Float);
17 std::vector<at::Tensor> tensors{t1, t2};
18 std::vector<char> payload = {'1', '2', '3'};
19 std::vector<char> payloadCopy = payload; // for testing
20 torch::distributed::rpc::MessageType mtype =
21 torch::distributed::rpc::MessageType::UNKNOWN;
22 int64_t mId = 100;
23 auto sendingRpcMessage =
24 c10::make_intrusive<torch::distributed::rpc::Message>(
25 std::move(payload), std::move(tensors), mtype);
26 sendingRpcMessage->setId(mId);
27 tensorpipe::Message sendingTpMessage;
28 torch::distributed::rpc::TensorpipeWriteBuffers sendingTpBuffers;
29 std::tie(sendingTpMessage, sendingTpBuffers) =
30 torch::distributed::rpc::tensorpipeSerialize(
31 std::move(sendingRpcMessage), {}, {});
32
33 // Mimic receiving message descriptor: recvingTpDescriptor is a copy of
34 // sendingTpMessage except for the data pointers which are left null.
35 tensorpipe::Descriptor recvingTpDescriptor;
36 recvingTpDescriptor.metadata = sendingTpMessage.metadata;
37 recvingTpDescriptor.payloads.reserve(sendingTpMessage.payloads.size());
38 for (auto& tpPayload : sendingTpMessage.payloads) {
39 tensorpipe::Descriptor::Payload p;
40 p.length = tpPayload.length;
41 p.metadata = tpPayload.metadata;
42 recvingTpDescriptor.payloads.push_back(std::move(p));
43 }
44 EXPECT_EQ(
45 recvingTpDescriptor.payloads.size(), sendingTpMessage.payloads.size());
46 recvingTpDescriptor.tensors.reserve(sendingTpMessage.tensors.size());
47 for (auto& tpTensor : sendingTpMessage.tensors) {
48 tensorpipe::Descriptor::Tensor t;
49 t.length = tpTensor.length;
50 t.sourceDevice = tpTensor.buffer.device();
51 t.targetDevice = tpTensor.targetDevice;
52 t.metadata = tpTensor.metadata;
53 recvingTpDescriptor.tensors.push_back(std::move(t));
54 }
55 EXPECT_EQ(
56 recvingTpDescriptor.tensors.size(), sendingTpMessage.tensors.size());
57
58 // Mimic readDescriptor() callback:
59 // - Allocate buffers
60 // - Fill pointers in tensorpipe message
61 tensorpipe::Allocation recvingTpAllocation;
62 torch::distributed::rpc::TensorpipeReadBuffers recvingTpBuffers;
63 std::tie(recvingTpAllocation, recvingTpBuffers) =
64 torch::distributed::rpc::tensorpipeAllocate(recvingTpDescriptor, {});
65
66 // Mimic tensorpipe data transfer
67 EXPECT_EQ(
68 recvingTpAllocation.payloads.size(), sendingTpMessage.payloads.size());
69 for (const auto i : c10::irange(recvingTpAllocation.payloads.size())) {
70 tensorpipe::Message::Payload& srcPayload = sendingTpMessage.payloads[i];
71 tensorpipe::Allocation::Payload& dstPayload =
72 recvingTpAllocation.payloads[i];
73 if (srcPayload.length) {
74 // Empty vector's data() can return nullptr, use the length to avoid
75 // coying into nullptr
76 memcpy(dstPayload.data, srcPayload.data, srcPayload.length);
77 }
78 }
79 EXPECT_EQ(
80 recvingTpAllocation.tensors.size(), sendingTpMessage.tensors.size());
81 for (const auto i : c10::irange(recvingTpAllocation.tensors.size())) {
82 tensorpipe::Message::Tensor& srcTensor = sendingTpMessage.tensors[i];
83 tensorpipe::Allocation::Tensor& dstTensor = recvingTpAllocation.tensors[i];
84 memcpy(
85 dstTensor.buffer.unwrap<tensorpipe::CpuBuffer>().ptr,
86 srcTensor.buffer.unwrap<tensorpipe::CpuBuffer>().ptr,
87 srcTensor.length);
88 }
89
90 // Mimic read() callback:
91 // - Unpickle
92 c10::intrusive_ptr<torch::distributed::rpc::Message> recvingRpcMessage =
93 torch::distributed::rpc::tensorpipeDeserialize(
94 std::move(recvingTpDescriptor), std::move(recvingTpBuffers));
95
96 // Data is ready
97 EXPECT_EQ(mtype, recvingRpcMessage->type());
98 EXPECT_EQ(payloadCopy, recvingRpcMessage->payload());
99 EXPECT_EQ(mId, recvingRpcMessage->id());
100 EXPECT_TRUE(torch::equal(t1, recvingRpcMessage->tensors()[0]));
101 EXPECT_TRUE(torch::equal(t2, recvingRpcMessage->tensors()[1]));
102}
103
104TEST(TensorpipeSerialize, RecopySparseTensors) {
105 // Take a 1K row of a 1M tensors, and make sure we don't send across 1M rows.
106 constexpr size_t k1K = 1024;
107 at::Tensor main = torch::randn({k1K, k1K});
108 at::Tensor tiny = main.select(0, 2); // Select a row in the middle
109 EXPECT_EQ(tiny.numel(), k1K);
110 EXPECT_EQ(tiny.storage().nbytes() / tiny.itemsize(), k1K * k1K);
111
112 std::vector<at::Tensor> tensors{main, tiny};
113 std::vector<char> payload = {'1', '2', '3'};
114 torch::distributed::rpc::MessageType mtype =
115 torch::distributed::rpc::MessageType::UNKNOWN;
116 auto sendingRpcMessage =
117 c10::make_intrusive<torch::distributed::rpc::Message>(
118 std::move(payload), std::move(tensors), mtype);
119
120 tensorpipe::Message sendingTpMessage;
121 torch::distributed::rpc::TensorpipeWriteBuffers tpBuffers;
122 std::tie(sendingTpMessage, tpBuffers) =
123 torch::distributed::rpc::tensorpipeSerialize(
124 std::move(sendingRpcMessage), {}, {});
125
126 EXPECT_EQ(tpBuffers.tensors.size(), 2);
127 EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
128 EXPECT_TRUE(torch::equal(main, tpBuffers.tensors[0]));
129 EXPECT_TRUE(torch::equal(tiny, tpBuffers.tensors[1]));
130 // Test cloned storage
131 EXPECT_EQ(
132 main.storage().data(),
133 sendingTpMessage.tensors[0].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
134 EXPECT_NE(
135 tiny.storage().data(),
136 sendingTpMessage.tensors[1].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
137 EXPECT_EQ(tiny.element_size() * k1K, sendingTpMessage.tensors[1].length);
138}
139
140TEST(TensorpipeSerialize, NoDeleterTensors) {
141 std::vector<float> blob1{.8, .2};
142 std::vector<float> blob2{.7, .5, .9};
143 at::Tensor t1 = torch::from_blob((float*)(blob1.data()), blob1.size());
144 at::Tensor t2 = torch::from_blob((float*)(blob2.data()), blob2.size());
145 std::vector<at::Tensor> tensors{t1, t2};
146 std::vector<char> payload = {'1', '2', '3'};
147 torch::distributed::rpc::MessageType mtype =
148 torch::distributed::rpc::MessageType::UNKNOWN;
149 auto sendingRpcMessage =
150 c10::make_intrusive<torch::distributed::rpc::Message>(
151 std::move(payload), std::move(tensors), mtype);
152
153 tensorpipe::Message sendingTpMessage;
154 torch::distributed::rpc::TensorpipeWriteBuffers tpBuffers;
155 std::tie(sendingTpMessage, tpBuffers) =
156 torch::distributed::rpc::tensorpipeSerialize(
157 std::move(sendingRpcMessage), {}, {});
158
159 EXPECT_EQ(tpBuffers.copiedTensors.size(), 2);
160 EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
161 EXPECT_EQ(
162 tpBuffers.copiedTensors[0].size(), sendingTpMessage.tensors[0].length);
163 EXPECT_EQ(
164 tpBuffers.copiedTensors[1].size(), sendingTpMessage.tensors[1].length);
165 EXPECT_EQ(
166 tpBuffers.copiedTensors[0].data(),
167 sendingTpMessage.tensors[0].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
168 EXPECT_EQ(
169 tpBuffers.copiedTensors[1].data(),
170 sendingTpMessage.tensors[1].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
171 EXPECT_TRUE(
172 memcmp(
173 tpBuffers.copiedTensors[0].data(),
174 t1.storage().data(),
175 sendingTpMessage.tensors[0].length) == 0);
176 EXPECT_TRUE(
177 memcmp(
178 tpBuffers.copiedTensors[1].data(),
179 t2.storage().data(),
180 sendingTpMessage.tensors[1].length) == 0);
181}
182