1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "atomicvar.h"
32#include "cluster.h"
33#include "script.h"
34#include <sys/socket.h>
35#include <sys/uio.h>
36#include <math.h>
37#include <ctype.h>
38
39static void setProtocolError(const char *errstr, client *c);
40int postponeClientRead(client *c);
41int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
42
43/* Return the size consumed from the allocator, for the specified SDS string,
44 * including internal fragmentation. This function is used in order to compute
45 * the client output buffer size. */
46size_t sdsZmallocSize(sds s) {
47 void *sh = sdsAllocPtr(s);
48 return zmalloc_size(sh);
49}
50
51/* Return the amount of memory used by the sds string at object->ptr
52 * for a string object. This includes internal fragmentation. */
53size_t getStringObjectSdsUsedMemory(robj *o) {
54 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
55 switch(o->encoding) {
56 case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
57 case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
58 default: return 0; /* Just integer encoding for now. */
59 }
60}
61
62/* Return the length of a string object.
63 * This does NOT includes internal fragmentation or sds unused space. */
64size_t getStringObjectLen(robj *o) {
65 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
66 switch(o->encoding) {
67 case OBJ_ENCODING_RAW: return sdslen(o->ptr);
68 case OBJ_ENCODING_EMBSTR: return sdslen(o->ptr);
69 default: return 0; /* Just integer encoding for now. */
70 }
71}
72
73/* Client.reply list dup and free methods. */
74void *dupClientReplyValue(void *o) {
75 clientReplyBlock *old = o;
76 clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
77 memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
78 return buf;
79}
80
81void freeClientReplyValue(void *o) {
82 zfree(o);
83}
84
85int listMatchObjects(void *a, void *b) {
86 return equalStringObjects(a,b);
87}
88
89/* This function links the client to the global linked list of clients.
90 * unlinkClient() does the opposite, among other things. */
91void linkClient(client *c) {
92 listAddNodeTail(server.clients,c);
93 /* Note that we remember the linked list node where the client is stored,
94 * this way removing the client in unlinkClient() will not require
95 * a linear scan, but just a constant time operation. */
96 c->client_list_node = listLast(server.clients);
97 uint64_t id = htonu64(c->id);
98 raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
99}
100
101/* Initialize client authentication state.
102 */
103static void clientSetDefaultAuth(client *c) {
104 /* If the default user does not require authentication, the user is
105 * directly authenticated. */
106 c->user = DefaultUser;
107 c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
108 !(c->user->flags & USER_FLAG_DISABLED);
109}
110
111int authRequired(client *c) {
112 /* Check if the user is authenticated. This check is skipped in case
113 * the default user is flagged as "nopass" and is active. */
114 int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
115 (DefaultUser->flags & USER_FLAG_DISABLED)) &&
116 !c->authenticated;
117 return auth_required;
118}
119
120client *createClient(connection *conn) {
121 client *c = zmalloc(sizeof(client));
122
123 /* passing NULL as conn it is possible to create a non connected client.
124 * This is useful since all the commands needs to be executed
125 * in the context of a client. When commands are executed in other
126 * contexts (for instance a Lua script) we need a non connected client. */
127 if (conn) {
128 connEnableTcpNoDelay(conn);
129 if (server.tcpkeepalive)
130 connKeepAlive(conn,server.tcpkeepalive);
131 connSetReadHandler(conn, readQueryFromClient);
132 connSetPrivateData(conn, c);
133 }
134 c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);
135 selectDb(c,0);
136 uint64_t client_id;
137 atomicGetIncr(server.next_client_id, client_id, 1);
138 c->id = client_id;
139 c->resp = 2;
140 c->conn = conn;
141 c->name = NULL;
142 c->bufpos = 0;
143 c->buf_usable_size = zmalloc_usable_size(c->buf);
144 c->buf_peak = c->buf_usable_size;
145 c->buf_peak_last_reset_time = server.unixtime;
146 c->ref_repl_buf_node = NULL;
147 c->ref_block_pos = 0;
148 c->qb_pos = 0;
149 c->querybuf = sdsempty();
150 c->querybuf_peak = 0;
151 c->reqtype = 0;
152 c->argc = 0;
153 c->argv = NULL;
154 c->argv_len = 0;
155 c->argv_len_sum = 0;
156 c->original_argc = 0;
157 c->original_argv = NULL;
158 c->cmd = c->lastcmd = c->realcmd = NULL;
159 c->multibulklen = 0;
160 c->bulklen = -1;
161 c->sentlen = 0;
162 c->flags = 0;
163 c->slot = -1;
164 c->ctime = c->lastinteraction = server.unixtime;
165 clientSetDefaultAuth(c);
166 c->replstate = REPL_STATE_NONE;
167 c->repl_start_cmd_stream_on_ack = 0;
168 c->reploff = 0;
169 c->read_reploff = 0;
170 c->repl_applied = 0;
171 c->repl_ack_off = 0;
172 c->repl_ack_time = 0;
173 c->repl_last_partial_write = 0;
174 c->slave_listening_port = 0;
175 c->slave_addr = NULL;
176 c->slave_capa = SLAVE_CAPA_NONE;
177 c->slave_req = SLAVE_REQ_NONE;
178 c->reply = listCreate();
179 c->deferred_reply_errors = NULL;
180 c->reply_bytes = 0;
181 c->obuf_soft_limit_reached_time = 0;
182 listSetFreeMethod(c->reply,freeClientReplyValue);
183 listSetDupMethod(c->reply,dupClientReplyValue);
184 c->btype = BLOCKED_NONE;
185 c->bpop.timeout = 0;
186 c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType);
187 c->bpop.target = NULL;
188 c->bpop.xread_group = NULL;
189 c->bpop.xread_consumer = NULL;
190 c->bpop.xread_group_noack = 0;
191 c->bpop.numreplicas = 0;
192 c->bpop.reploffset = 0;
193 c->woff = 0;
194 c->watched_keys = listCreate();
195 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
196 c->pubsub_patterns = listCreate();
197 c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
198 c->peerid = NULL;
199 c->sockname = NULL;
200 c->client_list_node = NULL;
201 c->postponed_list_node = NULL;
202 c->pending_read_list_node = NULL;
203 c->client_tracking_redirection = 0;
204 c->client_tracking_prefixes = NULL;
205 c->last_memory_usage = 0;
206 c->last_memory_type = CLIENT_TYPE_NORMAL;
207 c->auth_callback = NULL;
208 c->auth_callback_privdata = NULL;
209 c->auth_module = NULL;
210 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
211 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
212 c->mem_usage_bucket = NULL;
213 c->mem_usage_bucket_node = NULL;
214 if (conn) linkClient(c);
215 initClientMultiState(c);
216 return c;
217}
218
219void installClientWriteHandler(client *c) {
220 int ae_barrier = 0;
221 /* For the fsync=always policy, we want that a given FD is never
222 * served for reading and writing in the same event loop iteration,
223 * so that in the middle of receiving the query, and serving it
224 * to the client, we'll call beforeSleep() that will do the
225 * actual fsync of AOF to disk. the write barrier ensures that. */
226 if (server.aof_state == AOF_ON &&
227 server.aof_fsync == AOF_FSYNC_ALWAYS)
228 {
229 ae_barrier = 1;
230 }
231 if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
232 freeClientAsync(c);
233 }
234}
235
236/* This function puts the client in the queue of clients that should write
237 * their output buffers to the socket. Note that it does not *yet* install
238 * the write handler, to start clients are put in a queue of clients that need
239 * to write, so we try to do that before returning in the event loop (see the
240 * handleClientsWithPendingWrites() function).
241 * If we fail and there is more data to write, compared to what the socket
242 * buffers can hold, then we'll really install the handler. */
243void putClientInPendingWriteQueue(client *c) {
244 /* Schedule the client to write the output buffers to the socket only
245 * if not already done and, for slaves, if the slave can actually receive
246 * writes at this stage. */
247 if (!(c->flags & CLIENT_PENDING_WRITE) &&
248 (c->replstate == REPL_STATE_NONE ||
249 (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
250 {
251 /* Here instead of installing the write handler, we just flag the
252 * client and put it into a list of clients that have something
253 * to write to the socket. This way before re-entering the event
254 * loop, we can try to directly write to the client sockets avoiding
255 * a system call. We'll only really install the write handler if
256 * we'll not be able to write the whole reply at once. */
257 c->flags |= CLIENT_PENDING_WRITE;
258 listAddNodeHead(server.clients_pending_write,c);
259 }
260}
261
262/* This function is called every time we are going to transmit new data
263 * to the client. The behavior is the following:
264 *
265 * If the client should receive new data (normal clients will) the function
266 * returns C_OK, and make sure to install the write handler in our event
267 * loop so that when the socket is writable new data gets written.
268 *
269 * If the client should not receive new data, because it is a fake client
270 * (used to load AOF in memory), a master or because the setup of the write
271 * handler failed, the function returns C_ERR.
272 *
273 * The function may return C_OK without actually installing the write
274 * event handler in the following cases:
275 *
276 * 1) The event handler should already be installed since the output buffer
277 * already contains something.
278 * 2) The client is a slave but not yet online, so we want to just accumulate
279 * writes in the buffer but not actually sending them yet.
280 *
281 * Typically gets called every time a reply is built, before adding more
282 * data to the clients output buffers. If the function returns C_ERR no
283 * data should be appended to the output buffers. */
284int prepareClientToWrite(client *c) {
285 /* If it's the Lua client we always return ok without installing any
286 * handler since there is no socket at all. */
287 if (c->flags & (CLIENT_SCRIPT|CLIENT_MODULE)) return C_OK;
288
289 /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
290 if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
291
292 /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
293 if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
294
295 /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
296 * is set. */
297 if ((c->flags & CLIENT_MASTER) &&
298 !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
299
300 if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
301
302 /* Schedule the client to write the output buffers to the socket, unless
303 * it should already be setup to do so (it has already pending data).
304 *
305 * If CLIENT_PENDING_READ is set, we're in an IO thread and should
306 * not put the client in pending write queue. Instead, it will be
307 * done by handleClientsWithPendingReadsUsingThreads() upon return.
308 */
309 if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
310 putClientInPendingWriteQueue(c);
311
312 /* Authorize the caller to queue in the output buffer of this client. */
313 return C_OK;
314}
315
316/* -----------------------------------------------------------------------------
317 * Low level functions to add more data to output buffers.
318 * -------------------------------------------------------------------------- */
319
320/* Attempts to add the reply to the static buffer in the client struct.
321 * Returns the length of data that is added to the reply buffer.
322 *
323 * Sanitizer suppression: client->buf_usable_size determined by
324 * zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses
325 * sanitizer and generates a false positive out-of-bounds error */
326REDIS_NO_SANITIZE("bounds")
327size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
328 size_t available = c->buf_usable_size - c->bufpos;
329
330 /* If there already are entries in the reply list, we cannot
331 * add anything more to the static buffer. */
332 if (listLength(c->reply) > 0) return 0;
333
334 size_t reply_len = len > available ? available : len;
335 memcpy(c->buf+c->bufpos,s,reply_len);
336 c->bufpos+=reply_len;
337 /* We update the buffer peak after appending the reply to the buffer */
338 if(c->buf_peak < (size_t)c->bufpos)
339 c->buf_peak = (size_t)c->bufpos;
340 return reply_len;
341}
342
343/* Adds the reply to the reply linked list.
344 * Note: some edits to this function need to be relayed to AddReplyFromClient. */
345void _addReplyProtoToList(client *c, const char *s, size_t len) {
346 listNode *ln = listLast(c->reply);
347 clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
348
349 /* Note that 'tail' may be NULL even if we have a tail node, because when
350 * addReplyDeferredLen() is used, it sets a dummy node to NULL just
351 * to fill it later, when the size of the bulk length is set. */
352
353 /* Append to tail string when possible. */
354 if (tail) {
355 /* Copy the part we can fit into the tail, and leave the rest for a
356 * new node */
357 size_t avail = tail->size - tail->used;
358 size_t copy = avail >= len? len: avail;
359 memcpy(tail->buf + tail->used, s, copy);
360 tail->used += copy;
361 s += copy;
362 len -= copy;
363 }
364 if (len) {
365 /* Create a new node, make sure it is allocated to at
366 * least PROTO_REPLY_CHUNK_BYTES */
367 size_t usable_size;
368 size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
369 tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
370 /* take over the allocation's internal fragmentation */
371 tail->size = usable_size - sizeof(clientReplyBlock);
372 tail->used = len;
373 memcpy(tail->buf, s, len);
374 listAddNodeTail(c->reply, tail);
375 c->reply_bytes += tail->size;
376
377 closeClientOnOutputBufferLimitReached(c, 1);
378 }
379}
380
381void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
382 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
383
384 /* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
385 * replication link that caused a reply to be generated we'll simply disconnect it.
386 * Note this is the simplest way to check a command added a response. Replication links are used to write data but
387 * not for responses, so we should normally never get here on a replica client. */
388 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
389 sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL;
390 logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'",
391 cmdname ? cmdname : "<unknown>");
392 return;
393 }
394
395 size_t reply_len = _addReplyToBuffer(c,s,len);
396 if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len);
397}
398
399/* -----------------------------------------------------------------------------
400 * Higher level functions to queue data on the client output buffer.
401 * The following functions are the ones that commands implementations will call.
402 * -------------------------------------------------------------------------- */
403
404/* Add the object 'obj' string representation to the client output buffer. */
405void addReply(client *c, robj *obj) {
406 if (prepareClientToWrite(c) != C_OK) return;
407
408 if (sdsEncodedObject(obj)) {
409 _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
410 } else if (obj->encoding == OBJ_ENCODING_INT) {
411 /* For integer encoded strings we just convert it into a string
412 * using our optimized function, and attach the resulting string
413 * to the output buffer. */
414 char buf[32];
415 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
416 _addReplyToBufferOrList(c,buf,len);
417 } else {
418 serverPanic("Wrong obj->encoding in addReply()");
419 }
420}
421
422/* Add the SDS 's' string to the client output buffer, as a side effect
423 * the SDS string is freed. */
424void addReplySds(client *c, sds s) {
425 if (prepareClientToWrite(c) != C_OK) {
426 /* The caller expects the sds to be free'd. */
427 sdsfree(s);
428 return;
429 }
430 _addReplyToBufferOrList(c,s,sdslen(s));
431 sdsfree(s);
432}
433
434/* This low level function just adds whatever protocol you send it to the
435 * client buffer, trying the static buffer initially, and using the string
436 * of objects if not possible.
437 *
438 * It is efficient because does not create an SDS object nor an Redis object
439 * if not needed. The object will only be created by calling
440 * _addReplyProtoToList() if we fail to extend the existing tail object
441 * in the list of objects. */
442void addReplyProto(client *c, const char *s, size_t len) {
443 if (prepareClientToWrite(c) != C_OK) return;
444 _addReplyToBufferOrList(c,s,len);
445}
446
447/* Low level function called by the addReplyError...() functions.
448 * It emits the protocol for a Redis error, in the form:
449 *
450 * -ERRORCODE Error Message<CR><LF>
451 *
452 * If the error code is already passed in the string 's', the error
453 * code provided is used, otherwise the string "-ERR " for the generic
454 * error code is automatically added.
455 * Note that 's' must NOT end with \r\n. */
456void addReplyErrorLength(client *c, const char *s, size_t len) {
457 /* If the string already starts with "-..." then the error code
458 * is provided by the caller. Otherwise we use "-ERR". */
459 if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
460 addReplyProto(c,s,len);
461 addReplyProto(c,"\r\n",2);
462}
463
464/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.)
465 * Possible flags:
466 * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to update any error stats. */
467void afterErrorReply(client *c, const char *s, size_t len, int flags) {
468 /* Module clients fall into two categories:
469 * Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted.
470 * Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */
471 if (c->flags & CLIENT_MODULE) {
472 if (!c->deferred_reply_errors) {
473 c->deferred_reply_errors = listCreate();
474 listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree);
475 }
476 listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len));
477 return;
478 }
479
480 if (!(flags & ERR_REPLY_FLAG_NO_STATS_UPDATE)) {
481 /* Increment the global error counter */
482 server.stat_total_error_replies++;
483 /* Increment the error stats
484 * If the string already starts with "-..." then the error prefix
485 * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
486 if (s[0] != '-') {
487 incrementErrorCount("ERR", 3);
488 } else {
489 char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
490 if (spaceloc) {
491 const size_t errEndPos = (size_t)(spaceloc - s);
492 incrementErrorCount(s+1, errEndPos-1);
493 } else {
494 /* Fallback to ERR if we can't retrieve the error prefix */
495 incrementErrorCount("ERR", 3);
496 }
497 }
498 } else {
499 /* stat_total_error_replies will not be updated, which means that
500 * the cmd stats will not be updated as well, we still want this command
501 * to be counted as failed so we update it here. We update c->realcmd in
502 * case c->cmd was changed (like in GEOADD). */
503 c->realcmd->failed_calls++;
504 }
505
506 /* Sometimes it could be normal that a slave replies to a master with
507 * an error and this function gets called. Actually the error will never
508 * be sent because addReply*() against master clients has no effect...
509 * A notable example is:
510 *
511 * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
512 *
513 * Where the master must propagate the first change even if the second
514 * will produce an error. However it is useful to log such events since
515 * they are rare and may hint at errors in a script or a bug in Redis. */
516 int ctype = getClientType(c);
517 if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
518 char *to, *from;
519
520 if (c->id == CLIENT_ID_AOF) {
521 to = "AOF-loading-client";
522 from = "server";
523 } else if (ctype == CLIENT_TYPE_MASTER) {
524 to = "master";
525 from = "replica";
526 } else {
527 to = "replica";
528 from = "master";
529 }
530
531 if (len > 4096) len = 4096;
532 sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL;
533 serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
534 "to its %s: '%.*s' after processing the command "
535 "'%s'", from, to, (int)len, s, cmdname ? cmdname : "<unknown>");
536 if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
537 server.repl_backlog->histlen > 0)
538 {
539 showLatestBacklog();
540 }
541 server.stat_unexpected_error_replies++;
542
543 /* Based off the propagation error behavior, check if we need to panic here. There
544 * are currently two checked cases:
545 * * If this command was from our master and we are not a writable replica.
546 * * We are reading from an AOF file. */
547 int panic_in_replicas = (ctype == CLIENT_TYPE_MASTER && server.repl_slave_ro)
548 && (server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC ||
549 server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS);
550 int panic_in_aof = c->id == CLIENT_ID_AOF
551 && server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC;
552 if (panic_in_replicas || panic_in_aof) {
553 serverPanic("This %s panicked sending an error to its %s"
554 " after processing the command '%s'",
555 from, to, cmdname ? cmdname : "<unknown>");
556 }
557 }
558}
559
560/* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
561 * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
562void addReplyErrorObject(client *c, robj *err) {
563 addReply(c, err);
564 afterErrorReply(c, err->ptr, sdslen(err->ptr)-2, 0); /* Ignore trailing \r\n */
565}
566
567/* Sends either a reply or an error reply by checking the first char.
568 * If the first char is '-' the reply is considered an error.
569 * In any case the given reply is sent, if the reply is also recognize
570 * as an error we also perform some post reply operations such as
571 * logging and stats update. */
572void addReplyOrErrorObject(client *c, robj *reply) {
573 serverAssert(sdsEncodedObject(reply));
574 sds rep = reply->ptr;
575 if (sdslen(rep) > 1 && rep[0] == '-') {
576 addReplyErrorObject(c, reply);
577 } else {
578 addReply(c, reply);
579 }
580}
581
582/* See addReplyErrorLength for expectations from the input string. */
583void addReplyError(client *c, const char *err) {
584 addReplyErrorLength(c,err,strlen(err));
585 afterErrorReply(c,err,strlen(err),0);
586}
587
588/* Add error reply to the given client.
589 * Supported flags:
590 * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to perform any error stats updates */
591void addReplyErrorSdsEx(client *c, sds err, int flags) {
592 addReplyErrorLength(c,err,sdslen(err));
593 afterErrorReply(c,err,sdslen(err),flags);
594 sdsfree(err);
595}
596
597/* See addReplyErrorLength for expectations from the input string. */
598/* As a side effect the SDS string is freed. */
599void addReplyErrorSds(client *c, sds err) {
600 addReplyErrorSdsEx(c, err, 0);
601}
602
603/* Internal function used by addReplyErrorFormat and addReplyErrorFormatEx.
604 * Refer to afterErrorReply for more information about the flags. */
605static void addReplyErrorFormatInternal(client *c, int flags, const char *fmt, va_list ap) {
606 va_list cpy;
607 va_copy(cpy,ap);
608 sds s = sdscatvprintf(sdsempty(),fmt,cpy);
609 va_end(cpy);
610 /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */
611 s = sdstrim(s, "\r\n");
612 /* Make sure there are no newlines in the middle of the string, otherwise
613 * invalid protocol is emitted. */
614 s = sdsmapchars(s, "\r\n", " ", 2);
615 addReplyErrorLength(c,s,sdslen(s));
616 afterErrorReply(c,s,sdslen(s),flags);
617 sdsfree(s);
618}
619
620void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...) {
621 va_list ap;
622 va_start(ap,fmt);
623 addReplyErrorFormatInternal(c, flags, fmt, ap);
624 va_end(ap);
625}
626
627/* See addReplyErrorLength for expectations from the formatted string.
628 * The formatted string is safe to contain \r and \n anywhere. */
629void addReplyErrorFormat(client *c, const char *fmt, ...) {
630 va_list ap;
631 va_start(ap,fmt);
632 addReplyErrorFormatInternal(c, 0, fmt, ap);
633 va_end(ap);
634}
635
636void addReplyErrorArity(client *c) {
637 addReplyErrorFormat(c, "wrong number of arguments for '%s' command",
638 c->cmd->fullname);
639}
640
641void addReplyErrorExpireTime(client *c) {
642 addReplyErrorFormat(c, "invalid expire time in '%s' command",
643 c->cmd->fullname);
644}
645
646void addReplyStatusLength(client *c, const char *s, size_t len) {
647 addReplyProto(c,"+",1);
648 addReplyProto(c,s,len);
649 addReplyProto(c,"\r\n",2);
650}
651
652void addReplyStatus(client *c, const char *status) {
653 addReplyStatusLength(c,status,strlen(status));
654}
655
656void addReplyStatusFormat(client *c, const char *fmt, ...) {
657 va_list ap;
658 va_start(ap,fmt);
659 sds s = sdscatvprintf(sdsempty(),fmt,ap);
660 va_end(ap);
661 addReplyStatusLength(c,s,sdslen(s));
662 sdsfree(s);
663}
664
665/* Sometimes we are forced to create a new reply node, and we can't append to
666 * the previous one, when that happens, we wanna try to trim the unused space
667 * at the end of the last reply node which we won't use anymore. */
668void trimReplyUnusedTailSpace(client *c) {
669 listNode *ln = listLast(c->reply);
670 clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
671
672 /* Note that 'tail' may be NULL even if we have a tail node, because when
673 * addReplyDeferredLen() is used */
674 if (!tail) return;
675
676 /* We only try to trim the space is relatively high (more than a 1/4 of the
677 * allocation), otherwise there's a high chance realloc will NOP.
678 * Also, to avoid large memmove which happens as part of realloc, we only do
679 * that if the used part is small. */
680 if (tail->size - tail->used > tail->size / 4 &&
681 tail->used < PROTO_REPLY_CHUNK_BYTES)
682 {
683 size_t old_size = tail->size;
684 tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
685 /* take over the allocation's internal fragmentation (at least for
686 * memory usage tracking) */
687 tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
688 c->reply_bytes = c->reply_bytes + tail->size - old_size;
689 listNodeValue(ln) = tail;
690 }
691}
692
693/* Adds an empty object to the reply list that will contain the multi bulk
694 * length, which is not known when this function is called. */
695void *addReplyDeferredLen(client *c) {
696 /* Note that we install the write event here even if the object is not
697 * ready to be sent, since we are sure that before returning to the
698 * event loop setDeferredAggregateLen() will be called. */
699 if (prepareClientToWrite(c) != C_OK) return NULL;
700
701 /* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
702 * replication link that caused a reply to be generated we'll simply disconnect it.
703 * Note this is the simplest way to check a command added a response. Replication links are used to write data but
704 * not for responses, so we should normally never get here on a replica client. */
705 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
706 sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL;
707 logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'",
708 cmdname ? cmdname : "<unknown>");
709 return NULL;
710 }
711
712 trimReplyUnusedTailSpace(c);
713 listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
714 return listLast(c->reply);
715}
716
717void setDeferredReply(client *c, void *node, const char *s, size_t length) {
718 listNode *ln = (listNode*)node;
719 clientReplyBlock *next, *prev;
720
721 /* Abort when *node is NULL: when the client should not accept writes
722 * we return NULL in addReplyDeferredLen() */
723 if (node == NULL) return;
724 serverAssert(!listNodeValue(ln));
725
726 /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(),
727 * with a new buffer structure containing the protocol needed to specify
728 * the length of the array following. However sometimes there might be room
729 * in the previous/next node so we can instead remove this NULL node, and
730 * suffix/prefix our data in the node immediately before/after it, in order
731 * to save a write(2) syscall later. Conditions needed to do it:
732 *
733 * - The prev node is non-NULL and has space in it or
734 * - The next node is non-NULL,
735 * - It has enough room already allocated
736 * - And not too large (avoid large memmove) */
737 if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) &&
738 prev->size - prev->used > 0)
739 {
740 size_t len_to_copy = prev->size - prev->used;
741 if (len_to_copy > length)
742 len_to_copy = length;
743 memcpy(prev->buf + prev->used, s, len_to_copy);
744 prev->used += len_to_copy;
745 length -= len_to_copy;
746 if (length == 0) {
747 listDelNode(c->reply, ln);
748 return;
749 }
750 s += len_to_copy;
751 }
752
753 if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
754 next->size - next->used >= length &&
755 next->used < PROTO_REPLY_CHUNK_BYTES * 4)
756 {
757 memmove(next->buf + length, next->buf, next->used);
758 memcpy(next->buf, s, length);
759 next->used += length;
760 listDelNode(c->reply,ln);
761 } else {
762 /* Create a new node */
763 clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
764 /* Take over the allocation's internal fragmentation */
765 buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
766 buf->used = length;
767 memcpy(buf->buf, s, length);
768 listNodeValue(ln) = buf;
769 c->reply_bytes += buf->size;
770
771 closeClientOnOutputBufferLimitReached(c, 1);
772 }
773}
774
775/* Populate the length object and try gluing it to the next chunk. */
776void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
777 serverAssert(length >= 0);
778
779 /* Abort when *node is NULL: when the client should not accept writes
780 * we return NULL in addReplyDeferredLen() */
781 if (node == NULL) return;
782
783 /* Things like *2\r\n, %3\r\n or ~4\r\n are emitted very often by the protocol
784 * so we have a few shared objects to use if the integer is small
785 * like it is most of the times. */
786 const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(length);
787 const int opt_hdr = length < OBJ_SHARED_BULKHDR_LEN;
788 if (prefix == '*' && opt_hdr) {
789 setDeferredReply(c, node, shared.mbulkhdr[length]->ptr, hdr_len);
790 return;
791 }
792 if (prefix == '%' && opt_hdr) {
793 setDeferredReply(c, node, shared.maphdr[length]->ptr, hdr_len);
794 return;
795 }
796 if (prefix == '~' && opt_hdr) {
797 setDeferredReply(c, node, shared.sethdr[length]->ptr, hdr_len);
798 return;
799 }
800
801 char lenstr[128];
802 size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
803 setDeferredReply(c, node, lenstr, lenstr_len);
804}
805
806void setDeferredArrayLen(client *c, void *node, long length) {
807 setDeferredAggregateLen(c,node,length,'*');
808}
809
810void setDeferredMapLen(client *c, void *node, long length) {
811 int prefix = c->resp == 2 ? '*' : '%';
812 if (c->resp == 2) length *= 2;
813 setDeferredAggregateLen(c,node,length,prefix);
814}
815
816void setDeferredSetLen(client *c, void *node, long length) {
817 int prefix = c->resp == 2 ? '*' : '~';
818 setDeferredAggregateLen(c,node,length,prefix);
819}
820
821void setDeferredAttributeLen(client *c, void *node, long length) {
822 serverAssert(c->resp >= 3);
823 setDeferredAggregateLen(c,node,length,'|');
824}
825
826void setDeferredPushLen(client *c, void *node, long length) {
827 serverAssert(c->resp >= 3);
828 setDeferredAggregateLen(c,node,length,'>');
829}
830
831/* Add a double as a bulk reply */
832void addReplyDouble(client *c, double d) {
833 if (isinf(d)) {
834 /* Libc in odd systems (Hi Solaris!) will format infinite in a
835 * different way, so better to handle it in an explicit way. */
836 if (c->resp == 2) {
837 addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
838 } else {
839 addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
840 d > 0 ? 6 : 7);
841 }
842 } else {
843 char dbuf[MAX_LONG_DOUBLE_CHARS+3],
844 sbuf[MAX_LONG_DOUBLE_CHARS+32];
845 int dlen, slen;
846 if (c->resp == 2) {
847 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
848 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
849 addReplyProto(c,sbuf,slen);
850 } else {
851 dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
852 addReplyProto(c,dbuf,dlen);
853 }
854 }
855}
856
857void addReplyBigNum(client *c, const char* num, size_t len) {
858 if (c->resp == 2) {
859 addReplyBulkCBuffer(c, num, len);
860 } else {
861 addReplyProto(c,"(",1);
862 addReplyProto(c,num,len);
863 addReply(c,shared.crlf);
864 }
865}
866
867/* Add a long double as a bulk reply, but uses a human readable formatting
868 * of the double instead of exposing the crude behavior of doubles to the
869 * dear user. */
870void addReplyHumanLongDouble(client *c, long double d) {
871 if (c->resp == 2) {
872 robj *o = createStringObjectFromLongDouble(d,1);
873 addReplyBulk(c,o);
874 decrRefCount(o);
875 } else {
876 char buf[MAX_LONG_DOUBLE_CHARS];
877 int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
878 addReplyProto(c,",",1);
879 addReplyProto(c,buf,len);
880 addReplyProto(c,"\r\n",2);
881 }
882}
883
884/* Add a long long as integer reply or bulk len / multi bulk count.
885 * Basically this is used to output <prefix><long long><crlf>. */
886void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
887 char buf[128];
888 int len;
889
890 /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
891 * so we have a few shared objects to use if the integer is small
892 * like it is most of the times. */
893 const int opt_hdr = ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0;
894 const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(ll);
895 if (prefix == '*' && opt_hdr) {
896 addReplyProto(c,shared.mbulkhdr[ll]->ptr,hdr_len);
897 return;
898 } else if (prefix == '$' && opt_hdr) {
899 addReplyProto(c,shared.bulkhdr[ll]->ptr,hdr_len);
900 return;
901 } else if (prefix == '%' && opt_hdr) {
902 addReplyProto(c,shared.maphdr[ll]->ptr,hdr_len);
903 return;
904 } else if (prefix == '~' && opt_hdr) {
905 addReplyProto(c,shared.sethdr[ll]->ptr,hdr_len);
906 return;
907 }
908
909 buf[0] = prefix;
910 len = ll2string(buf+1,sizeof(buf)-1,ll);
911 buf[len+1] = '\r';
912 buf[len+2] = '\n';
913 addReplyProto(c,buf,len+3);
914}
915
916void addReplyLongLong(client *c, long long ll) {
917 if (ll == 0)
918 addReply(c,shared.czero);
919 else if (ll == 1)
920 addReply(c,shared.cone);
921 else
922 addReplyLongLongWithPrefix(c,ll,':');
923}
924
925void addReplyAggregateLen(client *c, long length, int prefix) {
926 serverAssert(length >= 0);
927 addReplyLongLongWithPrefix(c,length,prefix);
928}
929
930void addReplyArrayLen(client *c, long length) {
931 addReplyAggregateLen(c,length,'*');
932}
933
934void addReplyMapLen(client *c, long length) {
935 int prefix = c->resp == 2 ? '*' : '%';
936 if (c->resp == 2) length *= 2;
937 addReplyAggregateLen(c,length,prefix);
938}
939
940void addReplySetLen(client *c, long length) {
941 int prefix = c->resp == 2 ? '*' : '~';
942 addReplyAggregateLen(c,length,prefix);
943}
944
945void addReplyAttributeLen(client *c, long length) {
946 serverAssert(c->resp >= 3);
947 addReplyAggregateLen(c,length,'|');
948}
949
950void addReplyPushLen(client *c, long length) {
951 serverAssert(c->resp >= 3);
952 addReplyAggregateLen(c,length,'>');
953}
954
955void addReplyNull(client *c) {
956 if (c->resp == 2) {
957 addReplyProto(c,"$-1\r\n",5);
958 } else {
959 addReplyProto(c,"_\r\n",3);
960 }
961}
962
963void addReplyBool(client *c, int b) {
964 if (c->resp == 2) {
965 addReply(c, b ? shared.cone : shared.czero);
966 } else {
967 addReplyProto(c, b ? "#t\r\n" : "#f\r\n",4);
968 }
969}
970
971/* A null array is a concept that no longer exists in RESP3. However
972 * RESP2 had it, so API-wise we have this call, that will emit the correct
973 * RESP2 protocol, however for RESP3 the reply will always be just the
974 * Null type "_\r\n". */
975void addReplyNullArray(client *c) {
976 if (c->resp == 2) {
977 addReplyProto(c,"*-1\r\n",5);
978 } else {
979 addReplyProto(c,"_\r\n",3);
980 }
981}
982
983/* Create the length prefix of a bulk reply, example: $2234 */
984void addReplyBulkLen(client *c, robj *obj) {
985 size_t len = stringObjectLen(obj);
986
987 addReplyLongLongWithPrefix(c,len,'$');
988}
989
990/* Add a Redis Object as a bulk reply */
991void addReplyBulk(client *c, robj *obj) {
992 addReplyBulkLen(c,obj);
993 addReply(c,obj);
994 addReply(c,shared.crlf);
995}
996
997/* Add a C buffer as bulk reply */
998void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
999 addReplyLongLongWithPrefix(c,len,'$');
1000 addReplyProto(c,p,len);
1001 addReply(c,shared.crlf);
1002}
1003
1004/* Add sds to reply (takes ownership of sds and frees it) */
1005void addReplyBulkSds(client *c, sds s) {
1006 addReplyLongLongWithPrefix(c,sdslen(s),'$');
1007 addReplySds(c,s);
1008 addReply(c,shared.crlf);
1009}
1010
1011/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
1012void setDeferredReplyBulkSds(client *c, void *node, sds s) {
1013 sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
1014 setDeferredReply(c, node, reply, sdslen(reply));
1015 sdsfree(reply);
1016 sdsfree(s);
1017}
1018
1019/* Add a C null term string as bulk reply */
1020void addReplyBulkCString(client *c, const char *s) {
1021 if (s == NULL) {
1022 addReplyNull(c);
1023 } else {
1024 addReplyBulkCBuffer(c,s,strlen(s));
1025 }
1026}
1027
1028/* Add a long long as a bulk reply */
1029void addReplyBulkLongLong(client *c, long long ll) {
1030 char buf[64];
1031 int len;
1032
1033 len = ll2string(buf,64,ll);
1034 addReplyBulkCBuffer(c,buf,len);
1035}
1036
1037/* Reply with a verbatim type having the specified extension.
1038 *
1039 * The 'ext' is the "extension" of the file, actually just a three
1040 * character type that describes the format of the verbatim string.
1041 * For instance "txt" means it should be interpreted as a text only
1042 * file by the receiver, "md " as markdown, and so forth. Only the
1043 * three first characters of the extension are used, and if the
1044 * provided one is shorter than that, the remaining is filled with
1045 * spaces. */
1046void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
1047 if (c->resp == 2) {
1048 addReplyBulkCBuffer(c,s,len);
1049 } else {
1050 char buf[32];
1051 size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
1052 char *p = buf+preflen-4;
1053 for (int i = 0; i < 3; i++) {
1054 if (*ext == '\0') {
1055 p[i] = ' ';
1056 } else {
1057 p[i] = *ext++;
1058 }
1059 }
1060 addReplyProto(c,buf,preflen);
1061 addReplyProto(c,s,len);
1062 addReplyProto(c,"\r\n",2);
1063 }
1064}
1065
1066/* Add an array of C strings as status replies with a heading.
1067 * This function is typically invoked by from commands that support
1068 * subcommands in response to the 'help' subcommand. The help array
1069 * is terminated by NULL sentinel. */
1070void addReplyHelp(client *c, const char **help) {
1071 sds cmd = sdsnew((char*) c->argv[0]->ptr);
1072 void *blenp = addReplyDeferredLen(c);
1073 int blen = 0;
1074
1075 sdstoupper(cmd);
1076 addReplyStatusFormat(c,
1077 "%s <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",cmd);
1078 sdsfree(cmd);
1079
1080 while (help[blen]) addReplyStatus(c,help[blen++]);
1081
1082 addReplyStatus(c,"HELP");
1083 addReplyStatus(c," Prints this help.");
1084
1085 blen += 1; /* Account for the header. */
1086 blen += 2; /* Account for the footer. */
1087 setDeferredArrayLen(c,blenp,blen);
1088}
1089
1090/* Add a suggestive error reply.
1091 * This function is typically invoked by from commands that support
1092 * subcommands in response to an unknown subcommand or argument error. */
1093void addReplySubcommandSyntaxError(client *c) {
1094 sds cmd = sdsnew((char*) c->argv[0]->ptr);
1095 sdstoupper(cmd);
1096 addReplyErrorFormat(c,
1097 "unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.",
1098 (char*)c->argv[1]->ptr,cmd);
1099 sdsfree(cmd);
1100}
1101
1102/* Append 'src' client output buffers into 'dst' client output buffers.
1103 * This function clears the output buffers of 'src' */
1104void AddReplyFromClient(client *dst, client *src) {
1105 /* If the source client contains a partial response due to client output
1106 * buffer limits, propagate that to the dest rather than copy a partial
1107 * reply. We don't wanna run the risk of copying partial response in case
1108 * for some reason the output limits don't reach the same decision (maybe
1109 * they changed) */
1110 if (src->flags & CLIENT_CLOSE_ASAP) {
1111 sds client = catClientInfoString(sdsempty(),dst);
1112 freeClientAsync(dst);
1113 serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
1114 sdsfree(client);
1115 return;
1116 }
1117
1118 /* First add the static buffer (either into the static buffer or reply list) */
1119 addReplyProto(dst,src->buf, src->bufpos);
1120
1121 /* We need to check with prepareClientToWrite again (after addReplyProto)
1122 * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
1123 if (prepareClientToWrite(dst) != C_OK)
1124 return;
1125
1126 /* We're bypassing _addReplyProtoToList, so we need to add the pre/post
1127 * checks in it. */
1128 if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
1129
1130 /* Concatenate the reply list into the dest */
1131 if (listLength(src->reply))
1132 listJoin(dst->reply,src->reply);
1133 dst->reply_bytes += src->reply_bytes;
1134 src->reply_bytes = 0;
1135 src->bufpos = 0;
1136
1137 if (src->deferred_reply_errors) {
1138 deferredAfterErrorReply(dst, src->deferred_reply_errors);
1139 listRelease(src->deferred_reply_errors);
1140 src->deferred_reply_errors = NULL;
1141 }
1142
1143 /* Check output buffer limits */
1144 closeClientOnOutputBufferLimitReached(dst, 1);
1145}
1146
1147/* Append the listed errors to the server error statistics. the input
1148 * list is not modified and remains the responsibility of the caller. */
1149void deferredAfterErrorReply(client *c, list *errors) {
1150 listIter li;
1151 listNode *ln;
1152 listRewind(errors,&li);
1153 while((ln = listNext(&li))) {
1154 sds err = ln->value;
1155 afterErrorReply(c, err, sdslen(err), 0);
1156 }
1157}
1158
1159/* Logically copy 'src' replica client buffers info to 'dst' replica.
1160 * Basically increase referenced buffer block node reference count. */
1161void copyReplicaOutputBuffer(client *dst, client *src) {
1162 serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
1163
1164 if (src->ref_repl_buf_node == NULL) return;
1165 dst->ref_repl_buf_node = src->ref_repl_buf_node;
1166 dst->ref_block_pos = src->ref_block_pos;
1167 ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
1168}
1169
1170/* Return true if the specified client has pending reply buffers to write to
1171 * the socket. */
1172int clientHasPendingReplies(client *c) {
1173 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1174 /* Replicas use global shared replication buffer instead of
1175 * private output buffer. */
1176 serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1177 if (c->ref_repl_buf_node == NULL) return 0;
1178
1179 /* If the last replication buffer block content is totally sent,
1180 * we have nothing to send. */
1181 listNode *ln = listLast(server.repl_buffer_blocks);
1182 replBufBlock *tail = listNodeValue(ln);
1183 if (ln == c->ref_repl_buf_node &&
1184 c->ref_block_pos == tail->used) return 0;
1185
1186 return 1;
1187 } else {
1188 return c->bufpos || listLength(c->reply);
1189 }
1190}
1191
1192/* Return true if client connected from loopback interface */
1193int islocalClient(client *c) {
1194 /* unix-socket */
1195 if (c->flags & CLIENT_UNIX_SOCKET) return 1;
1196
1197 /* tcp */
1198 char cip[NET_IP_STR_LEN+1] = { 0 };
1199 connPeerToString(c->conn, cip, sizeof(cip)-1, NULL);
1200
1201 return !strcmp(cip,"127.0.0.1") || !strcmp(cip,"::1");
1202}
1203
1204void clientAcceptHandler(connection *conn) {
1205 client *c = connGetPrivateData(conn);
1206
1207 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1208 serverLog(LL_WARNING,
1209 "Error accepting a client connection: %s",
1210 connGetLastError(conn));
1211 freeClientAsync(c);
1212 return;
1213 }
1214
1215 /* If the server is running in protected mode (the default) and there
1216 * is no password set, nor a specific interface is bound, we don't accept
1217 * requests from non loopback interfaces. Instead we try to explain the
1218 * user what to do to fix it if needed. */
1219 if (server.protected_mode &&
1220 DefaultUser->flags & USER_FLAG_NOPASS)
1221 {
1222 if (!islocalClient(c)) {
1223 char *err =
1224 "-DENIED Redis is running in protected mode because protected "
1225 "mode is enabled and no password is set for the default user. "
1226 "In this mode connections are only accepted from the loopback interface. "
1227 "If you want to connect from external computers to Redis you "
1228 "may adopt one of the following solutions: "
1229 "1) Just disable protected mode sending the command "
1230 "'CONFIG SET protected-mode no' from the loopback interface "
1231 "by connecting to Redis from the same host the server is "
1232 "running, however MAKE SURE Redis is not publicly accessible "
1233 "from internet if you do so. Use CONFIG REWRITE to make this "
1234 "change permanent. "
1235 "2) Alternatively you can just disable the protected mode by "
1236 "editing the Redis configuration file, and setting the protected "
1237 "mode option to 'no', and then restarting the server. "
1238 "3) If you started the server manually just for testing, restart "
1239 "it with the '--protected-mode no' option. "
1240 "4) Setup a an authentication password for the default user. "
1241 "NOTE: You only need to do one of the above things in order for "
1242 "the server to start accepting connections from the outside.\r\n";
1243 if (connWrite(c->conn,err,strlen(err)) == -1) {
1244 /* Nothing to do, Just to avoid the warning... */
1245 }
1246 server.stat_rejected_conn++;
1247 freeClientAsync(c);
1248 return;
1249 }
1250 }
1251
1252 server.stat_numconnections++;
1253 moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
1254 REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
1255 c);
1256}
1257
1258#define MAX_ACCEPTS_PER_CALL 1000
1259static void acceptCommonHandler(connection *conn, int flags, char *ip) {
1260 client *c;
1261 char conninfo[100];
1262 UNUSED(ip);
1263
1264 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
1265 serverLog(LL_VERBOSE,
1266 "Accepted client connection in error state: %s (conn: %s)",
1267 connGetLastError(conn),
1268 connGetInfo(conn, conninfo, sizeof(conninfo)));
1269 connClose(conn);
1270 return;
1271 }
1272
1273 /* Limit the number of connections we take at the same time.
1274 *
1275 * Admission control will happen before a client is created and connAccept()
1276 * called, because we don't want to even start transport-level negotiation
1277 * if rejected. */
1278 if (listLength(server.clients) + getClusterConnectionsCount()
1279 >= server.maxclients)
1280 {
1281 char *err;
1282 if (server.cluster_enabled)
1283 err = "-ERR max number of clients + cluster "
1284 "connections reached\r\n";
1285 else
1286 err = "-ERR max number of clients reached\r\n";
1287
1288 /* That's a best effort error message, don't check write errors.
1289 * Note that for TLS connections, no handshake was done yet so nothing
1290 * is written and the connection will just drop. */
1291 if (connWrite(conn,err,strlen(err)) == -1) {
1292 /* Nothing to do, Just to avoid the warning... */
1293 }
1294 server.stat_rejected_conn++;
1295 connClose(conn);
1296 return;
1297 }
1298
1299 /* Create connection and client */
1300 if ((c = createClient(conn)) == NULL) {
1301 serverLog(LL_WARNING,
1302 "Error registering fd event for the new client: %s (conn: %s)",
1303 connGetLastError(conn),
1304 connGetInfo(conn, conninfo, sizeof(conninfo)));
1305 connClose(conn); /* May be already closed, just ignore errors */
1306 return;
1307 }
1308
1309 /* Last chance to keep flags */
1310 c->flags |= flags;
1311
1312 /* Initiate accept.
1313 *
1314 * Note that connAccept() is free to do two things here:
1315 * 1. Call clientAcceptHandler() immediately;
1316 * 2. Schedule a future call to clientAcceptHandler().
1317 *
1318 * Because of that, we must do nothing else afterwards.
1319 */
1320 if (connAccept(conn, clientAcceptHandler) == C_ERR) {
1321 char conninfo[100];
1322 if (connGetState(conn) == CONN_STATE_ERROR)
1323 serverLog(LL_WARNING,
1324 "Error accepting a client connection: %s (conn: %s)",
1325 connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
1326 freeClient(connGetPrivateData(conn));
1327 return;
1328 }
1329}
1330
1331void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1332 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
1333 char cip[NET_IP_STR_LEN];
1334 UNUSED(el);
1335 UNUSED(mask);
1336 UNUSED(privdata);
1337
1338 while(max--) {
1339 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1340 if (cfd == ANET_ERR) {
1341 if (errno != EWOULDBLOCK)
1342 serverLog(LL_WARNING,
1343 "Accepting client connection: %s", server.neterr);
1344 return;
1345 }
1346 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
1347 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
1348 }
1349}
1350
1351void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1352 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
1353 char cip[NET_IP_STR_LEN];
1354 UNUSED(el);
1355 UNUSED(mask);
1356 UNUSED(privdata);
1357
1358 while(max--) {
1359 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1360 if (cfd == ANET_ERR) {
1361 if (errno != EWOULDBLOCK)
1362 serverLog(LL_WARNING,
1363 "Accepting client connection: %s", server.neterr);
1364 return;
1365 }
1366 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
1367 acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip);
1368 }
1369}
1370
1371void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1372 int cfd, max = MAX_ACCEPTS_PER_CALL;
1373 UNUSED(el);
1374 UNUSED(mask);
1375 UNUSED(privdata);
1376
1377 while(max--) {
1378 cfd = anetUnixAccept(server.neterr, fd);
1379 if (cfd == ANET_ERR) {
1380 if (errno != EWOULDBLOCK)
1381 serverLog(LL_WARNING,
1382 "Accepting client connection: %s", server.neterr);
1383 return;
1384 }
1385 serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
1386 acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL);
1387 }
1388}
1389
1390void freeClientOriginalArgv(client *c) {
1391 /* We didn't rewrite this client */
1392 if (!c->original_argv) return;
1393
1394 for (int j = 0; j < c->original_argc; j++)
1395 decrRefCount(c->original_argv[j]);
1396 zfree(c->original_argv);
1397 c->original_argv = NULL;
1398 c->original_argc = 0;
1399}
1400
1401void freeClientArgv(client *c) {
1402 int j;
1403 for (j = 0; j < c->argc; j++)
1404 decrRefCount(c->argv[j]);
1405 c->argc = 0;
1406 c->cmd = NULL;
1407 c->argv_len_sum = 0;
1408 c->argv_len = 0;
1409 zfree(c->argv);
1410 c->argv = NULL;
1411}
1412
1413/* Close all the slaves connections. This is useful in chained replication
1414 * when we resync with our own master and want to force all our slaves to
1415 * resync with us as well. */
1416void disconnectSlaves(void) {
1417 listIter li;
1418 listNode *ln;
1419 listRewind(server.slaves,&li);
1420 while((ln = listNext(&li))) {
1421 freeClient((client*)ln->value);
1422 }
1423}
1424
1425/* Check if there is any other slave waiting dumping RDB finished expect me.
1426 * This function is useful to judge current dumping RDB can be used for full
1427 * synchronization or not. */
1428int anyOtherSlaveWaitRdb(client *except_me) {
1429 listIter li;
1430 listNode *ln;
1431
1432 listRewind(server.slaves, &li);
1433 while((ln = listNext(&li))) {
1434 client *slave = ln->value;
1435 if (slave != except_me &&
1436 slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
1437 {
1438 return 1;
1439 }
1440 }
1441 return 0;
1442}
1443
1444/* Remove the specified client from global lists where the client could
1445 * be referenced, not including the Pub/Sub channels.
1446 * This is used by freeClient() and replicationCacheMaster(). */
1447void unlinkClient(client *c) {
1448 listNode *ln;
1449
1450 /* If this is marked as current client unset it. */
1451 if (server.current_client == c) server.current_client = NULL;
1452
1453 /* Certain operations must be done only if the client has an active connection.
1454 * If the client was already unlinked or if it's a "fake client" the
1455 * conn is already set to NULL. */
1456 if (c->conn) {
1457 /* Remove from the list of active clients. */
1458 if (c->client_list_node) {
1459 uint64_t id = htonu64(c->id);
1460 raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL);
1461 listDelNode(server.clients,c->client_list_node);
1462 c->client_list_node = NULL;
1463 }
1464
1465 /* Check if this is a replica waiting for diskless replication (rdb pipe),
1466 * in which case it needs to be cleaned from that list */
1467 if (c->flags & CLIENT_SLAVE &&
1468 c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1469 server.rdb_pipe_conns)
1470 {
1471 int i;
1472 for (i=0; i < server.rdb_pipe_numconns; i++) {
1473 if (server.rdb_pipe_conns[i] == c->conn) {
1474 rdbPipeWriteHandlerConnRemoved(c->conn);
1475 server.rdb_pipe_conns[i] = NULL;
1476 break;
1477 }
1478 }
1479 }
1480 connClose(c->conn);
1481 c->conn = NULL;
1482 }
1483
1484 /* Remove from the list of pending writes if needed. */
1485 if (c->flags & CLIENT_PENDING_WRITE) {
1486 ln = listSearchKey(server.clients_pending_write,c);
1487 serverAssert(ln != NULL);
1488 listDelNode(server.clients_pending_write,ln);
1489 c->flags &= ~CLIENT_PENDING_WRITE;
1490 }
1491
1492 /* Remove from the list of pending reads if needed. */
1493 serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
1494 if (c->pending_read_list_node != NULL) {
1495 listDelNode(server.clients_pending_read,c->pending_read_list_node);
1496 c->pending_read_list_node = NULL;
1497 }
1498
1499
1500 /* When client was just unblocked because of a blocking operation,
1501 * remove it from the list of unblocked clients. */
1502 if (c->flags & CLIENT_UNBLOCKED) {
1503 ln = listSearchKey(server.unblocked_clients,c);
1504 serverAssert(ln != NULL);
1505 listDelNode(server.unblocked_clients,ln);
1506 c->flags &= ~CLIENT_UNBLOCKED;
1507 }
1508
1509 /* Clear the tracking status. */
1510 if (c->flags & CLIENT_TRACKING) disableTracking(c);
1511}
1512
1513/* Clear the client state to resemble a newly connected client. */
1514void clearClientConnectionState(client *c) {
1515 listNode *ln;
1516
1517 /* MONITOR clients are also marked with CLIENT_SLAVE, we need to
1518 * distinguish between the two.
1519 */
1520 if (c->flags & CLIENT_MONITOR) {
1521 ln = listSearchKey(server.monitors,c);
1522 serverAssert(ln != NULL);
1523 listDelNode(server.monitors,ln);
1524
1525 c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
1526 }
1527
1528 serverAssert(!(c->flags &(CLIENT_SLAVE|CLIENT_MASTER)));
1529
1530 if (c->flags & CLIENT_TRACKING) disableTracking(c);
1531 selectDb(c,0);
1532 c->resp = 2;
1533
1534 clientSetDefaultAuth(c);
1535 moduleNotifyUserChanged(c);
1536 discardTransaction(c);
1537
1538 pubsubUnsubscribeAllChannels(c,0);
1539 pubsubUnsubscribeShardAllChannels(c, 0);
1540 pubsubUnsubscribeAllPatterns(c,0);
1541
1542 if (c->name) {
1543 decrRefCount(c->name);
1544 c->name = NULL;
1545 }
1546
1547 /* Selectively clear state flags not covered above */
1548 c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|
1549 CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT);
1550}
1551
1552void freeClient(client *c) {
1553 listNode *ln;
1554
1555 /* If a client is protected, yet we need to free it right now, make sure
1556 * to at least use asynchronous freeing. */
1557 if (c->flags & CLIENT_PROTECTED) {
1558 freeClientAsync(c);
1559 return;
1560 }
1561
1562 /* For connected clients, call the disconnection event of modules hooks. */
1563 if (c->conn) {
1564 moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
1565 REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED,
1566 c);
1567 }
1568
1569 /* Notify module system that this client auth status changed. */
1570 moduleNotifyUserChanged(c);
1571
1572 /* If this client was scheduled for async freeing we need to remove it
1573 * from the queue. Note that we need to do this here, because later
1574 * we may call replicationCacheMaster() and the client should already
1575 * be removed from the list of clients to free. */
1576 if (c->flags & CLIENT_CLOSE_ASAP) {
1577 ln = listSearchKey(server.clients_to_close,c);
1578 serverAssert(ln != NULL);
1579 listDelNode(server.clients_to_close,ln);
1580 }
1581
1582 /* If it is our master that's being disconnected we should make sure
1583 * to cache the state to try a partial resynchronization later.
1584 *
1585 * Note that before doing this we make sure that the client is not in
1586 * some unexpected state, by checking its flags. */
1587 if (server.master && c->flags & CLIENT_MASTER) {
1588 serverLog(LL_WARNING,"Connection with master lost.");
1589 if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) {
1590 c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
1591 replicationCacheMaster(c);
1592 return;
1593 }
1594 }
1595
1596 /* Log link disconnection with slave */
1597 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1598 serverLog(LL_WARNING,"Connection with replica %s lost.",
1599 replicationGetSlaveName(c));
1600 }
1601
1602 /* Free the query buffer */
1603 sdsfree(c->querybuf);
1604 c->querybuf = NULL;
1605
1606 /* Deallocate structures used to block on blocking ops. */
1607 if (c->flags & CLIENT_BLOCKED) unblockClient(c);
1608 dictRelease(c->bpop.keys);
1609
1610 /* UNWATCH all the keys */
1611 unwatchAllKeys(c);
1612 listRelease(c->watched_keys);
1613
1614 /* Unsubscribe from all the pubsub channels */
1615 pubsubUnsubscribeAllChannels(c,0);
1616 pubsubUnsubscribeShardAllChannels(c, 0);
1617 pubsubUnsubscribeAllPatterns(c,0);
1618 dictRelease(c->pubsub_channels);
1619 listRelease(c->pubsub_patterns);
1620 dictRelease(c->pubsubshard_channels);
1621
1622 /* Free data structures. */
1623 listRelease(c->reply);
1624 zfree(c->buf);
1625 freeReplicaReferencedReplBuffer(c);
1626 freeClientArgv(c);
1627 freeClientOriginalArgv(c);
1628 if (c->deferred_reply_errors)
1629 listRelease(c->deferred_reply_errors);
1630
1631 /* Unlink the client: this will close the socket, remove the I/O
1632 * handlers, and remove references of the client from different
1633 * places where active clients may be referenced. */
1634 unlinkClient(c);
1635
1636 /* Master/slave cleanup Case 1:
1637 * we lost the connection with a slave. */
1638 if (c->flags & CLIENT_SLAVE) {
1639 /* If there is no any other slave waiting dumping RDB finished, the
1640 * current child process need not continue to dump RDB, then we kill it.
1641 * So child process won't use more memory, and we also can fork a new
1642 * child process asap to dump rdb for next full synchronization or bgsave.
1643 * But we also need to check if users enable 'save' RDB, if enable, we
1644 * should not remove directly since that means RDB is important for users
1645 * to keep data safe and we may delay configured 'save' for full sync. */
1646 if (server.saveparamslen == 0 &&
1647 c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1648 server.child_type == CHILD_TYPE_RDB &&
1649 server.rdb_child_type == RDB_CHILD_TYPE_DISK &&
1650 anyOtherSlaveWaitRdb(c) == 0)
1651 {
1652 killRDBChild();
1653 }
1654 if (c->replstate == SLAVE_STATE_SEND_BULK) {
1655 if (c->repldbfd != -1) close(c->repldbfd);
1656 if (c->replpreamble) sdsfree(c->replpreamble);
1657 }
1658 list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
1659 ln = listSearchKey(l,c);
1660 serverAssert(ln != NULL);
1661 listDelNode(l,ln);
1662 /* We need to remember the time when we started to have zero
1663 * attached slaves, as after some time we'll free the replication
1664 * backlog. */
1665 if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
1666 server.repl_no_slaves_since = server.unixtime;
1667 refreshGoodSlavesCount();
1668 /* Fire the replica change modules event. */
1669 if (c->replstate == SLAVE_STATE_ONLINE)
1670 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
1671 REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
1672 NULL);
1673 }
1674
1675 /* Master/slave cleanup Case 2:
1676 * we lost the connection with the master. */
1677 if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
1678
1679 /* Remove the contribution that this client gave to our
1680 * incrementally computed memory usage. */
1681 server.stat_clients_type_memory[c->last_memory_type] -=
1682 c->last_memory_usage;
1683 /* Remove client from memory usage buckets */
1684 if (c->mem_usage_bucket) {
1685 c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
1686 listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
1687 }
1688
1689 /* Release other dynamically allocated client structure fields,
1690 * and finally release the client structure itself. */
1691 if (c->name) decrRefCount(c->name);
1692 freeClientMultiState(c);
1693 sdsfree(c->peerid);
1694 sdsfree(c->sockname);
1695 sdsfree(c->slave_addr);
1696 zfree(c);
1697}
1698
1699/* Schedule a client to free it at a safe time in the serverCron() function.
1700 * This function is useful when we need to terminate a client but we are in
1701 * a context where calling freeClient() is not possible, because the client
1702 * should be valid for the continuation of the flow of the program. */
1703void freeClientAsync(client *c) {
1704 /* We need to handle concurrent access to the server.clients_to_close list
1705 * only in the freeClientAsync() function, since it's the only function that
1706 * may access the list while Redis uses I/O threads. All the other accesses
1707 * are in the context of the main thread while the other threads are
1708 * idle. */
1709 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_SCRIPT) return;
1710 c->flags |= CLIENT_CLOSE_ASAP;
1711 if (server.io_threads_num == 1) {
1712 /* no need to bother with locking if there's just one thread (the main thread) */
1713 listAddNodeTail(server.clients_to_close,c);
1714 return;
1715 }
1716 static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
1717 pthread_mutex_lock(&async_free_queue_mutex);
1718 listAddNodeTail(server.clients_to_close,c);
1719 pthread_mutex_unlock(&async_free_queue_mutex);
1720}
1721
1722/* Log errors for invalid use and free the client in async way.
1723 * We will add additional information about the client to the message. */
1724void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...) {
1725 va_list ap;
1726 va_start(ap, fmt);
1727 sds info = sdscatvprintf(sdsempty(), fmt, ap);
1728 va_end(ap);
1729
1730 sds client = catClientInfoString(sdsempty(), c);
1731 serverLog(LL_WARNING, "%s, disconnecting it: %s", info, client);
1732
1733 sdsfree(info);
1734 sdsfree(client);
1735 freeClientAsync(c);
1736}
1737
1738/* Perform processing of the client before moving on to processing the next client
1739 * this is useful for performing operations that affect the global state but can't
1740 * wait until we're done with all clients. In other words can't wait until beforeSleep()
1741 * return C_ERR in case client is no longer valid after call.
1742 * The input client argument: c, may be NULL in case the previous client was
1743 * freed before the call. */
1744int beforeNextClient(client *c) {
1745 /* Skip the client processing if we're in an IO thread, in that case we'll perform
1746 this operation later (this function is called again) in the fan-in stage of the threading mechanism */
1747 if (io_threads_op != IO_THREADS_OP_IDLE)
1748 return C_OK;
1749 /* Handle async frees */
1750 /* Note: this doesn't make the server.clients_to_close list redundant because of
1751 * cases where we want an async free of a client other than myself. For example
1752 * in ACL modifications we disconnect clients authenticated to non-existent
1753 * users (see ACL LOAD). */
1754 if (c && (c->flags & CLIENT_CLOSE_ASAP)) {
1755 freeClient(c);
1756 return C_ERR;
1757 }
1758 return C_OK;
1759}
1760
1761/* Free the clients marked as CLOSE_ASAP, return the number of clients
1762 * freed. */
1763int freeClientsInAsyncFreeQueue(void) {
1764 int freed = 0;
1765 listIter li;
1766 listNode *ln;
1767
1768 listRewind(server.clients_to_close,&li);
1769 while ((ln = listNext(&li)) != NULL) {
1770 client *c = listNodeValue(ln);
1771
1772 if (c->flags & CLIENT_PROTECTED) continue;
1773
1774 c->flags &= ~CLIENT_CLOSE_ASAP;
1775 freeClient(c);
1776 listDelNode(server.clients_to_close,ln);
1777 freed++;
1778 }
1779 return freed;
1780}
1781
1782/* Return a client by ID, or NULL if the client ID is not in the set
1783 * of registered clients. Note that "fake clients", created with -1 as FD,
1784 * are not registered clients. */
1785client *lookupClientByID(uint64_t id) {
1786 id = htonu64(id);
1787 client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id));
1788 return (c == raxNotFound) ? NULL : c;
1789}
1790
1791/* This function should be called from _writeToClient when the reply list is not empty,
1792 * it gathers the scattered buffers from reply list and sends them away with connWritev.
1793 * If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
1794 * and 'nwritten' is an output parameter, it means how many bytes server write
1795 * to client. */
1796static int _writevToClient(client *c, ssize_t *nwritten) {
1797 struct iovec iov[IOV_MAX];
1798 int iovcnt = 0;
1799 size_t iov_bytes_len = 0;
1800 /* If the static reply buffer is not empty,
1801 * add it to the iov array for writev() as well. */
1802 if (c->bufpos > 0) {
1803 iov[iovcnt].iov_base = c->buf + c->sentlen;
1804 iov[iovcnt].iov_len = c->bufpos - c->sentlen;
1805 iov_bytes_len += iov[iovcnt++].iov_len;
1806 }
1807 /* The first node of reply list might be incomplete from the last call,
1808 * thus it needs to be calibrated to get the actual data address and length. */
1809 size_t offset = c->bufpos > 0 ? 0 : c->sentlen;
1810 listIter iter;
1811 listNode *next;
1812 clientReplyBlock *o;
1813 listRewind(c->reply, &iter);
1814 while ((next = listNext(&iter)) && iovcnt < IOV_MAX && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) {
1815 o = listNodeValue(next);
1816 if (o->used == 0) { /* empty node, just release it and skip. */
1817 c->reply_bytes -= o->size;
1818 listDelNode(c->reply, next);
1819 offset = 0;
1820 continue;
1821 }
1822
1823 iov[iovcnt].iov_base = o->buf + offset;
1824 iov[iovcnt].iov_len = o->used - offset;
1825 iov_bytes_len += iov[iovcnt++].iov_len;
1826 offset = 0;
1827 }
1828 if (iovcnt == 0) return C_OK;
1829 *nwritten = connWritev(c->conn, iov, iovcnt);
1830 if (*nwritten <= 0) return C_ERR;
1831
1832 /* Locate the new node which has leftover data and
1833 * release all nodes in front of it. */
1834 ssize_t remaining = *nwritten;
1835 if (c->bufpos > 0) { /* deal with static reply buffer first. */
1836 int buf_len = c->bufpos - c->sentlen;
1837 c->sentlen += remaining;
1838 /* If the buffer was sent, set bufpos to zero to continue with
1839 * the remainder of the reply. */
1840 if (remaining >= buf_len) {
1841 c->bufpos = 0;
1842 c->sentlen = 0;
1843 }
1844 remaining -= buf_len;
1845 }
1846 listRewind(c->reply, &iter);
1847 while (remaining > 0) {
1848 next = listNext(&iter);
1849 o = listNodeValue(next);
1850 if (remaining < (ssize_t)(o->used - c->sentlen)) {
1851 c->sentlen += remaining;
1852 break;
1853 }
1854 remaining -= (ssize_t)(o->used - c->sentlen);
1855 c->reply_bytes -= o->size;
1856 listDelNode(c->reply, next);
1857 c->sentlen = 0;
1858 }
1859
1860 return C_OK;
1861}
1862
1863/* This function does actual writing output buffers to different types of
1864 * clients, it is called by writeToClient.
1865 * If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
1866 * and 'nwritten' is an output parameter, it means how many bytes server write
1867 * to client. */
1868int _writeToClient(client *c, ssize_t *nwritten) {
1869 *nwritten = 0;
1870 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1871 serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1872
1873 replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
1874 serverAssert(o->used >= c->ref_block_pos);
1875 /* Send current block if it is not fully sent. */
1876 if (o->used > c->ref_block_pos) {
1877 *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
1878 o->used-c->ref_block_pos);
1879 if (*nwritten <= 0) return C_ERR;
1880 c->ref_block_pos += *nwritten;
1881 }
1882
1883 /* If we fully sent the object on head, go to the next one. */
1884 listNode *next = listNextNode(c->ref_repl_buf_node);
1885 if (next && c->ref_block_pos == o->used) {
1886 o->refcount--;
1887 ((replBufBlock *)(listNodeValue(next)))->refcount++;
1888 c->ref_repl_buf_node = next;
1889 c->ref_block_pos = 0;
1890 incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
1891 }
1892 return C_OK;
1893 }
1894
1895 /* When the reply list is not empty, it's better to use writev to save us some
1896 * system calls and TCP packets. */
1897 if (listLength(c->reply) > 0) {
1898 int ret = _writevToClient(c, nwritten);
1899 if (ret != C_OK) return ret;
1900
1901 /* If there are no longer objects in the list, we expect
1902 * the count of reply bytes to be exactly zero. */
1903 if (listLength(c->reply) == 0)
1904 serverAssert(c->reply_bytes == 0);
1905 } else if (c->bufpos > 0) {
1906 *nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
1907 if (*nwritten <= 0) return C_ERR;
1908 c->sentlen += *nwritten;
1909
1910 /* If the buffer was sent, set bufpos to zero to continue with
1911 * the remainder of the reply. */
1912 if ((int)c->sentlen == c->bufpos) {
1913 c->bufpos = 0;
1914 c->sentlen = 0;
1915 }
1916 }
1917
1918 return C_OK;
1919}
1920
1921/* Write data in output buffers to client. Return C_OK if the client
1922 * is still valid after the call, C_ERR if it was freed because of some
1923 * error. If handler_installed is set, it will attempt to clear the
1924 * write event.
1925 *
1926 * This function is called by threads, but always with handler_installed
1927 * set to 0. So when handler_installed is set to 0 the function must be
1928 * thread safe. */
1929int writeToClient(client *c, int handler_installed) {
1930 /* Update total number of writes on server */
1931 atomicIncr(server.stat_total_writes_processed, 1);
1932
1933 ssize_t nwritten = 0, totwritten = 0;
1934
1935 while(clientHasPendingReplies(c)) {
1936 int ret = _writeToClient(c, &nwritten);
1937 if (ret == C_ERR) break;
1938 totwritten += nwritten;
1939 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
1940 * bytes, in a single threaded server it's a good idea to serve
1941 * other clients as well, even if a very large request comes from
1942 * super fast link that is always able to accept data (in real world
1943 * scenario think about 'KEYS *' against the loopback interface).
1944 *
1945 * However if we are over the maxmemory limit we ignore that and
1946 * just deliver as much data as it is possible to deliver.
1947 *
1948 * Moreover, we also send as much as possible if the client is
1949 * a slave or a monitor (otherwise, on high-speed traffic, the
1950 * replication/output buffer will grow indefinitely) */
1951 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
1952 (server.maxmemory == 0 ||
1953 zmalloc_used_memory() < server.maxmemory) &&
1954 !(c->flags & CLIENT_SLAVE)) break;
1955 }
1956
1957 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1958 atomicIncr(server.stat_net_repl_output_bytes, totwritten);
1959 } else {
1960 atomicIncr(server.stat_net_output_bytes, totwritten);
1961 }
1962
1963 if (nwritten == -1) {
1964 if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
1965 serverLog(LL_VERBOSE,
1966 "Error writing to client: %s", connGetLastError(c->conn));
1967 freeClientAsync(c);
1968 return C_ERR;
1969 }
1970 }
1971 if (totwritten > 0) {
1972 /* For clients representing masters we don't count sending data
1973 * as an interaction, since we always send REPLCONF ACK commands
1974 * that take some time to just fill the socket output buffer.
1975 * We just rely on data / pings received for timeout detection. */
1976 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
1977 }
1978 if (!clientHasPendingReplies(c)) {
1979 c->sentlen = 0;
1980 /* Note that writeToClient() is called in a threaded way, but
1981 * aeDeleteFileEvent() is not thread safe: however writeToClient()
1982 * is always called with handler_installed set to 0 from threads
1983 * so we are fine. */
1984 if (handler_installed) {
1985 serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
1986 connSetWriteHandler(c->conn, NULL);
1987 }
1988
1989 /* Close connection after entire reply has been sent. */
1990 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
1991 freeClientAsync(c);
1992 return C_ERR;
1993 }
1994 }
1995 /* Update client's memory usage after writing.
1996 * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
1997 * handleClientsWithPendingWritesUsingThreads(). */
1998 if (io_threads_op == IO_THREADS_OP_IDLE)
1999 updateClientMemUsage(c);
2000 return C_OK;
2001}
2002
2003/* Write event handler. Just send data to the client. */
2004void sendReplyToClient(connection *conn) {
2005 client *c = connGetPrivateData(conn);
2006 writeToClient(c,1);
2007}
2008
2009/* This function is called just before entering the event loop, in the hope
2010 * we can just write the replies to the client output buffer without any
2011 * need to use a syscall in order to install the writable event handler,
2012 * get it called, and so forth. */
2013int handleClientsWithPendingWrites(void) {
2014 listIter li;
2015 listNode *ln;
2016 int processed = listLength(server.clients_pending_write);
2017
2018 listRewind(server.clients_pending_write,&li);
2019 while((ln = listNext(&li))) {
2020 client *c = listNodeValue(ln);
2021 c->flags &= ~CLIENT_PENDING_WRITE;
2022 listDelNode(server.clients_pending_write,ln);
2023
2024 /* If a client is protected, don't do anything,
2025 * that may trigger write error or recreate handler. */
2026 if (c->flags & CLIENT_PROTECTED) continue;
2027
2028 /* Don't write to clients that are going to be closed anyway. */
2029 if (c->flags & CLIENT_CLOSE_ASAP) continue;
2030
2031 /* Try to write buffers to the client socket. */
2032 if (writeToClient(c,0) == C_ERR) continue;
2033
2034 /* If after the synchronous writes above we still have data to
2035 * output to the client, we need to install the writable handler. */
2036 if (clientHasPendingReplies(c)) {
2037 installClientWriteHandler(c);
2038 }
2039 }
2040 return processed;
2041}
2042
2043/* resetClient prepare the client to process the next command */
2044void resetClient(client *c) {
2045 redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
2046
2047 freeClientArgv(c);
2048 c->reqtype = 0;
2049 c->multibulklen = 0;
2050 c->bulklen = -1;
2051 c->slot = -1;
2052
2053 if (c->deferred_reply_errors)
2054 listRelease(c->deferred_reply_errors);
2055 c->deferred_reply_errors = NULL;
2056
2057 /* We clear the ASKING flag as well if we are not inside a MULTI, and
2058 * if what we just executed is not the ASKING command itself. */
2059 if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
2060 c->flags &= ~CLIENT_ASKING;
2061
2062 /* We do the same for the CACHING command as well. It also affects
2063 * the next command or transaction executed, in a way very similar
2064 * to ASKING. */
2065 if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
2066 c->flags &= ~CLIENT_TRACKING_CACHING;
2067
2068 /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
2069 * to the next command will be sent, but set the flag if the command
2070 * we just processed was "CLIENT REPLY SKIP". */
2071 c->flags &= ~CLIENT_REPLY_SKIP;
2072 if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
2073 c->flags |= CLIENT_REPLY_SKIP;
2074 c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
2075 }
2076}
2077
2078/* This function is used when we want to re-enter the event loop but there
2079 * is the risk that the client we are dealing with will be freed in some
2080 * way. This happens for instance in:
2081 *
2082 * * DEBUG RELOAD and similar.
2083 * * When a Lua script is in -BUSY state.
2084 *
2085 * So the function will protect the client by doing two things:
2086 *
2087 * 1) It removes the file events. This way it is not possible that an
2088 * error is signaled on the socket, freeing the client.
2089 * 2) Moreover it makes sure that if the client is freed in a different code
2090 * path, it is not really released, but only marked for later release. */
2091void protectClient(client *c) {
2092 c->flags |= CLIENT_PROTECTED;
2093 if (c->conn) {
2094 connSetReadHandler(c->conn,NULL);
2095 connSetWriteHandler(c->conn,NULL);
2096 }
2097}
2098
2099/* This will undo the client protection done by protectClient() */
2100void unprotectClient(client *c) {
2101 if (c->flags & CLIENT_PROTECTED) {
2102 c->flags &= ~CLIENT_PROTECTED;
2103 if (c->conn) {
2104 connSetReadHandler(c->conn,readQueryFromClient);
2105 if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
2106 }
2107 }
2108}
2109
2110/* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
2111 * this function consumes the client query buffer and creates a command ready
2112 * to be executed inside the client structure. Returns C_OK if the command
2113 * is ready to be executed, or C_ERR if there is still protocol to read to
2114 * have a well formed command. The function also returns C_ERR when there is
2115 * a protocol error: in such a case the client structure is setup to reply
2116 * with the error and close the connection. */
2117int processInlineBuffer(client *c) {
2118 char *newline;
2119 int argc, j, linefeed_chars = 1;
2120 sds *argv, aux;
2121 size_t querylen;
2122
2123 /* Search for end of line */
2124 newline = strchr(c->querybuf+c->qb_pos,'\n');
2125
2126 /* Nothing to do without a \r\n */
2127 if (newline == NULL) {
2128 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
2129 addReplyError(c,"Protocol error: too big inline request");
2130 setProtocolError("too big inline request",c);
2131 }
2132 return C_ERR;
2133 }
2134
2135 /* Handle the \r\n case. */
2136 if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
2137 newline--, linefeed_chars++;
2138
2139 /* Split the input buffer up to the \r\n */
2140 querylen = newline-(c->querybuf+c->qb_pos);
2141 aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
2142 argv = sdssplitargs(aux,&argc);
2143 sdsfree(aux);
2144 if (argv == NULL) {
2145 addReplyError(c,"Protocol error: unbalanced quotes in request");
2146 setProtocolError("unbalanced quotes in inline request",c);
2147 return C_ERR;
2148 }
2149
2150 /* Newline from slaves can be used to refresh the last ACK time.
2151 * This is useful for a slave to ping back while loading a big
2152 * RDB file. */
2153 if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
2154 c->repl_ack_time = server.unixtime;
2155
2156 /* Masters should never send us inline protocol to run actual
2157 * commands. If this happens, it is likely due to a bug in Redis where
2158 * we got some desynchronization in the protocol, for example
2159 * because of a PSYNC gone bad.
2160 *
2161 * However there is an exception: masters may send us just a newline
2162 * to keep the connection active. */
2163 if (querylen != 0 && c->flags & CLIENT_MASTER) {
2164 sdsfreesplitres(argv,argc);
2165 serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
2166 setProtocolError("Master using the inline protocol. Desync?",c);
2167 return C_ERR;
2168 }
2169
2170 /* Move querybuffer position to the next query in the buffer. */
2171 c->qb_pos += querylen+linefeed_chars;
2172
2173 /* Setup argv array on client structure */
2174 if (argc) {
2175 if (c->argv) zfree(c->argv);
2176 c->argv_len = argc;
2177 c->argv = zmalloc(sizeof(robj*)*c->argv_len);
2178 c->argv_len_sum = 0;
2179 }
2180
2181 /* Create redis objects for all arguments. */
2182 for (c->argc = 0, j = 0; j < argc; j++) {
2183 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
2184 c->argc++;
2185 c->argv_len_sum += sdslen(argv[j]);
2186 }
2187 zfree(argv);
2188 return C_OK;
2189}
2190
2191/* Helper function. Record protocol error details in server log,
2192 * and set the client as CLIENT_CLOSE_AFTER_REPLY and
2193 * CLIENT_PROTOCOL_ERROR. */
2194#define PROTO_DUMP_LEN 128
2195static void setProtocolError(const char *errstr, client *c) {
2196 if (server.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
2197 sds client = catClientInfoString(sdsempty(),c);
2198
2199 /* Sample some protocol to given an idea about what was inside. */
2200 char buf[256];
2201 if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
2202 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
2203 } else {
2204 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
2205 }
2206
2207 /* Remove non printable chars. */
2208 char *p = buf;
2209 while (*p != '\0') {
2210 if (!isprint(*p)) *p = '.';
2211 p++;
2212 }
2213
2214 /* Log all the client and protocol info. */
2215 int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
2216 LL_VERBOSE;
2217 serverLog(loglevel,
2218 "Protocol error (%s) from client: %s. %s", errstr, client, buf);
2219 sdsfree(client);
2220 }
2221 c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
2222}
2223
2224/* Process the query buffer for client 'c', setting up the client argument
2225 * vector for command execution. Returns C_OK if after running the function
2226 * the client has a well-formed ready to be processed command, otherwise
2227 * C_ERR if there is still to read more buffer to get the full command.
2228 * The function also returns C_ERR when there is a protocol error: in such a
2229 * case the client structure is setup to reply with the error and close
2230 * the connection.
2231 *
2232 * This function is called if processInputBuffer() detects that the next
2233 * command is in RESP format, so the first byte in the command is found
2234 * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
2235int processMultibulkBuffer(client *c) {
2236 char *newline = NULL;
2237 int ok;
2238 long long ll;
2239
2240 if (c->multibulklen == 0) {
2241 /* The client should have been reset */
2242 serverAssertWithInfo(c,NULL,c->argc == 0);
2243
2244 /* Multi bulk length cannot be read without a \r\n */
2245 newline = strchr(c->querybuf+c->qb_pos,'\r');
2246 if (newline == NULL) {
2247 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
2248 addReplyError(c,"Protocol error: too big mbulk count string");
2249 setProtocolError("too big mbulk count string",c);
2250 }
2251 return C_ERR;
2252 }
2253
2254 /* Buffer should also contain \n */
2255 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
2256 return C_ERR;
2257
2258 /* We know for sure there is a whole line since newline != NULL,
2259 * so go ahead and find out the multi bulk length. */
2260 serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
2261 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
2262 if (!ok || ll > INT_MAX) {
2263 addReplyError(c,"Protocol error: invalid multibulk length");
2264 setProtocolError("invalid mbulk count",c);
2265 return C_ERR;
2266 } else if (ll > 10 && authRequired(c)) {
2267 addReplyError(c, "Protocol error: unauthenticated multibulk length");
2268 setProtocolError("unauth mbulk count", c);
2269 return C_ERR;
2270 }
2271
2272 c->qb_pos = (newline-c->querybuf)+2;
2273
2274 if (ll <= 0) return C_OK;
2275
2276 c->multibulklen = ll;
2277
2278 /* Setup argv array on client structure */
2279 if (c->argv) zfree(c->argv);
2280 c->argv_len = min(c->multibulklen, 1024);
2281 c->argv = zmalloc(sizeof(robj*)*c->argv_len);
2282 c->argv_len_sum = 0;
2283 }
2284
2285 serverAssertWithInfo(c,NULL,c->multibulklen > 0);
2286 while(c->multibulklen) {
2287 /* Read bulk length if unknown */
2288 if (c->bulklen == -1) {
2289 newline = strchr(c->querybuf+c->qb_pos,'\r');
2290 if (newline == NULL) {
2291 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
2292 addReplyError(c,
2293 "Protocol error: too big bulk count string");
2294 setProtocolError("too big bulk count string",c);
2295 return C_ERR;
2296 }
2297 break;
2298 }
2299
2300 /* Buffer should also contain \n */
2301 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
2302 break;
2303
2304 if (c->querybuf[c->qb_pos] != '$') {
2305 addReplyErrorFormat(c,
2306 "Protocol error: expected '$', got '%c'",
2307 c->querybuf[c->qb_pos]);
2308 setProtocolError("expected $ but got something else",c);
2309 return C_ERR;
2310 }
2311
2312 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
2313 if (!ok || ll < 0 ||
2314 (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
2315 addReplyError(c,"Protocol error: invalid bulk length");
2316 setProtocolError("invalid bulk length",c);
2317 return C_ERR;
2318 } else if (ll > 16384 && authRequired(c)) {
2319 addReplyError(c, "Protocol error: unauthenticated bulk length");
2320 setProtocolError("unauth bulk length", c);
2321 return C_ERR;
2322 }
2323
2324 c->qb_pos = newline-c->querybuf+2;
2325 if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
2326 /* When the client is not a master client (because master
2327 * client's querybuf can only be trimmed after data applied
2328 * and sent to replicas).
2329 *
2330 * If we are going to read a large object from network
2331 * try to make it likely that it will start at c->querybuf
2332 * boundary so that we can optimize object creation
2333 * avoiding a large copy of data.
2334 *
2335 * But only when the data we have not parsed is less than
2336 * or equal to ll+2. If the data length is greater than
2337 * ll+2, trimming querybuf is just a waste of time, because
2338 * at this time the querybuf contains not only our bulk. */
2339 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
2340 sdsrange(c->querybuf,c->qb_pos,-1);
2341 c->qb_pos = 0;
2342 /* Hint the sds library about the amount of bytes this string is
2343 * going to contain. */
2344 c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf,ll+2-sdslen(c->querybuf));
2345 }
2346 }
2347 c->bulklen = ll;
2348 }
2349
2350 /* Read bulk argument */
2351 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
2352 /* Not enough data (+2 == trailing \r\n) */
2353 break;
2354 } else {
2355 /* Check if we have space in argv, grow if needed */
2356 if (c->argc >= c->argv_len) {
2357 c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen);
2358 c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len);
2359 }
2360
2361 /* Optimization: if a non-master client's buffer contains JUST our bulk element
2362 * instead of creating a new object by *copying* the sds we
2363 * just use the current sds string. */
2364 if (!(c->flags & CLIENT_MASTER) &&
2365 c->qb_pos == 0 &&
2366 c->bulklen >= PROTO_MBULK_BIG_ARG &&
2367 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
2368 {
2369 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
2370 c->argv_len_sum += c->bulklen;
2371 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
2372 /* Assume that if we saw a fat argument we'll see another one
2373 * likely... */
2374 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
2375 sdsclear(c->querybuf);
2376 } else {
2377 c->argv[c->argc++] =
2378 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
2379 c->argv_len_sum += c->bulklen;
2380 c->qb_pos += c->bulklen+2;
2381 }
2382 c->bulklen = -1;
2383 c->multibulklen--;
2384 }
2385 }
2386
2387 /* We're done when c->multibulk == 0 */
2388 if (c->multibulklen == 0) return C_OK;
2389
2390 /* Still not ready to process the command */
2391 return C_ERR;
2392}
2393
2394/* Perform necessary tasks after a command was executed:
2395 *
2396 * 1. The client is reset unless there are reasons to avoid doing it.
2397 * 2. In the case of master clients, the replication offset is updated.
2398 * 3. Propagate commands we got from our master to replicas down the line. */
2399void commandProcessed(client *c) {
2400 /* If client is blocked(including paused), just return avoid reset and replicate.
2401 *
2402 * 1. Don't reset the client structure for blocked clients, so that the reply
2403 * callback will still be able to access the client argv and argc fields.
2404 * The client will be reset in unblockClient().
2405 * 2. Don't update replication offset or propagate commands to replicas,
2406 * since we have not applied the command. */
2407 if (c->flags & CLIENT_BLOCKED) return;
2408
2409 resetClient(c);
2410
2411 long long prev_offset = c->reploff;
2412 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
2413 /* Update the applied replication offset of our master. */
2414 c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
2415 }
2416
2417 /* If the client is a master we need to compute the difference
2418 * between the applied offset before and after processing the buffer,
2419 * to understand how much of the replication stream was actually
2420 * applied to the master state: this quantity, and its corresponding
2421 * part of the replication stream, will be propagated to the
2422 * sub-replicas and to the replication backlog. */
2423 if (c->flags & CLIENT_MASTER) {
2424 long long applied = c->reploff - prev_offset;
2425 if (applied) {
2426 replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied);
2427 c->repl_applied += applied;
2428 }
2429 }
2430}
2431
2432/* This function calls processCommand(), but also performs a few sub tasks
2433 * for the client that are useful in that context:
2434 *
2435 * 1. It sets the current client to the client 'c'.
2436 * 2. calls commandProcessed() if the command was handled.
2437 *
2438 * The function returns C_ERR in case the client was freed as a side effect
2439 * of processing the command, otherwise C_OK is returned. */
2440int processCommandAndResetClient(client *c) {
2441 int deadclient = 0;
2442 client *old_client = server.current_client;
2443 server.current_client = c;
2444 if (processCommand(c) == C_OK) {
2445 commandProcessed(c);
2446 /* Update the client's memory to include output buffer growth following the
2447 * processed command. */
2448 updateClientMemUsage(c);
2449 }
2450
2451 if (server.current_client == NULL) deadclient = 1;
2452 /*
2453 * Restore the old client, this is needed because when a script
2454 * times out, we will get into this code from processEventsWhileBlocked.
2455 * Which will cause to set the server.current_client. If not restored
2456 * we will return 1 to our caller which will falsely indicate the client
2457 * is dead and will stop reading from its buffer.
2458 */
2459 server.current_client = old_client;
2460 /* performEvictions may flush slave output buffers. This may
2461 * result in a slave, that may be the active client, to be
2462 * freed. */
2463 return deadclient ? C_ERR : C_OK;
2464}
2465
2466
2467/* This function will execute any fully parsed commands pending on
2468 * the client. Returns C_ERR if the client is no longer valid after executing
2469 * the command, and C_OK for all other cases. */
2470int processPendingCommandAndInputBuffer(client *c) {
2471 if (c->flags & CLIENT_PENDING_COMMAND) {
2472 c->flags &= ~CLIENT_PENDING_COMMAND;
2473 if (processCommandAndResetClient(c) == C_ERR) {
2474 return C_ERR;
2475 }
2476 }
2477
2478 /* Now process client if it has more data in it's buffer.
2479 *
2480 * Note: when a master client steps into this function,
2481 * it can always satisfy this condition, because its querbuf
2482 * contains data not applied. */
2483 if (c->querybuf && sdslen(c->querybuf) > 0) {
2484 return processInputBuffer(c);
2485 }
2486 return C_OK;
2487}
2488
2489/* This function is called every time, in the client structure 'c', there is
2490 * more query buffer to process, because we read more data from the socket
2491 * or because a client was blocked and later reactivated, so there could be
2492 * pending query buffer, already representing a full command, to process.
2493 * return C_ERR in case the client was freed during the processing */
2494int processInputBuffer(client *c) {
2495 /* Keep processing while there is something in the input buffer */
2496 while(c->qb_pos < sdslen(c->querybuf)) {
2497 /* Immediately abort if the client is in the middle of something. */
2498 if (c->flags & CLIENT_BLOCKED) break;
2499
2500 /* Don't process more buffers from clients that have already pending
2501 * commands to execute in c->argv. */
2502 if (c->flags & CLIENT_PENDING_COMMAND) break;
2503
2504 /* Don't process input from the master while there is a busy script
2505 * condition on the slave. We want just to accumulate the replication
2506 * stream (instead of replying -BUSY like we do with other clients) and
2507 * later resume the processing. */
2508 if (isInsideYieldingLongCommand() && c->flags & CLIENT_MASTER) break;
2509
2510 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
2511 * written to the client. Make sure to not let the reply grow after
2512 * this flag has been set (i.e. don't process more commands).
2513 *
2514 * The same applies for clients we want to terminate ASAP. */
2515 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
2516
2517 /* Determine request type when unknown. */
2518 if (!c->reqtype) {
2519 if (c->querybuf[c->qb_pos] == '*') {
2520 c->reqtype = PROTO_REQ_MULTIBULK;
2521 } else {
2522 c->reqtype = PROTO_REQ_INLINE;
2523 }
2524 }
2525
2526 if (c->reqtype == PROTO_REQ_INLINE) {
2527 if (processInlineBuffer(c) != C_OK) break;
2528 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
2529 if (processMultibulkBuffer(c) != C_OK) break;
2530 } else {
2531 serverPanic("Unknown request type");
2532 }
2533
2534 /* Multibulk processing could see a <= 0 length. */
2535 if (c->argc == 0) {
2536 resetClient(c);
2537 } else {
2538 /* If we are in the context of an I/O thread, we can't really
2539 * execute the command here. All we can do is to flag the client
2540 * as one that needs to process the command. */
2541 if (io_threads_op != IO_THREADS_OP_IDLE) {
2542 serverAssert(io_threads_op == IO_THREADS_OP_READ);
2543 c->flags |= CLIENT_PENDING_COMMAND;
2544 break;
2545 }
2546
2547 /* We are finally ready to execute the command. */
2548 if (processCommandAndResetClient(c) == C_ERR) {
2549 /* If the client is no longer valid, we avoid exiting this
2550 * loop and trimming the client buffer later. So we return
2551 * ASAP in that case. */
2552 return C_ERR;
2553 }
2554 }
2555 }
2556
2557 if (c->flags & CLIENT_MASTER) {
2558 /* If the client is a master, trim the querybuf to repl_applied,
2559 * since master client is very special, its querybuf not only
2560 * used to parse command, but also proxy to sub-replicas.
2561 *
2562 * Here are some scenarios we cannot trim to qb_pos:
2563 * 1. we don't receive complete command from master
2564 * 2. master client blocked cause of client pause
2565 * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND
2566 *
2567 * In these scenarios, qb_pos points to the part of the current command
2568 * or the beginning of next command, and the current command is not applied yet,
2569 * so the repl_applied is not equal to qb_pos. */
2570 if (c->repl_applied) {
2571 sdsrange(c->querybuf,c->repl_applied,-1);
2572 c->qb_pos -= c->repl_applied;
2573 c->repl_applied = 0;
2574 }
2575 } else if (c->qb_pos) {
2576 /* Trim to pos */
2577 sdsrange(c->querybuf,c->qb_pos,-1);
2578 c->qb_pos = 0;
2579 }
2580
2581 /* Update client memory usage after processing the query buffer, this is
2582 * important in case the query buffer is big and wasn't drained during
2583 * the above loop (because of partially sent big commands). */
2584 if (io_threads_op == IO_THREADS_OP_IDLE)
2585 updateClientMemUsage(c);
2586
2587 return C_OK;
2588}
2589
2590void readQueryFromClient(connection *conn) {
2591 client *c = connGetPrivateData(conn);
2592 int nread, big_arg = 0;
2593 size_t qblen, readlen;
2594
2595 /* Check if we want to read from the client later when exiting from
2596 * the event loop. This is the case if threaded I/O is enabled. */
2597 if (postponeClientRead(c)) return;
2598
2599 /* Update total number of reads on server */
2600 atomicIncr(server.stat_total_reads_processed, 1);
2601
2602 readlen = PROTO_IOBUF_LEN;
2603 /* If this is a multi bulk request, and we are processing a bulk reply
2604 * that is large enough, try to maximize the probability that the query
2605 * buffer contains exactly the SDS string representing the object, even
2606 * at the risk of requiring more read(2) calls. This way the function
2607 * processMultiBulkBuffer() can avoid copying buffers to create the
2608 * Redis Object representing the argument. */
2609 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
2610 && c->bulklen >= PROTO_MBULK_BIG_ARG)
2611 {
2612 ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
2613 big_arg = 1;
2614
2615 /* Note that the 'remaining' variable may be zero in some edge case,
2616 * for example once we resume a blocked client after CLIENT PAUSE. */
2617 if (remaining > 0) readlen = remaining;
2618
2619 /* Master client needs expand the readlen when meet BIG_ARG(see #9100),
2620 * but doesn't need align to the next arg, we can read more data. */
2621 if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
2622 readlen = PROTO_IOBUF_LEN;
2623 }
2624
2625 qblen = sdslen(c->querybuf);
2626 if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
2627 (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
2628 /* When reading a BIG_ARG we won't be reading more than that one arg
2629 * into the query buffer, so we don't need to pre-allocate more than we
2630 * need, so using the non-greedy growing. For an initial allocation of
2631 * the query buffer, we also don't wanna use the greedy growth, in order
2632 * to avoid collision with the RESIZE_THRESHOLD mechanism. */
2633 c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
2634 } else {
2635 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
2636
2637 /* Read as much as possible from the socket to save read(2) system calls. */
2638 readlen = sdsavail(c->querybuf);
2639 }
2640 nread = connRead(c->conn, c->querybuf+qblen, readlen);
2641 if (nread == -1) {
2642 if (connGetState(conn) == CONN_STATE_CONNECTED) {
2643 return;
2644 } else {
2645 serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
2646 freeClientAsync(c);
2647 goto done;
2648 }
2649 } else if (nread == 0) {
2650 if (server.verbosity <= LL_VERBOSE) {
2651 sds info = catClientInfoString(sdsempty(), c);
2652 serverLog(LL_VERBOSE, "Client closed connection %s", info);
2653 sdsfree(info);
2654 }
2655 freeClientAsync(c);
2656 goto done;
2657 }
2658
2659 sdsIncrLen(c->querybuf,nread);
2660 qblen = sdslen(c->querybuf);
2661 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
2662
2663 c->lastinteraction = server.unixtime;
2664 if (c->flags & CLIENT_MASTER) {
2665 c->read_reploff += nread;
2666 atomicIncr(server.stat_net_repl_input_bytes, nread);
2667 } else {
2668 atomicIncr(server.stat_net_input_bytes, nread);
2669 }
2670
2671 if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
2672 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
2673
2674 bytes = sdscatrepr(bytes,c->querybuf,64);
2675 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
2676 sdsfree(ci);
2677 sdsfree(bytes);
2678 freeClientAsync(c);
2679 goto done;
2680 }
2681
2682 /* There is more data in the client input buffer, continue parsing it
2683 * and check if there is a full command to execute. */
2684 if (processInputBuffer(c) == C_ERR)
2685 c = NULL;
2686
2687done:
2688 beforeNextClient(c);
2689}
2690
2691/* A Redis "Address String" is a colon separated ip:port pair.
2692 * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
2693 * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
2694 * For Unix sockets we use path:0, like in "/tmp/redis:0".
2695 *
2696 * An Address String always fits inside a buffer of NET_ADDR_STR_LEN bytes,
2697 * including the null term.
2698 *
2699 * On failure the function still populates 'addr' with the "?:0" string in case
2700 * you want to relax error checking or need to display something anyway (see
2701 * anetFdToString implementation for more info). */
2702void genClientAddrString(client *client, char *addr,
2703 size_t addr_len, int fd_to_str_type) {
2704 if (client->flags & CLIENT_UNIX_SOCKET) {
2705 /* Unix socket client. */
2706 snprintf(addr,addr_len,"%s:0",server.unixsocket);
2707 } else {
2708 /* TCP client. */
2709 connFormatFdAddr(client->conn,addr,addr_len,fd_to_str_type);
2710 }
2711}
2712
2713/* This function returns the client peer id, by creating and caching it
2714 * if client->peerid is NULL, otherwise returning the cached value.
2715 * The Peer ID never changes during the life of the client, however it
2716 * is expensive to compute. */
2717char *getClientPeerId(client *c) {
2718 char peerid[NET_ADDR_STR_LEN];
2719
2720 if (c->peerid == NULL) {
2721 genClientAddrString(c,peerid,sizeof(peerid),FD_TO_PEER_NAME);
2722 c->peerid = sdsnew(peerid);
2723 }
2724 return c->peerid;
2725}
2726
2727/* This function returns the client bound socket name, by creating and caching
2728 * it if client->sockname is NULL, otherwise returning the cached value.
2729 * The Socket Name never changes during the life of the client, however it
2730 * is expensive to compute. */
2731char *getClientSockname(client *c) {
2732 char sockname[NET_ADDR_STR_LEN];
2733
2734 if (c->sockname == NULL) {
2735 genClientAddrString(c,sockname,sizeof(sockname),FD_TO_SOCK_NAME);
2736 c->sockname = sdsnew(sockname);
2737 }
2738 return c->sockname;
2739}
2740
2741/* Concatenate a string representing the state of a client in a human
2742 * readable format, into the sds string 's'. */
2743sds catClientInfoString(sds s, client *client) {
2744 char flags[16], events[3], conninfo[CONN_INFO_LEN], *p;
2745
2746 p = flags;
2747 if (client->flags & CLIENT_SLAVE) {
2748 if (client->flags & CLIENT_MONITOR)
2749 *p++ = 'O';
2750 else
2751 *p++ = 'S';
2752 }
2753 if (client->flags & CLIENT_MASTER) *p++ = 'M';
2754 if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
2755 if (client->flags & CLIENT_MULTI) *p++ = 'x';
2756 if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
2757 if (client->flags & CLIENT_TRACKING) *p++ = 't';
2758 if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
2759 if (client->flags & CLIENT_TRACKING_BCAST) *p++ = 'B';
2760 if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
2761 if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
2762 if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
2763 if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
2764 if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
2765 if (client->flags & CLIENT_READONLY) *p++ = 'r';
2766 if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
2767 if (p == flags) *p++ = 'N';
2768 *p++ = '\0';
2769
2770 p = events;
2771 if (client->conn) {
2772 if (connHasReadHandler(client->conn)) *p++ = 'r';
2773 if (connHasWriteHandler(client->conn)) *p++ = 'w';
2774 }
2775 *p = '\0';
2776
2777 /* Compute the total memory consumed by this client. */
2778 size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
2779
2780 size_t used_blocks_of_repl_buf = 0;
2781 if (client->ref_repl_buf_node) {
2782 replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
2783 replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
2784 used_blocks_of_repl_buf = last->id - cur->id + 1;
2785 }
2786
2787 sds ret = sdscatfmt(s,
2788 "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i ssub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U rbs=%U rbp=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
2789 (unsigned long long) client->id,
2790 getClientPeerId(client),
2791 getClientSockname(client),
2792 connGetInfo(client->conn, conninfo, sizeof(conninfo)),
2793 client->name ? (char*)client->name->ptr : "",
2794 (long long)(server.unixtime - client->ctime),
2795 (long long)(server.unixtime - client->lastinteraction),
2796 flags,
2797 client->db->id,
2798 (int) dictSize(client->pubsub_channels),
2799 (int) listLength(client->pubsub_patterns),
2800 (int) dictSize(client->pubsubshard_channels),
2801 (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
2802 (unsigned long long) sdslen(client->querybuf),
2803 (unsigned long long) sdsavail(client->querybuf),
2804 (unsigned long long) client->argv_len_sum,
2805 (unsigned long long) client->mstate.argv_len_sums,
2806 (unsigned long long) client->buf_usable_size,
2807 (unsigned long long) client->buf_peak,
2808 (unsigned long long) client->bufpos,
2809 (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
2810 (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
2811 (unsigned long long) total_mem,
2812 events,
2813 client->lastcmd ? client->lastcmd->fullname : "NULL",
2814 client->user ? client->user->name : "(superuser)",
2815 (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1,
2816 client->resp);
2817 return ret;
2818}
2819
2820sds getAllClientsInfoString(int type) {
2821 listNode *ln;
2822 listIter li;
2823 client *client;
2824 sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients));
2825 sdsclear(o);
2826 listRewind(server.clients,&li);
2827 while ((ln = listNext(&li)) != NULL) {
2828 client = listNodeValue(ln);
2829 if (type != -1 && getClientType(client) != type) continue;
2830 o = catClientInfoString(o,client);
2831 o = sdscatlen(o,"\n",1);
2832 }
2833 return o;
2834}
2835
2836/* Returns C_OK if the name has been set or C_ERR if the name is invalid. */
2837int clientSetName(client *c, robj *name) {
2838 int len = (name != NULL) ? sdslen(name->ptr) : 0;
2839
2840 /* Setting the client name to an empty string actually removes
2841 * the current name. */
2842 if (len == 0) {
2843 if (c->name) decrRefCount(c->name);
2844 c->name = NULL;
2845 return C_OK;
2846 }
2847
2848 /* Otherwise check if the charset is ok. We need to do this otherwise
2849 * CLIENT LIST format will break. You should always be able to
2850 * split by space to get the different fields. */
2851 char *p = name->ptr;
2852 for (int j = 0; j < len; j++) {
2853 if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
2854 return C_ERR;
2855 }
2856 }
2857 if (c->name) decrRefCount(c->name);
2858 c->name = name;
2859 incrRefCount(name);
2860 return C_OK;
2861}
2862
2863/* This function implements CLIENT SETNAME, including replying to the
2864 * user with an error if the charset is wrong (in that case C_ERR is
2865 * returned). If the function succeeded C_OK is returned, and it's up
2866 * to the caller to send a reply if needed.
2867 *
2868 * Setting an empty string as name has the effect of unsetting the
2869 * currently set name: the client will remain unnamed.
2870 *
2871 * This function is also used to implement the HELLO SETNAME option. */
2872int clientSetNameOrReply(client *c, robj *name) {
2873 int result = clientSetName(c, name);
2874 if (result == C_ERR) {
2875 addReplyError(c,
2876 "Client names cannot contain spaces, "
2877 "newlines or special characters.");
2878 }
2879 return result;
2880}
2881
2882/* Reset the client state to resemble a newly connected client.
2883 */
2884void resetCommand(client *c) {
2885 /* MONITOR clients are also marked with CLIENT_SLAVE, we need to
2886 * distinguish between the two.
2887 */
2888 uint64_t flags = c->flags;
2889 if (flags & CLIENT_MONITOR) flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
2890
2891 if (flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) {
2892 addReplyError(c,"can only reset normal client connections");
2893 return;
2894 }
2895
2896 clearClientConnectionState(c);
2897 addReplyStatus(c,"RESET");
2898}
2899
2900/* Disconnect the current client */
2901void quitCommand(client *c) {
2902 addReply(c,shared.ok);
2903 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2904}
2905
2906void clientCommand(client *c) {
2907 listNode *ln;
2908 listIter li;
2909
2910 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
2911 const char *help[] = {
2912"CACHING (YES|NO)",
2913" Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.",
2914"GETREDIR",
2915" Return the client ID we are redirecting to when tracking is enabled.",
2916"GETNAME",
2917" Return the name of the current connection.",
2918"ID",
2919" Return the ID of the current connection.",
2920"INFO",
2921" Return information about the current client connection.",
2922"KILL <ip:port>",
2923" Kill connection made from <ip:port>.",
2924"KILL <option> <value> [<option> <value> [...]]",
2925" Kill connections. Options are:",
2926" * ADDR (<ip:port>|<unixsocket>:0)",
2927" Kill connections made from the specified address",
2928" * LADDR (<ip:port>|<unixsocket>:0)",
2929" Kill connections made to specified local address",
2930" * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)",
2931" Kill connections by type.",
2932" * USER <username>",
2933" Kill connections authenticated by <username>.",
2934" * SKIPME (YES|NO)",
2935" Skip killing current connection (default: yes).",
2936"LIST [options ...]",
2937" Return information about client connections. Options:",
2938" * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)",
2939" Return clients of specified type.",
2940"UNPAUSE",
2941" Stop the current client pause, resuming traffic.",
2942"PAUSE <timeout> [WRITE|ALL]",
2943" Suspend all, or just write, clients for <timeout> milliseconds.",
2944"REPLY (ON|OFF|SKIP)",
2945" Control the replies sent to the current connection.",
2946"SETNAME <name>",
2947" Assign the name <name> to the current connection.",
2948"UNBLOCK <clientid> [TIMEOUT|ERROR]",
2949" Unblock the specified blocked client.",
2950"TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]",
2951" [OPTIN] [OPTOUT] [NOLOOP]",
2952" Control server assisted client side caching.",
2953"TRACKINGINFO",
2954" Report tracking status for the current connection.",
2955"NO-EVICT (ON|OFF)",
2956" Protect current client connection from eviction.",
2957NULL
2958 };
2959 addReplyHelp(c, help);
2960 } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) {
2961 /* CLIENT ID */
2962 addReplyLongLong(c,c->id);
2963 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
2964 /* CLIENT INFO */
2965 sds o = catClientInfoString(sdsempty(), c);
2966 o = sdscatlen(o,"\n",1);
2967 addReplyVerbatim(c,o,sdslen(o),"txt");
2968 sdsfree(o);
2969 } else if (!strcasecmp(c->argv[1]->ptr,"list")) {
2970 /* CLIENT LIST */
2971 int type = -1;
2972 sds o = NULL;
2973 if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) {
2974 type = getClientTypeByName(c->argv[3]->ptr);
2975 if (type == -1) {
2976 addReplyErrorFormat(c,"Unknown client type '%s'",
2977 (char*) c->argv[3]->ptr);
2978 return;
2979 }
2980 } else if (c->argc > 3 && !strcasecmp(c->argv[2]->ptr,"id")) {
2981 int j;
2982 o = sdsempty();
2983 for (j = 3; j < c->argc; j++) {
2984 long long cid;
2985 if (getLongLongFromObjectOrReply(c, c->argv[j], &cid,
2986 "Invalid client ID")) {
2987 sdsfree(o);
2988 return;
2989 }
2990 client *cl = lookupClientByID(cid);
2991 if (cl) {
2992 o = catClientInfoString(o, cl);
2993 o = sdscatlen(o, "\n", 1);
2994 }
2995 }
2996 } else if (c->argc != 2) {
2997 addReplyErrorObject(c,shared.syntaxerr);
2998 return;
2999 }
3000
3001 if (!o)
3002 o = getAllClientsInfoString(type);
3003 addReplyVerbatim(c,o,sdslen(o),"txt");
3004 sdsfree(o);
3005 } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
3006 /* CLIENT REPLY ON|OFF|SKIP */
3007 if (!strcasecmp(c->argv[2]->ptr,"on")) {
3008 c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
3009 addReply(c,shared.ok);
3010 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
3011 c->flags |= CLIENT_REPLY_OFF;
3012 } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
3013 if (!(c->flags & CLIENT_REPLY_OFF))
3014 c->flags |= CLIENT_REPLY_SKIP_NEXT;
3015 } else {
3016 addReplyErrorObject(c,shared.syntaxerr);
3017 return;
3018 }
3019 } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) {
3020 /* CLIENT NO-EVICT ON|OFF */
3021 if (!strcasecmp(c->argv[2]->ptr,"on")) {
3022 c->flags |= CLIENT_NO_EVICT;
3023 addReply(c,shared.ok);
3024 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
3025 c->flags &= ~CLIENT_NO_EVICT;
3026 addReply(c,shared.ok);
3027 } else {
3028 addReplyErrorObject(c,shared.syntaxerr);
3029 return;
3030 }
3031 } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
3032 /* CLIENT KILL <ip:port>
3033 * CLIENT KILL <option> [value] ... <option> [value] */
3034 char *addr = NULL;
3035 char *laddr = NULL;
3036 user *user = NULL;
3037 int type = -1;
3038 uint64_t id = 0;
3039 int skipme = 1;
3040 int killed = 0, close_this_client = 0;
3041
3042 if (c->argc == 3) {
3043 /* Old style syntax: CLIENT KILL <addr> */
3044 addr = c->argv[2]->ptr;
3045 skipme = 0; /* With the old form, you can kill yourself. */
3046 } else if (c->argc > 3) {
3047 int i = 2; /* Next option index. */
3048
3049 /* New style syntax: parse options. */
3050 while(i < c->argc) {
3051 int moreargs = c->argc > i+1;
3052
3053 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
3054 long tmp;
3055
3056 if (getRangeLongFromObjectOrReply(c, c->argv[i+1], 1, LONG_MAX, &tmp,
3057 "client-id should be greater than 0") != C_OK)
3058 return;
3059 id = tmp;
3060 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
3061 type = getClientTypeByName(c->argv[i+1]->ptr);
3062 if (type == -1) {
3063 addReplyErrorFormat(c,"Unknown client type '%s'",
3064 (char*) c->argv[i+1]->ptr);
3065 return;
3066 }
3067 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
3068 addr = c->argv[i+1]->ptr;
3069 } else if (!strcasecmp(c->argv[i]->ptr,"laddr") && moreargs) {
3070 laddr = c->argv[i+1]->ptr;
3071 } else if (!strcasecmp(c->argv[i]->ptr,"user") && moreargs) {
3072 user = ACLGetUserByName(c->argv[i+1]->ptr,
3073 sdslen(c->argv[i+1]->ptr));
3074 if (user == NULL) {
3075 addReplyErrorFormat(c,"No such user '%s'",
3076 (char*) c->argv[i+1]->ptr);
3077 return;
3078 }
3079 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
3080 if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
3081 skipme = 1;
3082 } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
3083 skipme = 0;
3084 } else {
3085 addReplyErrorObject(c,shared.syntaxerr);
3086 return;
3087 }
3088 } else {
3089 addReplyErrorObject(c,shared.syntaxerr);
3090 return;
3091 }
3092 i += 2;
3093 }
3094 } else {
3095 addReplyErrorObject(c,shared.syntaxerr);
3096 return;
3097 }
3098
3099 /* Iterate clients killing all the matching clients. */
3100 listRewind(server.clients,&li);
3101 while ((ln = listNext(&li)) != NULL) {
3102 client *client = listNodeValue(ln);
3103 if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
3104 if (laddr && strcmp(getClientSockname(client),laddr) != 0) continue;
3105 if (type != -1 && getClientType(client) != type) continue;
3106 if (id != 0 && client->id != id) continue;
3107 if (user && client->user != user) continue;
3108 if (c == client && skipme) continue;
3109
3110 /* Kill it. */
3111 if (c == client) {
3112 close_this_client = 1;
3113 } else {
3114 freeClient(client);
3115 }
3116 killed++;
3117 }
3118
3119 /* Reply according to old/new format. */
3120 if (c->argc == 3) {
3121 if (killed == 0)
3122 addReplyError(c,"No such client");
3123 else
3124 addReply(c,shared.ok);
3125 } else {
3126 addReplyLongLong(c,killed);
3127 }
3128
3129 /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
3130 * only after we queued the reply to its output buffers. */
3131 if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
3132 } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 ||
3133 c->argc == 4))
3134 {
3135 /* CLIENT UNBLOCK <id> [timeout|error] */
3136 long long id;
3137 int unblock_error = 0;
3138
3139 if (c->argc == 4) {
3140 if (!strcasecmp(c->argv[3]->ptr,"timeout")) {
3141 unblock_error = 0;
3142 } else if (!strcasecmp(c->argv[3]->ptr,"error")) {
3143 unblock_error = 1;
3144 } else {
3145 addReplyError(c,
3146 "CLIENT UNBLOCK reason should be TIMEOUT or ERROR");
3147 return;
3148 }
3149 }
3150 if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
3151 != C_OK) return;
3152 struct client *target = lookupClientByID(id);
3153 /* Note that we never try to unblock a client blocked on a module command, which
3154 * doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
3155 * The reason is that we assume that if a command doesn't expect to be timedout,
3156 * it also doesn't expect to be unblocked by CLIENT UNBLOCK */
3157 if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
3158 if (unblock_error)
3159 addReplyError(target,
3160 "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
3161 else
3162 replyToBlockedClientTimedOut(target);
3163 unblockClient(target);
3164 updateStatsOnUnblock(target, 0, 0, 1);
3165 addReply(c,shared.cone);
3166 } else {
3167 addReply(c,shared.czero);
3168 }
3169 } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
3170 /* CLIENT SETNAME */
3171 if (clientSetNameOrReply(c,c->argv[2]) == C_OK)
3172 addReply(c,shared.ok);
3173 } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
3174 /* CLIENT GETNAME */
3175 if (c->name)
3176 addReplyBulk(c,c->name);
3177 else
3178 addReplyNull(c);
3179 } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
3180 /* CLIENT UNPAUSE */
3181 unpauseClients(PAUSE_BY_CLIENT_COMMAND);
3182 addReply(c,shared.ok);
3183 } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
3184 c->argc == 4))
3185 {
3186 /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */
3187 mstime_t end;
3188 int type = CLIENT_PAUSE_ALL;
3189 if (c->argc == 4) {
3190 if (!strcasecmp(c->argv[3]->ptr,"write")) {
3191 type = CLIENT_PAUSE_WRITE;
3192 } else if (!strcasecmp(c->argv[3]->ptr,"all")) {
3193 type = CLIENT_PAUSE_ALL;
3194 } else {
3195 addReplyError(c,
3196 "CLIENT PAUSE mode must be WRITE or ALL");
3197 return;
3198 }
3199 }
3200
3201 if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
3202 UNIT_MILLISECONDS) != C_OK) return;
3203 pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type);
3204 addReply(c,shared.ok);
3205 } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
3206 /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
3207 * [PREFIX second] [OPTIN] [OPTOUT] [NOLOOP]... */
3208 long long redir = 0;
3209 uint64_t options = 0;
3210 robj **prefix = NULL;
3211 size_t numprefix = 0;
3212
3213 /* Parse the options. */
3214 for (int j = 3; j < c->argc; j++) {
3215 int moreargs = (c->argc-1) - j;
3216
3217 if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
3218 j++;
3219 if (redir != 0) {
3220 addReplyError(c,"A client can only redirect to a single "
3221 "other client");
3222 zfree(prefix);
3223 return;
3224 }
3225
3226 if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
3227 C_OK)
3228 {
3229 zfree(prefix);
3230 return;
3231 }
3232 /* We will require the client with the specified ID to exist
3233 * right now, even if it is possible that it gets disconnected
3234 * later. Still a valid sanity check. */
3235 if (lookupClientByID(redir) == NULL) {
3236 addReplyError(c,"The client ID you want redirect to "
3237 "does not exist");
3238 zfree(prefix);
3239 return;
3240 }
3241 } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
3242 options |= CLIENT_TRACKING_BCAST;
3243 } else if (!strcasecmp(c->argv[j]->ptr,"optin")) {
3244 options |= CLIENT_TRACKING_OPTIN;
3245 } else if (!strcasecmp(c->argv[j]->ptr,"optout")) {
3246 options |= CLIENT_TRACKING_OPTOUT;
3247 } else if (!strcasecmp(c->argv[j]->ptr,"noloop")) {
3248 options |= CLIENT_TRACKING_NOLOOP;
3249 } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
3250 j++;
3251 prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
3252 prefix[numprefix++] = c->argv[j];
3253 } else {
3254 zfree(prefix);
3255 addReplyErrorObject(c,shared.syntaxerr);
3256 return;
3257 }
3258 }
3259
3260 /* Options are ok: enable or disable the tracking for this client. */
3261 if (!strcasecmp(c->argv[2]->ptr,"on")) {
3262 /* Before enabling tracking, make sure options are compatible
3263 * among each other and with the current state of the client. */
3264 if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
3265 addReplyError(c,
3266 "PREFIX option requires BCAST mode to be enabled");
3267 zfree(prefix);
3268 return;
3269 }
3270
3271 if (c->flags & CLIENT_TRACKING) {
3272 int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
3273 int newbcast = !!(options & CLIENT_TRACKING_BCAST);
3274 if (oldbcast != newbcast) {
3275 addReplyError(c,
3276 "You can't switch BCAST mode on/off before disabling "
3277 "tracking for this client, and then re-enabling it with "
3278 "a different mode.");
3279 zfree(prefix);
3280 return;
3281 }
3282 }
3283
3284 if (options & CLIENT_TRACKING_BCAST &&
3285 options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
3286 {
3287 addReplyError(c,
3288 "OPTIN and OPTOUT are not compatible with BCAST");
3289 zfree(prefix);
3290 return;
3291 }
3292
3293 if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT)
3294 {
3295 addReplyError(c,
3296 "You can't specify both OPTIN mode and OPTOUT mode");
3297 zfree(prefix);
3298 return;
3299 }
3300
3301 if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) ||
3302 (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN))
3303 {
3304 addReplyError(c,
3305 "You can't switch OPTIN/OPTOUT mode before disabling "
3306 "tracking for this client, and then re-enabling it with "
3307 "a different mode.");
3308 zfree(prefix);
3309 return;
3310 }
3311
3312 if (options & CLIENT_TRACKING_BCAST) {
3313 if (!checkPrefixCollisionsOrReply(c,prefix,numprefix)) {
3314 zfree(prefix);
3315 return;
3316 }
3317 }
3318
3319 enableTracking(c,redir,options,prefix,numprefix);
3320 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
3321 disableTracking(c);
3322 } else {
3323 zfree(prefix);
3324 addReplyErrorObject(c,shared.syntaxerr);
3325 return;
3326 }
3327 zfree(prefix);
3328 addReply(c,shared.ok);
3329 } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) {
3330 if (!(c->flags & CLIENT_TRACKING)) {
3331 addReplyError(c,"CLIENT CACHING can be called only when the "
3332 "client is in tracking mode with OPTIN or "
3333 "OPTOUT mode enabled");
3334 return;
3335 }
3336
3337 char *opt = c->argv[2]->ptr;
3338 if (!strcasecmp(opt,"yes")) {
3339 if (c->flags & CLIENT_TRACKING_OPTIN) {
3340 c->flags |= CLIENT_TRACKING_CACHING;
3341 } else {
3342 addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
3343 return;
3344 }
3345 } else if (!strcasecmp(opt,"no")) {
3346 if (c->flags & CLIENT_TRACKING_OPTOUT) {
3347 c->flags |= CLIENT_TRACKING_CACHING;
3348 } else {
3349 addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
3350 return;
3351 }
3352 } else {
3353 addReplyErrorObject(c,shared.syntaxerr);
3354 return;
3355 }
3356
3357 /* Common reply for when we succeeded. */
3358 addReply(c,shared.ok);
3359 } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
3360 /* CLIENT GETREDIR */
3361 if (c->flags & CLIENT_TRACKING) {
3362 addReplyLongLong(c,c->client_tracking_redirection);
3363 } else {
3364 addReplyLongLong(c,-1);
3365 }
3366 } else if (!strcasecmp(c->argv[1]->ptr,"trackinginfo") && c->argc == 2) {
3367 addReplyMapLen(c,3);
3368
3369 /* Flags */
3370 addReplyBulkCString(c,"flags");
3371 void *arraylen_ptr = addReplyDeferredLen(c);
3372 int numflags = 0;
3373 addReplyBulkCString(c,c->flags & CLIENT_TRACKING ? "on" : "off");
3374 numflags++;
3375 if (c->flags & CLIENT_TRACKING_BCAST) {
3376 addReplyBulkCString(c,"bcast");
3377 numflags++;
3378 }
3379 if (c->flags & CLIENT_TRACKING_OPTIN) {
3380 addReplyBulkCString(c,"optin");
3381 numflags++;
3382 if (c->flags & CLIENT_TRACKING_CACHING) {
3383 addReplyBulkCString(c,"caching-yes");
3384 numflags++;
3385 }
3386 }
3387 if (c->flags & CLIENT_TRACKING_OPTOUT) {
3388 addReplyBulkCString(c,"optout");
3389 numflags++;
3390 if (c->flags & CLIENT_TRACKING_CACHING) {
3391 addReplyBulkCString(c,"caching-no");
3392 numflags++;
3393 }
3394 }
3395 if (c->flags & CLIENT_TRACKING_NOLOOP) {
3396 addReplyBulkCString(c,"noloop");
3397 numflags++;
3398 }
3399 if (c->flags & CLIENT_TRACKING_BROKEN_REDIR) {
3400 addReplyBulkCString(c,"broken_redirect");
3401 numflags++;
3402 }
3403 setDeferredSetLen(c,arraylen_ptr,numflags);
3404
3405 /* Redirect */
3406 addReplyBulkCString(c,"redirect");
3407 if (c->flags & CLIENT_TRACKING) {
3408 addReplyLongLong(c,c->client_tracking_redirection);
3409 } else {
3410 addReplyLongLong(c,-1);
3411 }
3412
3413 /* Prefixes */
3414 addReplyBulkCString(c,"prefixes");
3415 if (c->client_tracking_prefixes) {
3416 addReplyArrayLen(c,raxSize(c->client_tracking_prefixes));
3417 raxIterator ri;
3418 raxStart(&ri,c->client_tracking_prefixes);
3419 raxSeek(&ri,"^",NULL,0);
3420 while(raxNext(&ri)) {
3421 addReplyBulkCBuffer(c,ri.key,ri.key_len);
3422 }
3423 raxStop(&ri);
3424 } else {
3425 addReplyArrayLen(c,0);
3426 }
3427 } else {
3428 addReplySubcommandSyntaxError(c);
3429 }
3430}
3431
3432/* HELLO [<protocol-version> [AUTH <user> <password>] [SETNAME <name>] ] */
3433void helloCommand(client *c) {
3434 long long ver = 0;
3435 int next_arg = 1;
3436
3437 if (c->argc >= 2) {
3438 if (getLongLongFromObjectOrReply(c, c->argv[next_arg++], &ver,
3439 "Protocol version is not an integer or out of range") != C_OK) {
3440 return;
3441 }
3442
3443 if (ver < 2 || ver > 3) {
3444 addReplyError(c,"-NOPROTO unsupported protocol version");
3445 return;
3446 }
3447 }
3448
3449 for (int j = next_arg; j < c->argc; j++) {
3450 int moreargs = (c->argc-1) - j;
3451 const char *opt = c->argv[j]->ptr;
3452 if (!strcasecmp(opt,"AUTH") && moreargs >= 2) {
3453 redactClientCommandArgument(c, j+1);
3454 redactClientCommandArgument(c, j+2);
3455 if (ACLAuthenticateUser(c, c->argv[j+1], c->argv[j+2]) == C_ERR) {
3456 addReplyError(c,"-WRONGPASS invalid username-password pair or user is disabled.");
3457 return;
3458 }
3459 j += 2;
3460 } else if (!strcasecmp(opt,"SETNAME") && moreargs) {
3461 if (clientSetNameOrReply(c, c->argv[j+1]) == C_ERR) return;
3462 j++;
3463 } else {
3464 addReplyErrorFormat(c,"Syntax error in HELLO option '%s'",opt);
3465 return;
3466 }
3467 }
3468
3469 /* At this point we need to be authenticated to continue. */
3470 if (!c->authenticated) {
3471 addReplyError(c,"-NOAUTH HELLO must be called with the client already "
3472 "authenticated, otherwise the HELLO AUTH <user> <pass> "
3473 "option can be used to authenticate the client and "
3474 "select the RESP protocol version at the same time");
3475 return;
3476 }
3477
3478 /* Let's switch to the specified RESP mode. */
3479 if (ver) c->resp = ver;
3480 addReplyMapLen(c,6 + !server.sentinel_mode);
3481
3482 addReplyBulkCString(c,"server");
3483 addReplyBulkCString(c,"redis");
3484
3485 addReplyBulkCString(c,"version");
3486 addReplyBulkCString(c,REDIS_VERSION);
3487
3488 addReplyBulkCString(c,"proto");
3489 addReplyLongLong(c,c->resp);
3490
3491 addReplyBulkCString(c,"id");
3492 addReplyLongLong(c,c->id);
3493
3494 addReplyBulkCString(c,"mode");
3495 if (server.sentinel_mode) addReplyBulkCString(c,"sentinel");
3496 else if (server.cluster_enabled) addReplyBulkCString(c,"cluster");
3497 else addReplyBulkCString(c,"standalone");
3498
3499 if (!server.sentinel_mode) {
3500 addReplyBulkCString(c,"role");
3501 addReplyBulkCString(c,server.masterhost ? "replica" : "master");
3502 }
3503
3504 addReplyBulkCString(c,"modules");
3505 addReplyLoadedModules(c);
3506}
3507
3508/* This callback is bound to POST and "Host:" command names. Those are not
3509 * really commands, but are used in security attacks in order to talk to
3510 * Redis instances via HTTP, with a technique called "cross protocol scripting"
3511 * which exploits the fact that services like Redis will discard invalid
3512 * HTTP headers and will process what follows.
3513 *
3514 * As a protection against this attack, Redis will terminate the connection
3515 * when a POST or "Host:" header is seen, and will log the event from
3516 * time to time (to avoid creating a DOS as a result of too many logs). */
3517void securityWarningCommand(client *c) {
3518 static time_t logged_time = 0;
3519 time_t now = time(NULL);
3520
3521 if (llabs(now-logged_time) > 60) {
3522 serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted.");
3523 logged_time = now;
3524 }
3525 freeClientAsync(c);
3526}
3527
3528/* Keep track of the original command arguments so that we can generate
3529 * an accurate slowlog entry after the command has been executed. */
3530static void retainOriginalCommandVector(client *c) {
3531 /* We already rewrote this command, so don't rewrite it again */
3532 if (c->original_argv) return;
3533 c->original_argc = c->argc;
3534 c->original_argv = zmalloc(sizeof(robj*)*(c->argc));
3535 for (int j = 0; j < c->argc; j++) {
3536 c->original_argv[j] = c->argv[j];
3537 incrRefCount(c->argv[j]);
3538 }
3539}
3540
3541/* Redact a given argument to prevent it from being shown
3542 * in the slowlog. This information is stored in the
3543 * original_argv array. */
3544void redactClientCommandArgument(client *c, int argc) {
3545 retainOriginalCommandVector(c);
3546 if (c->original_argv[argc] == shared.redacted) {
3547 /* This argument has already been redacted */
3548 return;
3549 }
3550 decrRefCount(c->original_argv[argc]);
3551 c->original_argv[argc] = shared.redacted;
3552}
3553
3554/* Rewrite the command vector of the client. All the new objects ref count
3555 * is incremented. The old command vector is freed, and the old objects
3556 * ref count is decremented. */
3557void rewriteClientCommandVector(client *c, int argc, ...) {
3558 va_list ap;
3559 int j;
3560 robj **argv; /* The new argument vector */
3561
3562 argv = zmalloc(sizeof(robj*)*argc);
3563 va_start(ap,argc);
3564 for (j = 0; j < argc; j++) {
3565 robj *a;
3566
3567 a = va_arg(ap, robj*);
3568 argv[j] = a;
3569 incrRefCount(a);
3570 }
3571 replaceClientCommandVector(c, argc, argv);
3572 va_end(ap);
3573}
3574
3575/* Completely replace the client command vector with the provided one. */
3576void replaceClientCommandVector(client *c, int argc, robj **argv) {
3577 int j;
3578 retainOriginalCommandVector(c);
3579 freeClientArgv(c);
3580 c->argv = argv;
3581 c->argc = argc;
3582 c->argv_len_sum = 0;
3583 for (j = 0; j < c->argc; j++)
3584 if (c->argv[j])
3585 c->argv_len_sum += getStringObjectLen(c->argv[j]);
3586 c->cmd = lookupCommandOrOriginal(c->argv,c->argc);
3587 serverAssertWithInfo(c,NULL,c->cmd != NULL);
3588}
3589
3590/* Rewrite a single item in the command vector.
3591 * The new val ref count is incremented, and the old decremented.
3592 *
3593 * It is possible to specify an argument over the current size of the
3594 * argument vector: in this case the array of objects gets reallocated
3595 * and c->argc set to the max value. However it's up to the caller to
3596 *
3597 * 1. Make sure there are no "holes" and all the arguments are set.
3598 * 2. If the original argument vector was longer than the one we
3599 * want to end with, it's up to the caller to set c->argc and
3600 * free the no longer used objects on c->argv. */
3601void rewriteClientCommandArgument(client *c, int i, robj *newval) {
3602 robj *oldval;
3603 retainOriginalCommandVector(c);
3604
3605 /* We need to handle both extending beyond argc (just update it and
3606 * initialize the new element) or beyond argv_len (realloc is needed).
3607 */
3608 if (i >= c->argc) {
3609 if (i >= c->argv_len) {
3610 c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
3611 c->argv_len = i+1;
3612 }
3613 c->argc = i+1;
3614 c->argv[i] = NULL;
3615 }
3616 oldval = c->argv[i];
3617 if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
3618 if (newval) c->argv_len_sum += getStringObjectLen(newval);
3619 c->argv[i] = newval;
3620 incrRefCount(newval);
3621 if (oldval) decrRefCount(oldval);
3622
3623 /* If this is the command name make sure to fix c->cmd. */
3624 if (i == 0) {
3625 c->cmd = lookupCommandOrOriginal(c->argv,c->argc);
3626 serverAssertWithInfo(c,NULL,c->cmd != NULL);
3627 }
3628}
3629
3630/* This function returns the number of bytes that Redis is
3631 * using to store the reply still not read by the client.
3632 *
3633 * Note: this function is very fast so can be called as many time as
3634 * the caller wishes. The main usage of this function currently is
3635 * enforcing the client output length limits. */
3636size_t getClientOutputBufferMemoryUsage(client *c) {
3637 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
3638 size_t repl_buf_size = 0;
3639 size_t repl_node_num = 0;
3640 size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
3641 if (c->ref_repl_buf_node) {
3642 replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
3643 replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
3644 repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
3645 repl_node_num = last->id - cur->id + 1;
3646 }
3647 return repl_buf_size + (repl_node_size*repl_node_num);
3648 } else {
3649 size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
3650 return c->reply_bytes + (list_item_size*listLength(c->reply));
3651 }
3652}
3653
3654/* Returns the total client's memory usage.
3655 * Optionally, if output_buffer_mem_usage is not NULL, it fills it with
3656 * the client output buffer memory usage portion of the total. */
3657size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
3658 size_t mem = getClientOutputBufferMemoryUsage(c);
3659 if (output_buffer_mem_usage != NULL)
3660 *output_buffer_mem_usage = mem;
3661 mem += sdsZmallocSize(c->querybuf);
3662 mem += zmalloc_size(c);
3663 mem += c->buf_usable_size;
3664 /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
3665 * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
3666 * spot problematic clients. */
3667 mem += c->argv_len_sum + sizeof(robj*)*c->argc;
3668 mem += multiStateMemOverhead(c);
3669
3670 /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers
3671 * to the strings themselves because they aren't stored per client. */
3672 mem += pubsubMemOverhead(c);
3673
3674 /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
3675 if (c->client_tracking_prefixes)
3676 mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
3677
3678 return mem;
3679}
3680
3681/* Get the class of a client, used in order to enforce limits to different
3682 * classes of clients.
3683 *
3684 * The function will return one of the following:
3685 * CLIENT_TYPE_NORMAL -> Normal client
3686 * CLIENT_TYPE_SLAVE -> Slave
3687 * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
3688 * CLIENT_TYPE_MASTER -> The client representing our replication master.
3689 */
3690int getClientType(client *c) {
3691 if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
3692 /* Even though MONITOR clients are marked as replicas, we
3693 * want the expose them as normal clients. */
3694 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
3695 return CLIENT_TYPE_SLAVE;
3696 if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
3697 return CLIENT_TYPE_NORMAL;
3698}
3699
3700int getClientTypeByName(char *name) {
3701 if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
3702 else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
3703 else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;
3704 else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
3705 else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
3706 else return -1;
3707}
3708
3709char *getClientTypeName(int class) {
3710 switch(class) {
3711 case CLIENT_TYPE_NORMAL: return "normal";
3712 case CLIENT_TYPE_SLAVE: return "slave";
3713 case CLIENT_TYPE_PUBSUB: return "pubsub";
3714 case CLIENT_TYPE_MASTER: return "master";
3715 default: return NULL;
3716 }
3717}
3718
3719/* The function checks if the client reached output buffer soft or hard
3720 * limit, and also update the state needed to check the soft limit as
3721 * a side effect.
3722 *
3723 * Return value: non-zero if the client reached the soft or the hard limit.
3724 * Otherwise zero is returned. */
3725int checkClientOutputBufferLimits(client *c) {
3726 int soft = 0, hard = 0, class;
3727 unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
3728
3729 class = getClientType(c);
3730 /* For the purpose of output buffer limiting, masters are handled
3731 * like normal clients. */
3732 if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
3733
3734 /* Note that it doesn't make sense to set the replica clients output buffer
3735 * limit lower than the repl-backlog-size config (partial sync will succeed
3736 * and then replica will get disconnected).
3737 * Such a configuration is ignored (the size of repl-backlog-size will be used).
3738 * This doesn't have memory consumption implications since the replica client
3739 * will share the backlog buffers memory. */
3740 size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
3741 if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes &&
3742 (long long)hard_limit_bytes < server.repl_backlog_size)
3743 hard_limit_bytes = server.repl_backlog_size;
3744 if (server.client_obuf_limits[class].hard_limit_bytes &&
3745 used_mem >= hard_limit_bytes)
3746 hard = 1;
3747 if (server.client_obuf_limits[class].soft_limit_bytes &&
3748 used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
3749 soft = 1;
3750
3751 /* We need to check if the soft limit is reached continuously for the
3752 * specified amount of seconds. */
3753 if (soft) {
3754 if (c->obuf_soft_limit_reached_time == 0) {
3755 c->obuf_soft_limit_reached_time = server.unixtime;
3756 soft = 0; /* First time we see the soft limit reached */
3757 } else {
3758 time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
3759
3760 if (elapsed <=
3761 server.client_obuf_limits[class].soft_limit_seconds) {
3762 soft = 0; /* The client still did not reached the max number of
3763 seconds for the soft limit to be considered
3764 reached. */
3765 }
3766 }
3767 } else {
3768 c->obuf_soft_limit_reached_time = 0;
3769 }
3770 return soft || hard;
3771}
3772
3773/* Asynchronously close a client if soft or hard limit is reached on the
3774 * output buffer size. The caller can check if the client will be closed
3775 * checking if the client CLIENT_CLOSE_ASAP flag is set.
3776 *
3777 * Note: we need to close the client asynchronously because this function is
3778 * called from contexts where the client can't be freed safely, i.e. from the
3779 * lower level functions pushing data inside the client output buffers.
3780 * When `async` is set to 0, we close the client immediately, this is
3781 * useful when called from cron.
3782 *
3783 * Returns 1 if client was (flagged) closed. */
3784int closeClientOnOutputBufferLimitReached(client *c, int async) {
3785 if (!c->conn) return 0; /* It is unsafe to free fake clients. */
3786 serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
3787 /* Note that c->reply_bytes is irrelevant for replica clients
3788 * (they use the global repl buffers). */
3789 if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
3790 c->flags & CLIENT_CLOSE_ASAP) return 0;
3791 if (checkClientOutputBufferLimits(c)) {
3792 sds client = catClientInfoString(sdsempty(),c);
3793
3794 if (async) {
3795 freeClientAsync(c);
3796 serverLog(LL_WARNING,
3797 "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.",
3798 client);
3799 } else {
3800 freeClient(c);
3801 serverLog(LL_WARNING,
3802 "Client %s closed for overcoming of output buffer limits.",
3803 client);
3804 }
3805 sdsfree(client);
3806 return 1;
3807 }
3808 return 0;
3809}
3810
3811/* Helper function used by performEvictions() in order to flush slaves
3812 * output buffers without returning control to the event loop.
3813 * This is also called by SHUTDOWN for a best-effort attempt to send
3814 * slaves the latest writes. */
3815void flushSlavesOutputBuffers(void) {
3816 listIter li;
3817 listNode *ln;
3818
3819 listRewind(server.slaves,&li);
3820 while((ln = listNext(&li))) {
3821 client *slave = listNodeValue(ln);
3822 int can_receive_writes = connHasWriteHandler(slave->conn) ||
3823 (slave->flags & CLIENT_PENDING_WRITE);
3824
3825 /* We don't want to send the pending data to the replica in a few
3826 * cases:
3827 *
3828 * 1. For some reason there is neither the write handler installed
3829 * nor the client is flagged as to have pending writes: for some
3830 * reason this replica may not be set to receive data. This is
3831 * just for the sake of defensive programming.
3832 *
3833 * 2. The put_online_on_ack flag is true. To know why we don't want
3834 * to send data to the replica in this case, please grep for the
3835 * flag for this flag.
3836 *
3837 * 3. Obviously if the slave is not ONLINE.
3838 */
3839 if (slave->replstate == SLAVE_STATE_ONLINE &&
3840 can_receive_writes &&
3841 !slave->repl_start_cmd_stream_on_ack &&
3842 clientHasPendingReplies(slave))
3843 {
3844 writeToClient(slave,0);
3845 }
3846 }
3847}
3848
3849/* Compute current most restrictive pause type and its end time, aggregated for
3850 * all pause purposes. */
3851static void updateClientPauseTypeAndEndTime(void) {
3852 pause_type old_type = server.client_pause_type;
3853 pause_type type = CLIENT_PAUSE_OFF;
3854 mstime_t end = 0;
3855 for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
3856 pause_event *p = server.client_pause_per_purpose[i];
3857 if (p == NULL) {
3858 /* Nothing to do. */
3859 } else if (p->end < server.mstime) {
3860 /* This one expired. */
3861 zfree(p);
3862 server.client_pause_per_purpose[i] = NULL;
3863 } else if (p->type > type) {
3864 /* This type is the most restrictive so far. */
3865 type = p->type;
3866 }
3867 }
3868
3869 /* Find the furthest end time among the pause purposes of the most
3870 * restrictive type */
3871 for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
3872 pause_event *p = server.client_pause_per_purpose[i];
3873 if (p != NULL && p->type == type && p->end > end) end = p->end;
3874 }
3875 server.client_pause_type = type;
3876 server.client_pause_end_time = end;
3877
3878 /* If the pause type is less restrictive than before, we unblock all clients
3879 * so they are reprocessed (may get re-paused). */
3880 if (type < old_type) {
3881 unblockPostponedClients();
3882 }
3883}
3884
3885/* Unblock all paused clients (ones that where blocked by BLOCKED_POSTPONE (possibly in processCommand).
3886 * This means they'll get re-processed in beforeSleep, and may get paused again if needed. */
3887void unblockPostponedClients() {
3888 listNode *ln;
3889 listIter li;
3890 listRewind(server.postponed_clients, &li);
3891 while ((ln = listNext(&li)) != NULL) {
3892 client *c = listNodeValue(ln);
3893 unblockClient(c);
3894 }
3895}
3896
3897/* Pause clients up to the specified unixtime (in ms) for a given type of
3898 * commands.
3899 *
3900 * A main use case of this function is to allow pausing replication traffic
3901 * so that a failover without data loss to occur. Replicas will continue to receive
3902 * traffic to facilitate this functionality.
3903 *
3904 * This function is also internally used by Redis Cluster for the manual
3905 * failover procedure implemented by CLUSTER FAILOVER.
3906 *
3907 * The function always succeed, even if there is already a pause in progress.
3908 * In such a case, the duration is set to the maximum and new end time and the
3909 * type is set to the more restrictive type of pause. */
3910void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) {
3911 /* Manage pause type and end time per pause purpose. */
3912 if (server.client_pause_per_purpose[purpose] == NULL) {
3913 server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event));
3914 server.client_pause_per_purpose[purpose]->type = type;
3915 server.client_pause_per_purpose[purpose]->end = end;
3916 } else {
3917 pause_event *p = server.client_pause_per_purpose[purpose];
3918 p->type = max(p->type, type);
3919 p->end = max(p->end, end);
3920 }
3921 updateClientPauseTypeAndEndTime();
3922
3923 /* We allow write commands that were queued
3924 * up before and after to execute. We need
3925 * to track this state so that we don't assert
3926 * in propagateNow(). */
3927 if (server.in_exec) {
3928 server.client_pause_in_transaction = 1;
3929 }
3930}
3931
3932/* Unpause clients and queue them for reprocessing. */
3933void unpauseClients(pause_purpose purpose) {
3934 if (server.client_pause_per_purpose[purpose] == NULL) return;
3935 zfree(server.client_pause_per_purpose[purpose]);
3936 server.client_pause_per_purpose[purpose] = NULL;
3937 updateClientPauseTypeAndEndTime();
3938}
3939
3940/* Returns true if clients are paused and false otherwise. */
3941int areClientsPaused(void) {
3942 return server.client_pause_type != CLIENT_PAUSE_OFF;
3943}
3944
3945/* Checks if the current client pause has elapsed and unpause clients
3946 * if it has. Also returns true if clients are now paused and false
3947 * otherwise. */
3948int checkClientPauseTimeoutAndReturnIfPaused(void) {
3949 if (!areClientsPaused())
3950 return 0;
3951 if (server.client_pause_end_time < server.mstime) {
3952 updateClientPauseTypeAndEndTime();
3953 }
3954 return areClientsPaused();
3955}
3956
3957/* This function is called by Redis in order to process a few events from
3958 * time to time while blocked into some not interruptible operation.
3959 * This allows to reply to clients with the -LOADING error while loading the
3960 * data set at startup or after a full resynchronization with the master
3961 * and so forth.
3962 *
3963 * It calls the event loop in order to process a few events. Specifically we
3964 * try to call the event loop 4 times as long as we receive acknowledge that
3965 * some event was processed, in order to go forward with the accept, read,
3966 * write, close sequence needed to serve a client.
3967 *
3968 * The function returns the total number of events processed. */
3969void processEventsWhileBlocked(void) {
3970 int iterations = 4; /* See the function top-comment. */
3971
3972 /* Update our cached time since it is used to create and update the last
3973 * interaction time with clients and for other important things. */
3974 updateCachedTime(0);
3975
3976 /* Note: when we are processing events while blocked (for instance during
3977 * busy Lua scripts), we set a global flag. When such flag is set, we
3978 * avoid handling the read part of clients using threaded I/O.
3979 * See https://github.com/redis/redis/issues/6988 for more info.
3980 * Note that there could be cases of nested calls to this function,
3981 * specifically on a busy script during async_loading rdb, and scripts
3982 * that came from AOF. */
3983 ProcessingEventsWhileBlocked++;
3984 while (iterations--) {
3985 long long startval = server.events_processed_while_blocked;
3986 long long ae_events = aeProcessEvents(server.el,
3987 AE_FILE_EVENTS|AE_DONT_WAIT|
3988 AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
3989 /* Note that server.events_processed_while_blocked will also get
3990 * incremented by callbacks called by the event loop handlers. */
3991 server.events_processed_while_blocked += ae_events;
3992 long long events = server.events_processed_while_blocked - startval;
3993 if (!events) break;
3994 }
3995
3996 whileBlockedCron();
3997
3998 ProcessingEventsWhileBlocked--;
3999 serverAssert(ProcessingEventsWhileBlocked >= 0);
4000}
4001
4002/* ==========================================================================
4003 * Threaded I/O
4004 * ========================================================================== */
4005
4006#define IO_THREADS_MAX_NUM 128
4007#define CACHE_LINE_SIZE 64
4008
4009typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending {
4010 redisAtomic unsigned long value;
4011} threads_pending;
4012
4013pthread_t io_threads[IO_THREADS_MAX_NUM];
4014pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
4015threads_pending io_threads_pending[IO_THREADS_MAX_NUM];
4016int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
4017
4018/* This is the list of clients each thread will serve when threaded I/O is
4019 * used. We spawn io_threads_num-1 threads, since one is the main thread
4020 * itself. */
4021list *io_threads_list[IO_THREADS_MAX_NUM];
4022
4023static inline unsigned long getIOPendingCount(int i) {
4024 unsigned long count = 0;
4025 atomicGetWithSync(io_threads_pending[i].value, count);
4026 return count;
4027}
4028
4029static inline void setIOPendingCount(int i, unsigned long count) {
4030 atomicSetWithSync(io_threads_pending[i].value, count);
4031}
4032
4033void *IOThreadMain(void *myid) {
4034 /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
4035 * used by the thread to just manipulate a single sub-array of clients. */
4036 long id = (unsigned long)myid;
4037 char thdname[16];
4038
4039 snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
4040 redis_set_thread_title(thdname);
4041 redisSetCpuAffinity(server.server_cpulist);
4042 makeThreadKillable();
4043
4044 while(1) {
4045 /* Wait for start */
4046 for (int j = 0; j < 1000000; j++) {
4047 if (getIOPendingCount(id) != 0) break;
4048 }
4049
4050 /* Give the main thread a chance to stop this thread. */
4051 if (getIOPendingCount(id) == 0) {
4052 pthread_mutex_lock(&io_threads_mutex[id]);
4053 pthread_mutex_unlock(&io_threads_mutex[id]);
4054 continue;
4055 }
4056
4057 serverAssert(getIOPendingCount(id) != 0);
4058
4059 /* Process: note that the main thread will never touch our list
4060 * before we drop the pending count to 0. */
4061 listIter li;
4062 listNode *ln;
4063 listRewind(io_threads_list[id],&li);
4064 while((ln = listNext(&li))) {
4065 client *c = listNodeValue(ln);
4066 if (io_threads_op == IO_THREADS_OP_WRITE) {
4067 writeToClient(c,0);
4068 } else if (io_threads_op == IO_THREADS_OP_READ) {
4069 readQueryFromClient(c->conn);
4070 } else {
4071 serverPanic("io_threads_op value is unknown");
4072 }
4073 }
4074 listEmpty(io_threads_list[id]);
4075 setIOPendingCount(id, 0);
4076 }
4077}
4078
4079/* Initialize the data structures needed for threaded I/O. */
4080void initThreadedIO(void) {
4081 server.io_threads_active = 0; /* We start with threads not active. */
4082
4083 /* Indicate that io-threads are currently idle */
4084 io_threads_op = IO_THREADS_OP_IDLE;
4085
4086 /* Don't spawn any thread if the user selected a single thread:
4087 * we'll handle I/O directly from the main thread. */
4088 if (server.io_threads_num == 1) return;
4089
4090 if (server.io_threads_num > IO_THREADS_MAX_NUM) {
4091 serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
4092 "The maximum number is %d.", IO_THREADS_MAX_NUM);
4093 exit(1);
4094 }
4095
4096 /* Spawn and initialize the I/O threads. */
4097 for (int i = 0; i < server.io_threads_num; i++) {
4098 /* Things we do for all the threads including the main thread. */
4099 io_threads_list[i] = listCreate();
4100 if (i == 0) continue; /* Thread 0 is the main thread. */
4101
4102 /* Things we do only for the additional threads. */
4103 pthread_t tid;
4104 pthread_mutex_init(&io_threads_mutex[i],NULL);
4105 setIOPendingCount(i, 0);
4106 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
4107 if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
4108 serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
4109 exit(1);
4110 }
4111 io_threads[i] = tid;
4112 }
4113}
4114
4115void killIOThreads(void) {
4116 int err, j;
4117 for (j = 0; j < server.io_threads_num; j++) {
4118 if (io_threads[j] == pthread_self()) continue;
4119 if (io_threads[j] && pthread_cancel(io_threads[j]) == 0) {
4120 if ((err = pthread_join(io_threads[j],NULL)) != 0) {
4121 serverLog(LL_WARNING,
4122 "IO thread(tid:%lu) can not be joined: %s",
4123 (unsigned long)io_threads[j], strerror(err));
4124 } else {
4125 serverLog(LL_WARNING,
4126 "IO thread(tid:%lu) terminated",(unsigned long)io_threads[j]);
4127 }
4128 }
4129 }
4130}
4131
4132void startThreadedIO(void) {
4133 serverAssert(server.io_threads_active == 0);
4134 for (int j = 1; j < server.io_threads_num; j++)
4135 pthread_mutex_unlock(&io_threads_mutex[j]);
4136 server.io_threads_active = 1;
4137}
4138
4139void stopThreadedIO(void) {
4140 /* We may have still clients with pending reads when this function
4141 * is called: handle them before stopping the threads. */
4142 handleClientsWithPendingReadsUsingThreads();
4143 serverAssert(server.io_threads_active == 1);
4144 for (int j = 1; j < server.io_threads_num; j++)
4145 pthread_mutex_lock(&io_threads_mutex[j]);
4146 server.io_threads_active = 0;
4147}
4148
4149/* This function checks if there are not enough pending clients to justify
4150 * taking the I/O threads active: in that case I/O threads are stopped if
4151 * currently active. We track the pending writes as a measure of clients
4152 * we need to handle in parallel, however the I/O threading is disabled
4153 * globally for reads as well if we have too little pending clients.
4154 *
4155 * The function returns 0 if the I/O threading should be used because there
4156 * are enough active threads, otherwise 1 is returned and the I/O threads
4157 * could be possibly stopped (if already active) as a side effect. */
4158int stopThreadedIOIfNeeded(void) {
4159 int pending = listLength(server.clients_pending_write);
4160
4161 /* Return ASAP if IO threads are disabled (single threaded mode). */
4162 if (server.io_threads_num == 1) return 1;
4163
4164 if (pending < (server.io_threads_num*2)) {
4165 if (server.io_threads_active) stopThreadedIO();
4166 return 1;
4167 } else {
4168 return 0;
4169 }
4170}
4171
4172/* This function achieves thread safety using a fan-out -> fan-in paradigm:
4173 * Fan out: The main thread fans out work to the io-threads which block until
4174 * setIOPendingCount() is called with a value larger than 0 by the main thread.
4175 * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
4176 * it can safely perform post-processing and return to normal synchronous
4177 * work. */
4178int handleClientsWithPendingWritesUsingThreads(void) {
4179 int processed = listLength(server.clients_pending_write);
4180 if (processed == 0) return 0; /* Return ASAP if there are no clients. */
4181
4182 /* If I/O threads are disabled or we have few clients to serve, don't
4183 * use I/O threads, but the boring synchronous code. */
4184 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
4185 return handleClientsWithPendingWrites();
4186 }
4187
4188 /* Start threads if needed. */
4189 if (!server.io_threads_active) startThreadedIO();
4190
4191 /* Distribute the clients across N different lists. */
4192 listIter li;
4193 listNode *ln;
4194 listRewind(server.clients_pending_write,&li);
4195 int item_id = 0;
4196 while((ln = listNext(&li))) {
4197 client *c = listNodeValue(ln);
4198 c->flags &= ~CLIENT_PENDING_WRITE;
4199
4200 /* Remove clients from the list of pending writes since
4201 * they are going to be closed ASAP. */
4202 if (c->flags & CLIENT_CLOSE_ASAP) {
4203 listDelNode(server.clients_pending_write, ln);
4204 continue;
4205 }
4206
4207 /* Since all replicas and replication backlog use global replication
4208 * buffer, to guarantee data accessing thread safe, we must put all
4209 * replicas client into io_threads_list[0] i.e. main thread handles
4210 * sending the output buffer of all replicas. */
4211 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
4212 listAddNodeTail(io_threads_list[0],c);
4213 continue;
4214 }
4215
4216 int target_id = item_id % server.io_threads_num;
4217 listAddNodeTail(io_threads_list[target_id],c);
4218 item_id++;
4219 }
4220
4221 /* Give the start condition to the waiting threads, by setting the
4222 * start condition atomic var. */
4223 io_threads_op = IO_THREADS_OP_WRITE;
4224 for (int j = 1; j < server.io_threads_num; j++) {
4225 int count = listLength(io_threads_list[j]);
4226 setIOPendingCount(j, count);
4227 }
4228
4229 /* Also use the main thread to process a slice of clients. */
4230 listRewind(io_threads_list[0],&li);
4231 while((ln = listNext(&li))) {
4232 client *c = listNodeValue(ln);
4233 writeToClient(c,0);
4234 }
4235 listEmpty(io_threads_list[0]);
4236
4237 /* Wait for all the other threads to end their work. */
4238 while(1) {
4239 unsigned long pending = 0;
4240 for (int j = 1; j < server.io_threads_num; j++)
4241 pending += getIOPendingCount(j);
4242 if (pending == 0) break;
4243 }
4244
4245 io_threads_op = IO_THREADS_OP_IDLE;
4246
4247 /* Run the list of clients again to install the write handler where
4248 * needed. */
4249 listRewind(server.clients_pending_write,&li);
4250 while((ln = listNext(&li))) {
4251 client *c = listNodeValue(ln);
4252
4253 /* Update the client in the mem usage after we're done processing it in the io-threads */
4254 updateClientMemUsage(c);
4255
4256 /* Install the write handler if there are pending writes in some
4257 * of the clients. */
4258 if (clientHasPendingReplies(c)) {
4259 installClientWriteHandler(c);
4260 }
4261 }
4262 listEmpty(server.clients_pending_write);
4263
4264 /* Update processed count on server */
4265 server.stat_io_writes_processed += processed;
4266
4267 return processed;
4268}
4269
4270/* Return 1 if we want to handle the client read later using threaded I/O.
4271 * This is called by the readable handler of the event loop.
4272 * As a side effect of calling this function the client is put in the
4273 * pending read clients and flagged as such. */
4274int postponeClientRead(client *c) {
4275 if (server.io_threads_active &&
4276 server.io_threads_do_reads &&
4277 !ProcessingEventsWhileBlocked &&
4278 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
4279 io_threads_op == IO_THREADS_OP_IDLE)
4280 {
4281 listAddNodeHead(server.clients_pending_read,c);
4282 c->pending_read_list_node = listFirst(server.clients_pending_read);
4283 return 1;
4284 } else {
4285 return 0;
4286 }
4287}
4288
4289/* When threaded I/O is also enabled for the reading + parsing side, the
4290 * readable handler will just put normal clients into a queue of clients to
4291 * process (instead of serving them synchronously). This function runs
4292 * the queue using the I/O threads, and process them in order to accumulate
4293 * the reads in the buffers, and also parse the first command available
4294 * rendering it in the client structures.
4295 * This function achieves thread safety using a fan-out -> fan-in paradigm:
4296 * Fan out: The main thread fans out work to the io-threads which block until
4297 * setIOPendingCount() is called with a value larger than 0 by the main thread.
4298 * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
4299 * it can safely perform post-processing and return to normal synchronous
4300 * work. */
4301int handleClientsWithPendingReadsUsingThreads(void) {
4302 if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
4303 int processed = listLength(server.clients_pending_read);
4304 if (processed == 0) return 0;
4305
4306 /* Distribute the clients across N different lists. */
4307 listIter li;
4308 listNode *ln;
4309 listRewind(server.clients_pending_read,&li);
4310 int item_id = 0;
4311 while((ln = listNext(&li))) {
4312 client *c = listNodeValue(ln);
4313 int target_id = item_id % server.io_threads_num;
4314 listAddNodeTail(io_threads_list[target_id],c);
4315 item_id++;
4316 }
4317
4318 /* Give the start condition to the waiting threads, by setting the
4319 * start condition atomic var. */
4320 io_threads_op = IO_THREADS_OP_READ;
4321 for (int j = 1; j < server.io_threads_num; j++) {
4322 int count = listLength(io_threads_list[j]);
4323 setIOPendingCount(j, count);
4324 }
4325
4326 /* Also use the main thread to process a slice of clients. */
4327 listRewind(io_threads_list[0],&li);
4328 while((ln = listNext(&li))) {
4329 client *c = listNodeValue(ln);
4330 readQueryFromClient(c->conn);
4331 }
4332 listEmpty(io_threads_list[0]);
4333
4334 /* Wait for all the other threads to end their work. */
4335 while(1) {
4336 unsigned long pending = 0;
4337 for (int j = 1; j < server.io_threads_num; j++)
4338 pending += getIOPendingCount(j);
4339 if (pending == 0) break;
4340 }
4341
4342 io_threads_op = IO_THREADS_OP_IDLE;
4343
4344 /* Run the list of clients again to process the new buffers. */
4345 while(listLength(server.clients_pending_read)) {
4346 ln = listFirst(server.clients_pending_read);
4347 client *c = listNodeValue(ln);
4348 listDelNode(server.clients_pending_read,ln);
4349 c->pending_read_list_node = NULL;
4350
4351 serverAssert(!(c->flags & CLIENT_BLOCKED));
4352
4353 if (beforeNextClient(c) == C_ERR) {
4354 /* If the client is no longer valid, we avoid
4355 * processing the client later. So we just go
4356 * to the next. */
4357 continue;
4358 }
4359
4360 /* Once io-threads are idle we can update the client in the mem usage */
4361 updateClientMemUsage(c);
4362
4363 if (processPendingCommandAndInputBuffer(c) == C_ERR) {
4364 /* If the client is no longer valid, we avoid
4365 * processing the client later. So we just go
4366 * to the next. */
4367 continue;
4368 }
4369
4370 /* We may have pending replies if a thread readQueryFromClient() produced
4371 * replies and did not put the client in pending write queue (it can't).
4372 */
4373 if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
4374 putClientInPendingWriteQueue(c);
4375 }
4376
4377 /* Update processed count on server */
4378 server.stat_io_reads_processed += processed;
4379
4380 return processed;
4381}
4382
4383/* Returns the actual client eviction limit based on current configuration or
4384 * 0 if no limit. */
4385size_t getClientEvictionLimit(void) {
4386 size_t maxmemory_clients_actual = SIZE_MAX;
4387
4388 /* Handle percentage of maxmemory*/
4389 if (server.maxmemory_clients < 0 && server.maxmemory > 0) {
4390 unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100);
4391 if (maxmemory_clients_bytes <= SIZE_MAX)
4392 maxmemory_clients_actual = maxmemory_clients_bytes;
4393 }
4394 else if (server.maxmemory_clients > 0)
4395 maxmemory_clients_actual = server.maxmemory_clients;
4396 else
4397 return 0;
4398
4399 /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate
4400 * at all with the server because of bad configuration */
4401 if (maxmemory_clients_actual < 1024*128)
4402 maxmemory_clients_actual = 1024*128;
4403
4404 return maxmemory_clients_actual;
4405}
4406
4407void evictClients(void) {
4408 /* Start eviction from topmost bucket (largest clients) */
4409 int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
4410 listIter bucket_iter;
4411 listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
4412 size_t client_eviction_limit = getClientEvictionLimit();
4413 if (client_eviction_limit == 0)
4414 return;
4415 while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
4416 server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) {
4417 listNode *ln = listNext(&bucket_iter);
4418 if (ln) {
4419 client *c = ln->value;
4420 sds ci = catClientInfoString(sdsempty(),c);
4421 serverLog(LL_NOTICE, "Evicting client: %s", ci);
4422 freeClient(c);
4423 sdsfree(ci);
4424 server.stat_evictedclients++;
4425 } else {
4426 curr_bucket--;
4427 if (curr_bucket < 0) {
4428 serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients");
4429 break;
4430 }
4431 listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
4432 }
4433 }
4434}
4435