1 | /* |
2 | * Copyright (c) 2019, Redis Labs |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "connhelpers.h" |
32 | |
33 | /* The connections module provides a lean abstraction of network connections |
34 | * to avoid direct socket and async event management across the Redis code base. |
35 | * |
36 | * It does NOT provide advanced connection features commonly found in similar |
37 | * libraries such as complete in/out buffer management, throttling, etc. These |
38 | * functions remain in networking.c. |
39 | * |
40 | * The primary goal is to allow transparent handling of TCP and TLS based |
41 | * connections. To do so, connections have the following properties: |
42 | * |
43 | * 1. A connection may live before its corresponding socket exists. This |
44 | * allows various context and configuration setting to be handled before |
45 | * establishing the actual connection. |
46 | * 2. The caller may register/unregister logical read/write handlers to be |
47 | * called when the connection has data to read from/can accept writes. |
48 | * These logical handlers may or may not correspond to actual AE events, |
49 | * depending on the implementation (for TCP they are; for TLS they aren't). |
50 | */ |
51 | |
52 | ConnectionType CT_Socket; |
53 | |
54 | /* When a connection is created we must know its type already, but the |
55 | * underlying socket may or may not exist: |
56 | * |
57 | * - For accepted connections, it exists as we do not model the listen/accept |
58 | * part; So caller calls connCreateSocket() followed by connAccept(). |
59 | * - For outgoing connections, the socket is created by the connection module |
60 | * itself; So caller calls connCreateSocket() followed by connConnect(), |
61 | * which registers a connect callback that fires on connected/error state |
62 | * (and after any transport level handshake was done). |
63 | * |
64 | * NOTE: An earlier version relied on connections being part of other structs |
65 | * and not independently allocated. This could lead to further optimizations |
66 | * like using container_of(), etc. However it was discontinued in favor of |
67 | * this approach for these reasons: |
68 | * |
69 | * 1. In some cases conns are created/handled outside the context of the |
70 | * containing struct, in which case it gets a bit awkward to copy them. |
71 | * 2. Future implementations may wish to allocate arbitrary data for the |
72 | * connection. |
73 | * 3. The container_of() approach is anyway risky because connections may |
74 | * be embedded in different structs, not just client. |
75 | */ |
76 | |
77 | connection *connCreateSocket() { |
78 | connection *conn = zcalloc(sizeof(connection)); |
79 | conn->type = &CT_Socket; |
80 | conn->fd = -1; |
81 | |
82 | return conn; |
83 | } |
84 | |
85 | /* Create a new socket-type connection that is already associated with |
86 | * an accepted connection. |
87 | * |
88 | * The socket is not ready for I/O until connAccept() was called and |
89 | * invoked the connection-level accept handler. |
90 | * |
91 | * Callers should use connGetState() and verify the created connection |
92 | * is not in an error state (which is not possible for a socket connection, |
93 | * but could but possible with other protocols). |
94 | */ |
95 | connection *connCreateAcceptedSocket(int fd) { |
96 | connection *conn = connCreateSocket(); |
97 | conn->fd = fd; |
98 | conn->state = CONN_STATE_ACCEPTING; |
99 | return conn; |
100 | } |
101 | |
102 | static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr, |
103 | ConnectionCallbackFunc connect_handler) { |
104 | int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr); |
105 | if (fd == -1) { |
106 | conn->state = CONN_STATE_ERROR; |
107 | conn->last_errno = errno; |
108 | return C_ERR; |
109 | } |
110 | |
111 | conn->fd = fd; |
112 | conn->state = CONN_STATE_CONNECTING; |
113 | |
114 | conn->conn_handler = connect_handler; |
115 | aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, |
116 | conn->type->ae_handler, conn); |
117 | |
118 | return C_OK; |
119 | } |
120 | |
121 | /* Returns true if a write handler is registered */ |
122 | int connHasWriteHandler(connection *conn) { |
123 | return conn->write_handler != NULL; |
124 | } |
125 | |
126 | /* Returns true if a read handler is registered */ |
127 | int connHasReadHandler(connection *conn) { |
128 | return conn->read_handler != NULL; |
129 | } |
130 | |
131 | /* Associate a private data pointer with the connection */ |
132 | void connSetPrivateData(connection *conn, void *data) { |
133 | conn->private_data = data; |
134 | } |
135 | |
136 | /* Get the associated private data pointer */ |
137 | void *connGetPrivateData(connection *conn) { |
138 | return conn->private_data; |
139 | } |
140 | |
141 | /* ------ Pure socket connections ------- */ |
142 | |
143 | /* A very incomplete list of implementation-specific calls. Much of the above shall |
144 | * move here as we implement additional connection types. |
145 | */ |
146 | |
147 | /* Close the connection and free resources. */ |
148 | static void connSocketClose(connection *conn) { |
149 | if (conn->fd != -1) { |
150 | aeDeleteFileEvent(server.el,conn->fd, AE_READABLE | AE_WRITABLE); |
151 | close(conn->fd); |
152 | conn->fd = -1; |
153 | } |
154 | |
155 | /* If called from within a handler, schedule the close but |
156 | * keep the connection until the handler returns. |
157 | */ |
158 | if (connHasRefs(conn)) { |
159 | conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; |
160 | return; |
161 | } |
162 | |
163 | zfree(conn); |
164 | } |
165 | |
166 | static int connSocketWrite(connection *conn, const void *data, size_t data_len) { |
167 | int ret = write(conn->fd, data, data_len); |
168 | if (ret < 0 && errno != EAGAIN) { |
169 | conn->last_errno = errno; |
170 | |
171 | /* Don't overwrite the state of a connection that is not already |
172 | * connected, not to mess with handler callbacks. |
173 | */ |
174 | if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) |
175 | conn->state = CONN_STATE_ERROR; |
176 | } |
177 | |
178 | return ret; |
179 | } |
180 | |
181 | static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) { |
182 | int ret = writev(conn->fd, iov, iovcnt); |
183 | if (ret < 0 && errno != EAGAIN) { |
184 | conn->last_errno = errno; |
185 | |
186 | /* Don't overwrite the state of a connection that is not already |
187 | * connected, not to mess with handler callbacks. |
188 | */ |
189 | if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) |
190 | conn->state = CONN_STATE_ERROR; |
191 | } |
192 | |
193 | return ret; |
194 | } |
195 | |
196 | static int connSocketRead(connection *conn, void *buf, size_t buf_len) { |
197 | int ret = read(conn->fd, buf, buf_len); |
198 | if (!ret) { |
199 | conn->state = CONN_STATE_CLOSED; |
200 | } else if (ret < 0 && errno != EAGAIN) { |
201 | conn->last_errno = errno; |
202 | |
203 | /* Don't overwrite the state of a connection that is not already |
204 | * connected, not to mess with handler callbacks. |
205 | */ |
206 | if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) |
207 | conn->state = CONN_STATE_ERROR; |
208 | } |
209 | |
210 | return ret; |
211 | } |
212 | |
213 | static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { |
214 | int ret = C_OK; |
215 | |
216 | if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; |
217 | conn->state = CONN_STATE_CONNECTED; |
218 | |
219 | connIncrRefs(conn); |
220 | if (!callHandler(conn, accept_handler)) ret = C_ERR; |
221 | connDecrRefs(conn); |
222 | |
223 | return ret; |
224 | } |
225 | |
226 | /* Register a write handler, to be called when the connection is writable. |
227 | * If NULL, the existing handler is removed. |
228 | * |
229 | * The barrier flag indicates a write barrier is requested, resulting with |
230 | * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is |
231 | * always called before and not after the read handler in a single event |
232 | * loop. |
233 | */ |
234 | static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { |
235 | if (func == conn->write_handler) return C_OK; |
236 | |
237 | conn->write_handler = func; |
238 | if (barrier) |
239 | conn->flags |= CONN_FLAG_WRITE_BARRIER; |
240 | else |
241 | conn->flags &= ~CONN_FLAG_WRITE_BARRIER; |
242 | if (!conn->write_handler) |
243 | aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); |
244 | else |
245 | if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE, |
246 | conn->type->ae_handler,conn) == AE_ERR) return C_ERR; |
247 | return C_OK; |
248 | } |
249 | |
250 | /* Register a read handler, to be called when the connection is readable. |
251 | * If NULL, the existing handler is removed. |
252 | */ |
253 | static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { |
254 | if (func == conn->read_handler) return C_OK; |
255 | |
256 | conn->read_handler = func; |
257 | if (!conn->read_handler) |
258 | aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); |
259 | else |
260 | if (aeCreateFileEvent(server.el,conn->fd, |
261 | AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; |
262 | return C_OK; |
263 | } |
264 | |
265 | static const char *connSocketGetLastError(connection *conn) { |
266 | return strerror(conn->last_errno); |
267 | } |
268 | |
269 | static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) |
270 | { |
271 | UNUSED(el); |
272 | UNUSED(fd); |
273 | connection *conn = clientData; |
274 | |
275 | if (conn->state == CONN_STATE_CONNECTING && |
276 | (mask & AE_WRITABLE) && conn->conn_handler) { |
277 | |
278 | int conn_error = connGetSocketError(conn); |
279 | if (conn_error) { |
280 | conn->last_errno = conn_error; |
281 | conn->state = CONN_STATE_ERROR; |
282 | } else { |
283 | conn->state = CONN_STATE_CONNECTED; |
284 | } |
285 | |
286 | if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); |
287 | |
288 | if (!callHandler(conn, conn->conn_handler)) return; |
289 | conn->conn_handler = NULL; |
290 | } |
291 | |
292 | /* Normally we execute the readable event first, and the writable |
293 | * event later. This is useful as sometimes we may be able |
294 | * to serve the reply of a query immediately after processing the |
295 | * query. |
296 | * |
297 | * However if WRITE_BARRIER is set in the mask, our application is |
298 | * asking us to do the reverse: never fire the writable event |
299 | * after the readable. In such a case, we invert the calls. |
300 | * This is useful when, for instance, we want to do things |
301 | * in the beforeSleep() hook, like fsync'ing a file to disk, |
302 | * before replying to a client. */ |
303 | int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; |
304 | |
305 | int call_write = (mask & AE_WRITABLE) && conn->write_handler; |
306 | int call_read = (mask & AE_READABLE) && conn->read_handler; |
307 | |
308 | /* Handle normal I/O flows */ |
309 | if (!invert && call_read) { |
310 | if (!callHandler(conn, conn->read_handler)) return; |
311 | } |
312 | /* Fire the writable event. */ |
313 | if (call_write) { |
314 | if (!callHandler(conn, conn->write_handler)) return; |
315 | } |
316 | /* If we have to invert the call, fire the readable event now |
317 | * after the writable one. */ |
318 | if (invert && call_read) { |
319 | if (!callHandler(conn, conn->read_handler)) return; |
320 | } |
321 | } |
322 | |
323 | static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { |
324 | int fd = anetTcpNonBlockConnect(NULL,addr,port); |
325 | if (fd == -1) { |
326 | conn->state = CONN_STATE_ERROR; |
327 | conn->last_errno = errno; |
328 | return C_ERR; |
329 | } |
330 | |
331 | if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) { |
332 | conn->state = CONN_STATE_ERROR; |
333 | conn->last_errno = ETIMEDOUT; |
334 | } |
335 | |
336 | conn->fd = fd; |
337 | conn->state = CONN_STATE_CONNECTED; |
338 | return C_OK; |
339 | } |
340 | |
341 | /* Connection-based versions of syncio.c functions. |
342 | * NOTE: This should ideally be refactored out in favor of pure async work. |
343 | */ |
344 | |
345 | static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) { |
346 | return syncWrite(conn->fd, ptr, size, timeout); |
347 | } |
348 | |
349 | static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) { |
350 | return syncRead(conn->fd, ptr, size, timeout); |
351 | } |
352 | |
353 | static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) { |
354 | return syncReadLine(conn->fd, ptr, size, timeout); |
355 | } |
356 | |
357 | static int connSocketGetType(connection *conn) { |
358 | (void) conn; |
359 | |
360 | return CONN_TYPE_SOCKET; |
361 | } |
362 | |
363 | ConnectionType CT_Socket = { |
364 | .ae_handler = connSocketEventHandler, |
365 | .close = connSocketClose, |
366 | .write = connSocketWrite, |
367 | .writev = connSocketWritev, |
368 | .read = connSocketRead, |
369 | .accept = connSocketAccept, |
370 | .connect = connSocketConnect, |
371 | .set_write_handler = connSocketSetWriteHandler, |
372 | .set_read_handler = connSocketSetReadHandler, |
373 | .get_last_error = connSocketGetLastError, |
374 | .blocking_connect = connSocketBlockingConnect, |
375 | .sync_write = connSocketSyncWrite, |
376 | .sync_read = connSocketSyncRead, |
377 | .sync_readline = connSocketSyncReadLine, |
378 | .get_type = connSocketGetType |
379 | }; |
380 | |
381 | |
382 | int connGetSocketError(connection *conn) { |
383 | int sockerr = 0; |
384 | socklen_t errlen = sizeof(sockerr); |
385 | |
386 | if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) |
387 | sockerr = errno; |
388 | return sockerr; |
389 | } |
390 | |
391 | int connPeerToString(connection *conn, char *ip, size_t ip_len, int *port) { |
392 | return anetFdToString(conn ? conn->fd : -1, ip, ip_len, port, FD_TO_PEER_NAME); |
393 | } |
394 | |
395 | int connSockName(connection *conn, char *ip, size_t ip_len, int *port) { |
396 | return anetFdToString(conn->fd, ip, ip_len, port, FD_TO_SOCK_NAME); |
397 | } |
398 | |
399 | int connFormatFdAddr(connection *conn, char *buf, size_t buf_len, int fd_to_str_type) { |
400 | return anetFormatFdAddr(conn ? conn->fd : -1, buf, buf_len, fd_to_str_type); |
401 | } |
402 | |
403 | int connBlock(connection *conn) { |
404 | if (conn->fd == -1) return C_ERR; |
405 | return anetBlock(NULL, conn->fd); |
406 | } |
407 | |
408 | int connNonBlock(connection *conn) { |
409 | if (conn->fd == -1) return C_ERR; |
410 | return anetNonBlock(NULL, conn->fd); |
411 | } |
412 | |
413 | int connEnableTcpNoDelay(connection *conn) { |
414 | if (conn->fd == -1) return C_ERR; |
415 | return anetEnableTcpNoDelay(NULL, conn->fd); |
416 | } |
417 | |
418 | int connDisableTcpNoDelay(connection *conn) { |
419 | if (conn->fd == -1) return C_ERR; |
420 | return anetDisableTcpNoDelay(NULL, conn->fd); |
421 | } |
422 | |
423 | int connKeepAlive(connection *conn, int interval) { |
424 | if (conn->fd == -1) return C_ERR; |
425 | return anetKeepAlive(NULL, conn->fd, interval); |
426 | } |
427 | |
428 | int connSendTimeout(connection *conn, long long ms) { |
429 | return anetSendTimeout(NULL, conn->fd, ms); |
430 | } |
431 | |
432 | int connRecvTimeout(connection *conn, long long ms) { |
433 | return anetRecvTimeout(NULL, conn->fd, ms); |
434 | } |
435 | |
436 | int connGetState(connection *conn) { |
437 | return conn->state; |
438 | } |
439 | |
440 | /* Return a text that describes the connection, suitable for inclusion |
441 | * in CLIENT LIST and similar outputs. |
442 | * |
443 | * For sockets, we always return "fd=<fdnum>" to maintain compatibility. |
444 | */ |
445 | const char *connGetInfo(connection *conn, char *buf, size_t buf_len) { |
446 | snprintf(buf, buf_len-1, "fd=%i" , conn == NULL ? -1 : conn->fd); |
447 | return buf; |
448 | } |
449 | |
450 | |