1#pragma once
2#ifdef USE_CUDA
3#include <c10/core/Allocator.h>
4#include <c10/cuda/CUDACachingAllocator.h>
5#include <c10/cuda/CUDAException.h>
6#include <c10/cuda/CUDAGuard.h>
7#include <c10/cuda/CUDAStream.h>
8#include <c10/util/Logging.h>
9#include <cuda_runtime_api.h>
10#include <torch/csrc/Export.h>
11#include <cstddef>
12namespace torch {
13
14TORCH_CUDA_CU_API bool CudaIPCCollect();
15
16struct CudaIPCReceivedData final {
17 CudaIPCReceivedData() = default;
18 explicit CudaIPCReceivedData(std::shared_ptr<void> shared_ptr)
19 : shared_ptr_(std::move(shared_ptr)) {}
20 std::shared_ptr<void> shared_ptr_;
21};
22
23struct CudaIPCSentData final {
24 std::string handle_;
25 int64_t offset_;
26 int64_t* counter_ptr_; // Reference counter shared memory block
27 at::DataPtr original_ptr_; // Original mem allocation
28 cudaEvent_t event_; // Sync cuEventDestroy
29 bool event_sync_required_;
30 at::Device device_;
31
32 CudaIPCSentData(
33 std::string handle,
34 int64_t offset,
35 int64_t* counter_ptr,
36 at::Device device);
37 ~CudaIPCSentData();
38
39 int64_t counter_value();
40 std::string handle() {
41 return handle_;
42 }
43 int64_t offset() {
44 return offset_;
45 }
46 void set_original_ptr(at::DataPtr data_ptr) {
47 original_ptr_ = std::move(data_ptr);
48 }
49};
50
51TORCH_CUDA_CU_API at::DataPtr GetNewRefCountedSentData(
52 void* data,
53 at::Device device);
54
55namespace {
56
57constexpr int64_t CUDA_IPC_REF_COUNTER_FILE_SIZE = 10000;
58constexpr int64_t CUDA_IPC_WARN_AFTER_X_BLOCKS_IN_LIMBO = 1000;
59// This was determined empirically that CUDA (v10.1 and below) have the limit
60// on the number of recorded blocking interprocess events. It is around ~22,000.
61// And to give us leeway, we picked 1000 as it gives us enough events to share
62// tensors effectively.
63constexpr int64_t CUDA_IPC_MAXIMUM_EVENTS_TO_USE = 1000;
64
65// All to be deleted data blocks with non zero reference counter goes there
66struct CudaIPCSentDataLimbo final {
67 ~CudaIPCSentDataLimbo();
68 bool collect();
69 void add(std::unique_ptr<CudaIPCSentData> shared_block);
70 uint64_t size();
71
72 private:
73 // TODO: Can be changed to FIFO in order to avoid full traverse on every
74 // collect()
75 std::vector<std::unique_ptr<CudaIPCSentData>> shared_blocks_;
76 std::mutex limbo_mutex_;
77};
78
79struct CudaIPCRefCountersFile final {
80 CudaIPCRefCountersFile(
81 std::string handle,
82 uint64_t size,
83 at::DataPtr data_ptr)
84 : next_offset_(0),
85 size_(size),
86 used_slots_(0),
87 handle_(std::move(handle)),
88 refcounted_shared_mem_(std::move(data_ptr)) {}
89
90 int64_t* counter_ptr() {
91 return static_cast<int64_t*>(refcounted_shared_mem_.get()) + next_offset_;
92 }
93
94 void set_counter(uint64_t value) {
95 *counter_ptr() = value;
96 }
97
98 bool have_offsets() {
99 return next_offset_ < size_;
100 }
101
102 bool offsets_in_use() {
103 return used_slots_;
104 }
105
106 int64_t get_offset() {
107 return next_offset_;
108 }
109
110 void rotate_offset() {
111 next_offset_++;
112 used_slots_++;
113 }
114
115 void return_offset(uint64_t offset /* unused */) {
116 used_slots_--;
117 }
118
119 std::string handle() {
120 return handle_;
121 }
122
123 private:
124 uint64_t next_offset_;
125 uint64_t size_;
126 uint64_t used_slots_;
127 std::string handle_;
128 at::DataPtr refcounted_shared_mem_;
129};
130
131} // namespace
132} // namespace torch
133
134namespace c10 {
135namespace {
136class CudaIPCCollectCallback : public FreeMemoryCallback {
137 public:
138 bool Execute() override {
139 return torch::CudaIPCCollect();
140 }
141};
142} // namespace
143
144} // namespace c10
145
146#endif
147