1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | |
19 | #include <inttypes.h> |
20 | #include <gflags/gflags.h> |
21 | #include "butil/fd_guard.h" // fd_guard |
22 | #include "butil/fd_utility.h" // make_close_on_exec |
23 | #include "butil/time.h" // gettimeofday_us |
24 | #include "brpc/acceptor.h" |
25 | |
26 | |
27 | namespace brpc { |
28 | |
29 | static const int INITIAL_CONNECTION_CAP = 65536; |
30 | |
31 | Acceptor::Acceptor(bthread_keytable_pool_t* pool) |
32 | : InputMessenger() |
33 | , _keytable_pool(pool) |
34 | , _status(UNINITIALIZED) |
35 | , _idle_timeout_sec(-1) |
36 | , _close_idle_tid(INVALID_BTHREAD) |
37 | , _listened_fd(-1) |
38 | , _acception_id(0) |
39 | , _empty_cond(&_map_mutex) |
40 | , _ssl_ctx(NULL) { |
41 | } |
42 | |
43 | Acceptor::~Acceptor() { |
44 | StopAccept(0); |
45 | Join(); |
46 | } |
47 | |
48 | int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec, |
49 | const std::shared_ptr<SocketSSLContext>& ssl_ctx) { |
50 | if (listened_fd < 0) { |
51 | LOG(FATAL) << "Invalid listened_fd=" << listened_fd; |
52 | return -1; |
53 | } |
54 | |
55 | BAIDU_SCOPED_LOCK(_map_mutex); |
56 | if (_status == UNINITIALIZED) { |
57 | if (Initialize() != 0) { |
58 | LOG(FATAL) << "Fail to initialize Acceptor" ; |
59 | return -1; |
60 | } |
61 | _status = READY; |
62 | } |
63 | if (_status != READY) { |
64 | LOG(FATAL) << "Acceptor hasn't stopped yet: status=" << status(); |
65 | return -1; |
66 | } |
67 | if (idle_timeout_sec > 0) { |
68 | if (bthread_start_background(&_close_idle_tid, NULL, |
69 | CloseIdleConnections, this) != 0) { |
70 | LOG(FATAL) << "Fail to start bthread" ; |
71 | return -1; |
72 | } |
73 | } |
74 | _idle_timeout_sec = idle_timeout_sec; |
75 | _ssl_ctx = ssl_ctx; |
76 | |
77 | // Creation of _acception_id is inside lock so that OnNewConnections |
78 | // (which may run immediately) should see sane fields set below. |
79 | SocketOptions options; |
80 | options.fd = listened_fd; |
81 | options.user = this; |
82 | options.on_edge_triggered_events = OnNewConnections; |
83 | if (Socket::Create(options, &_acception_id) != 0) { |
84 | // Close-idle-socket thread will be stopped inside destructor |
85 | LOG(FATAL) << "Fail to create _acception_id" ; |
86 | return -1; |
87 | } |
88 | |
89 | _listened_fd = listened_fd; |
90 | _status = RUNNING; |
91 | return 0; |
92 | } |
93 | |
94 | void* Acceptor::CloseIdleConnections(void* arg) { |
95 | Acceptor* am = static_cast<Acceptor*>(arg); |
96 | std::vector<SocketId> checking_fds; |
97 | const uint64_t CHECK_INTERVAL_US = 1000000UL; |
98 | while (bthread_usleep(CHECK_INTERVAL_US) == 0) { |
99 | // TODO: this is not efficient for a lot of connections(>100K) |
100 | am->ListConnections(&checking_fds); |
101 | for (size_t i = 0; i < checking_fds.size(); ++i) { |
102 | SocketUniquePtr s; |
103 | if (Socket::Address(checking_fds[i], &s) == 0) { |
104 | s->ReleaseReferenceIfIdle(am->_idle_timeout_sec); |
105 | } |
106 | } |
107 | } |
108 | return NULL; |
109 | } |
110 | |
111 | void Acceptor::StopAccept(int /*closewait_ms*/) { |
112 | // Currently `closewait_ms' is useless since we have to wait until |
113 | // existing requests are finished. Otherwise, contexts depended by |
114 | // the requests may be deleted and invalid. |
115 | |
116 | { |
117 | BAIDU_SCOPED_LOCK(_map_mutex); |
118 | if (_status != RUNNING) { |
119 | return; |
120 | } |
121 | _status = STOPPING; |
122 | } |
123 | |
124 | // Don't set _acception_id to 0 because BeforeRecycle needs it. |
125 | Socket::SetFailed(_acception_id); |
126 | |
127 | // SetFailed all existing connections. Connections added after this piece |
128 | // of code will be SetFailed directly in OnNewConnectionsUntilEAGAIN |
129 | std::vector<SocketId> erasing_ids; |
130 | ListConnections(&erasing_ids); |
131 | |
132 | for (size_t i = 0; i < erasing_ids.size(); ++i) { |
133 | SocketUniquePtr socket; |
134 | if (Socket::Address(erasing_ids[i], &socket) == 0) { |
135 | if (socket->shall_fail_me_at_server_stop()) { |
136 | // Mainly streaming connections, should be SetFailed() to |
137 | // trigger callbacks to NotifyOnFailed() to remove references, |
138 | // otherwise the sockets are often referenced by corresponding |
139 | // objects and delay server's stopping which requires all |
140 | // existing sockets to be recycled. |
141 | socket->SetFailed(ELOGOFF, "Server is stopping" ); |
142 | } else { |
143 | // Message-oriented RPC connections. Just release the addtional |
144 | // reference in the socket, which will be recycled when current |
145 | // requests have been processed. |
146 | socket->ReleaseAdditionalReference(); |
147 | } |
148 | } // else: This socket already called `SetFailed' before |
149 | } |
150 | } |
151 | |
152 | int Acceptor::Initialize() { |
153 | if (_socket_map.init(INITIAL_CONNECTION_CAP) != 0) { |
154 | LOG(FATAL) << "Fail to initialize FlatMap, size=" |
155 | << INITIAL_CONNECTION_CAP; |
156 | return -1; |
157 | } |
158 | return 0; |
159 | } |
160 | |
161 | // NOTE: Join() can happen before StopAccept() |
162 | void Acceptor::Join() { |
163 | std::unique_lock<butil::Mutex> mu(_map_mutex); |
164 | if (_status != STOPPING && _status != RUNNING) { // no need to join. |
165 | return; |
166 | } |
167 | // `_listened_fd' will be set to -1 once it has been recycled |
168 | while (_listened_fd > 0 || !_socket_map.empty()) { |
169 | _empty_cond.Wait(); |
170 | } |
171 | const int saved_idle_timeout_sec = _idle_timeout_sec; |
172 | _idle_timeout_sec = 0; |
173 | const bthread_t saved_close_idle_tid = _close_idle_tid; |
174 | mu.unlock(); |
175 | |
176 | // Join the bthread outside lock. |
177 | if (saved_idle_timeout_sec > 0) { |
178 | bthread_stop(saved_close_idle_tid); |
179 | bthread_join(saved_close_idle_tid, NULL); |
180 | } |
181 | |
182 | { |
183 | BAIDU_SCOPED_LOCK(_map_mutex); |
184 | _status = READY; |
185 | } |
186 | } |
187 | |
188 | size_t Acceptor::ConnectionCount() const { |
189 | // Notice that _socket_map may be modified concurrently. This actually |
190 | // assumes that size() is safe to call concurrently. |
191 | return _socket_map.size(); |
192 | } |
193 | |
194 | void Acceptor::ListConnections(std::vector<SocketId>* conn_list, |
195 | size_t max_copied) { |
196 | if (conn_list == NULL) { |
197 | LOG(FATAL) << "Param[conn_list] is NULL" ; |
198 | return; |
199 | } |
200 | conn_list->clear(); |
201 | // Add additional 10(randomly small number) so that even if |
202 | // ConnectionCount is inaccurate, enough space is reserved |
203 | conn_list->reserve(ConnectionCount() + 10); |
204 | |
205 | std::unique_lock<butil::Mutex> mu(_map_mutex); |
206 | if (!_socket_map.initialized()) { |
207 | // Optional. Uninitialized FlatMap should be iteratable. |
208 | return; |
209 | } |
210 | // Copy all the SocketId (protected by mutex) into a temporary |
211 | // container to avoid dealing with sockets inside the mutex. |
212 | size_t ntotal = 0; |
213 | size_t n = 0; |
214 | for (SocketMap::const_iterator it = _socket_map.begin(); |
215 | it != _socket_map.end(); ++it, ++ntotal) { |
216 | if (ntotal >= max_copied) { |
217 | return; |
218 | } |
219 | if (++n >= 256/*max iterated one pass*/) { |
220 | SocketMap::PositionHint hint; |
221 | _socket_map.save_iterator(it, &hint); |
222 | n = 0; |
223 | mu.unlock(); // yield |
224 | mu.lock(); |
225 | it = _socket_map.restore_iterator(hint); |
226 | if (it == _socket_map.begin()) { // resized |
227 | conn_list->clear(); |
228 | } |
229 | if (it == _socket_map.end()) { |
230 | break; |
231 | } |
232 | } |
233 | conn_list->push_back(it->first); |
234 | } |
235 | } |
236 | |
237 | void Acceptor::ListConnections(std::vector<SocketId>* conn_list) { |
238 | return ListConnections(conn_list, std::numeric_limits<size_t>::max()); |
239 | } |
240 | |
241 | void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) { |
242 | while (1) { |
243 | struct sockaddr in_addr; |
244 | socklen_t in_len = sizeof(in_addr); |
245 | butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len)); |
246 | if (in_fd < 0) { |
247 | // no EINTR because listened fd is non-blocking. |
248 | if (errno == EAGAIN) { |
249 | return; |
250 | } |
251 | // Do NOT return -1 when `accept' failed, otherwise `_listened_fd' |
252 | // will be closed. Continue to consume all the events until EAGAIN |
253 | // instead. |
254 | // If the accept was failed, the error may repeat constantly, |
255 | // limit frequency of logging. |
256 | PLOG_EVERY_SECOND(ERROR) |
257 | << "Fail to accept from listened_fd=" << acception->fd(); |
258 | continue; |
259 | } |
260 | |
261 | Acceptor* am = dynamic_cast<Acceptor*>(acception->user()); |
262 | if (NULL == am) { |
263 | LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor" ; |
264 | acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor" ); |
265 | return; |
266 | } |
267 | |
268 | SocketId socket_id; |
269 | SocketOptions options; |
270 | options.keytable_pool = am->_keytable_pool; |
271 | options.fd = in_fd; |
272 | options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr); |
273 | options.user = acception->user(); |
274 | options.on_edge_triggered_events = InputMessenger::OnNewMessages; |
275 | options.initial_ssl_ctx = am->_ssl_ctx; |
276 | if (Socket::Create(options, &socket_id) != 0) { |
277 | LOG(ERROR) << "Fail to create Socket" ; |
278 | continue; |
279 | } |
280 | in_fd.release(); // transfer ownership to socket_id |
281 | |
282 | // There's a funny race condition here. After Socket::Create, messages |
283 | // from the socket are already handled and a RPC is possibly done |
284 | // before the socket is added into _socket_map below. This is found in |
285 | // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running |
286 | // on machines with few cores) where the _messenger.ConnectionCount() |
287 | // may surprisingly be 0 even if the RPC is already done. |
288 | |
289 | SocketUniquePtr sock; |
290 | if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) { |
291 | bool is_running = true; |
292 | { |
293 | BAIDU_SCOPED_LOCK(am->_map_mutex); |
294 | is_running = (am->status() == RUNNING); |
295 | // Always add this socket into `_socket_map' whether it |
296 | // has been `SetFailed' or not, whether `Acceptor' is |
297 | // running or not. Otherwise, `Acceptor::BeforeRecycle' |
298 | // may be called (inside Socket::OnRecycle) after `Acceptor' |
299 | // has been destroyed |
300 | am->_socket_map.insert(socket_id, ConnectStatistics()); |
301 | } |
302 | if (!is_running) { |
303 | LOG(WARNING) << "Acceptor on fd=" << acception->fd() |
304 | << " has been stopped, discard newly created " << *sock; |
305 | sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, " |
306 | "discard newly created %s" , acception->fd(), |
307 | sock->description().c_str()); |
308 | return; |
309 | } |
310 | } // else: The socket has already been destroyed, Don't add its id |
311 | // into _socket_map |
312 | } |
313 | } |
314 | |
315 | void Acceptor::OnNewConnections(Socket* acception) { |
316 | int progress = Socket::PROGRESS_INIT; |
317 | do { |
318 | OnNewConnectionsUntilEAGAIN(acception); |
319 | if (acception->Failed()) { |
320 | return; |
321 | } |
322 | } while (acception->MoreReadEvents(&progress)); |
323 | } |
324 | |
325 | void Acceptor::BeforeRecycle(Socket* sock) { |
326 | BAIDU_SCOPED_LOCK(_map_mutex); |
327 | if (sock->id() == _acception_id) { |
328 | // Set _listened_fd to -1 when acception socket has been recycled |
329 | // so that we are ensured no more events will arrive (and `Join' |
330 | // will return to its caller) |
331 | _listened_fd = -1; |
332 | _empty_cond.Broadcast(); |
333 | return; |
334 | } |
335 | // If a Socket could not be addressed shortly after its creation, it |
336 | // was not added into `_socket_map'. |
337 | _socket_map.erase(sock->id()); |
338 | if (_socket_map.empty()) { |
339 | _empty_cond.Broadcast(); |
340 | } |
341 | } |
342 | |
343 | } // namespace brpc |
344 | |