1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18
19#include <inttypes.h>
20#include <gflags/gflags.h>
21#include "butil/fd_guard.h" // fd_guard
22#include "butil/fd_utility.h" // make_close_on_exec
23#include "butil/time.h" // gettimeofday_us
24#include "brpc/acceptor.h"
25
26
27namespace brpc {
28
29static const int INITIAL_CONNECTION_CAP = 65536;
30
31Acceptor::Acceptor(bthread_keytable_pool_t* pool)
32 : InputMessenger()
33 , _keytable_pool(pool)
34 , _status(UNINITIALIZED)
35 , _idle_timeout_sec(-1)
36 , _close_idle_tid(INVALID_BTHREAD)
37 , _listened_fd(-1)
38 , _acception_id(0)
39 , _empty_cond(&_map_mutex)
40 , _ssl_ctx(NULL) {
41}
42
43Acceptor::~Acceptor() {
44 StopAccept(0);
45 Join();
46}
47
48int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
49 const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
50 if (listened_fd < 0) {
51 LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
52 return -1;
53 }
54
55 BAIDU_SCOPED_LOCK(_map_mutex);
56 if (_status == UNINITIALIZED) {
57 if (Initialize() != 0) {
58 LOG(FATAL) << "Fail to initialize Acceptor";
59 return -1;
60 }
61 _status = READY;
62 }
63 if (_status != READY) {
64 LOG(FATAL) << "Acceptor hasn't stopped yet: status=" << status();
65 return -1;
66 }
67 if (idle_timeout_sec > 0) {
68 if (bthread_start_background(&_close_idle_tid, NULL,
69 CloseIdleConnections, this) != 0) {
70 LOG(FATAL) << "Fail to start bthread";
71 return -1;
72 }
73 }
74 _idle_timeout_sec = idle_timeout_sec;
75 _ssl_ctx = ssl_ctx;
76
77 // Creation of _acception_id is inside lock so that OnNewConnections
78 // (which may run immediately) should see sane fields set below.
79 SocketOptions options;
80 options.fd = listened_fd;
81 options.user = this;
82 options.on_edge_triggered_events = OnNewConnections;
83 if (Socket::Create(options, &_acception_id) != 0) {
84 // Close-idle-socket thread will be stopped inside destructor
85 LOG(FATAL) << "Fail to create _acception_id";
86 return -1;
87 }
88
89 _listened_fd = listened_fd;
90 _status = RUNNING;
91 return 0;
92}
93
94void* Acceptor::CloseIdleConnections(void* arg) {
95 Acceptor* am = static_cast<Acceptor*>(arg);
96 std::vector<SocketId> checking_fds;
97 const uint64_t CHECK_INTERVAL_US = 1000000UL;
98 while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
99 // TODO: this is not efficient for a lot of connections(>100K)
100 am->ListConnections(&checking_fds);
101 for (size_t i = 0; i < checking_fds.size(); ++i) {
102 SocketUniquePtr s;
103 if (Socket::Address(checking_fds[i], &s) == 0) {
104 s->ReleaseReferenceIfIdle(am->_idle_timeout_sec);
105 }
106 }
107 }
108 return NULL;
109}
110
111void Acceptor::StopAccept(int /*closewait_ms*/) {
112 // Currently `closewait_ms' is useless since we have to wait until
113 // existing requests are finished. Otherwise, contexts depended by
114 // the requests may be deleted and invalid.
115
116 {
117 BAIDU_SCOPED_LOCK(_map_mutex);
118 if (_status != RUNNING) {
119 return;
120 }
121 _status = STOPPING;
122 }
123
124 // Don't set _acception_id to 0 because BeforeRecycle needs it.
125 Socket::SetFailed(_acception_id);
126
127 // SetFailed all existing connections. Connections added after this piece
128 // of code will be SetFailed directly in OnNewConnectionsUntilEAGAIN
129 std::vector<SocketId> erasing_ids;
130 ListConnections(&erasing_ids);
131
132 for (size_t i = 0; i < erasing_ids.size(); ++i) {
133 SocketUniquePtr socket;
134 if (Socket::Address(erasing_ids[i], &socket) == 0) {
135 if (socket->shall_fail_me_at_server_stop()) {
136 // Mainly streaming connections, should be SetFailed() to
137 // trigger callbacks to NotifyOnFailed() to remove references,
138 // otherwise the sockets are often referenced by corresponding
139 // objects and delay server's stopping which requires all
140 // existing sockets to be recycled.
141 socket->SetFailed(ELOGOFF, "Server is stopping");
142 } else {
143 // Message-oriented RPC connections. Just release the addtional
144 // reference in the socket, which will be recycled when current
145 // requests have been processed.
146 socket->ReleaseAdditionalReference();
147 }
148 } // else: This socket already called `SetFailed' before
149 }
150}
151
152int Acceptor::Initialize() {
153 if (_socket_map.init(INITIAL_CONNECTION_CAP) != 0) {
154 LOG(FATAL) << "Fail to initialize FlatMap, size="
155 << INITIAL_CONNECTION_CAP;
156 return -1;
157 }
158 return 0;
159}
160
161// NOTE: Join() can happen before StopAccept()
162void Acceptor::Join() {
163 std::unique_lock<butil::Mutex> mu(_map_mutex);
164 if (_status != STOPPING && _status != RUNNING) { // no need to join.
165 return;
166 }
167 // `_listened_fd' will be set to -1 once it has been recycled
168 while (_listened_fd > 0 || !_socket_map.empty()) {
169 _empty_cond.Wait();
170 }
171 const int saved_idle_timeout_sec = _idle_timeout_sec;
172 _idle_timeout_sec = 0;
173 const bthread_t saved_close_idle_tid = _close_idle_tid;
174 mu.unlock();
175
176 // Join the bthread outside lock.
177 if (saved_idle_timeout_sec > 0) {
178 bthread_stop(saved_close_idle_tid);
179 bthread_join(saved_close_idle_tid, NULL);
180 }
181
182 {
183 BAIDU_SCOPED_LOCK(_map_mutex);
184 _status = READY;
185 }
186}
187
188size_t Acceptor::ConnectionCount() const {
189 // Notice that _socket_map may be modified concurrently. This actually
190 // assumes that size() is safe to call concurrently.
191 return _socket_map.size();
192}
193
194void Acceptor::ListConnections(std::vector<SocketId>* conn_list,
195 size_t max_copied) {
196 if (conn_list == NULL) {
197 LOG(FATAL) << "Param[conn_list] is NULL";
198 return;
199 }
200 conn_list->clear();
201 // Add additional 10(randomly small number) so that even if
202 // ConnectionCount is inaccurate, enough space is reserved
203 conn_list->reserve(ConnectionCount() + 10);
204
205 std::unique_lock<butil::Mutex> mu(_map_mutex);
206 if (!_socket_map.initialized()) {
207 // Optional. Uninitialized FlatMap should be iteratable.
208 return;
209 }
210 // Copy all the SocketId (protected by mutex) into a temporary
211 // container to avoid dealing with sockets inside the mutex.
212 size_t ntotal = 0;
213 size_t n = 0;
214 for (SocketMap::const_iterator it = _socket_map.begin();
215 it != _socket_map.end(); ++it, ++ntotal) {
216 if (ntotal >= max_copied) {
217 return;
218 }
219 if (++n >= 256/*max iterated one pass*/) {
220 SocketMap::PositionHint hint;
221 _socket_map.save_iterator(it, &hint);
222 n = 0;
223 mu.unlock(); // yield
224 mu.lock();
225 it = _socket_map.restore_iterator(hint);
226 if (it == _socket_map.begin()) { // resized
227 conn_list->clear();
228 }
229 if (it == _socket_map.end()) {
230 break;
231 }
232 }
233 conn_list->push_back(it->first);
234 }
235}
236
237void Acceptor::ListConnections(std::vector<SocketId>* conn_list) {
238 return ListConnections(conn_list, std::numeric_limits<size_t>::max());
239}
240
241void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
242 while (1) {
243 struct sockaddr in_addr;
244 socklen_t in_len = sizeof(in_addr);
245 butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
246 if (in_fd < 0) {
247 // no EINTR because listened fd is non-blocking.
248 if (errno == EAGAIN) {
249 return;
250 }
251 // Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
252 // will be closed. Continue to consume all the events until EAGAIN
253 // instead.
254 // If the accept was failed, the error may repeat constantly,
255 // limit frequency of logging.
256 PLOG_EVERY_SECOND(ERROR)
257 << "Fail to accept from listened_fd=" << acception->fd();
258 continue;
259 }
260
261 Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
262 if (NULL == am) {
263 LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
264 acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
265 return;
266 }
267
268 SocketId socket_id;
269 SocketOptions options;
270 options.keytable_pool = am->_keytable_pool;
271 options.fd = in_fd;
272 options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
273 options.user = acception->user();
274 options.on_edge_triggered_events = InputMessenger::OnNewMessages;
275 options.initial_ssl_ctx = am->_ssl_ctx;
276 if (Socket::Create(options, &socket_id) != 0) {
277 LOG(ERROR) << "Fail to create Socket";
278 continue;
279 }
280 in_fd.release(); // transfer ownership to socket_id
281
282 // There's a funny race condition here. After Socket::Create, messages
283 // from the socket are already handled and a RPC is possibly done
284 // before the socket is added into _socket_map below. This is found in
285 // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
286 // on machines with few cores) where the _messenger.ConnectionCount()
287 // may surprisingly be 0 even if the RPC is already done.
288
289 SocketUniquePtr sock;
290 if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
291 bool is_running = true;
292 {
293 BAIDU_SCOPED_LOCK(am->_map_mutex);
294 is_running = (am->status() == RUNNING);
295 // Always add this socket into `_socket_map' whether it
296 // has been `SetFailed' or not, whether `Acceptor' is
297 // running or not. Otherwise, `Acceptor::BeforeRecycle'
298 // may be called (inside Socket::OnRecycle) after `Acceptor'
299 // has been destroyed
300 am->_socket_map.insert(socket_id, ConnectStatistics());
301 }
302 if (!is_running) {
303 LOG(WARNING) << "Acceptor on fd=" << acception->fd()
304 << " has been stopped, discard newly created " << *sock;
305 sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "
306 "discard newly created %s", acception->fd(),
307 sock->description().c_str());
308 return;
309 }
310 } // else: The socket has already been destroyed, Don't add its id
311 // into _socket_map
312 }
313}
314
315void Acceptor::OnNewConnections(Socket* acception) {
316 int progress = Socket::PROGRESS_INIT;
317 do {
318 OnNewConnectionsUntilEAGAIN(acception);
319 if (acception->Failed()) {
320 return;
321 }
322 } while (acception->MoreReadEvents(&progress));
323}
324
325void Acceptor::BeforeRecycle(Socket* sock) {
326 BAIDU_SCOPED_LOCK(_map_mutex);
327 if (sock->id() == _acception_id) {
328 // Set _listened_fd to -1 when acception socket has been recycled
329 // so that we are ensured no more events will arrive (and `Join'
330 // will return to its caller)
331 _listened_fd = -1;
332 _empty_cond.Broadcast();
333 return;
334 }
335 // If a Socket could not be addressed shortly after its creation, it
336 // was not added into `_socket_map'.
337 _socket_map.erase(sock->id());
338 if (_socket_map.empty()) {
339 _empty_cond.Broadcast();
340 }
341}
342
343} // namespace brpc
344