1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/*!
21 * \file ring_buffer.h
22 * \brief this file aims to provide a wrapper of sockets
23 */
24#ifndef TVM_SUPPORT_RING_BUFFER_H_
25#define TVM_SUPPORT_RING_BUFFER_H_
26
27#include <algorithm>
28#include <cstring>
29#include <vector>
30
31namespace tvm {
32namespace support {
33/*!
34 * \brief Ring buffer class for data buffering in IO.
35 * Enables easy usage for sync and async mode.
36 */
37class RingBuffer {
38 public:
39 /*! \brief Initial capacity of ring buffer. */
40 static const int kInitCapacity = 4 << 10;
41 /*! \brief constructor */
42 RingBuffer() : ring_(kInitCapacity) {}
43 /*! \return number of bytes available in buffer. */
44 size_t bytes_available() const { return bytes_available_; }
45 /*! \return Current capacity of buffer. */
46 size_t capacity() const { return ring_.size(); }
47 /*!
48 * Reserve capacity to be at least n.
49 * Will only increase capacity if n is bigger than current capacity.
50 *
51 * The effect of Reserve only lasts before the next call to Reserve.
52 * Other functions in the ring buffer can also call into the reserve.
53 *
54 * \param n The size of capacity.
55 */
56 void Reserve(size_t n) {
57 if (ring_.size() < n) {
58 size_t old_size = ring_.size();
59 size_t new_size = static_cast<size_t>(n * 1.2);
60 ring_.resize(new_size);
61 if (head_ptr_ + bytes_available_ > old_size) {
62 // copy the ring overflow part into the tail.
63 size_t ncopy = head_ptr_ + bytes_available_ - old_size;
64 memcpy(&ring_[0] + old_size, &ring_[0], ncopy);
65 }
66 } else if (ring_.size() > n * 8 && ring_.size() > kInitCapacity) {
67 // shrink too large temporary buffer to
68 // avoid out of memory on some embedded devices
69 if (bytes_available_ != 0) {
70 // move existing bytes to the head.
71 size_t old_bytes = bytes_available_;
72 std::vector<char> tmp(old_bytes);
73 Read(&tmp[0], old_bytes);
74
75 memcpy(&ring_[0], &tmp[0], old_bytes);
76 bytes_available_ = old_bytes;
77 }
78 // shrink the ring.
79 size_t new_size = kInitCapacity;
80 new_size = std::max(new_size, n);
81 new_size = std::max(new_size, bytes_available_);
82
83 ring_.resize(new_size);
84 ring_.shrink_to_fit();
85 head_ptr_ = 0;
86 }
87 }
88
89 /*!
90 * \brief Perform a non-blocking read from buffer
91 * size must be smaller than this->bytes_available()
92 * \param data the data pointer.
93 * \param size The number of bytes to read.
94 */
95 void Read(void* data, size_t size) {
96 ICHECK_GE(bytes_available_, size);
97 size_t ncopy = std::min(size, ring_.size() - head_ptr_);
98 memcpy(data, &ring_[0] + head_ptr_, ncopy);
99 if (ncopy < size) {
100 memcpy(reinterpret_cast<char*>(data) + ncopy, &ring_[0], size - ncopy);
101 }
102 head_ptr_ = (head_ptr_ + size) % ring_.size();
103 bytes_available_ -= size;
104 }
105 /*!
106 * \brief Read data from buffer with and put them to non-blocking send function.
107 *
108 * \param fsend A send function handle to put the data to.
109 * \param max_nbytes Maximum number of bytes can to read.
110 * \tparam FSend A non-blocking function with signature size_t (const void* data, size_t size);
111 */
112 template <typename FSend>
113 size_t ReadWithCallback(FSend fsend, size_t max_nbytes) {
114 size_t size = std::min(max_nbytes, bytes_available_);
115 ICHECK_NE(size, 0U);
116 size_t ncopy = std::min(size, ring_.size() - head_ptr_);
117 size_t nsend = fsend(&ring_[0] + head_ptr_, ncopy);
118 bytes_available_ -= nsend;
119 if (ncopy == nsend && ncopy < size) {
120 size_t nsend2 = fsend(&ring_[0], size - ncopy);
121 bytes_available_ -= nsend2;
122 nsend += nsend2;
123 }
124 return nsend;
125 }
126 /*!
127 * \brief Write data into buffer, always ensures all data is written.
128 * \param data The data pointer
129 * \param size The size of data to be written.
130 */
131 void Write(const void* data, size_t size) {
132 this->Reserve(bytes_available_ + size);
133 size_t tail = head_ptr_ + bytes_available_;
134 if (tail >= ring_.size()) {
135 memcpy(&ring_[0] + (tail - ring_.size()), data, size);
136 } else {
137 size_t ncopy = std::min(ring_.size() - tail, size);
138 memcpy(&ring_[0] + tail, data, ncopy);
139 if (ncopy < size) {
140 memcpy(&ring_[0], reinterpret_cast<const char*>(data) + ncopy, size - ncopy);
141 }
142 }
143 bytes_available_ += size;
144 }
145 /*!
146 * \brief Written data into the buffer by give it a non-blocking callback function.
147 *
148 * \param frecv A receive function handle
149 * \param max_nbytes Maximum number of bytes can write.
150 * \tparam FRecv A non-blocking function with signature size_t (void* data, size_t size);
151 */
152 template <typename FRecv>
153 size_t WriteWithCallback(FRecv frecv, size_t max_nbytes) {
154 this->Reserve(bytes_available_ + max_nbytes);
155 size_t nbytes = max_nbytes;
156 size_t tail = head_ptr_ + bytes_available_;
157 if (tail >= ring_.size()) {
158 size_t nrecv = frecv(&ring_[0] + (tail - ring_.size()), nbytes);
159 bytes_available_ += nrecv;
160 return nrecv;
161 } else {
162 size_t ncopy = std::min(ring_.size() - tail, nbytes);
163 size_t nrecv = frecv(&ring_[0] + tail, ncopy);
164 bytes_available_ += nrecv;
165 if (nrecv == ncopy && ncopy < nbytes) {
166 size_t nrecv2 = frecv(&ring_[0], nbytes - ncopy);
167 bytes_available_ += nrecv2;
168 nrecv += nrecv2;
169 }
170 return nrecv;
171 }
172 }
173
174 private:
175 // buffer head
176 size_t head_ptr_{0};
177 // number of bytes occupied in the buffer.
178 size_t bytes_available_{0};
179 // The internal data ring.
180 std::vector<char> ring_;
181};
182} // namespace support
183} // namespace tvm
184#endif // TVM_SUPPORT_RING_BUFFER_H_
185