1/*
2 * Copyright (c) 2019, Redis Labs
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "connhelpers.h"
32
33/* The connections module provides a lean abstraction of network connections
34 * to avoid direct socket and async event management across the Redis code base.
35 *
36 * It does NOT provide advanced connection features commonly found in similar
37 * libraries such as complete in/out buffer management, throttling, etc. These
38 * functions remain in networking.c.
39 *
40 * The primary goal is to allow transparent handling of TCP and TLS based
41 * connections. To do so, connections have the following properties:
42 *
43 * 1. A connection may live before its corresponding socket exists. This
44 * allows various context and configuration setting to be handled before
45 * establishing the actual connection.
46 * 2. The caller may register/unregister logical read/write handlers to be
47 * called when the connection has data to read from/can accept writes.
48 * These logical handlers may or may not correspond to actual AE events,
49 * depending on the implementation (for TCP they are; for TLS they aren't).
50 */
51
52ConnectionType CT_Socket;
53
54/* When a connection is created we must know its type already, but the
55 * underlying socket may or may not exist:
56 *
57 * - For accepted connections, it exists as we do not model the listen/accept
58 * part; So caller calls connCreateSocket() followed by connAccept().
59 * - For outgoing connections, the socket is created by the connection module
60 * itself; So caller calls connCreateSocket() followed by connConnect(),
61 * which registers a connect callback that fires on connected/error state
62 * (and after any transport level handshake was done).
63 *
64 * NOTE: An earlier version relied on connections being part of other structs
65 * and not independently allocated. This could lead to further optimizations
66 * like using container_of(), etc. However it was discontinued in favor of
67 * this approach for these reasons:
68 *
69 * 1. In some cases conns are created/handled outside the context of the
70 * containing struct, in which case it gets a bit awkward to copy them.
71 * 2. Future implementations may wish to allocate arbitrary data for the
72 * connection.
73 * 3. The container_of() approach is anyway risky because connections may
74 * be embedded in different structs, not just client.
75 */
76
77connection *connCreateSocket() {
78 connection *conn = zcalloc(sizeof(connection));
79 conn->type = &CT_Socket;
80 conn->fd = -1;
81
82 return conn;
83}
84
85/* Create a new socket-type connection that is already associated with
86 * an accepted connection.
87 *
88 * The socket is not ready for I/O until connAccept() was called and
89 * invoked the connection-level accept handler.
90 *
91 * Callers should use connGetState() and verify the created connection
92 * is not in an error state (which is not possible for a socket connection,
93 * but could but possible with other protocols).
94 */
95connection *connCreateAcceptedSocket(int fd) {
96 connection *conn = connCreateSocket();
97 conn->fd = fd;
98 conn->state = CONN_STATE_ACCEPTING;
99 return conn;
100}
101
102static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr,
103 ConnectionCallbackFunc connect_handler) {
104 int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr);
105 if (fd == -1) {
106 conn->state = CONN_STATE_ERROR;
107 conn->last_errno = errno;
108 return C_ERR;
109 }
110
111 conn->fd = fd;
112 conn->state = CONN_STATE_CONNECTING;
113
114 conn->conn_handler = connect_handler;
115 aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE,
116 conn->type->ae_handler, conn);
117
118 return C_OK;
119}
120
121/* Returns true if a write handler is registered */
122int connHasWriteHandler(connection *conn) {
123 return conn->write_handler != NULL;
124}
125
126/* Returns true if a read handler is registered */
127int connHasReadHandler(connection *conn) {
128 return conn->read_handler != NULL;
129}
130
131/* Associate a private data pointer with the connection */
132void connSetPrivateData(connection *conn, void *data) {
133 conn->private_data = data;
134}
135
136/* Get the associated private data pointer */
137void *connGetPrivateData(connection *conn) {
138 return conn->private_data;
139}
140
141/* ------ Pure socket connections ------- */
142
143/* A very incomplete list of implementation-specific calls. Much of the above shall
144 * move here as we implement additional connection types.
145 */
146
147/* Close the connection and free resources. */
148static void connSocketClose(connection *conn) {
149 if (conn->fd != -1) {
150 aeDeleteFileEvent(server.el,conn->fd, AE_READABLE | AE_WRITABLE);
151 close(conn->fd);
152 conn->fd = -1;
153 }
154
155 /* If called from within a handler, schedule the close but
156 * keep the connection until the handler returns.
157 */
158 if (connHasRefs(conn)) {
159 conn->flags |= CONN_FLAG_CLOSE_SCHEDULED;
160 return;
161 }
162
163 zfree(conn);
164}
165
166static int connSocketWrite(connection *conn, const void *data, size_t data_len) {
167 int ret = write(conn->fd, data, data_len);
168 if (ret < 0 && errno != EAGAIN) {
169 conn->last_errno = errno;
170
171 /* Don't overwrite the state of a connection that is not already
172 * connected, not to mess with handler callbacks.
173 */
174 if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
175 conn->state = CONN_STATE_ERROR;
176 }
177
178 return ret;
179}
180
181static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) {
182 int ret = writev(conn->fd, iov, iovcnt);
183 if (ret < 0 && errno != EAGAIN) {
184 conn->last_errno = errno;
185
186 /* Don't overwrite the state of a connection that is not already
187 * connected, not to mess with handler callbacks.
188 */
189 if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
190 conn->state = CONN_STATE_ERROR;
191 }
192
193 return ret;
194}
195
196static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
197 int ret = read(conn->fd, buf, buf_len);
198 if (!ret) {
199 conn->state = CONN_STATE_CLOSED;
200 } else if (ret < 0 && errno != EAGAIN) {
201 conn->last_errno = errno;
202
203 /* Don't overwrite the state of a connection that is not already
204 * connected, not to mess with handler callbacks.
205 */
206 if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
207 conn->state = CONN_STATE_ERROR;
208 }
209
210 return ret;
211}
212
213static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
214 int ret = C_OK;
215
216 if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
217 conn->state = CONN_STATE_CONNECTED;
218
219 connIncrRefs(conn);
220 if (!callHandler(conn, accept_handler)) ret = C_ERR;
221 connDecrRefs(conn);
222
223 return ret;
224}
225
226/* Register a write handler, to be called when the connection is writable.
227 * If NULL, the existing handler is removed.
228 *
229 * The barrier flag indicates a write barrier is requested, resulting with
230 * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is
231 * always called before and not after the read handler in a single event
232 * loop.
233 */
234static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
235 if (func == conn->write_handler) return C_OK;
236
237 conn->write_handler = func;
238 if (barrier)
239 conn->flags |= CONN_FLAG_WRITE_BARRIER;
240 else
241 conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
242 if (!conn->write_handler)
243 aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
244 else
245 if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
246 conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
247 return C_OK;
248}
249
250/* Register a read handler, to be called when the connection is readable.
251 * If NULL, the existing handler is removed.
252 */
253static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
254 if (func == conn->read_handler) return C_OK;
255
256 conn->read_handler = func;
257 if (!conn->read_handler)
258 aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
259 else
260 if (aeCreateFileEvent(server.el,conn->fd,
261 AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
262 return C_OK;
263}
264
265static const char *connSocketGetLastError(connection *conn) {
266 return strerror(conn->last_errno);
267}
268
269static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
270{
271 UNUSED(el);
272 UNUSED(fd);
273 connection *conn = clientData;
274
275 if (conn->state == CONN_STATE_CONNECTING &&
276 (mask & AE_WRITABLE) && conn->conn_handler) {
277
278 int conn_error = connGetSocketError(conn);
279 if (conn_error) {
280 conn->last_errno = conn_error;
281 conn->state = CONN_STATE_ERROR;
282 } else {
283 conn->state = CONN_STATE_CONNECTED;
284 }
285
286 if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
287
288 if (!callHandler(conn, conn->conn_handler)) return;
289 conn->conn_handler = NULL;
290 }
291
292 /* Normally we execute the readable event first, and the writable
293 * event later. This is useful as sometimes we may be able
294 * to serve the reply of a query immediately after processing the
295 * query.
296 *
297 * However if WRITE_BARRIER is set in the mask, our application is
298 * asking us to do the reverse: never fire the writable event
299 * after the readable. In such a case, we invert the calls.
300 * This is useful when, for instance, we want to do things
301 * in the beforeSleep() hook, like fsync'ing a file to disk,
302 * before replying to a client. */
303 int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
304
305 int call_write = (mask & AE_WRITABLE) && conn->write_handler;
306 int call_read = (mask & AE_READABLE) && conn->read_handler;
307
308 /* Handle normal I/O flows */
309 if (!invert && call_read) {
310 if (!callHandler(conn, conn->read_handler)) return;
311 }
312 /* Fire the writable event. */
313 if (call_write) {
314 if (!callHandler(conn, conn->write_handler)) return;
315 }
316 /* If we have to invert the call, fire the readable event now
317 * after the writable one. */
318 if (invert && call_read) {
319 if (!callHandler(conn, conn->read_handler)) return;
320 }
321}
322
323static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
324 int fd = anetTcpNonBlockConnect(NULL,addr,port);
325 if (fd == -1) {
326 conn->state = CONN_STATE_ERROR;
327 conn->last_errno = errno;
328 return C_ERR;
329 }
330
331 if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) {
332 conn->state = CONN_STATE_ERROR;
333 conn->last_errno = ETIMEDOUT;
334 }
335
336 conn->fd = fd;
337 conn->state = CONN_STATE_CONNECTED;
338 return C_OK;
339}
340
341/* Connection-based versions of syncio.c functions.
342 * NOTE: This should ideally be refactored out in favor of pure async work.
343 */
344
345static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
346 return syncWrite(conn->fd, ptr, size, timeout);
347}
348
349static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
350 return syncRead(conn->fd, ptr, size, timeout);
351}
352
353static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
354 return syncReadLine(conn->fd, ptr, size, timeout);
355}
356
357static int connSocketGetType(connection *conn) {
358 (void) conn;
359
360 return CONN_TYPE_SOCKET;
361}
362
363ConnectionType CT_Socket = {
364 .ae_handler = connSocketEventHandler,
365 .close = connSocketClose,
366 .write = connSocketWrite,
367 .writev = connSocketWritev,
368 .read = connSocketRead,
369 .accept = connSocketAccept,
370 .connect = connSocketConnect,
371 .set_write_handler = connSocketSetWriteHandler,
372 .set_read_handler = connSocketSetReadHandler,
373 .get_last_error = connSocketGetLastError,
374 .blocking_connect = connSocketBlockingConnect,
375 .sync_write = connSocketSyncWrite,
376 .sync_read = connSocketSyncRead,
377 .sync_readline = connSocketSyncReadLine,
378 .get_type = connSocketGetType
379};
380
381
382int connGetSocketError(connection *conn) {
383 int sockerr = 0;
384 socklen_t errlen = sizeof(sockerr);
385
386 if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
387 sockerr = errno;
388 return sockerr;
389}
390
391int connPeerToString(connection *conn, char *ip, size_t ip_len, int *port) {
392 return anetFdToString(conn ? conn->fd : -1, ip, ip_len, port, FD_TO_PEER_NAME);
393}
394
395int connSockName(connection *conn, char *ip, size_t ip_len, int *port) {
396 return anetFdToString(conn->fd, ip, ip_len, port, FD_TO_SOCK_NAME);
397}
398
399int connFormatFdAddr(connection *conn, char *buf, size_t buf_len, int fd_to_str_type) {
400 return anetFormatFdAddr(conn ? conn->fd : -1, buf, buf_len, fd_to_str_type);
401}
402
403int connBlock(connection *conn) {
404 if (conn->fd == -1) return C_ERR;
405 return anetBlock(NULL, conn->fd);
406}
407
408int connNonBlock(connection *conn) {
409 if (conn->fd == -1) return C_ERR;
410 return anetNonBlock(NULL, conn->fd);
411}
412
413int connEnableTcpNoDelay(connection *conn) {
414 if (conn->fd == -1) return C_ERR;
415 return anetEnableTcpNoDelay(NULL, conn->fd);
416}
417
418int connDisableTcpNoDelay(connection *conn) {
419 if (conn->fd == -1) return C_ERR;
420 return anetDisableTcpNoDelay(NULL, conn->fd);
421}
422
423int connKeepAlive(connection *conn, int interval) {
424 if (conn->fd == -1) return C_ERR;
425 return anetKeepAlive(NULL, conn->fd, interval);
426}
427
428int connSendTimeout(connection *conn, long long ms) {
429 return anetSendTimeout(NULL, conn->fd, ms);
430}
431
432int connRecvTimeout(connection *conn, long long ms) {
433 return anetRecvTimeout(NULL, conn->fd, ms);
434}
435
436int connGetState(connection *conn) {
437 return conn->state;
438}
439
440/* Return a text that describes the connection, suitable for inclusion
441 * in CLIENT LIST and similar outputs.
442 *
443 * For sockets, we always return "fd=<fdnum>" to maintain compatibility.
444 */
445const char *connGetInfo(connection *conn, char *buf, size_t buf_len) {
446 snprintf(buf, buf_len-1, "fd=%i", conn == NULL ? -1 : conn->fd);
447 return buf;
448}
449
450