1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "atomicvar.h" |
32 | #include "cluster.h" |
33 | #include "script.h" |
34 | #include <sys/socket.h> |
35 | #include <sys/uio.h> |
36 | #include <math.h> |
37 | #include <ctype.h> |
38 | |
39 | static void setProtocolError(const char *errstr, client *c); |
40 | int postponeClientRead(client *c); |
41 | int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ |
42 | |
43 | /* Return the size consumed from the allocator, for the specified SDS string, |
44 | * including internal fragmentation. This function is used in order to compute |
45 | * the client output buffer size. */ |
46 | size_t sdsZmallocSize(sds s) { |
47 | void *sh = sdsAllocPtr(s); |
48 | return zmalloc_size(sh); |
49 | } |
50 | |
51 | /* Return the amount of memory used by the sds string at object->ptr |
52 | * for a string object. This includes internal fragmentation. */ |
53 | size_t getStringObjectSdsUsedMemory(robj *o) { |
54 | serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); |
55 | switch(o->encoding) { |
56 | case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr); |
57 | case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); |
58 | default: return 0; /* Just integer encoding for now. */ |
59 | } |
60 | } |
61 | |
62 | /* Return the length of a string object. |
63 | * This does NOT includes internal fragmentation or sds unused space. */ |
64 | size_t getStringObjectLen(robj *o) { |
65 | serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); |
66 | switch(o->encoding) { |
67 | case OBJ_ENCODING_RAW: return sdslen(o->ptr); |
68 | case OBJ_ENCODING_EMBSTR: return sdslen(o->ptr); |
69 | default: return 0; /* Just integer encoding for now. */ |
70 | } |
71 | } |
72 | |
73 | /* Client.reply list dup and free methods. */ |
74 | void *dupClientReplyValue(void *o) { |
75 | clientReplyBlock *old = o; |
76 | clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size); |
77 | memcpy(buf, o, sizeof(clientReplyBlock) + old->size); |
78 | return buf; |
79 | } |
80 | |
81 | void freeClientReplyValue(void *o) { |
82 | zfree(o); |
83 | } |
84 | |
85 | int listMatchObjects(void *a, void *b) { |
86 | return equalStringObjects(a,b); |
87 | } |
88 | |
89 | /* This function links the client to the global linked list of clients. |
90 | * unlinkClient() does the opposite, among other things. */ |
91 | void linkClient(client *c) { |
92 | listAddNodeTail(server.clients,c); |
93 | /* Note that we remember the linked list node where the client is stored, |
94 | * this way removing the client in unlinkClient() will not require |
95 | * a linear scan, but just a constant time operation. */ |
96 | c->client_list_node = listLast(server.clients); |
97 | uint64_t id = htonu64(c->id); |
98 | raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); |
99 | } |
100 | |
101 | /* Initialize client authentication state. |
102 | */ |
103 | static void clientSetDefaultAuth(client *c) { |
104 | /* If the default user does not require authentication, the user is |
105 | * directly authenticated. */ |
106 | c->user = DefaultUser; |
107 | c->authenticated = (c->user->flags & USER_FLAG_NOPASS) && |
108 | !(c->user->flags & USER_FLAG_DISABLED); |
109 | } |
110 | |
111 | int authRequired(client *c) { |
112 | /* Check if the user is authenticated. This check is skipped in case |
113 | * the default user is flagged as "nopass" and is active. */ |
114 | int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || |
115 | (DefaultUser->flags & USER_FLAG_DISABLED)) && |
116 | !c->authenticated; |
117 | return auth_required; |
118 | } |
119 | |
120 | client *createClient(connection *conn) { |
121 | client *c = zmalloc(sizeof(client)); |
122 | |
123 | /* passing NULL as conn it is possible to create a non connected client. |
124 | * This is useful since all the commands needs to be executed |
125 | * in the context of a client. When commands are executed in other |
126 | * contexts (for instance a Lua script) we need a non connected client. */ |
127 | if (conn) { |
128 | connEnableTcpNoDelay(conn); |
129 | if (server.tcpkeepalive) |
130 | connKeepAlive(conn,server.tcpkeepalive); |
131 | connSetReadHandler(conn, readQueryFromClient); |
132 | connSetPrivateData(conn, c); |
133 | } |
134 | c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES); |
135 | selectDb(c,0); |
136 | uint64_t client_id; |
137 | atomicGetIncr(server.next_client_id, client_id, 1); |
138 | c->id = client_id; |
139 | c->resp = 2; |
140 | c->conn = conn; |
141 | c->name = NULL; |
142 | c->bufpos = 0; |
143 | c->buf_usable_size = zmalloc_usable_size(c->buf); |
144 | c->buf_peak = c->buf_usable_size; |
145 | c->buf_peak_last_reset_time = server.unixtime; |
146 | c->ref_repl_buf_node = NULL; |
147 | c->ref_block_pos = 0; |
148 | c->qb_pos = 0; |
149 | c->querybuf = sdsempty(); |
150 | c->querybuf_peak = 0; |
151 | c->reqtype = 0; |
152 | c->argc = 0; |
153 | c->argv = NULL; |
154 | c->argv_len = 0; |
155 | c->argv_len_sum = 0; |
156 | c->original_argc = 0; |
157 | c->original_argv = NULL; |
158 | c->cmd = c->lastcmd = c->realcmd = NULL; |
159 | c->multibulklen = 0; |
160 | c->bulklen = -1; |
161 | c->sentlen = 0; |
162 | c->flags = 0; |
163 | c->slot = -1; |
164 | c->ctime = c->lastinteraction = server.unixtime; |
165 | clientSetDefaultAuth(c); |
166 | c->replstate = REPL_STATE_NONE; |
167 | c->repl_start_cmd_stream_on_ack = 0; |
168 | c->reploff = 0; |
169 | c->read_reploff = 0; |
170 | c->repl_applied = 0; |
171 | c->repl_ack_off = 0; |
172 | c->repl_ack_time = 0; |
173 | c->repl_last_partial_write = 0; |
174 | c->slave_listening_port = 0; |
175 | c->slave_addr = NULL; |
176 | c->slave_capa = SLAVE_CAPA_NONE; |
177 | c->slave_req = SLAVE_REQ_NONE; |
178 | c->reply = listCreate(); |
179 | c->deferred_reply_errors = NULL; |
180 | c->reply_bytes = 0; |
181 | c->obuf_soft_limit_reached_time = 0; |
182 | listSetFreeMethod(c->reply,freeClientReplyValue); |
183 | listSetDupMethod(c->reply,dupClientReplyValue); |
184 | c->btype = BLOCKED_NONE; |
185 | c->bpop.timeout = 0; |
186 | c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType); |
187 | c->bpop.target = NULL; |
188 | c->bpop.xread_group = NULL; |
189 | c->bpop.xread_consumer = NULL; |
190 | c->bpop.xread_group_noack = 0; |
191 | c->bpop.numreplicas = 0; |
192 | c->bpop.reploffset = 0; |
193 | c->woff = 0; |
194 | c->watched_keys = listCreate(); |
195 | c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType); |
196 | c->pubsub_patterns = listCreate(); |
197 | c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType); |
198 | c->peerid = NULL; |
199 | c->sockname = NULL; |
200 | c->client_list_node = NULL; |
201 | c->postponed_list_node = NULL; |
202 | c->pending_read_list_node = NULL; |
203 | c->client_tracking_redirection = 0; |
204 | c->client_tracking_prefixes = NULL; |
205 | c->last_memory_usage = 0; |
206 | c->last_memory_type = CLIENT_TYPE_NORMAL; |
207 | c->auth_callback = NULL; |
208 | c->auth_callback_privdata = NULL; |
209 | c->auth_module = NULL; |
210 | listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); |
211 | listSetMatchMethod(c->pubsub_patterns,listMatchObjects); |
212 | c->mem_usage_bucket = NULL; |
213 | c->mem_usage_bucket_node = NULL; |
214 | if (conn) linkClient(c); |
215 | initClientMultiState(c); |
216 | return c; |
217 | } |
218 | |
219 | void installClientWriteHandler(client *c) { |
220 | int ae_barrier = 0; |
221 | /* For the fsync=always policy, we want that a given FD is never |
222 | * served for reading and writing in the same event loop iteration, |
223 | * so that in the middle of receiving the query, and serving it |
224 | * to the client, we'll call beforeSleep() that will do the |
225 | * actual fsync of AOF to disk. the write barrier ensures that. */ |
226 | if (server.aof_state == AOF_ON && |
227 | server.aof_fsync == AOF_FSYNC_ALWAYS) |
228 | { |
229 | ae_barrier = 1; |
230 | } |
231 | if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { |
232 | freeClientAsync(c); |
233 | } |
234 | } |
235 | |
236 | /* This function puts the client in the queue of clients that should write |
237 | * their output buffers to the socket. Note that it does not *yet* install |
238 | * the write handler, to start clients are put in a queue of clients that need |
239 | * to write, so we try to do that before returning in the event loop (see the |
240 | * handleClientsWithPendingWrites() function). |
241 | * If we fail and there is more data to write, compared to what the socket |
242 | * buffers can hold, then we'll really install the handler. */ |
243 | void putClientInPendingWriteQueue(client *c) { |
244 | /* Schedule the client to write the output buffers to the socket only |
245 | * if not already done and, for slaves, if the slave can actually receive |
246 | * writes at this stage. */ |
247 | if (!(c->flags & CLIENT_PENDING_WRITE) && |
248 | (c->replstate == REPL_STATE_NONE || |
249 | (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) |
250 | { |
251 | /* Here instead of installing the write handler, we just flag the |
252 | * client and put it into a list of clients that have something |
253 | * to write to the socket. This way before re-entering the event |
254 | * loop, we can try to directly write to the client sockets avoiding |
255 | * a system call. We'll only really install the write handler if |
256 | * we'll not be able to write the whole reply at once. */ |
257 | c->flags |= CLIENT_PENDING_WRITE; |
258 | listAddNodeHead(server.clients_pending_write,c); |
259 | } |
260 | } |
261 | |
262 | /* This function is called every time we are going to transmit new data |
263 | * to the client. The behavior is the following: |
264 | * |
265 | * If the client should receive new data (normal clients will) the function |
266 | * returns C_OK, and make sure to install the write handler in our event |
267 | * loop so that when the socket is writable new data gets written. |
268 | * |
269 | * If the client should not receive new data, because it is a fake client |
270 | * (used to load AOF in memory), a master or because the setup of the write |
271 | * handler failed, the function returns C_ERR. |
272 | * |
273 | * The function may return C_OK without actually installing the write |
274 | * event handler in the following cases: |
275 | * |
276 | * 1) The event handler should already be installed since the output buffer |
277 | * already contains something. |
278 | * 2) The client is a slave but not yet online, so we want to just accumulate |
279 | * writes in the buffer but not actually sending them yet. |
280 | * |
281 | * Typically gets called every time a reply is built, before adding more |
282 | * data to the clients output buffers. If the function returns C_ERR no |
283 | * data should be appended to the output buffers. */ |
284 | int prepareClientToWrite(client *c) { |
285 | /* If it's the Lua client we always return ok without installing any |
286 | * handler since there is no socket at all. */ |
287 | if (c->flags & (CLIENT_SCRIPT|CLIENT_MODULE)) return C_OK; |
288 | |
289 | /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ |
290 | if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; |
291 | |
292 | /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ |
293 | if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; |
294 | |
295 | /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag |
296 | * is set. */ |
297 | if ((c->flags & CLIENT_MASTER) && |
298 | !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; |
299 | |
300 | if (!c->conn) return C_ERR; /* Fake client for AOF loading. */ |
301 | |
302 | /* Schedule the client to write the output buffers to the socket, unless |
303 | * it should already be setup to do so (it has already pending data). |
304 | * |
305 | * If CLIENT_PENDING_READ is set, we're in an IO thread and should |
306 | * not put the client in pending write queue. Instead, it will be |
307 | * done by handleClientsWithPendingReadsUsingThreads() upon return. |
308 | */ |
309 | if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) |
310 | putClientInPendingWriteQueue(c); |
311 | |
312 | /* Authorize the caller to queue in the output buffer of this client. */ |
313 | return C_OK; |
314 | } |
315 | |
316 | /* ----------------------------------------------------------------------------- |
317 | * Low level functions to add more data to output buffers. |
318 | * -------------------------------------------------------------------------- */ |
319 | |
320 | /* Attempts to add the reply to the static buffer in the client struct. |
321 | * Returns the length of data that is added to the reply buffer. |
322 | * |
323 | * Sanitizer suppression: client->buf_usable_size determined by |
324 | * zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses |
325 | * sanitizer and generates a false positive out-of-bounds error */ |
326 | REDIS_NO_SANITIZE("bounds" ) |
327 | size_t _addReplyToBuffer(client *c, const char *s, size_t len) { |
328 | size_t available = c->buf_usable_size - c->bufpos; |
329 | |
330 | /* If there already are entries in the reply list, we cannot |
331 | * add anything more to the static buffer. */ |
332 | if (listLength(c->reply) > 0) return 0; |
333 | |
334 | size_t reply_len = len > available ? available : len; |
335 | memcpy(c->buf+c->bufpos,s,reply_len); |
336 | c->bufpos+=reply_len; |
337 | /* We update the buffer peak after appending the reply to the buffer */ |
338 | if(c->buf_peak < (size_t)c->bufpos) |
339 | c->buf_peak = (size_t)c->bufpos; |
340 | return reply_len; |
341 | } |
342 | |
343 | /* Adds the reply to the reply linked list. |
344 | * Note: some edits to this function need to be relayed to AddReplyFromClient. */ |
345 | void _addReplyProtoToList(client *c, const char *s, size_t len) { |
346 | listNode *ln = listLast(c->reply); |
347 | clientReplyBlock *tail = ln? listNodeValue(ln): NULL; |
348 | |
349 | /* Note that 'tail' may be NULL even if we have a tail node, because when |
350 | * addReplyDeferredLen() is used, it sets a dummy node to NULL just |
351 | * to fill it later, when the size of the bulk length is set. */ |
352 | |
353 | /* Append to tail string when possible. */ |
354 | if (tail) { |
355 | /* Copy the part we can fit into the tail, and leave the rest for a |
356 | * new node */ |
357 | size_t avail = tail->size - tail->used; |
358 | size_t copy = avail >= len? len: avail; |
359 | memcpy(tail->buf + tail->used, s, copy); |
360 | tail->used += copy; |
361 | s += copy; |
362 | len -= copy; |
363 | } |
364 | if (len) { |
365 | /* Create a new node, make sure it is allocated to at |
366 | * least PROTO_REPLY_CHUNK_BYTES */ |
367 | size_t usable_size; |
368 | size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; |
369 | tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size); |
370 | /* take over the allocation's internal fragmentation */ |
371 | tail->size = usable_size - sizeof(clientReplyBlock); |
372 | tail->used = len; |
373 | memcpy(tail->buf, s, len); |
374 | listAddNodeTail(c->reply, tail); |
375 | c->reply_bytes += tail->size; |
376 | |
377 | closeClientOnOutputBufferLimitReached(c, 1); |
378 | } |
379 | } |
380 | |
381 | void _addReplyToBufferOrList(client *c, const char *s, size_t len) { |
382 | if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; |
383 | |
384 | /* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the |
385 | * replication link that caused a reply to be generated we'll simply disconnect it. |
386 | * Note this is the simplest way to check a command added a response. Replication links are used to write data but |
387 | * not for responses, so we should normally never get here on a replica client. */ |
388 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
389 | sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL; |
390 | logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'" , |
391 | cmdname ? cmdname : "<unknown>" ); |
392 | return; |
393 | } |
394 | |
395 | size_t reply_len = _addReplyToBuffer(c,s,len); |
396 | if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len); |
397 | } |
398 | |
399 | /* ----------------------------------------------------------------------------- |
400 | * Higher level functions to queue data on the client output buffer. |
401 | * The following functions are the ones that commands implementations will call. |
402 | * -------------------------------------------------------------------------- */ |
403 | |
404 | /* Add the object 'obj' string representation to the client output buffer. */ |
405 | void addReply(client *c, robj *obj) { |
406 | if (prepareClientToWrite(c) != C_OK) return; |
407 | |
408 | if (sdsEncodedObject(obj)) { |
409 | _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr)); |
410 | } else if (obj->encoding == OBJ_ENCODING_INT) { |
411 | /* For integer encoded strings we just convert it into a string |
412 | * using our optimized function, and attach the resulting string |
413 | * to the output buffer. */ |
414 | char buf[32]; |
415 | size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); |
416 | _addReplyToBufferOrList(c,buf,len); |
417 | } else { |
418 | serverPanic("Wrong obj->encoding in addReply()" ); |
419 | } |
420 | } |
421 | |
422 | /* Add the SDS 's' string to the client output buffer, as a side effect |
423 | * the SDS string is freed. */ |
424 | void addReplySds(client *c, sds s) { |
425 | if (prepareClientToWrite(c) != C_OK) { |
426 | /* The caller expects the sds to be free'd. */ |
427 | sdsfree(s); |
428 | return; |
429 | } |
430 | _addReplyToBufferOrList(c,s,sdslen(s)); |
431 | sdsfree(s); |
432 | } |
433 | |
434 | /* This low level function just adds whatever protocol you send it to the |
435 | * client buffer, trying the static buffer initially, and using the string |
436 | * of objects if not possible. |
437 | * |
438 | * It is efficient because does not create an SDS object nor an Redis object |
439 | * if not needed. The object will only be created by calling |
440 | * _addReplyProtoToList() if we fail to extend the existing tail object |
441 | * in the list of objects. */ |
442 | void addReplyProto(client *c, const char *s, size_t len) { |
443 | if (prepareClientToWrite(c) != C_OK) return; |
444 | _addReplyToBufferOrList(c,s,len); |
445 | } |
446 | |
447 | /* Low level function called by the addReplyError...() functions. |
448 | * It emits the protocol for a Redis error, in the form: |
449 | * |
450 | * -ERRORCODE Error Message<CR><LF> |
451 | * |
452 | * If the error code is already passed in the string 's', the error |
453 | * code provided is used, otherwise the string "-ERR " for the generic |
454 | * error code is automatically added. |
455 | * Note that 's' must NOT end with \r\n. */ |
456 | void addReplyErrorLength(client *c, const char *s, size_t len) { |
457 | /* If the string already starts with "-..." then the error code |
458 | * is provided by the caller. Otherwise we use "-ERR". */ |
459 | if (!len || s[0] != '-') addReplyProto(c,"-ERR " ,5); |
460 | addReplyProto(c,s,len); |
461 | addReplyProto(c,"\r\n" ,2); |
462 | } |
463 | |
464 | /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) |
465 | * Possible flags: |
466 | * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to update any error stats. */ |
467 | void afterErrorReply(client *c, const char *s, size_t len, int flags) { |
468 | /* Module clients fall into two categories: |
469 | * Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted. |
470 | * Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */ |
471 | if (c->flags & CLIENT_MODULE) { |
472 | if (!c->deferred_reply_errors) { |
473 | c->deferred_reply_errors = listCreate(); |
474 | listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree); |
475 | } |
476 | listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len)); |
477 | return; |
478 | } |
479 | |
480 | if (!(flags & ERR_REPLY_FLAG_NO_STATS_UPDATE)) { |
481 | /* Increment the global error counter */ |
482 | server.stat_total_error_replies++; |
483 | /* Increment the error stats |
484 | * If the string already starts with "-..." then the error prefix |
485 | * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */ |
486 | if (s[0] != '-') { |
487 | incrementErrorCount("ERR" , 3); |
488 | } else { |
489 | char *spaceloc = memchr(s, ' ', len < 32 ? len : 32); |
490 | if (spaceloc) { |
491 | const size_t errEndPos = (size_t)(spaceloc - s); |
492 | incrementErrorCount(s+1, errEndPos-1); |
493 | } else { |
494 | /* Fallback to ERR if we can't retrieve the error prefix */ |
495 | incrementErrorCount("ERR" , 3); |
496 | } |
497 | } |
498 | } else { |
499 | /* stat_total_error_replies will not be updated, which means that |
500 | * the cmd stats will not be updated as well, we still want this command |
501 | * to be counted as failed so we update it here. We update c->realcmd in |
502 | * case c->cmd was changed (like in GEOADD). */ |
503 | c->realcmd->failed_calls++; |
504 | } |
505 | |
506 | /* Sometimes it could be normal that a slave replies to a master with |
507 | * an error and this function gets called. Actually the error will never |
508 | * be sent because addReply*() against master clients has no effect... |
509 | * A notable example is: |
510 | * |
511 | * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x |
512 | * |
513 | * Where the master must propagate the first change even if the second |
514 | * will produce an error. However it is useful to log such events since |
515 | * they are rare and may hint at errors in a script or a bug in Redis. */ |
516 | int ctype = getClientType(c); |
517 | if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) { |
518 | char *to, *from; |
519 | |
520 | if (c->id == CLIENT_ID_AOF) { |
521 | to = "AOF-loading-client" ; |
522 | from = "server" ; |
523 | } else if (ctype == CLIENT_TYPE_MASTER) { |
524 | to = "master" ; |
525 | from = "replica" ; |
526 | } else { |
527 | to = "replica" ; |
528 | from = "master" ; |
529 | } |
530 | |
531 | if (len > 4096) len = 4096; |
532 | sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL; |
533 | serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " |
534 | "to its %s: '%.*s' after processing the command " |
535 | "'%s'" , from, to, (int)len, s, cmdname ? cmdname : "<unknown>" ); |
536 | if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog && |
537 | server.repl_backlog->histlen > 0) |
538 | { |
539 | showLatestBacklog(); |
540 | } |
541 | server.stat_unexpected_error_replies++; |
542 | |
543 | /* Based off the propagation error behavior, check if we need to panic here. There |
544 | * are currently two checked cases: |
545 | * * If this command was from our master and we are not a writable replica. |
546 | * * We are reading from an AOF file. */ |
547 | int panic_in_replicas = (ctype == CLIENT_TYPE_MASTER && server.repl_slave_ro) |
548 | && (server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC || |
549 | server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS); |
550 | int panic_in_aof = c->id == CLIENT_ID_AOF |
551 | && server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC; |
552 | if (panic_in_replicas || panic_in_aof) { |
553 | serverPanic("This %s panicked sending an error to its %s" |
554 | " after processing the command '%s'" , |
555 | from, to, cmdname ? cmdname : "<unknown>" ); |
556 | } |
557 | } |
558 | } |
559 | |
560 | /* The 'err' object is expected to start with -ERRORCODE and end with \r\n. |
561 | * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */ |
562 | void addReplyErrorObject(client *c, robj *err) { |
563 | addReply(c, err); |
564 | afterErrorReply(c, err->ptr, sdslen(err->ptr)-2, 0); /* Ignore trailing \r\n */ |
565 | } |
566 | |
567 | /* Sends either a reply or an error reply by checking the first char. |
568 | Â * If the first char is '-' the reply is considered an error. |
569 | Â * In any case the given reply is sent, if the reply is also recognize |
570 | * as an error we also perform some post reply operations such as |
571 | * logging and stats update. */ |
572 | void addReplyOrErrorObject(client *c, robj *reply) { |
573 | serverAssert(sdsEncodedObject(reply)); |
574 | sds rep = reply->ptr; |
575 | if (sdslen(rep) > 1 && rep[0] == '-') { |
576 | addReplyErrorObject(c, reply); |
577 | } else { |
578 | addReply(c, reply); |
579 | } |
580 | } |
581 | |
582 | /* See addReplyErrorLength for expectations from the input string. */ |
583 | void addReplyError(client *c, const char *err) { |
584 | addReplyErrorLength(c,err,strlen(err)); |
585 | afterErrorReply(c,err,strlen(err),0); |
586 | } |
587 | |
588 | /* Add error reply to the given client. |
589 | * Supported flags: |
590 | * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to perform any error stats updates */ |
591 | void addReplyErrorSdsEx(client *c, sds err, int flags) { |
592 | addReplyErrorLength(c,err,sdslen(err)); |
593 | afterErrorReply(c,err,sdslen(err),flags); |
594 | sdsfree(err); |
595 | } |
596 | |
597 | /* See addReplyErrorLength for expectations from the input string. */ |
598 | /* As a side effect the SDS string is freed. */ |
599 | void addReplyErrorSds(client *c, sds err) { |
600 | addReplyErrorSdsEx(c, err, 0); |
601 | } |
602 | |
603 | /* Internal function used by addReplyErrorFormat and addReplyErrorFormatEx. |
604 | * Refer to afterErrorReply for more information about the flags. */ |
605 | static void addReplyErrorFormatInternal(client *c, int flags, const char *fmt, va_list ap) { |
606 | va_list cpy; |
607 | va_copy(cpy,ap); |
608 | sds s = sdscatvprintf(sdsempty(),fmt,cpy); |
609 | va_end(cpy); |
610 | /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */ |
611 | s = sdstrim(s, "\r\n" ); |
612 | /* Make sure there are no newlines in the middle of the string, otherwise |
613 | * invalid protocol is emitted. */ |
614 | s = sdsmapchars(s, "\r\n" , " " , 2); |
615 | addReplyErrorLength(c,s,sdslen(s)); |
616 | afterErrorReply(c,s,sdslen(s),flags); |
617 | sdsfree(s); |
618 | } |
619 | |
620 | void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...) { |
621 | va_list ap; |
622 | va_start(ap,fmt); |
623 | addReplyErrorFormatInternal(c, flags, fmt, ap); |
624 | va_end(ap); |
625 | } |
626 | |
627 | /* See addReplyErrorLength for expectations from the formatted string. |
628 | * The formatted string is safe to contain \r and \n anywhere. */ |
629 | void addReplyErrorFormat(client *c, const char *fmt, ...) { |
630 | va_list ap; |
631 | va_start(ap,fmt); |
632 | addReplyErrorFormatInternal(c, 0, fmt, ap); |
633 | va_end(ap); |
634 | } |
635 | |
636 | void addReplyErrorArity(client *c) { |
637 | addReplyErrorFormat(c, "wrong number of arguments for '%s' command" , |
638 | c->cmd->fullname); |
639 | } |
640 | |
641 | void addReplyErrorExpireTime(client *c) { |
642 | addReplyErrorFormat(c, "invalid expire time in '%s' command" , |
643 | c->cmd->fullname); |
644 | } |
645 | |
646 | void addReplyStatusLength(client *c, const char *s, size_t len) { |
647 | addReplyProto(c,"+" ,1); |
648 | addReplyProto(c,s,len); |
649 | addReplyProto(c,"\r\n" ,2); |
650 | } |
651 | |
652 | void addReplyStatus(client *c, const char *status) { |
653 | addReplyStatusLength(c,status,strlen(status)); |
654 | } |
655 | |
656 | void addReplyStatusFormat(client *c, const char *fmt, ...) { |
657 | va_list ap; |
658 | va_start(ap,fmt); |
659 | sds s = sdscatvprintf(sdsempty(),fmt,ap); |
660 | va_end(ap); |
661 | addReplyStatusLength(c,s,sdslen(s)); |
662 | sdsfree(s); |
663 | } |
664 | |
665 | /* Sometimes we are forced to create a new reply node, and we can't append to |
666 | * the previous one, when that happens, we wanna try to trim the unused space |
667 | * at the end of the last reply node which we won't use anymore. */ |
668 | void trimReplyUnusedTailSpace(client *c) { |
669 | listNode *ln = listLast(c->reply); |
670 | clientReplyBlock *tail = ln? listNodeValue(ln): NULL; |
671 | |
672 | /* Note that 'tail' may be NULL even if we have a tail node, because when |
673 | * addReplyDeferredLen() is used */ |
674 | if (!tail) return; |
675 | |
676 | /* We only try to trim the space is relatively high (more than a 1/4 of the |
677 | * allocation), otherwise there's a high chance realloc will NOP. |
678 | * Also, to avoid large memmove which happens as part of realloc, we only do |
679 | * that if the used part is small. */ |
680 | if (tail->size - tail->used > tail->size / 4 && |
681 | tail->used < PROTO_REPLY_CHUNK_BYTES) |
682 | { |
683 | size_t old_size = tail->size; |
684 | tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); |
685 | /* take over the allocation's internal fragmentation (at least for |
686 | * memory usage tracking) */ |
687 | tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock); |
688 | c->reply_bytes = c->reply_bytes + tail->size - old_size; |
689 | listNodeValue(ln) = tail; |
690 | } |
691 | } |
692 | |
693 | /* Adds an empty object to the reply list that will contain the multi bulk |
694 | * length, which is not known when this function is called. */ |
695 | void *addReplyDeferredLen(client *c) { |
696 | /* Note that we install the write event here even if the object is not |
697 | * ready to be sent, since we are sure that before returning to the |
698 | * event loop setDeferredAggregateLen() will be called. */ |
699 | if (prepareClientToWrite(c) != C_OK) return NULL; |
700 | |
701 | /* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the |
702 | * replication link that caused a reply to be generated we'll simply disconnect it. |
703 | * Note this is the simplest way to check a command added a response. Replication links are used to write data but |
704 | * not for responses, so we should normally never get here on a replica client. */ |
705 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
706 | sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL; |
707 | logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'" , |
708 | cmdname ? cmdname : "<unknown>" ); |
709 | return NULL; |
710 | } |
711 | |
712 | trimReplyUnusedTailSpace(c); |
713 | listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ |
714 | return listLast(c->reply); |
715 | } |
716 | |
717 | void setDeferredReply(client *c, void *node, const char *s, size_t length) { |
718 | listNode *ln = (listNode*)node; |
719 | clientReplyBlock *next, *prev; |
720 | |
721 | /* Abort when *node is NULL: when the client should not accept writes |
722 | * we return NULL in addReplyDeferredLen() */ |
723 | if (node == NULL) return; |
724 | serverAssert(!listNodeValue(ln)); |
725 | |
726 | /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(), |
727 | * with a new buffer structure containing the protocol needed to specify |
728 | * the length of the array following. However sometimes there might be room |
729 | * in the previous/next node so we can instead remove this NULL node, and |
730 | * suffix/prefix our data in the node immediately before/after it, in order |
731 | * to save a write(2) syscall later. Conditions needed to do it: |
732 | * |
733 | * - The prev node is non-NULL and has space in it or |
734 | * - The next node is non-NULL, |
735 | * - It has enough room already allocated |
736 | * - And not too large (avoid large memmove) */ |
737 | if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && |
738 | prev->size - prev->used > 0) |
739 | { |
740 | size_t len_to_copy = prev->size - prev->used; |
741 | if (len_to_copy > length) |
742 | len_to_copy = length; |
743 | memcpy(prev->buf + prev->used, s, len_to_copy); |
744 | prev->used += len_to_copy; |
745 | length -= len_to_copy; |
746 | if (length == 0) { |
747 | listDelNode(c->reply, ln); |
748 | return; |
749 | } |
750 | s += len_to_copy; |
751 | } |
752 | |
753 | if (ln->next != NULL && (next = listNodeValue(ln->next)) && |
754 | next->size - next->used >= length && |
755 | next->used < PROTO_REPLY_CHUNK_BYTES * 4) |
756 | { |
757 | memmove(next->buf + length, next->buf, next->used); |
758 | memcpy(next->buf, s, length); |
759 | next->used += length; |
760 | listDelNode(c->reply,ln); |
761 | } else { |
762 | /* Create a new node */ |
763 | clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock)); |
764 | /* Take over the allocation's internal fragmentation */ |
765 | buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock); |
766 | buf->used = length; |
767 | memcpy(buf->buf, s, length); |
768 | listNodeValue(ln) = buf; |
769 | c->reply_bytes += buf->size; |
770 | |
771 | closeClientOnOutputBufferLimitReached(c, 1); |
772 | } |
773 | } |
774 | |
775 | /* Populate the length object and try gluing it to the next chunk. */ |
776 | void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { |
777 | serverAssert(length >= 0); |
778 | |
779 | /* Abort when *node is NULL: when the client should not accept writes |
780 | * we return NULL in addReplyDeferredLen() */ |
781 | if (node == NULL) return; |
782 | |
783 | /* Things like *2\r\n, %3\r\n or ~4\r\n are emitted very often by the protocol |
784 | * so we have a few shared objects to use if the integer is small |
785 | * like it is most of the times. */ |
786 | const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(length); |
787 | const int opt_hdr = length < OBJ_SHARED_BULKHDR_LEN; |
788 | if (prefix == '*' && opt_hdr) { |
789 | setDeferredReply(c, node, shared.mbulkhdr[length]->ptr, hdr_len); |
790 | return; |
791 | } |
792 | if (prefix == '%' && opt_hdr) { |
793 | setDeferredReply(c, node, shared.maphdr[length]->ptr, hdr_len); |
794 | return; |
795 | } |
796 | if (prefix == '~' && opt_hdr) { |
797 | setDeferredReply(c, node, shared.sethdr[length]->ptr, hdr_len); |
798 | return; |
799 | } |
800 | |
801 | char lenstr[128]; |
802 | size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n" , prefix, length); |
803 | setDeferredReply(c, node, lenstr, lenstr_len); |
804 | } |
805 | |
806 | void setDeferredArrayLen(client *c, void *node, long length) { |
807 | setDeferredAggregateLen(c,node,length,'*'); |
808 | } |
809 | |
810 | void setDeferredMapLen(client *c, void *node, long length) { |
811 | int prefix = c->resp == 2 ? '*' : '%'; |
812 | if (c->resp == 2) length *= 2; |
813 | setDeferredAggregateLen(c,node,length,prefix); |
814 | } |
815 | |
816 | void setDeferredSetLen(client *c, void *node, long length) { |
817 | int prefix = c->resp == 2 ? '*' : '~'; |
818 | setDeferredAggregateLen(c,node,length,prefix); |
819 | } |
820 | |
821 | void setDeferredAttributeLen(client *c, void *node, long length) { |
822 | serverAssert(c->resp >= 3); |
823 | setDeferredAggregateLen(c,node,length,'|'); |
824 | } |
825 | |
826 | void setDeferredPushLen(client *c, void *node, long length) { |
827 | serverAssert(c->resp >= 3); |
828 | setDeferredAggregateLen(c,node,length,'>'); |
829 | } |
830 | |
831 | /* Add a double as a bulk reply */ |
832 | void addReplyDouble(client *c, double d) { |
833 | if (isinf(d)) { |
834 | /* Libc in odd systems (Hi Solaris!) will format infinite in a |
835 | * different way, so better to handle it in an explicit way. */ |
836 | if (c->resp == 2) { |
837 | addReplyBulkCString(c, d > 0 ? "inf" : "-inf" ); |
838 | } else { |
839 | addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n" , |
840 | d > 0 ? 6 : 7); |
841 | } |
842 | } else { |
843 | char dbuf[MAX_LONG_DOUBLE_CHARS+3], |
844 | sbuf[MAX_LONG_DOUBLE_CHARS+32]; |
845 | int dlen, slen; |
846 | if (c->resp == 2) { |
847 | dlen = snprintf(dbuf,sizeof(dbuf),"%.17g" ,d); |
848 | slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n" ,dlen,dbuf); |
849 | addReplyProto(c,sbuf,slen); |
850 | } else { |
851 | dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n" ,d); |
852 | addReplyProto(c,dbuf,dlen); |
853 | } |
854 | } |
855 | } |
856 | |
857 | void addReplyBigNum(client *c, const char* num, size_t len) { |
858 | if (c->resp == 2) { |
859 | addReplyBulkCBuffer(c, num, len); |
860 | } else { |
861 | addReplyProto(c,"(" ,1); |
862 | addReplyProto(c,num,len); |
863 | addReply(c,shared.crlf); |
864 | } |
865 | } |
866 | |
867 | /* Add a long double as a bulk reply, but uses a human readable formatting |
868 | * of the double instead of exposing the crude behavior of doubles to the |
869 | * dear user. */ |
870 | void addReplyHumanLongDouble(client *c, long double d) { |
871 | if (c->resp == 2) { |
872 | robj *o = createStringObjectFromLongDouble(d,1); |
873 | addReplyBulk(c,o); |
874 | decrRefCount(o); |
875 | } else { |
876 | char buf[MAX_LONG_DOUBLE_CHARS]; |
877 | int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); |
878 | addReplyProto(c,"," ,1); |
879 | addReplyProto(c,buf,len); |
880 | addReplyProto(c,"\r\n" ,2); |
881 | } |
882 | } |
883 | |
884 | /* Add a long long as integer reply or bulk len / multi bulk count. |
885 | * Basically this is used to output <prefix><long long><crlf>. */ |
886 | void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { |
887 | char buf[128]; |
888 | int len; |
889 | |
890 | /* Things like $3\r\n or *2\r\n are emitted very often by the protocol |
891 | * so we have a few shared objects to use if the integer is small |
892 | * like it is most of the times. */ |
893 | const int opt_hdr = ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0; |
894 | const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(ll); |
895 | if (prefix == '*' && opt_hdr) { |
896 | addReplyProto(c,shared.mbulkhdr[ll]->ptr,hdr_len); |
897 | return; |
898 | } else if (prefix == '$' && opt_hdr) { |
899 | addReplyProto(c,shared.bulkhdr[ll]->ptr,hdr_len); |
900 | return; |
901 | } else if (prefix == '%' && opt_hdr) { |
902 | addReplyProto(c,shared.maphdr[ll]->ptr,hdr_len); |
903 | return; |
904 | } else if (prefix == '~' && opt_hdr) { |
905 | addReplyProto(c,shared.sethdr[ll]->ptr,hdr_len); |
906 | return; |
907 | } |
908 | |
909 | buf[0] = prefix; |
910 | len = ll2string(buf+1,sizeof(buf)-1,ll); |
911 | buf[len+1] = '\r'; |
912 | buf[len+2] = '\n'; |
913 | addReplyProto(c,buf,len+3); |
914 | } |
915 | |
916 | void addReplyLongLong(client *c, long long ll) { |
917 | if (ll == 0) |
918 | addReply(c,shared.czero); |
919 | else if (ll == 1) |
920 | addReply(c,shared.cone); |
921 | else |
922 | addReplyLongLongWithPrefix(c,ll,':'); |
923 | } |
924 | |
925 | void addReplyAggregateLen(client *c, long length, int prefix) { |
926 | serverAssert(length >= 0); |
927 | addReplyLongLongWithPrefix(c,length,prefix); |
928 | } |
929 | |
930 | void addReplyArrayLen(client *c, long length) { |
931 | addReplyAggregateLen(c,length,'*'); |
932 | } |
933 | |
934 | void addReplyMapLen(client *c, long length) { |
935 | int prefix = c->resp == 2 ? '*' : '%'; |
936 | if (c->resp == 2) length *= 2; |
937 | addReplyAggregateLen(c,length,prefix); |
938 | } |
939 | |
940 | void addReplySetLen(client *c, long length) { |
941 | int prefix = c->resp == 2 ? '*' : '~'; |
942 | addReplyAggregateLen(c,length,prefix); |
943 | } |
944 | |
945 | void addReplyAttributeLen(client *c, long length) { |
946 | serverAssert(c->resp >= 3); |
947 | addReplyAggregateLen(c,length,'|'); |
948 | } |
949 | |
950 | void addReplyPushLen(client *c, long length) { |
951 | serverAssert(c->resp >= 3); |
952 | addReplyAggregateLen(c,length,'>'); |
953 | } |
954 | |
955 | void addReplyNull(client *c) { |
956 | if (c->resp == 2) { |
957 | addReplyProto(c,"$-1\r\n" ,5); |
958 | } else { |
959 | addReplyProto(c,"_\r\n" ,3); |
960 | } |
961 | } |
962 | |
963 | void addReplyBool(client *c, int b) { |
964 | if (c->resp == 2) { |
965 | addReply(c, b ? shared.cone : shared.czero); |
966 | } else { |
967 | addReplyProto(c, b ? "#t\r\n" : "#f\r\n" ,4); |
968 | } |
969 | } |
970 | |
971 | /* A null array is a concept that no longer exists in RESP3. However |
972 | * RESP2 had it, so API-wise we have this call, that will emit the correct |
973 | * RESP2 protocol, however for RESP3 the reply will always be just the |
974 | * Null type "_\r\n". */ |
975 | void addReplyNullArray(client *c) { |
976 | if (c->resp == 2) { |
977 | addReplyProto(c,"*-1\r\n" ,5); |
978 | } else { |
979 | addReplyProto(c,"_\r\n" ,3); |
980 | } |
981 | } |
982 | |
983 | /* Create the length prefix of a bulk reply, example: $2234 */ |
984 | void addReplyBulkLen(client *c, robj *obj) { |
985 | size_t len = stringObjectLen(obj); |
986 | |
987 | addReplyLongLongWithPrefix(c,len,'$'); |
988 | } |
989 | |
990 | /* Add a Redis Object as a bulk reply */ |
991 | void addReplyBulk(client *c, robj *obj) { |
992 | addReplyBulkLen(c,obj); |
993 | addReply(c,obj); |
994 | addReply(c,shared.crlf); |
995 | } |
996 | |
997 | /* Add a C buffer as bulk reply */ |
998 | void addReplyBulkCBuffer(client *c, const void *p, size_t len) { |
999 | addReplyLongLongWithPrefix(c,len,'$'); |
1000 | addReplyProto(c,p,len); |
1001 | addReply(c,shared.crlf); |
1002 | } |
1003 | |
1004 | /* Add sds to reply (takes ownership of sds and frees it) */ |
1005 | void addReplyBulkSds(client *c, sds s) { |
1006 | addReplyLongLongWithPrefix(c,sdslen(s),'$'); |
1007 | addReplySds(c,s); |
1008 | addReply(c,shared.crlf); |
1009 | } |
1010 | |
1011 | /* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */ |
1012 | void setDeferredReplyBulkSds(client *c, void *node, sds s) { |
1013 | sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n" , (unsigned)sdslen(s), s); |
1014 | setDeferredReply(c, node, reply, sdslen(reply)); |
1015 | sdsfree(reply); |
1016 | sdsfree(s); |
1017 | } |
1018 | |
1019 | /* Add a C null term string as bulk reply */ |
1020 | void addReplyBulkCString(client *c, const char *s) { |
1021 | if (s == NULL) { |
1022 | addReplyNull(c); |
1023 | } else { |
1024 | addReplyBulkCBuffer(c,s,strlen(s)); |
1025 | } |
1026 | } |
1027 | |
1028 | /* Add a long long as a bulk reply */ |
1029 | void addReplyBulkLongLong(client *c, long long ll) { |
1030 | char buf[64]; |
1031 | int len; |
1032 | |
1033 | len = ll2string(buf,64,ll); |
1034 | addReplyBulkCBuffer(c,buf,len); |
1035 | } |
1036 | |
1037 | /* Reply with a verbatim type having the specified extension. |
1038 | * |
1039 | * The 'ext' is the "extension" of the file, actually just a three |
1040 | * character type that describes the format of the verbatim string. |
1041 | * For instance "txt" means it should be interpreted as a text only |
1042 | * file by the receiver, "md " as markdown, and so forth. Only the |
1043 | * three first characters of the extension are used, and if the |
1044 | * provided one is shorter than that, the remaining is filled with |
1045 | * spaces. */ |
1046 | void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { |
1047 | if (c->resp == 2) { |
1048 | addReplyBulkCBuffer(c,s,len); |
1049 | } else { |
1050 | char buf[32]; |
1051 | size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:" ,len+4); |
1052 | char *p = buf+preflen-4; |
1053 | for (int i = 0; i < 3; i++) { |
1054 | if (*ext == '\0') { |
1055 | p[i] = ' '; |
1056 | } else { |
1057 | p[i] = *ext++; |
1058 | } |
1059 | } |
1060 | addReplyProto(c,buf,preflen); |
1061 | addReplyProto(c,s,len); |
1062 | addReplyProto(c,"\r\n" ,2); |
1063 | } |
1064 | } |
1065 | |
1066 | /* Add an array of C strings as status replies with a heading. |
1067 | * This function is typically invoked by from commands that support |
1068 | * subcommands in response to the 'help' subcommand. The help array |
1069 | * is terminated by NULL sentinel. */ |
1070 | void addReplyHelp(client *c, const char **help) { |
1071 | sds cmd = sdsnew((char*) c->argv[0]->ptr); |
1072 | void *blenp = addReplyDeferredLen(c); |
1073 | int blen = 0; |
1074 | |
1075 | sdstoupper(cmd); |
1076 | addReplyStatusFormat(c, |
1077 | "%s <subcommand> [<arg> [value] [opt] ...]. Subcommands are:" ,cmd); |
1078 | sdsfree(cmd); |
1079 | |
1080 | while (help[blen]) addReplyStatus(c,help[blen++]); |
1081 | |
1082 | addReplyStatus(c,"HELP" ); |
1083 | addReplyStatus(c," Prints this help." ); |
1084 | |
1085 | blen += 1; /* Account for the header. */ |
1086 | blen += 2; /* Account for the footer. */ |
1087 | setDeferredArrayLen(c,blenp,blen); |
1088 | } |
1089 | |
1090 | /* Add a suggestive error reply. |
1091 | * This function is typically invoked by from commands that support |
1092 | * subcommands in response to an unknown subcommand or argument error. */ |
1093 | void addReplySubcommandSyntaxError(client *c) { |
1094 | sds cmd = sdsnew((char*) c->argv[0]->ptr); |
1095 | sdstoupper(cmd); |
1096 | addReplyErrorFormat(c, |
1097 | "unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP." , |
1098 | (char*)c->argv[1]->ptr,cmd); |
1099 | sdsfree(cmd); |
1100 | } |
1101 | |
1102 | /* Append 'src' client output buffers into 'dst' client output buffers. |
1103 | * This function clears the output buffers of 'src' */ |
1104 | void AddReplyFromClient(client *dst, client *src) { |
1105 | /* If the source client contains a partial response due to client output |
1106 | * buffer limits, propagate that to the dest rather than copy a partial |
1107 | * reply. We don't wanna run the risk of copying partial response in case |
1108 | * for some reason the output limits don't reach the same decision (maybe |
1109 | * they changed) */ |
1110 | if (src->flags & CLIENT_CLOSE_ASAP) { |
1111 | sds client = catClientInfoString(sdsempty(),dst); |
1112 | freeClientAsync(dst); |
1113 | serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits." , client); |
1114 | sdsfree(client); |
1115 | return; |
1116 | } |
1117 | |
1118 | /* First add the static buffer (either into the static buffer or reply list) */ |
1119 | addReplyProto(dst,src->buf, src->bufpos); |
1120 | |
1121 | /* We need to check with prepareClientToWrite again (after addReplyProto) |
1122 | * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */ |
1123 | if (prepareClientToWrite(dst) != C_OK) |
1124 | return; |
1125 | |
1126 | /* We're bypassing _addReplyProtoToList, so we need to add the pre/post |
1127 | * checks in it. */ |
1128 | if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return; |
1129 | |
1130 | /* Concatenate the reply list into the dest */ |
1131 | if (listLength(src->reply)) |
1132 | listJoin(dst->reply,src->reply); |
1133 | dst->reply_bytes += src->reply_bytes; |
1134 | src->reply_bytes = 0; |
1135 | src->bufpos = 0; |
1136 | |
1137 | if (src->deferred_reply_errors) { |
1138 | deferredAfterErrorReply(dst, src->deferred_reply_errors); |
1139 | listRelease(src->deferred_reply_errors); |
1140 | src->deferred_reply_errors = NULL; |
1141 | } |
1142 | |
1143 | /* Check output buffer limits */ |
1144 | closeClientOnOutputBufferLimitReached(dst, 1); |
1145 | } |
1146 | |
1147 | /* Append the listed errors to the server error statistics. the input |
1148 | * list is not modified and remains the responsibility of the caller. */ |
1149 | void deferredAfterErrorReply(client *c, list *errors) { |
1150 | listIter li; |
1151 | listNode *ln; |
1152 | listRewind(errors,&li); |
1153 | while((ln = listNext(&li))) { |
1154 | sds err = ln->value; |
1155 | afterErrorReply(c, err, sdslen(err), 0); |
1156 | } |
1157 | } |
1158 | |
1159 | /* Logically copy 'src' replica client buffers info to 'dst' replica. |
1160 | * Basically increase referenced buffer block node reference count. */ |
1161 | void copyReplicaOutputBuffer(client *dst, client *src) { |
1162 | serverAssert(src->bufpos == 0 && listLength(src->reply) == 0); |
1163 | |
1164 | if (src->ref_repl_buf_node == NULL) return; |
1165 | dst->ref_repl_buf_node = src->ref_repl_buf_node; |
1166 | dst->ref_block_pos = src->ref_block_pos; |
1167 | ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++; |
1168 | } |
1169 | |
1170 | /* Return true if the specified client has pending reply buffers to write to |
1171 | * the socket. */ |
1172 | int clientHasPendingReplies(client *c) { |
1173 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
1174 | /* Replicas use global shared replication buffer instead of |
1175 | * private output buffer. */ |
1176 | serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); |
1177 | if (c->ref_repl_buf_node == NULL) return 0; |
1178 | |
1179 | /* If the last replication buffer block content is totally sent, |
1180 | * we have nothing to send. */ |
1181 | listNode *ln = listLast(server.repl_buffer_blocks); |
1182 | replBufBlock *tail = listNodeValue(ln); |
1183 | if (ln == c->ref_repl_buf_node && |
1184 | c->ref_block_pos == tail->used) return 0; |
1185 | |
1186 | return 1; |
1187 | } else { |
1188 | return c->bufpos || listLength(c->reply); |
1189 | } |
1190 | } |
1191 | |
1192 | /* Return true if client connected from loopback interface */ |
1193 | int islocalClient(client *c) { |
1194 | /* unix-socket */ |
1195 | if (c->flags & CLIENT_UNIX_SOCKET) return 1; |
1196 | |
1197 | /* tcp */ |
1198 | char cip[NET_IP_STR_LEN+1] = { 0 }; |
1199 | connPeerToString(c->conn, cip, sizeof(cip)-1, NULL); |
1200 | |
1201 | return !strcmp(cip,"127.0.0.1" ) || !strcmp(cip,"::1" ); |
1202 | } |
1203 | |
1204 | void clientAcceptHandler(connection *conn) { |
1205 | client *c = connGetPrivateData(conn); |
1206 | |
1207 | if (connGetState(conn) != CONN_STATE_CONNECTED) { |
1208 | serverLog(LL_WARNING, |
1209 | "Error accepting a client connection: %s" , |
1210 | connGetLastError(conn)); |
1211 | freeClientAsync(c); |
1212 | return; |
1213 | } |
1214 | |
1215 | /* If the server is running in protected mode (the default) and there |
1216 | * is no password set, nor a specific interface is bound, we don't accept |
1217 | * requests from non loopback interfaces. Instead we try to explain the |
1218 | * user what to do to fix it if needed. */ |
1219 | if (server.protected_mode && |
1220 | DefaultUser->flags & USER_FLAG_NOPASS) |
1221 | { |
1222 | if (!islocalClient(c)) { |
1223 | char *err = |
1224 | "-DENIED Redis is running in protected mode because protected " |
1225 | "mode is enabled and no password is set for the default user. " |
1226 | "In this mode connections are only accepted from the loopback interface. " |
1227 | "If you want to connect from external computers to Redis you " |
1228 | "may adopt one of the following solutions: " |
1229 | "1) Just disable protected mode sending the command " |
1230 | "'CONFIG SET protected-mode no' from the loopback interface " |
1231 | "by connecting to Redis from the same host the server is " |
1232 | "running, however MAKE SURE Redis is not publicly accessible " |
1233 | "from internet if you do so. Use CONFIG REWRITE to make this " |
1234 | "change permanent. " |
1235 | "2) Alternatively you can just disable the protected mode by " |
1236 | "editing the Redis configuration file, and setting the protected " |
1237 | "mode option to 'no', and then restarting the server. " |
1238 | "3) If you started the server manually just for testing, restart " |
1239 | "it with the '--protected-mode no' option. " |
1240 | "4) Setup a an authentication password for the default user. " |
1241 | "NOTE: You only need to do one of the above things in order for " |
1242 | "the server to start accepting connections from the outside.\r\n" ; |
1243 | if (connWrite(c->conn,err,strlen(err)) == -1) { |
1244 | /* Nothing to do, Just to avoid the warning... */ |
1245 | } |
1246 | server.stat_rejected_conn++; |
1247 | freeClientAsync(c); |
1248 | return; |
1249 | } |
1250 | } |
1251 | |
1252 | server.stat_numconnections++; |
1253 | moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE, |
1254 | REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED, |
1255 | c); |
1256 | } |
1257 | |
1258 | #define MAX_ACCEPTS_PER_CALL 1000 |
1259 | static void acceptCommonHandler(connection *conn, int flags, char *ip) { |
1260 | client *c; |
1261 | char conninfo[100]; |
1262 | UNUSED(ip); |
1263 | |
1264 | if (connGetState(conn) != CONN_STATE_ACCEPTING) { |
1265 | serverLog(LL_VERBOSE, |
1266 | "Accepted client connection in error state: %s (conn: %s)" , |
1267 | connGetLastError(conn), |
1268 | connGetInfo(conn, conninfo, sizeof(conninfo))); |
1269 | connClose(conn); |
1270 | return; |
1271 | } |
1272 | |
1273 | /* Limit the number of connections we take at the same time. |
1274 | * |
1275 | * Admission control will happen before a client is created and connAccept() |
1276 | * called, because we don't want to even start transport-level negotiation |
1277 | * if rejected. */ |
1278 | if (listLength(server.clients) + getClusterConnectionsCount() |
1279 | >= server.maxclients) |
1280 | { |
1281 | char *err; |
1282 | if (server.cluster_enabled) |
1283 | err = "-ERR max number of clients + cluster " |
1284 | "connections reached\r\n" ; |
1285 | else |
1286 | err = "-ERR max number of clients reached\r\n" ; |
1287 | |
1288 | /* That's a best effort error message, don't check write errors. |
1289 | * Note that for TLS connections, no handshake was done yet so nothing |
1290 | * is written and the connection will just drop. */ |
1291 | if (connWrite(conn,err,strlen(err)) == -1) { |
1292 | /* Nothing to do, Just to avoid the warning... */ |
1293 | } |
1294 | server.stat_rejected_conn++; |
1295 | connClose(conn); |
1296 | return; |
1297 | } |
1298 | |
1299 | /* Create connection and client */ |
1300 | if ((c = createClient(conn)) == NULL) { |
1301 | serverLog(LL_WARNING, |
1302 | "Error registering fd event for the new client: %s (conn: %s)" , |
1303 | connGetLastError(conn), |
1304 | connGetInfo(conn, conninfo, sizeof(conninfo))); |
1305 | connClose(conn); /* May be already closed, just ignore errors */ |
1306 | return; |
1307 | } |
1308 | |
1309 | /* Last chance to keep flags */ |
1310 | c->flags |= flags; |
1311 | |
1312 | /* Initiate accept. |
1313 | * |
1314 | * Note that connAccept() is free to do two things here: |
1315 | * 1. Call clientAcceptHandler() immediately; |
1316 | * 2. Schedule a future call to clientAcceptHandler(). |
1317 | * |
1318 | * Because of that, we must do nothing else afterwards. |
1319 | */ |
1320 | if (connAccept(conn, clientAcceptHandler) == C_ERR) { |
1321 | char conninfo[100]; |
1322 | if (connGetState(conn) == CONN_STATE_ERROR) |
1323 | serverLog(LL_WARNING, |
1324 | "Error accepting a client connection: %s (conn: %s)" , |
1325 | connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); |
1326 | freeClient(connGetPrivateData(conn)); |
1327 | return; |
1328 | } |
1329 | } |
1330 | |
1331 | void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
1332 | int cport, cfd, max = MAX_ACCEPTS_PER_CALL; |
1333 | char cip[NET_IP_STR_LEN]; |
1334 | UNUSED(el); |
1335 | UNUSED(mask); |
1336 | UNUSED(privdata); |
1337 | |
1338 | while(max--) { |
1339 | cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); |
1340 | if (cfd == ANET_ERR) { |
1341 | if (errno != EWOULDBLOCK) |
1342 | serverLog(LL_WARNING, |
1343 | "Accepting client connection: %s" , server.neterr); |
1344 | return; |
1345 | } |
1346 | serverLog(LL_VERBOSE,"Accepted %s:%d" , cip, cport); |
1347 | acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); |
1348 | } |
1349 | } |
1350 | |
1351 | void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
1352 | int cport, cfd, max = MAX_ACCEPTS_PER_CALL; |
1353 | char cip[NET_IP_STR_LEN]; |
1354 | UNUSED(el); |
1355 | UNUSED(mask); |
1356 | UNUSED(privdata); |
1357 | |
1358 | while(max--) { |
1359 | cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); |
1360 | if (cfd == ANET_ERR) { |
1361 | if (errno != EWOULDBLOCK) |
1362 | serverLog(LL_WARNING, |
1363 | "Accepting client connection: %s" , server.neterr); |
1364 | return; |
1365 | } |
1366 | serverLog(LL_VERBOSE,"Accepted %s:%d" , cip, cport); |
1367 | acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip); |
1368 | } |
1369 | } |
1370 | |
1371 | void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
1372 | int cfd, max = MAX_ACCEPTS_PER_CALL; |
1373 | UNUSED(el); |
1374 | UNUSED(mask); |
1375 | UNUSED(privdata); |
1376 | |
1377 | while(max--) { |
1378 | cfd = anetUnixAccept(server.neterr, fd); |
1379 | if (cfd == ANET_ERR) { |
1380 | if (errno != EWOULDBLOCK) |
1381 | serverLog(LL_WARNING, |
1382 | "Accepting client connection: %s" , server.neterr); |
1383 | return; |
1384 | } |
1385 | serverLog(LL_VERBOSE,"Accepted connection to %s" , server.unixsocket); |
1386 | acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL); |
1387 | } |
1388 | } |
1389 | |
1390 | void freeClientOriginalArgv(client *c) { |
1391 | /* We didn't rewrite this client */ |
1392 | if (!c->original_argv) return; |
1393 | |
1394 | for (int j = 0; j < c->original_argc; j++) |
1395 | decrRefCount(c->original_argv[j]); |
1396 | zfree(c->original_argv); |
1397 | c->original_argv = NULL; |
1398 | c->original_argc = 0; |
1399 | } |
1400 | |
1401 | void freeClientArgv(client *c) { |
1402 | int j; |
1403 | for (j = 0; j < c->argc; j++) |
1404 | decrRefCount(c->argv[j]); |
1405 | c->argc = 0; |
1406 | c->cmd = NULL; |
1407 | c->argv_len_sum = 0; |
1408 | c->argv_len = 0; |
1409 | zfree(c->argv); |
1410 | c->argv = NULL; |
1411 | } |
1412 | |
1413 | /* Close all the slaves connections. This is useful in chained replication |
1414 | * when we resync with our own master and want to force all our slaves to |
1415 | * resync with us as well. */ |
1416 | void disconnectSlaves(void) { |
1417 | listIter li; |
1418 | listNode *ln; |
1419 | listRewind(server.slaves,&li); |
1420 | while((ln = listNext(&li))) { |
1421 | freeClient((client*)ln->value); |
1422 | } |
1423 | } |
1424 | |
1425 | /* Check if there is any other slave waiting dumping RDB finished expect me. |
1426 | * This function is useful to judge current dumping RDB can be used for full |
1427 | * synchronization or not. */ |
1428 | int anyOtherSlaveWaitRdb(client *except_me) { |
1429 | listIter li; |
1430 | listNode *ln; |
1431 | |
1432 | listRewind(server.slaves, &li); |
1433 | while((ln = listNext(&li))) { |
1434 | client *slave = ln->value; |
1435 | if (slave != except_me && |
1436 | slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) |
1437 | { |
1438 | return 1; |
1439 | } |
1440 | } |
1441 | return 0; |
1442 | } |
1443 | |
1444 | /* Remove the specified client from global lists where the client could |
1445 | * be referenced, not including the Pub/Sub channels. |
1446 | * This is used by freeClient() and replicationCacheMaster(). */ |
1447 | void unlinkClient(client *c) { |
1448 | listNode *ln; |
1449 | |
1450 | /* If this is marked as current client unset it. */ |
1451 | if (server.current_client == c) server.current_client = NULL; |
1452 | |
1453 | /* Certain operations must be done only if the client has an active connection. |
1454 | * If the client was already unlinked or if it's a "fake client" the |
1455 | * conn is already set to NULL. */ |
1456 | if (c->conn) { |
1457 | /* Remove from the list of active clients. */ |
1458 | if (c->client_list_node) { |
1459 | uint64_t id = htonu64(c->id); |
1460 | raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL); |
1461 | listDelNode(server.clients,c->client_list_node); |
1462 | c->client_list_node = NULL; |
1463 | } |
1464 | |
1465 | /* Check if this is a replica waiting for diskless replication (rdb pipe), |
1466 | * in which case it needs to be cleaned from that list */ |
1467 | if (c->flags & CLIENT_SLAVE && |
1468 | c->replstate == SLAVE_STATE_WAIT_BGSAVE_END && |
1469 | server.rdb_pipe_conns) |
1470 | { |
1471 | int i; |
1472 | for (i=0; i < server.rdb_pipe_numconns; i++) { |
1473 | if (server.rdb_pipe_conns[i] == c->conn) { |
1474 | rdbPipeWriteHandlerConnRemoved(c->conn); |
1475 | server.rdb_pipe_conns[i] = NULL; |
1476 | break; |
1477 | } |
1478 | } |
1479 | } |
1480 | connClose(c->conn); |
1481 | c->conn = NULL; |
1482 | } |
1483 | |
1484 | /* Remove from the list of pending writes if needed. */ |
1485 | if (c->flags & CLIENT_PENDING_WRITE) { |
1486 | ln = listSearchKey(server.clients_pending_write,c); |
1487 | serverAssert(ln != NULL); |
1488 | listDelNode(server.clients_pending_write,ln); |
1489 | c->flags &= ~CLIENT_PENDING_WRITE; |
1490 | } |
1491 | |
1492 | /* Remove from the list of pending reads if needed. */ |
1493 | serverAssert(io_threads_op == IO_THREADS_OP_IDLE); |
1494 | if (c->pending_read_list_node != NULL) { |
1495 | listDelNode(server.clients_pending_read,c->pending_read_list_node); |
1496 | c->pending_read_list_node = NULL; |
1497 | } |
1498 | |
1499 | |
1500 | /* When client was just unblocked because of a blocking operation, |
1501 | * remove it from the list of unblocked clients. */ |
1502 | if (c->flags & CLIENT_UNBLOCKED) { |
1503 | ln = listSearchKey(server.unblocked_clients,c); |
1504 | serverAssert(ln != NULL); |
1505 | listDelNode(server.unblocked_clients,ln); |
1506 | c->flags &= ~CLIENT_UNBLOCKED; |
1507 | } |
1508 | |
1509 | /* Clear the tracking status. */ |
1510 | if (c->flags & CLIENT_TRACKING) disableTracking(c); |
1511 | } |
1512 | |
1513 | /* Clear the client state to resemble a newly connected client. */ |
1514 | void clearClientConnectionState(client *c) { |
1515 | listNode *ln; |
1516 | |
1517 | /* MONITOR clients are also marked with CLIENT_SLAVE, we need to |
1518 | * distinguish between the two. |
1519 | */ |
1520 | if (c->flags & CLIENT_MONITOR) { |
1521 | ln = listSearchKey(server.monitors,c); |
1522 | serverAssert(ln != NULL); |
1523 | listDelNode(server.monitors,ln); |
1524 | |
1525 | c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE); |
1526 | } |
1527 | |
1528 | serverAssert(!(c->flags &(CLIENT_SLAVE|CLIENT_MASTER))); |
1529 | |
1530 | if (c->flags & CLIENT_TRACKING) disableTracking(c); |
1531 | selectDb(c,0); |
1532 | c->resp = 2; |
1533 | |
1534 | clientSetDefaultAuth(c); |
1535 | moduleNotifyUserChanged(c); |
1536 | discardTransaction(c); |
1537 | |
1538 | pubsubUnsubscribeAllChannels(c,0); |
1539 | pubsubUnsubscribeShardAllChannels(c, 0); |
1540 | pubsubUnsubscribeAllPatterns(c,0); |
1541 | |
1542 | if (c->name) { |
1543 | decrRefCount(c->name); |
1544 | c->name = NULL; |
1545 | } |
1546 | |
1547 | /* Selectively clear state flags not covered above */ |
1548 | c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB| |
1549 | CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT); |
1550 | } |
1551 | |
1552 | void freeClient(client *c) { |
1553 | listNode *ln; |
1554 | |
1555 | /* If a client is protected, yet we need to free it right now, make sure |
1556 | * to at least use asynchronous freeing. */ |
1557 | if (c->flags & CLIENT_PROTECTED) { |
1558 | freeClientAsync(c); |
1559 | return; |
1560 | } |
1561 | |
1562 | /* For connected clients, call the disconnection event of modules hooks. */ |
1563 | if (c->conn) { |
1564 | moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE, |
1565 | REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED, |
1566 | c); |
1567 | } |
1568 | |
1569 | /* Notify module system that this client auth status changed. */ |
1570 | moduleNotifyUserChanged(c); |
1571 | |
1572 | /* If this client was scheduled for async freeing we need to remove it |
1573 | * from the queue. Note that we need to do this here, because later |
1574 | * we may call replicationCacheMaster() and the client should already |
1575 | * be removed from the list of clients to free. */ |
1576 | if (c->flags & CLIENT_CLOSE_ASAP) { |
1577 | ln = listSearchKey(server.clients_to_close,c); |
1578 | serverAssert(ln != NULL); |
1579 | listDelNode(server.clients_to_close,ln); |
1580 | } |
1581 | |
1582 | /* If it is our master that's being disconnected we should make sure |
1583 | * to cache the state to try a partial resynchronization later. |
1584 | * |
1585 | * Note that before doing this we make sure that the client is not in |
1586 | * some unexpected state, by checking its flags. */ |
1587 | if (server.master && c->flags & CLIENT_MASTER) { |
1588 | serverLog(LL_WARNING,"Connection with master lost." ); |
1589 | if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) { |
1590 | c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY); |
1591 | replicationCacheMaster(c); |
1592 | return; |
1593 | } |
1594 | } |
1595 | |
1596 | /* Log link disconnection with slave */ |
1597 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
1598 | serverLog(LL_WARNING,"Connection with replica %s lost." , |
1599 | replicationGetSlaveName(c)); |
1600 | } |
1601 | |
1602 | /* Free the query buffer */ |
1603 | sdsfree(c->querybuf); |
1604 | c->querybuf = NULL; |
1605 | |
1606 | /* Deallocate structures used to block on blocking ops. */ |
1607 | if (c->flags & CLIENT_BLOCKED) unblockClient(c); |
1608 | dictRelease(c->bpop.keys); |
1609 | |
1610 | /* UNWATCH all the keys */ |
1611 | unwatchAllKeys(c); |
1612 | listRelease(c->watched_keys); |
1613 | |
1614 | /* Unsubscribe from all the pubsub channels */ |
1615 | pubsubUnsubscribeAllChannels(c,0); |
1616 | pubsubUnsubscribeShardAllChannels(c, 0); |
1617 | pubsubUnsubscribeAllPatterns(c,0); |
1618 | dictRelease(c->pubsub_channels); |
1619 | listRelease(c->pubsub_patterns); |
1620 | dictRelease(c->pubsubshard_channels); |
1621 | |
1622 | /* Free data structures. */ |
1623 | listRelease(c->reply); |
1624 | zfree(c->buf); |
1625 | freeReplicaReferencedReplBuffer(c); |
1626 | freeClientArgv(c); |
1627 | freeClientOriginalArgv(c); |
1628 | if (c->deferred_reply_errors) |
1629 | listRelease(c->deferred_reply_errors); |
1630 | |
1631 | /* Unlink the client: this will close the socket, remove the I/O |
1632 | * handlers, and remove references of the client from different |
1633 | * places where active clients may be referenced. */ |
1634 | unlinkClient(c); |
1635 | |
1636 | /* Master/slave cleanup Case 1: |
1637 | * we lost the connection with a slave. */ |
1638 | if (c->flags & CLIENT_SLAVE) { |
1639 | /* If there is no any other slave waiting dumping RDB finished, the |
1640 | * current child process need not continue to dump RDB, then we kill it. |
1641 | * So child process won't use more memory, and we also can fork a new |
1642 | * child process asap to dump rdb for next full synchronization or bgsave. |
1643 | * But we also need to check if users enable 'save' RDB, if enable, we |
1644 | * should not remove directly since that means RDB is important for users |
1645 | * to keep data safe and we may delay configured 'save' for full sync. */ |
1646 | if (server.saveparamslen == 0 && |
1647 | c->replstate == SLAVE_STATE_WAIT_BGSAVE_END && |
1648 | server.child_type == CHILD_TYPE_RDB && |
1649 | server.rdb_child_type == RDB_CHILD_TYPE_DISK && |
1650 | anyOtherSlaveWaitRdb(c) == 0) |
1651 | { |
1652 | killRDBChild(); |
1653 | } |
1654 | if (c->replstate == SLAVE_STATE_SEND_BULK) { |
1655 | if (c->repldbfd != -1) close(c->repldbfd); |
1656 | if (c->replpreamble) sdsfree(c->replpreamble); |
1657 | } |
1658 | list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves; |
1659 | ln = listSearchKey(l,c); |
1660 | serverAssert(ln != NULL); |
1661 | listDelNode(l,ln); |
1662 | /* We need to remember the time when we started to have zero |
1663 | * attached slaves, as after some time we'll free the replication |
1664 | * backlog. */ |
1665 | if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0) |
1666 | server.repl_no_slaves_since = server.unixtime; |
1667 | refreshGoodSlavesCount(); |
1668 | /* Fire the replica change modules event. */ |
1669 | if (c->replstate == SLAVE_STATE_ONLINE) |
1670 | moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, |
1671 | REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, |
1672 | NULL); |
1673 | } |
1674 | |
1675 | /* Master/slave cleanup Case 2: |
1676 | * we lost the connection with the master. */ |
1677 | if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); |
1678 | |
1679 | /* Remove the contribution that this client gave to our |
1680 | * incrementally computed memory usage. */ |
1681 | server.stat_clients_type_memory[c->last_memory_type] -= |
1682 | c->last_memory_usage; |
1683 | /* Remove client from memory usage buckets */ |
1684 | if (c->mem_usage_bucket) { |
1685 | c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; |
1686 | listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node); |
1687 | } |
1688 | |
1689 | /* Release other dynamically allocated client structure fields, |
1690 | * and finally release the client structure itself. */ |
1691 | if (c->name) decrRefCount(c->name); |
1692 | freeClientMultiState(c); |
1693 | sdsfree(c->peerid); |
1694 | sdsfree(c->sockname); |
1695 | sdsfree(c->slave_addr); |
1696 | zfree(c); |
1697 | } |
1698 | |
1699 | /* Schedule a client to free it at a safe time in the serverCron() function. |
1700 | * This function is useful when we need to terminate a client but we are in |
1701 | * a context where calling freeClient() is not possible, because the client |
1702 | * should be valid for the continuation of the flow of the program. */ |
1703 | void freeClientAsync(client *c) { |
1704 | /* We need to handle concurrent access to the server.clients_to_close list |
1705 | * only in the freeClientAsync() function, since it's the only function that |
1706 | * may access the list while Redis uses I/O threads. All the other accesses |
1707 | * are in the context of the main thread while the other threads are |
1708 | * idle. */ |
1709 | if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_SCRIPT) return; |
1710 | c->flags |= CLIENT_CLOSE_ASAP; |
1711 | if (server.io_threads_num == 1) { |
1712 | /* no need to bother with locking if there's just one thread (the main thread) */ |
1713 | listAddNodeTail(server.clients_to_close,c); |
1714 | return; |
1715 | } |
1716 | static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; |
1717 | pthread_mutex_lock(&async_free_queue_mutex); |
1718 | listAddNodeTail(server.clients_to_close,c); |
1719 | pthread_mutex_unlock(&async_free_queue_mutex); |
1720 | } |
1721 | |
1722 | /* Log errors for invalid use and free the client in async way. |
1723 | * We will add additional information about the client to the message. */ |
1724 | void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...) { |
1725 | va_list ap; |
1726 | va_start(ap, fmt); |
1727 | sds info = sdscatvprintf(sdsempty(), fmt, ap); |
1728 | va_end(ap); |
1729 | |
1730 | sds client = catClientInfoString(sdsempty(), c); |
1731 | serverLog(LL_WARNING, "%s, disconnecting it: %s" , info, client); |
1732 | |
1733 | sdsfree(info); |
1734 | sdsfree(client); |
1735 | freeClientAsync(c); |
1736 | } |
1737 | |
1738 | /* Perform processing of the client before moving on to processing the next client |
1739 | * this is useful for performing operations that affect the global state but can't |
1740 | * wait until we're done with all clients. In other words can't wait until beforeSleep() |
1741 | * return C_ERR in case client is no longer valid after call. |
1742 | * The input client argument: c, may be NULL in case the previous client was |
1743 | * freed before the call. */ |
1744 | int beforeNextClient(client *c) { |
1745 | /* Skip the client processing if we're in an IO thread, in that case we'll perform |
1746 | this operation later (this function is called again) in the fan-in stage of the threading mechanism */ |
1747 | if (io_threads_op != IO_THREADS_OP_IDLE) |
1748 | return C_OK; |
1749 | /* Handle async frees */ |
1750 | /* Note: this doesn't make the server.clients_to_close list redundant because of |
1751 | * cases where we want an async free of a client other than myself. For example |
1752 | * in ACL modifications we disconnect clients authenticated to non-existent |
1753 | * users (see ACL LOAD). */ |
1754 | if (c && (c->flags & CLIENT_CLOSE_ASAP)) { |
1755 | freeClient(c); |
1756 | return C_ERR; |
1757 | } |
1758 | return C_OK; |
1759 | } |
1760 | |
1761 | /* Free the clients marked as CLOSE_ASAP, return the number of clients |
1762 | * freed. */ |
1763 | int freeClientsInAsyncFreeQueue(void) { |
1764 | int freed = 0; |
1765 | listIter li; |
1766 | listNode *ln; |
1767 | |
1768 | listRewind(server.clients_to_close,&li); |
1769 | while ((ln = listNext(&li)) != NULL) { |
1770 | client *c = listNodeValue(ln); |
1771 | |
1772 | if (c->flags & CLIENT_PROTECTED) continue; |
1773 | |
1774 | c->flags &= ~CLIENT_CLOSE_ASAP; |
1775 | freeClient(c); |
1776 | listDelNode(server.clients_to_close,ln); |
1777 | freed++; |
1778 | } |
1779 | return freed; |
1780 | } |
1781 | |
1782 | /* Return a client by ID, or NULL if the client ID is not in the set |
1783 | * of registered clients. Note that "fake clients", created with -1 as FD, |
1784 | * are not registered clients. */ |
1785 | client *lookupClientByID(uint64_t id) { |
1786 | id = htonu64(id); |
1787 | client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id)); |
1788 | return (c == raxNotFound) ? NULL : c; |
1789 | } |
1790 | |
1791 | /* This function should be called from _writeToClient when the reply list is not empty, |
1792 | * it gathers the scattered buffers from reply list and sends them away with connWritev. |
1793 | * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, |
1794 | * and 'nwritten' is an output parameter, it means how many bytes server write |
1795 | * to client. */ |
1796 | static int _writevToClient(client *c, ssize_t *nwritten) { |
1797 | struct iovec iov[IOV_MAX]; |
1798 | int iovcnt = 0; |
1799 | size_t iov_bytes_len = 0; |
1800 | /* If the static reply buffer is not empty, |
1801 | * add it to the iov array for writev() as well. */ |
1802 | if (c->bufpos > 0) { |
1803 | iov[iovcnt].iov_base = c->buf + c->sentlen; |
1804 | iov[iovcnt].iov_len = c->bufpos - c->sentlen; |
1805 | iov_bytes_len += iov[iovcnt++].iov_len; |
1806 | } |
1807 | /* The first node of reply list might be incomplete from the last call, |
1808 | * thus it needs to be calibrated to get the actual data address and length. */ |
1809 | size_t offset = c->bufpos > 0 ? 0 : c->sentlen; |
1810 | listIter iter; |
1811 | listNode *next; |
1812 | clientReplyBlock *o; |
1813 | listRewind(c->reply, &iter); |
1814 | while ((next = listNext(&iter)) && iovcnt < IOV_MAX && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) { |
1815 | o = listNodeValue(next); |
1816 | if (o->used == 0) { /* empty node, just release it and skip. */ |
1817 | c->reply_bytes -= o->size; |
1818 | listDelNode(c->reply, next); |
1819 | offset = 0; |
1820 | continue; |
1821 | } |
1822 | |
1823 | iov[iovcnt].iov_base = o->buf + offset; |
1824 | iov[iovcnt].iov_len = o->used - offset; |
1825 | iov_bytes_len += iov[iovcnt++].iov_len; |
1826 | offset = 0; |
1827 | } |
1828 | if (iovcnt == 0) return C_OK; |
1829 | *nwritten = connWritev(c->conn, iov, iovcnt); |
1830 | if (*nwritten <= 0) return C_ERR; |
1831 | |
1832 | /* Locate the new node which has leftover data and |
1833 | * release all nodes in front of it. */ |
1834 | ssize_t remaining = *nwritten; |
1835 | if (c->bufpos > 0) { /* deal with static reply buffer first. */ |
1836 | int buf_len = c->bufpos - c->sentlen; |
1837 | c->sentlen += remaining; |
1838 | /* If the buffer was sent, set bufpos to zero to continue with |
1839 | * the remainder of the reply. */ |
1840 | if (remaining >= buf_len) { |
1841 | c->bufpos = 0; |
1842 | c->sentlen = 0; |
1843 | } |
1844 | remaining -= buf_len; |
1845 | } |
1846 | listRewind(c->reply, &iter); |
1847 | while (remaining > 0) { |
1848 | next = listNext(&iter); |
1849 | o = listNodeValue(next); |
1850 | if (remaining < (ssize_t)(o->used - c->sentlen)) { |
1851 | c->sentlen += remaining; |
1852 | break; |
1853 | } |
1854 | remaining -= (ssize_t)(o->used - c->sentlen); |
1855 | c->reply_bytes -= o->size; |
1856 | listDelNode(c->reply, next); |
1857 | c->sentlen = 0; |
1858 | } |
1859 | |
1860 | return C_OK; |
1861 | } |
1862 | |
1863 | /* This function does actual writing output buffers to different types of |
1864 | * clients, it is called by writeToClient. |
1865 | * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, |
1866 | * and 'nwritten' is an output parameter, it means how many bytes server write |
1867 | * to client. */ |
1868 | int _writeToClient(client *c, ssize_t *nwritten) { |
1869 | *nwritten = 0; |
1870 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
1871 | serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); |
1872 | |
1873 | replBufBlock *o = listNodeValue(c->ref_repl_buf_node); |
1874 | serverAssert(o->used >= c->ref_block_pos); |
1875 | /* Send current block if it is not fully sent. */ |
1876 | if (o->used > c->ref_block_pos) { |
1877 | *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos, |
1878 | o->used-c->ref_block_pos); |
1879 | if (*nwritten <= 0) return C_ERR; |
1880 | c->ref_block_pos += *nwritten; |
1881 | } |
1882 | |
1883 | /* If we fully sent the object on head, go to the next one. */ |
1884 | listNode *next = listNextNode(c->ref_repl_buf_node); |
1885 | if (next && c->ref_block_pos == o->used) { |
1886 | o->refcount--; |
1887 | ((replBufBlock *)(listNodeValue(next)))->refcount++; |
1888 | c->ref_repl_buf_node = next; |
1889 | c->ref_block_pos = 0; |
1890 | incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); |
1891 | } |
1892 | return C_OK; |
1893 | } |
1894 | |
1895 | /* When the reply list is not empty, it's better to use writev to save us some |
1896 | * system calls and TCP packets. */ |
1897 | if (listLength(c->reply) > 0) { |
1898 | int ret = _writevToClient(c, nwritten); |
1899 | if (ret != C_OK) return ret; |
1900 | |
1901 | /* If there are no longer objects in the list, we expect |
1902 | * the count of reply bytes to be exactly zero. */ |
1903 | if (listLength(c->reply) == 0) |
1904 | serverAssert(c->reply_bytes == 0); |
1905 | } else if (c->bufpos > 0) { |
1906 | *nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen); |
1907 | if (*nwritten <= 0) return C_ERR; |
1908 | c->sentlen += *nwritten; |
1909 | |
1910 | /* If the buffer was sent, set bufpos to zero to continue with |
1911 | * the remainder of the reply. */ |
1912 | if ((int)c->sentlen == c->bufpos) { |
1913 | c->bufpos = 0; |
1914 | c->sentlen = 0; |
1915 | } |
1916 | } |
1917 | |
1918 | return C_OK; |
1919 | } |
1920 | |
1921 | /* Write data in output buffers to client. Return C_OK if the client |
1922 | * is still valid after the call, C_ERR if it was freed because of some |
1923 | * error. If handler_installed is set, it will attempt to clear the |
1924 | * write event. |
1925 | * |
1926 | * This function is called by threads, but always with handler_installed |
1927 | * set to 0. So when handler_installed is set to 0 the function must be |
1928 | * thread safe. */ |
1929 | int writeToClient(client *c, int handler_installed) { |
1930 | /* Update total number of writes on server */ |
1931 | atomicIncr(server.stat_total_writes_processed, 1); |
1932 | |
1933 | ssize_t nwritten = 0, totwritten = 0; |
1934 | |
1935 | while(clientHasPendingReplies(c)) { |
1936 | int ret = _writeToClient(c, &nwritten); |
1937 | if (ret == C_ERR) break; |
1938 | totwritten += nwritten; |
1939 | /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT |
1940 | * bytes, in a single threaded server it's a good idea to serve |
1941 | * other clients as well, even if a very large request comes from |
1942 | * super fast link that is always able to accept data (in real world |
1943 | * scenario think about 'KEYS *' against the loopback interface). |
1944 | * |
1945 | * However if we are over the maxmemory limit we ignore that and |
1946 | * just deliver as much data as it is possible to deliver. |
1947 | * |
1948 | * Moreover, we also send as much as possible if the client is |
1949 | * a slave or a monitor (otherwise, on high-speed traffic, the |
1950 | * replication/output buffer will grow indefinitely) */ |
1951 | if (totwritten > NET_MAX_WRITES_PER_EVENT && |
1952 | (server.maxmemory == 0 || |
1953 | zmalloc_used_memory() < server.maxmemory) && |
1954 | !(c->flags & CLIENT_SLAVE)) break; |
1955 | } |
1956 | |
1957 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
1958 | atomicIncr(server.stat_net_repl_output_bytes, totwritten); |
1959 | } else { |
1960 | atomicIncr(server.stat_net_output_bytes, totwritten); |
1961 | } |
1962 | |
1963 | if (nwritten == -1) { |
1964 | if (connGetState(c->conn) != CONN_STATE_CONNECTED) { |
1965 | serverLog(LL_VERBOSE, |
1966 | "Error writing to client: %s" , connGetLastError(c->conn)); |
1967 | freeClientAsync(c); |
1968 | return C_ERR; |
1969 | } |
1970 | } |
1971 | if (totwritten > 0) { |
1972 | /* For clients representing masters we don't count sending data |
1973 | * as an interaction, since we always send REPLCONF ACK commands |
1974 | * that take some time to just fill the socket output buffer. |
1975 | * We just rely on data / pings received for timeout detection. */ |
1976 | if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; |
1977 | } |
1978 | if (!clientHasPendingReplies(c)) { |
1979 | c->sentlen = 0; |
1980 | /* Note that writeToClient() is called in a threaded way, but |
1981 | * aeDeleteFileEvent() is not thread safe: however writeToClient() |
1982 | * is always called with handler_installed set to 0 from threads |
1983 | * so we are fine. */ |
1984 | if (handler_installed) { |
1985 | serverAssert(io_threads_op == IO_THREADS_OP_IDLE); |
1986 | connSetWriteHandler(c->conn, NULL); |
1987 | } |
1988 | |
1989 | /* Close connection after entire reply has been sent. */ |
1990 | if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { |
1991 | freeClientAsync(c); |
1992 | return C_ERR; |
1993 | } |
1994 | } |
1995 | /* Update client's memory usage after writing. |
1996 | * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in |
1997 | * handleClientsWithPendingWritesUsingThreads(). */ |
1998 | if (io_threads_op == IO_THREADS_OP_IDLE) |
1999 | updateClientMemUsage(c); |
2000 | return C_OK; |
2001 | } |
2002 | |
2003 | /* Write event handler. Just send data to the client. */ |
2004 | void sendReplyToClient(connection *conn) { |
2005 | client *c = connGetPrivateData(conn); |
2006 | writeToClient(c,1); |
2007 | } |
2008 | |
2009 | /* This function is called just before entering the event loop, in the hope |
2010 | * we can just write the replies to the client output buffer without any |
2011 | * need to use a syscall in order to install the writable event handler, |
2012 | * get it called, and so forth. */ |
2013 | int handleClientsWithPendingWrites(void) { |
2014 | listIter li; |
2015 | listNode *ln; |
2016 | int processed = listLength(server.clients_pending_write); |
2017 | |
2018 | listRewind(server.clients_pending_write,&li); |
2019 | while((ln = listNext(&li))) { |
2020 | client *c = listNodeValue(ln); |
2021 | c->flags &= ~CLIENT_PENDING_WRITE; |
2022 | listDelNode(server.clients_pending_write,ln); |
2023 | |
2024 | /* If a client is protected, don't do anything, |
2025 | * that may trigger write error or recreate handler. */ |
2026 | if (c->flags & CLIENT_PROTECTED) continue; |
2027 | |
2028 | /* Don't write to clients that are going to be closed anyway. */ |
2029 | if (c->flags & CLIENT_CLOSE_ASAP) continue; |
2030 | |
2031 | /* Try to write buffers to the client socket. */ |
2032 | if (writeToClient(c,0) == C_ERR) continue; |
2033 | |
2034 | /* If after the synchronous writes above we still have data to |
2035 | * output to the client, we need to install the writable handler. */ |
2036 | if (clientHasPendingReplies(c)) { |
2037 | installClientWriteHandler(c); |
2038 | } |
2039 | } |
2040 | return processed; |
2041 | } |
2042 | |
2043 | /* resetClient prepare the client to process the next command */ |
2044 | void resetClient(client *c) { |
2045 | redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; |
2046 | |
2047 | freeClientArgv(c); |
2048 | c->reqtype = 0; |
2049 | c->multibulklen = 0; |
2050 | c->bulklen = -1; |
2051 | c->slot = -1; |
2052 | |
2053 | if (c->deferred_reply_errors) |
2054 | listRelease(c->deferred_reply_errors); |
2055 | c->deferred_reply_errors = NULL; |
2056 | |
2057 | /* We clear the ASKING flag as well if we are not inside a MULTI, and |
2058 | * if what we just executed is not the ASKING command itself. */ |
2059 | if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) |
2060 | c->flags &= ~CLIENT_ASKING; |
2061 | |
2062 | /* We do the same for the CACHING command as well. It also affects |
2063 | * the next command or transaction executed, in a way very similar |
2064 | * to ASKING. */ |
2065 | if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) |
2066 | c->flags &= ~CLIENT_TRACKING_CACHING; |
2067 | |
2068 | /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply |
2069 | * to the next command will be sent, but set the flag if the command |
2070 | * we just processed was "CLIENT REPLY SKIP". */ |
2071 | c->flags &= ~CLIENT_REPLY_SKIP; |
2072 | if (c->flags & CLIENT_REPLY_SKIP_NEXT) { |
2073 | c->flags |= CLIENT_REPLY_SKIP; |
2074 | c->flags &= ~CLIENT_REPLY_SKIP_NEXT; |
2075 | } |
2076 | } |
2077 | |
2078 | /* This function is used when we want to re-enter the event loop but there |
2079 | * is the risk that the client we are dealing with will be freed in some |
2080 | * way. This happens for instance in: |
2081 | * |
2082 | * * DEBUG RELOAD and similar. |
2083 | * * When a Lua script is in -BUSY state. |
2084 | * |
2085 | * So the function will protect the client by doing two things: |
2086 | * |
2087 | * 1) It removes the file events. This way it is not possible that an |
2088 | * error is signaled on the socket, freeing the client. |
2089 | * 2) Moreover it makes sure that if the client is freed in a different code |
2090 | * path, it is not really released, but only marked for later release. */ |
2091 | void protectClient(client *c) { |
2092 | c->flags |= CLIENT_PROTECTED; |
2093 | if (c->conn) { |
2094 | connSetReadHandler(c->conn,NULL); |
2095 | connSetWriteHandler(c->conn,NULL); |
2096 | } |
2097 | } |
2098 | |
2099 | /* This will undo the client protection done by protectClient() */ |
2100 | void unprotectClient(client *c) { |
2101 | if (c->flags & CLIENT_PROTECTED) { |
2102 | c->flags &= ~CLIENT_PROTECTED; |
2103 | if (c->conn) { |
2104 | connSetReadHandler(c->conn,readQueryFromClient); |
2105 | if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); |
2106 | } |
2107 | } |
2108 | } |
2109 | |
2110 | /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP, |
2111 | * this function consumes the client query buffer and creates a command ready |
2112 | * to be executed inside the client structure. Returns C_OK if the command |
2113 | * is ready to be executed, or C_ERR if there is still protocol to read to |
2114 | * have a well formed command. The function also returns C_ERR when there is |
2115 | * a protocol error: in such a case the client structure is setup to reply |
2116 | * with the error and close the connection. */ |
2117 | int processInlineBuffer(client *c) { |
2118 | char *newline; |
2119 | int argc, j, linefeed_chars = 1; |
2120 | sds *argv, aux; |
2121 | size_t querylen; |
2122 | |
2123 | /* Search for end of line */ |
2124 | newline = strchr(c->querybuf+c->qb_pos,'\n'); |
2125 | |
2126 | /* Nothing to do without a \r\n */ |
2127 | if (newline == NULL) { |
2128 | if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { |
2129 | addReplyError(c,"Protocol error: too big inline request" ); |
2130 | setProtocolError("too big inline request" ,c); |
2131 | } |
2132 | return C_ERR; |
2133 | } |
2134 | |
2135 | /* Handle the \r\n case. */ |
2136 | if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') |
2137 | newline--, linefeed_chars++; |
2138 | |
2139 | /* Split the input buffer up to the \r\n */ |
2140 | querylen = newline-(c->querybuf+c->qb_pos); |
2141 | aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); |
2142 | argv = sdssplitargs(aux,&argc); |
2143 | sdsfree(aux); |
2144 | if (argv == NULL) { |
2145 | addReplyError(c,"Protocol error: unbalanced quotes in request" ); |
2146 | setProtocolError("unbalanced quotes in inline request" ,c); |
2147 | return C_ERR; |
2148 | } |
2149 | |
2150 | /* Newline from slaves can be used to refresh the last ACK time. |
2151 | * This is useful for a slave to ping back while loading a big |
2152 | * RDB file. */ |
2153 | if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE) |
2154 | c->repl_ack_time = server.unixtime; |
2155 | |
2156 | /* Masters should never send us inline protocol to run actual |
2157 | * commands. If this happens, it is likely due to a bug in Redis where |
2158 | * we got some desynchronization in the protocol, for example |
2159 | * because of a PSYNC gone bad. |
2160 | * |
2161 | * However there is an exception: masters may send us just a newline |
2162 | * to keep the connection active. */ |
2163 | if (querylen != 0 && c->flags & CLIENT_MASTER) { |
2164 | sdsfreesplitres(argv,argc); |
2165 | serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master." ); |
2166 | setProtocolError("Master using the inline protocol. Desync?" ,c); |
2167 | return C_ERR; |
2168 | } |
2169 | |
2170 | /* Move querybuffer position to the next query in the buffer. */ |
2171 | c->qb_pos += querylen+linefeed_chars; |
2172 | |
2173 | /* Setup argv array on client structure */ |
2174 | if (argc) { |
2175 | if (c->argv) zfree(c->argv); |
2176 | c->argv_len = argc; |
2177 | c->argv = zmalloc(sizeof(robj*)*c->argv_len); |
2178 | c->argv_len_sum = 0; |
2179 | } |
2180 | |
2181 | /* Create redis objects for all arguments. */ |
2182 | for (c->argc = 0, j = 0; j < argc; j++) { |
2183 | c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); |
2184 | c->argc++; |
2185 | c->argv_len_sum += sdslen(argv[j]); |
2186 | } |
2187 | zfree(argv); |
2188 | return C_OK; |
2189 | } |
2190 | |
2191 | /* Helper function. Record protocol error details in server log, |
2192 | * and set the client as CLIENT_CLOSE_AFTER_REPLY and |
2193 | * CLIENT_PROTOCOL_ERROR. */ |
2194 | #define PROTO_DUMP_LEN 128 |
2195 | static void setProtocolError(const char *errstr, client *c) { |
2196 | if (server.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) { |
2197 | sds client = catClientInfoString(sdsempty(),c); |
2198 | |
2199 | /* Sample some protocol to given an idea about what was inside. */ |
2200 | char buf[256]; |
2201 | if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { |
2202 | snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'" , c->querybuf+c->qb_pos); |
2203 | } else { |
2204 | snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'" , PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); |
2205 | } |
2206 | |
2207 | /* Remove non printable chars. */ |
2208 | char *p = buf; |
2209 | while (*p != '\0') { |
2210 | if (!isprint(*p)) *p = '.'; |
2211 | p++; |
2212 | } |
2213 | |
2214 | /* Log all the client and protocol info. */ |
2215 | int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING : |
2216 | LL_VERBOSE; |
2217 | serverLog(loglevel, |
2218 | "Protocol error (%s) from client: %s. %s" , errstr, client, buf); |
2219 | sdsfree(client); |
2220 | } |
2221 | c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); |
2222 | } |
2223 | |
2224 | /* Process the query buffer for client 'c', setting up the client argument |
2225 | * vector for command execution. Returns C_OK if after running the function |
2226 | * the client has a well-formed ready to be processed command, otherwise |
2227 | * C_ERR if there is still to read more buffer to get the full command. |
2228 | * The function also returns C_ERR when there is a protocol error: in such a |
2229 | * case the client structure is setup to reply with the error and close |
2230 | * the connection. |
2231 | * |
2232 | * This function is called if processInputBuffer() detects that the next |
2233 | * command is in RESP format, so the first byte in the command is found |
2234 | * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ |
2235 | int processMultibulkBuffer(client *c) { |
2236 | char *newline = NULL; |
2237 | int ok; |
2238 | long long ll; |
2239 | |
2240 | if (c->multibulklen == 0) { |
2241 | /* The client should have been reset */ |
2242 | serverAssertWithInfo(c,NULL,c->argc == 0); |
2243 | |
2244 | /* Multi bulk length cannot be read without a \r\n */ |
2245 | newline = strchr(c->querybuf+c->qb_pos,'\r'); |
2246 | if (newline == NULL) { |
2247 | if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { |
2248 | addReplyError(c,"Protocol error: too big mbulk count string" ); |
2249 | setProtocolError("too big mbulk count string" ,c); |
2250 | } |
2251 | return C_ERR; |
2252 | } |
2253 | |
2254 | /* Buffer should also contain \n */ |
2255 | if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) |
2256 | return C_ERR; |
2257 | |
2258 | /* We know for sure there is a whole line since newline != NULL, |
2259 | * so go ahead and find out the multi bulk length. */ |
2260 | serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); |
2261 | ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); |
2262 | if (!ok || ll > INT_MAX) { |
2263 | addReplyError(c,"Protocol error: invalid multibulk length" ); |
2264 | setProtocolError("invalid mbulk count" ,c); |
2265 | return C_ERR; |
2266 | } else if (ll > 10 && authRequired(c)) { |
2267 | addReplyError(c, "Protocol error: unauthenticated multibulk length" ); |
2268 | setProtocolError("unauth mbulk count" , c); |
2269 | return C_ERR; |
2270 | } |
2271 | |
2272 | c->qb_pos = (newline-c->querybuf)+2; |
2273 | |
2274 | if (ll <= 0) return C_OK; |
2275 | |
2276 | c->multibulklen = ll; |
2277 | |
2278 | /* Setup argv array on client structure */ |
2279 | if (c->argv) zfree(c->argv); |
2280 | c->argv_len = min(c->multibulklen, 1024); |
2281 | c->argv = zmalloc(sizeof(robj*)*c->argv_len); |
2282 | c->argv_len_sum = 0; |
2283 | } |
2284 | |
2285 | serverAssertWithInfo(c,NULL,c->multibulklen > 0); |
2286 | while(c->multibulklen) { |
2287 | /* Read bulk length if unknown */ |
2288 | if (c->bulklen == -1) { |
2289 | newline = strchr(c->querybuf+c->qb_pos,'\r'); |
2290 | if (newline == NULL) { |
2291 | if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { |
2292 | addReplyError(c, |
2293 | "Protocol error: too big bulk count string" ); |
2294 | setProtocolError("too big bulk count string" ,c); |
2295 | return C_ERR; |
2296 | } |
2297 | break; |
2298 | } |
2299 | |
2300 | /* Buffer should also contain \n */ |
2301 | if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) |
2302 | break; |
2303 | |
2304 | if (c->querybuf[c->qb_pos] != '$') { |
2305 | addReplyErrorFormat(c, |
2306 | "Protocol error: expected '$', got '%c'" , |
2307 | c->querybuf[c->qb_pos]); |
2308 | setProtocolError("expected $ but got something else" ,c); |
2309 | return C_ERR; |
2310 | } |
2311 | |
2312 | ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); |
2313 | if (!ok || ll < 0 || |
2314 | (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { |
2315 | addReplyError(c,"Protocol error: invalid bulk length" ); |
2316 | setProtocolError("invalid bulk length" ,c); |
2317 | return C_ERR; |
2318 | } else if (ll > 16384 && authRequired(c)) { |
2319 | addReplyError(c, "Protocol error: unauthenticated bulk length" ); |
2320 | setProtocolError("unauth bulk length" , c); |
2321 | return C_ERR; |
2322 | } |
2323 | |
2324 | c->qb_pos = newline-c->querybuf+2; |
2325 | if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { |
2326 | /* When the client is not a master client (because master |
2327 | * client's querybuf can only be trimmed after data applied |
2328 | * and sent to replicas). |
2329 | * |
2330 | * If we are going to read a large object from network |
2331 | * try to make it likely that it will start at c->querybuf |
2332 | * boundary so that we can optimize object creation |
2333 | * avoiding a large copy of data. |
2334 | * |
2335 | * But only when the data we have not parsed is less than |
2336 | * or equal to ll+2. If the data length is greater than |
2337 | * ll+2, trimming querybuf is just a waste of time, because |
2338 | * at this time the querybuf contains not only our bulk. */ |
2339 | if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { |
2340 | sdsrange(c->querybuf,c->qb_pos,-1); |
2341 | c->qb_pos = 0; |
2342 | /* Hint the sds library about the amount of bytes this string is |
2343 | * going to contain. */ |
2344 | c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf,ll+2-sdslen(c->querybuf)); |
2345 | } |
2346 | } |
2347 | c->bulklen = ll; |
2348 | } |
2349 | |
2350 | /* Read bulk argument */ |
2351 | if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { |
2352 | /* Not enough data (+2 == trailing \r\n) */ |
2353 | break; |
2354 | } else { |
2355 | /* Check if we have space in argv, grow if needed */ |
2356 | if (c->argc >= c->argv_len) { |
2357 | c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen); |
2358 | c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); |
2359 | } |
2360 | |
2361 | /* Optimization: if a non-master client's buffer contains JUST our bulk element |
2362 | * instead of creating a new object by *copying* the sds we |
2363 | * just use the current sds string. */ |
2364 | if (!(c->flags & CLIENT_MASTER) && |
2365 | c->qb_pos == 0 && |
2366 | c->bulklen >= PROTO_MBULK_BIG_ARG && |
2367 | sdslen(c->querybuf) == (size_t)(c->bulklen+2)) |
2368 | { |
2369 | c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); |
2370 | c->argv_len_sum += c->bulklen; |
2371 | sdsIncrLen(c->querybuf,-2); /* remove CRLF */ |
2372 | /* Assume that if we saw a fat argument we'll see another one |
2373 | * likely... */ |
2374 | c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); |
2375 | sdsclear(c->querybuf); |
2376 | } else { |
2377 | c->argv[c->argc++] = |
2378 | createStringObject(c->querybuf+c->qb_pos,c->bulklen); |
2379 | c->argv_len_sum += c->bulklen; |
2380 | c->qb_pos += c->bulklen+2; |
2381 | } |
2382 | c->bulklen = -1; |
2383 | c->multibulklen--; |
2384 | } |
2385 | } |
2386 | |
2387 | /* We're done when c->multibulk == 0 */ |
2388 | if (c->multibulklen == 0) return C_OK; |
2389 | |
2390 | /* Still not ready to process the command */ |
2391 | return C_ERR; |
2392 | } |
2393 | |
2394 | /* Perform necessary tasks after a command was executed: |
2395 | * |
2396 | * 1. The client is reset unless there are reasons to avoid doing it. |
2397 | * 2. In the case of master clients, the replication offset is updated. |
2398 | * 3. Propagate commands we got from our master to replicas down the line. */ |
2399 | void commandProcessed(client *c) { |
2400 | /* If client is blocked(including paused), just return avoid reset and replicate. |
2401 | * |
2402 | * 1. Don't reset the client structure for blocked clients, so that the reply |
2403 | * callback will still be able to access the client argv and argc fields. |
2404 | * The client will be reset in unblockClient(). |
2405 | * 2. Don't update replication offset or propagate commands to replicas, |
2406 | * since we have not applied the command. */ |
2407 | if (c->flags & CLIENT_BLOCKED) return; |
2408 | |
2409 | resetClient(c); |
2410 | |
2411 | long long prev_offset = c->reploff; |
2412 | if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { |
2413 | /* Update the applied replication offset of our master. */ |
2414 | c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; |
2415 | } |
2416 | |
2417 | /* If the client is a master we need to compute the difference |
2418 | * between the applied offset before and after processing the buffer, |
2419 | * to understand how much of the replication stream was actually |
2420 | * applied to the master state: this quantity, and its corresponding |
2421 | * part of the replication stream, will be propagated to the |
2422 | * sub-replicas and to the replication backlog. */ |
2423 | if (c->flags & CLIENT_MASTER) { |
2424 | long long applied = c->reploff - prev_offset; |
2425 | if (applied) { |
2426 | replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied); |
2427 | c->repl_applied += applied; |
2428 | } |
2429 | } |
2430 | } |
2431 | |
2432 | /* This function calls processCommand(), but also performs a few sub tasks |
2433 | * for the client that are useful in that context: |
2434 | * |
2435 | * 1. It sets the current client to the client 'c'. |
2436 | * 2. calls commandProcessed() if the command was handled. |
2437 | * |
2438 | * The function returns C_ERR in case the client was freed as a side effect |
2439 | * of processing the command, otherwise C_OK is returned. */ |
2440 | int processCommandAndResetClient(client *c) { |
2441 | int deadclient = 0; |
2442 | client *old_client = server.current_client; |
2443 | server.current_client = c; |
2444 | if (processCommand(c) == C_OK) { |
2445 | commandProcessed(c); |
2446 | /* Update the client's memory to include output buffer growth following the |
2447 | * processed command. */ |
2448 | updateClientMemUsage(c); |
2449 | } |
2450 | |
2451 | if (server.current_client == NULL) deadclient = 1; |
2452 | /* |
2453 | * Restore the old client, this is needed because when a script |
2454 | * times out, we will get into this code from processEventsWhileBlocked. |
2455 | * Which will cause to set the server.current_client. If not restored |
2456 | * we will return 1 to our caller which will falsely indicate the client |
2457 | * is dead and will stop reading from its buffer. |
2458 | */ |
2459 | server.current_client = old_client; |
2460 | /* performEvictions may flush slave output buffers. This may |
2461 | * result in a slave, that may be the active client, to be |
2462 | * freed. */ |
2463 | return deadclient ? C_ERR : C_OK; |
2464 | } |
2465 | |
2466 | |
2467 | /* This function will execute any fully parsed commands pending on |
2468 | * the client. Returns C_ERR if the client is no longer valid after executing |
2469 | * the command, and C_OK for all other cases. */ |
2470 | int processPendingCommandAndInputBuffer(client *c) { |
2471 | if (c->flags & CLIENT_PENDING_COMMAND) { |
2472 | c->flags &= ~CLIENT_PENDING_COMMAND; |
2473 | if (processCommandAndResetClient(c) == C_ERR) { |
2474 | return C_ERR; |
2475 | } |
2476 | } |
2477 | |
2478 | /* Now process client if it has more data in it's buffer. |
2479 | * |
2480 | * Note: when a master client steps into this function, |
2481 | * it can always satisfy this condition, because its querbuf |
2482 | * contains data not applied. */ |
2483 | if (c->querybuf && sdslen(c->querybuf) > 0) { |
2484 | return processInputBuffer(c); |
2485 | } |
2486 | return C_OK; |
2487 | } |
2488 | |
2489 | /* This function is called every time, in the client structure 'c', there is |
2490 | * more query buffer to process, because we read more data from the socket |
2491 | * or because a client was blocked and later reactivated, so there could be |
2492 | * pending query buffer, already representing a full command, to process. |
2493 | * return C_ERR in case the client was freed during the processing */ |
2494 | int processInputBuffer(client *c) { |
2495 | /* Keep processing while there is something in the input buffer */ |
2496 | while(c->qb_pos < sdslen(c->querybuf)) { |
2497 | /* Immediately abort if the client is in the middle of something. */ |
2498 | if (c->flags & CLIENT_BLOCKED) break; |
2499 | |
2500 | /* Don't process more buffers from clients that have already pending |
2501 | * commands to execute in c->argv. */ |
2502 | if (c->flags & CLIENT_PENDING_COMMAND) break; |
2503 | |
2504 | /* Don't process input from the master while there is a busy script |
2505 | * condition on the slave. We want just to accumulate the replication |
2506 | * stream (instead of replying -BUSY like we do with other clients) and |
2507 | * later resume the processing. */ |
2508 | if (isInsideYieldingLongCommand() && c->flags & CLIENT_MASTER) break; |
2509 | |
2510 | /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is |
2511 | * written to the client. Make sure to not let the reply grow after |
2512 | * this flag has been set (i.e. don't process more commands). |
2513 | * |
2514 | * The same applies for clients we want to terminate ASAP. */ |
2515 | if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; |
2516 | |
2517 | /* Determine request type when unknown. */ |
2518 | if (!c->reqtype) { |
2519 | if (c->querybuf[c->qb_pos] == '*') { |
2520 | c->reqtype = PROTO_REQ_MULTIBULK; |
2521 | } else { |
2522 | c->reqtype = PROTO_REQ_INLINE; |
2523 | } |
2524 | } |
2525 | |
2526 | if (c->reqtype == PROTO_REQ_INLINE) { |
2527 | if (processInlineBuffer(c) != C_OK) break; |
2528 | } else if (c->reqtype == PROTO_REQ_MULTIBULK) { |
2529 | if (processMultibulkBuffer(c) != C_OK) break; |
2530 | } else { |
2531 | serverPanic("Unknown request type" ); |
2532 | } |
2533 | |
2534 | /* Multibulk processing could see a <= 0 length. */ |
2535 | if (c->argc == 0) { |
2536 | resetClient(c); |
2537 | } else { |
2538 | /* If we are in the context of an I/O thread, we can't really |
2539 | * execute the command here. All we can do is to flag the client |
2540 | * as one that needs to process the command. */ |
2541 | if (io_threads_op != IO_THREADS_OP_IDLE) { |
2542 | serverAssert(io_threads_op == IO_THREADS_OP_READ); |
2543 | c->flags |= CLIENT_PENDING_COMMAND; |
2544 | break; |
2545 | } |
2546 | |
2547 | /* We are finally ready to execute the command. */ |
2548 | if (processCommandAndResetClient(c) == C_ERR) { |
2549 | /* If the client is no longer valid, we avoid exiting this |
2550 | * loop and trimming the client buffer later. So we return |
2551 | * ASAP in that case. */ |
2552 | return C_ERR; |
2553 | } |
2554 | } |
2555 | } |
2556 | |
2557 | if (c->flags & CLIENT_MASTER) { |
2558 | /* If the client is a master, trim the querybuf to repl_applied, |
2559 | * since master client is very special, its querybuf not only |
2560 | * used to parse command, but also proxy to sub-replicas. |
2561 | * |
2562 | * Here are some scenarios we cannot trim to qb_pos: |
2563 | * 1. we don't receive complete command from master |
2564 | * 2. master client blocked cause of client pause |
2565 | * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND |
2566 | * |
2567 | * In these scenarios, qb_pos points to the part of the current command |
2568 | * or the beginning of next command, and the current command is not applied yet, |
2569 | * so the repl_applied is not equal to qb_pos. */ |
2570 | if (c->repl_applied) { |
2571 | sdsrange(c->querybuf,c->repl_applied,-1); |
2572 | c->qb_pos -= c->repl_applied; |
2573 | c->repl_applied = 0; |
2574 | } |
2575 | } else if (c->qb_pos) { |
2576 | /* Trim to pos */ |
2577 | sdsrange(c->querybuf,c->qb_pos,-1); |
2578 | c->qb_pos = 0; |
2579 | } |
2580 | |
2581 | /* Update client memory usage after processing the query buffer, this is |
2582 | * important in case the query buffer is big and wasn't drained during |
2583 | * the above loop (because of partially sent big commands). */ |
2584 | if (io_threads_op == IO_THREADS_OP_IDLE) |
2585 | updateClientMemUsage(c); |
2586 | |
2587 | return C_OK; |
2588 | } |
2589 | |
2590 | void readQueryFromClient(connection *conn) { |
2591 | client *c = connGetPrivateData(conn); |
2592 | int nread, big_arg = 0; |
2593 | size_t qblen, readlen; |
2594 | |
2595 | /* Check if we want to read from the client later when exiting from |
2596 | * the event loop. This is the case if threaded I/O is enabled. */ |
2597 | if (postponeClientRead(c)) return; |
2598 | |
2599 | /* Update total number of reads on server */ |
2600 | atomicIncr(server.stat_total_reads_processed, 1); |
2601 | |
2602 | readlen = PROTO_IOBUF_LEN; |
2603 | /* If this is a multi bulk request, and we are processing a bulk reply |
2604 | * that is large enough, try to maximize the probability that the query |
2605 | * buffer contains exactly the SDS string representing the object, even |
2606 | * at the risk of requiring more read(2) calls. This way the function |
2607 | * processMultiBulkBuffer() can avoid copying buffers to create the |
2608 | * Redis Object representing the argument. */ |
2609 | if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 |
2610 | && c->bulklen >= PROTO_MBULK_BIG_ARG) |
2611 | { |
2612 | ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); |
2613 | big_arg = 1; |
2614 | |
2615 | /* Note that the 'remaining' variable may be zero in some edge case, |
2616 | * for example once we resume a blocked client after CLIENT PAUSE. */ |
2617 | if (remaining > 0) readlen = remaining; |
2618 | |
2619 | /* Master client needs expand the readlen when meet BIG_ARG(see #9100), |
2620 | * but doesn't need align to the next arg, we can read more data. */ |
2621 | if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) |
2622 | readlen = PROTO_IOBUF_LEN; |
2623 | } |
2624 | |
2625 | qblen = sdslen(c->querybuf); |
2626 | if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. |
2627 | (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { |
2628 | /* When reading a BIG_ARG we won't be reading more than that one arg |
2629 | * into the query buffer, so we don't need to pre-allocate more than we |
2630 | * need, so using the non-greedy growing. For an initial allocation of |
2631 | * the query buffer, we also don't wanna use the greedy growth, in order |
2632 | * to avoid collision with the RESIZE_THRESHOLD mechanism. */ |
2633 | c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen); |
2634 | } else { |
2635 | c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); |
2636 | |
2637 | /* Read as much as possible from the socket to save read(2) system calls. */ |
2638 | readlen = sdsavail(c->querybuf); |
2639 | } |
2640 | nread = connRead(c->conn, c->querybuf+qblen, readlen); |
2641 | if (nread == -1) { |
2642 | if (connGetState(conn) == CONN_STATE_CONNECTED) { |
2643 | return; |
2644 | } else { |
2645 | serverLog(LL_VERBOSE, "Reading from client: %s" ,connGetLastError(c->conn)); |
2646 | freeClientAsync(c); |
2647 | goto done; |
2648 | } |
2649 | } else if (nread == 0) { |
2650 | if (server.verbosity <= LL_VERBOSE) { |
2651 | sds info = catClientInfoString(sdsempty(), c); |
2652 | serverLog(LL_VERBOSE, "Client closed connection %s" , info); |
2653 | sdsfree(info); |
2654 | } |
2655 | freeClientAsync(c); |
2656 | goto done; |
2657 | } |
2658 | |
2659 | sdsIncrLen(c->querybuf,nread); |
2660 | qblen = sdslen(c->querybuf); |
2661 | if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; |
2662 | |
2663 | c->lastinteraction = server.unixtime; |
2664 | if (c->flags & CLIENT_MASTER) { |
2665 | c->read_reploff += nread; |
2666 | atomicIncr(server.stat_net_repl_input_bytes, nread); |
2667 | } else { |
2668 | atomicIncr(server.stat_net_input_bytes, nread); |
2669 | } |
2670 | |
2671 | if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) { |
2672 | sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); |
2673 | |
2674 | bytes = sdscatrepr(bytes,c->querybuf,64); |
2675 | serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)" , ci, bytes); |
2676 | sdsfree(ci); |
2677 | sdsfree(bytes); |
2678 | freeClientAsync(c); |
2679 | goto done; |
2680 | } |
2681 | |
2682 | /* There is more data in the client input buffer, continue parsing it |
2683 | * and check if there is a full command to execute. */ |
2684 | if (processInputBuffer(c) == C_ERR) |
2685 | c = NULL; |
2686 | |
2687 | done: |
2688 | beforeNextClient(c); |
2689 | } |
2690 | |
2691 | /* A Redis "Address String" is a colon separated ip:port pair. |
2692 | * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234". |
2693 | * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234". |
2694 | * For Unix sockets we use path:0, like in "/tmp/redis:0". |
2695 | * |
2696 | * An Address String always fits inside a buffer of NET_ADDR_STR_LEN bytes, |
2697 | * including the null term. |
2698 | * |
2699 | * On failure the function still populates 'addr' with the "?:0" string in case |
2700 | * you want to relax error checking or need to display something anyway (see |
2701 | * anetFdToString implementation for more info). */ |
2702 | void genClientAddrString(client *client, char *addr, |
2703 | size_t addr_len, int fd_to_str_type) { |
2704 | if (client->flags & CLIENT_UNIX_SOCKET) { |
2705 | /* Unix socket client. */ |
2706 | snprintf(addr,addr_len,"%s:0" ,server.unixsocket); |
2707 | } else { |
2708 | /* TCP client. */ |
2709 | connFormatFdAddr(client->conn,addr,addr_len,fd_to_str_type); |
2710 | } |
2711 | } |
2712 | |
2713 | /* This function returns the client peer id, by creating and caching it |
2714 | * if client->peerid is NULL, otherwise returning the cached value. |
2715 | * The Peer ID never changes during the life of the client, however it |
2716 | * is expensive to compute. */ |
2717 | char *getClientPeerId(client *c) { |
2718 | char peerid[NET_ADDR_STR_LEN]; |
2719 | |
2720 | if (c->peerid == NULL) { |
2721 | genClientAddrString(c,peerid,sizeof(peerid),FD_TO_PEER_NAME); |
2722 | c->peerid = sdsnew(peerid); |
2723 | } |
2724 | return c->peerid; |
2725 | } |
2726 | |
2727 | /* This function returns the client bound socket name, by creating and caching |
2728 | * it if client->sockname is NULL, otherwise returning the cached value. |
2729 | * The Socket Name never changes during the life of the client, however it |
2730 | * is expensive to compute. */ |
2731 | char *getClientSockname(client *c) { |
2732 | char sockname[NET_ADDR_STR_LEN]; |
2733 | |
2734 | if (c->sockname == NULL) { |
2735 | genClientAddrString(c,sockname,sizeof(sockname),FD_TO_SOCK_NAME); |
2736 | c->sockname = sdsnew(sockname); |
2737 | } |
2738 | return c->sockname; |
2739 | } |
2740 | |
2741 | /* Concatenate a string representing the state of a client in a human |
2742 | * readable format, into the sds string 's'. */ |
2743 | sds catClientInfoString(sds s, client *client) { |
2744 | char flags[16], events[3], conninfo[CONN_INFO_LEN], *p; |
2745 | |
2746 | p = flags; |
2747 | if (client->flags & CLIENT_SLAVE) { |
2748 | if (client->flags & CLIENT_MONITOR) |
2749 | *p++ = 'O'; |
2750 | else |
2751 | *p++ = 'S'; |
2752 | } |
2753 | if (client->flags & CLIENT_MASTER) *p++ = 'M'; |
2754 | if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; |
2755 | if (client->flags & CLIENT_MULTI) *p++ = 'x'; |
2756 | if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; |
2757 | if (client->flags & CLIENT_TRACKING) *p++ = 't'; |
2758 | if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R'; |
2759 | if (client->flags & CLIENT_TRACKING_BCAST) *p++ = 'B'; |
2760 | if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; |
2761 | if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; |
2762 | if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; |
2763 | if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; |
2764 | if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; |
2765 | if (client->flags & CLIENT_READONLY) *p++ = 'r'; |
2766 | if (client->flags & CLIENT_NO_EVICT) *p++ = 'e'; |
2767 | if (p == flags) *p++ = 'N'; |
2768 | *p++ = '\0'; |
2769 | |
2770 | p = events; |
2771 | if (client->conn) { |
2772 | if (connHasReadHandler(client->conn)) *p++ = 'r'; |
2773 | if (connHasWriteHandler(client->conn)) *p++ = 'w'; |
2774 | } |
2775 | *p = '\0'; |
2776 | |
2777 | /* Compute the total memory consumed by this client. */ |
2778 | size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); |
2779 | |
2780 | size_t used_blocks_of_repl_buf = 0; |
2781 | if (client->ref_repl_buf_node) { |
2782 | replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); |
2783 | replBufBlock *cur = listNodeValue(client->ref_repl_buf_node); |
2784 | used_blocks_of_repl_buf = last->id - cur->id + 1; |
2785 | } |
2786 | |
2787 | sds ret = sdscatfmt(s, |
2788 | "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i ssub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U rbs=%U rbp=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i" , |
2789 | (unsigned long long) client->id, |
2790 | getClientPeerId(client), |
2791 | getClientSockname(client), |
2792 | connGetInfo(client->conn, conninfo, sizeof(conninfo)), |
2793 | client->name ? (char*)client->name->ptr : "" , |
2794 | (long long)(server.unixtime - client->ctime), |
2795 | (long long)(server.unixtime - client->lastinteraction), |
2796 | flags, |
2797 | client->db->id, |
2798 | (int) dictSize(client->pubsub_channels), |
2799 | (int) listLength(client->pubsub_patterns), |
2800 | (int) dictSize(client->pubsubshard_channels), |
2801 | (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, |
2802 | (unsigned long long) sdslen(client->querybuf), |
2803 | (unsigned long long) sdsavail(client->querybuf), |
2804 | (unsigned long long) client->argv_len_sum, |
2805 | (unsigned long long) client->mstate.argv_len_sums, |
2806 | (unsigned long long) client->buf_usable_size, |
2807 | (unsigned long long) client->buf_peak, |
2808 | (unsigned long long) client->bufpos, |
2809 | (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, |
2810 | (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ |
2811 | (unsigned long long) total_mem, |
2812 | events, |
2813 | client->lastcmd ? client->lastcmd->fullname : "NULL" , |
2814 | client->user ? client->user->name : "(superuser)" , |
2815 | (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1, |
2816 | client->resp); |
2817 | return ret; |
2818 | } |
2819 | |
2820 | sds getAllClientsInfoString(int type) { |
2821 | listNode *ln; |
2822 | listIter li; |
2823 | client *client; |
2824 | sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients)); |
2825 | sdsclear(o); |
2826 | listRewind(server.clients,&li); |
2827 | while ((ln = listNext(&li)) != NULL) { |
2828 | client = listNodeValue(ln); |
2829 | if (type != -1 && getClientType(client) != type) continue; |
2830 | o = catClientInfoString(o,client); |
2831 | o = sdscatlen(o,"\n" ,1); |
2832 | } |
2833 | return o; |
2834 | } |
2835 | |
2836 | /* Returns C_OK if the name has been set or C_ERR if the name is invalid. */ |
2837 | int clientSetName(client *c, robj *name) { |
2838 | int len = (name != NULL) ? sdslen(name->ptr) : 0; |
2839 | |
2840 | /* Setting the client name to an empty string actually removes |
2841 | * the current name. */ |
2842 | if (len == 0) { |
2843 | if (c->name) decrRefCount(c->name); |
2844 | c->name = NULL; |
2845 | return C_OK; |
2846 | } |
2847 | |
2848 | /* Otherwise check if the charset is ok. We need to do this otherwise |
2849 | * CLIENT LIST format will break. You should always be able to |
2850 | * split by space to get the different fields. */ |
2851 | char *p = name->ptr; |
2852 | for (int j = 0; j < len; j++) { |
2853 | if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */ |
2854 | return C_ERR; |
2855 | } |
2856 | } |
2857 | if (c->name) decrRefCount(c->name); |
2858 | c->name = name; |
2859 | incrRefCount(name); |
2860 | return C_OK; |
2861 | } |
2862 | |
2863 | /* This function implements CLIENT SETNAME, including replying to the |
2864 | * user with an error if the charset is wrong (in that case C_ERR is |
2865 | * returned). If the function succeeded C_OK is returned, and it's up |
2866 | * to the caller to send a reply if needed. |
2867 | * |
2868 | * Setting an empty string as name has the effect of unsetting the |
2869 | * currently set name: the client will remain unnamed. |
2870 | * |
2871 | * This function is also used to implement the HELLO SETNAME option. */ |
2872 | int clientSetNameOrReply(client *c, robj *name) { |
2873 | int result = clientSetName(c, name); |
2874 | if (result == C_ERR) { |
2875 | addReplyError(c, |
2876 | "Client names cannot contain spaces, " |
2877 | "newlines or special characters." ); |
2878 | } |
2879 | return result; |
2880 | } |
2881 | |
2882 | /* Reset the client state to resemble a newly connected client. |
2883 | */ |
2884 | void resetCommand(client *c) { |
2885 | /* MONITOR clients are also marked with CLIENT_SLAVE, we need to |
2886 | * distinguish between the two. |
2887 | */ |
2888 | uint64_t flags = c->flags; |
2889 | if (flags & CLIENT_MONITOR) flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE); |
2890 | |
2891 | if (flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) { |
2892 | addReplyError(c,"can only reset normal client connections" ); |
2893 | return; |
2894 | } |
2895 | |
2896 | clearClientConnectionState(c); |
2897 | addReplyStatus(c,"RESET" ); |
2898 | } |
2899 | |
2900 | /* Disconnect the current client */ |
2901 | void quitCommand(client *c) { |
2902 | addReply(c,shared.ok); |
2903 | c->flags |= CLIENT_CLOSE_AFTER_REPLY; |
2904 | } |
2905 | |
2906 | void clientCommand(client *c) { |
2907 | listNode *ln; |
2908 | listIter li; |
2909 | |
2910 | if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help" )) { |
2911 | const char *help[] = { |
2912 | "CACHING (YES|NO)" , |
2913 | " Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes." , |
2914 | "GETREDIR" , |
2915 | " Return the client ID we are redirecting to when tracking is enabled." , |
2916 | "GETNAME" , |
2917 | " Return the name of the current connection." , |
2918 | "ID" , |
2919 | " Return the ID of the current connection." , |
2920 | "INFO" , |
2921 | " Return information about the current client connection." , |
2922 | "KILL <ip:port>" , |
2923 | " Kill connection made from <ip:port>." , |
2924 | "KILL <option> <value> [<option> <value> [...]]" , |
2925 | " Kill connections. Options are:" , |
2926 | " * ADDR (<ip:port>|<unixsocket>:0)" , |
2927 | " Kill connections made from the specified address" , |
2928 | " * LADDR (<ip:port>|<unixsocket>:0)" , |
2929 | " Kill connections made to specified local address" , |
2930 | " * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)" , |
2931 | " Kill connections by type." , |
2932 | " * USER <username>" , |
2933 | " Kill connections authenticated by <username>." , |
2934 | " * SKIPME (YES|NO)" , |
2935 | " Skip killing current connection (default: yes)." , |
2936 | "LIST [options ...]" , |
2937 | " Return information about client connections. Options:" , |
2938 | " * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)" , |
2939 | " Return clients of specified type." , |
2940 | "UNPAUSE" , |
2941 | " Stop the current client pause, resuming traffic." , |
2942 | "PAUSE <timeout> [WRITE|ALL]" , |
2943 | " Suspend all, or just write, clients for <timeout> milliseconds." , |
2944 | "REPLY (ON|OFF|SKIP)" , |
2945 | " Control the replies sent to the current connection." , |
2946 | "SETNAME <name>" , |
2947 | " Assign the name <name> to the current connection." , |
2948 | "UNBLOCK <clientid> [TIMEOUT|ERROR]" , |
2949 | " Unblock the specified blocked client." , |
2950 | "TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]" , |
2951 | " [OPTIN] [OPTOUT] [NOLOOP]" , |
2952 | " Control server assisted client side caching." , |
2953 | "TRACKINGINFO" , |
2954 | " Report tracking status for the current connection." , |
2955 | "NO-EVICT (ON|OFF)" , |
2956 | " Protect current client connection from eviction." , |
2957 | NULL |
2958 | }; |
2959 | addReplyHelp(c, help); |
2960 | } else if (!strcasecmp(c->argv[1]->ptr,"id" ) && c->argc == 2) { |
2961 | /* CLIENT ID */ |
2962 | addReplyLongLong(c,c->id); |
2963 | } else if (!strcasecmp(c->argv[1]->ptr,"info" ) && c->argc == 2) { |
2964 | /* CLIENT INFO */ |
2965 | sds o = catClientInfoString(sdsempty(), c); |
2966 | o = sdscatlen(o,"\n" ,1); |
2967 | addReplyVerbatim(c,o,sdslen(o),"txt" ); |
2968 | sdsfree(o); |
2969 | } else if (!strcasecmp(c->argv[1]->ptr,"list" )) { |
2970 | /* CLIENT LIST */ |
2971 | int type = -1; |
2972 | sds o = NULL; |
2973 | if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type" )) { |
2974 | type = getClientTypeByName(c->argv[3]->ptr); |
2975 | if (type == -1) { |
2976 | addReplyErrorFormat(c,"Unknown client type '%s'" , |
2977 | (char*) c->argv[3]->ptr); |
2978 | return; |
2979 | } |
2980 | } else if (c->argc > 3 && !strcasecmp(c->argv[2]->ptr,"id" )) { |
2981 | int j; |
2982 | o = sdsempty(); |
2983 | for (j = 3; j < c->argc; j++) { |
2984 | long long cid; |
2985 | if (getLongLongFromObjectOrReply(c, c->argv[j], &cid, |
2986 | "Invalid client ID" )) { |
2987 | sdsfree(o); |
2988 | return; |
2989 | } |
2990 | client *cl = lookupClientByID(cid); |
2991 | if (cl) { |
2992 | o = catClientInfoString(o, cl); |
2993 | o = sdscatlen(o, "\n" , 1); |
2994 | } |
2995 | } |
2996 | } else if (c->argc != 2) { |
2997 | addReplyErrorObject(c,shared.syntaxerr); |
2998 | return; |
2999 | } |
3000 | |
3001 | if (!o) |
3002 | o = getAllClientsInfoString(type); |
3003 | addReplyVerbatim(c,o,sdslen(o),"txt" ); |
3004 | sdsfree(o); |
3005 | } else if (!strcasecmp(c->argv[1]->ptr,"reply" ) && c->argc == 3) { |
3006 | /* CLIENT REPLY ON|OFF|SKIP */ |
3007 | if (!strcasecmp(c->argv[2]->ptr,"on" )) { |
3008 | c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF); |
3009 | addReply(c,shared.ok); |
3010 | } else if (!strcasecmp(c->argv[2]->ptr,"off" )) { |
3011 | c->flags |= CLIENT_REPLY_OFF; |
3012 | } else if (!strcasecmp(c->argv[2]->ptr,"skip" )) { |
3013 | if (!(c->flags & CLIENT_REPLY_OFF)) |
3014 | c->flags |= CLIENT_REPLY_SKIP_NEXT; |
3015 | } else { |
3016 | addReplyErrorObject(c,shared.syntaxerr); |
3017 | return; |
3018 | } |
3019 | } else if (!strcasecmp(c->argv[1]->ptr,"no-evict" ) && c->argc == 3) { |
3020 | /* CLIENT NO-EVICT ON|OFF */ |
3021 | if (!strcasecmp(c->argv[2]->ptr,"on" )) { |
3022 | c->flags |= CLIENT_NO_EVICT; |
3023 | addReply(c,shared.ok); |
3024 | } else if (!strcasecmp(c->argv[2]->ptr,"off" )) { |
3025 | c->flags &= ~CLIENT_NO_EVICT; |
3026 | addReply(c,shared.ok); |
3027 | } else { |
3028 | addReplyErrorObject(c,shared.syntaxerr); |
3029 | return; |
3030 | } |
3031 | } else if (!strcasecmp(c->argv[1]->ptr,"kill" )) { |
3032 | /* CLIENT KILL <ip:port> |
3033 | * CLIENT KILL <option> [value] ... <option> [value] */ |
3034 | char *addr = NULL; |
3035 | char *laddr = NULL; |
3036 | user *user = NULL; |
3037 | int type = -1; |
3038 | uint64_t id = 0; |
3039 | int skipme = 1; |
3040 | int killed = 0, close_this_client = 0; |
3041 | |
3042 | if (c->argc == 3) { |
3043 | /* Old style syntax: CLIENT KILL <addr> */ |
3044 | addr = c->argv[2]->ptr; |
3045 | skipme = 0; /* With the old form, you can kill yourself. */ |
3046 | } else if (c->argc > 3) { |
3047 | int i = 2; /* Next option index. */ |
3048 | |
3049 | /* New style syntax: parse options. */ |
3050 | while(i < c->argc) { |
3051 | int moreargs = c->argc > i+1; |
3052 | |
3053 | if (!strcasecmp(c->argv[i]->ptr,"id" ) && moreargs) { |
3054 | long tmp; |
3055 | |
3056 | if (getRangeLongFromObjectOrReply(c, c->argv[i+1], 1, LONG_MAX, &tmp, |
3057 | "client-id should be greater than 0" ) != C_OK) |
3058 | return; |
3059 | id = tmp; |
3060 | } else if (!strcasecmp(c->argv[i]->ptr,"type" ) && moreargs) { |
3061 | type = getClientTypeByName(c->argv[i+1]->ptr); |
3062 | if (type == -1) { |
3063 | addReplyErrorFormat(c,"Unknown client type '%s'" , |
3064 | (char*) c->argv[i+1]->ptr); |
3065 | return; |
3066 | } |
3067 | } else if (!strcasecmp(c->argv[i]->ptr,"addr" ) && moreargs) { |
3068 | addr = c->argv[i+1]->ptr; |
3069 | } else if (!strcasecmp(c->argv[i]->ptr,"laddr" ) && moreargs) { |
3070 | laddr = c->argv[i+1]->ptr; |
3071 | } else if (!strcasecmp(c->argv[i]->ptr,"user" ) && moreargs) { |
3072 | user = ACLGetUserByName(c->argv[i+1]->ptr, |
3073 | sdslen(c->argv[i+1]->ptr)); |
3074 | if (user == NULL) { |
3075 | addReplyErrorFormat(c,"No such user '%s'" , |
3076 | (char*) c->argv[i+1]->ptr); |
3077 | return; |
3078 | } |
3079 | } else if (!strcasecmp(c->argv[i]->ptr,"skipme" ) && moreargs) { |
3080 | if (!strcasecmp(c->argv[i+1]->ptr,"yes" )) { |
3081 | skipme = 1; |
3082 | } else if (!strcasecmp(c->argv[i+1]->ptr,"no" )) { |
3083 | skipme = 0; |
3084 | } else { |
3085 | addReplyErrorObject(c,shared.syntaxerr); |
3086 | return; |
3087 | } |
3088 | } else { |
3089 | addReplyErrorObject(c,shared.syntaxerr); |
3090 | return; |
3091 | } |
3092 | i += 2; |
3093 | } |
3094 | } else { |
3095 | addReplyErrorObject(c,shared.syntaxerr); |
3096 | return; |
3097 | } |
3098 | |
3099 | /* Iterate clients killing all the matching clients. */ |
3100 | listRewind(server.clients,&li); |
3101 | while ((ln = listNext(&li)) != NULL) { |
3102 | client *client = listNodeValue(ln); |
3103 | if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; |
3104 | if (laddr && strcmp(getClientSockname(client),laddr) != 0) continue; |
3105 | if (type != -1 && getClientType(client) != type) continue; |
3106 | if (id != 0 && client->id != id) continue; |
3107 | if (user && client->user != user) continue; |
3108 | if (c == client && skipme) continue; |
3109 | |
3110 | /* Kill it. */ |
3111 | if (c == client) { |
3112 | close_this_client = 1; |
3113 | } else { |
3114 | freeClient(client); |
3115 | } |
3116 | killed++; |
3117 | } |
3118 | |
3119 | /* Reply according to old/new format. */ |
3120 | if (c->argc == 3) { |
3121 | if (killed == 0) |
3122 | addReplyError(c,"No such client" ); |
3123 | else |
3124 | addReply(c,shared.ok); |
3125 | } else { |
3126 | addReplyLongLong(c,killed); |
3127 | } |
3128 | |
3129 | /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY |
3130 | * only after we queued the reply to its output buffers. */ |
3131 | if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY; |
3132 | } else if (!strcasecmp(c->argv[1]->ptr,"unblock" ) && (c->argc == 3 || |
3133 | c->argc == 4)) |
3134 | { |
3135 | /* CLIENT UNBLOCK <id> [timeout|error] */ |
3136 | long long id; |
3137 | int unblock_error = 0; |
3138 | |
3139 | if (c->argc == 4) { |
3140 | if (!strcasecmp(c->argv[3]->ptr,"timeout" )) { |
3141 | unblock_error = 0; |
3142 | } else if (!strcasecmp(c->argv[3]->ptr,"error" )) { |
3143 | unblock_error = 1; |
3144 | } else { |
3145 | addReplyError(c, |
3146 | "CLIENT UNBLOCK reason should be TIMEOUT or ERROR" ); |
3147 | return; |
3148 | } |
3149 | } |
3150 | if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL) |
3151 | != C_OK) return; |
3152 | struct client *target = lookupClientByID(id); |
3153 | /* Note that we never try to unblock a client blocked on a module command, which |
3154 | * doesn't have a timeout callback (even in the case of UNBLOCK ERROR). |
3155 | * The reason is that we assume that if a command doesn't expect to be timedout, |
3156 | * it also doesn't expect to be unblocked by CLIENT UNBLOCK */ |
3157 | if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) { |
3158 | if (unblock_error) |
3159 | addReplyError(target, |
3160 | "-UNBLOCKED client unblocked via CLIENT UNBLOCK" ); |
3161 | else |
3162 | replyToBlockedClientTimedOut(target); |
3163 | unblockClient(target); |
3164 | updateStatsOnUnblock(target, 0, 0, 1); |
3165 | addReply(c,shared.cone); |
3166 | } else { |
3167 | addReply(c,shared.czero); |
3168 | } |
3169 | } else if (!strcasecmp(c->argv[1]->ptr,"setname" ) && c->argc == 3) { |
3170 | /* CLIENT SETNAME */ |
3171 | if (clientSetNameOrReply(c,c->argv[2]) == C_OK) |
3172 | addReply(c,shared.ok); |
3173 | } else if (!strcasecmp(c->argv[1]->ptr,"getname" ) && c->argc == 2) { |
3174 | /* CLIENT GETNAME */ |
3175 | if (c->name) |
3176 | addReplyBulk(c,c->name); |
3177 | else |
3178 | addReplyNull(c); |
3179 | } else if (!strcasecmp(c->argv[1]->ptr,"unpause" ) && c->argc == 2) { |
3180 | /* CLIENT UNPAUSE */ |
3181 | unpauseClients(PAUSE_BY_CLIENT_COMMAND); |
3182 | addReply(c,shared.ok); |
3183 | } else if (!strcasecmp(c->argv[1]->ptr,"pause" ) && (c->argc == 3 || |
3184 | c->argc == 4)) |
3185 | { |
3186 | /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */ |
3187 | mstime_t end; |
3188 | int type = CLIENT_PAUSE_ALL; |
3189 | if (c->argc == 4) { |
3190 | if (!strcasecmp(c->argv[3]->ptr,"write" )) { |
3191 | type = CLIENT_PAUSE_WRITE; |
3192 | } else if (!strcasecmp(c->argv[3]->ptr,"all" )) { |
3193 | type = CLIENT_PAUSE_ALL; |
3194 | } else { |
3195 | addReplyError(c, |
3196 | "CLIENT PAUSE mode must be WRITE or ALL" ); |
3197 | return; |
3198 | } |
3199 | } |
3200 | |
3201 | if (getTimeoutFromObjectOrReply(c,c->argv[2],&end, |
3202 | UNIT_MILLISECONDS) != C_OK) return; |
3203 | pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type); |
3204 | addReply(c,shared.ok); |
3205 | } else if (!strcasecmp(c->argv[1]->ptr,"tracking" ) && c->argc >= 3) { |
3206 | /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] |
3207 | * [PREFIX second] [OPTIN] [OPTOUT] [NOLOOP]... */ |
3208 | long long redir = 0; |
3209 | uint64_t options = 0; |
3210 | robj **prefix = NULL; |
3211 | size_t numprefix = 0; |
3212 | |
3213 | /* Parse the options. */ |
3214 | for (int j = 3; j < c->argc; j++) { |
3215 | int moreargs = (c->argc-1) - j; |
3216 | |
3217 | if (!strcasecmp(c->argv[j]->ptr,"redirect" ) && moreargs) { |
3218 | j++; |
3219 | if (redir != 0) { |
3220 | addReplyError(c,"A client can only redirect to a single " |
3221 | "other client" ); |
3222 | zfree(prefix); |
3223 | return; |
3224 | } |
3225 | |
3226 | if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != |
3227 | C_OK) |
3228 | { |
3229 | zfree(prefix); |
3230 | return; |
3231 | } |
3232 | /* We will require the client with the specified ID to exist |
3233 | * right now, even if it is possible that it gets disconnected |
3234 | * later. Still a valid sanity check. */ |
3235 | if (lookupClientByID(redir) == NULL) { |
3236 | addReplyError(c,"The client ID you want redirect to " |
3237 | "does not exist" ); |
3238 | zfree(prefix); |
3239 | return; |
3240 | } |
3241 | } else if (!strcasecmp(c->argv[j]->ptr,"bcast" )) { |
3242 | options |= CLIENT_TRACKING_BCAST; |
3243 | } else if (!strcasecmp(c->argv[j]->ptr,"optin" )) { |
3244 | options |= CLIENT_TRACKING_OPTIN; |
3245 | } else if (!strcasecmp(c->argv[j]->ptr,"optout" )) { |
3246 | options |= CLIENT_TRACKING_OPTOUT; |
3247 | } else if (!strcasecmp(c->argv[j]->ptr,"noloop" )) { |
3248 | options |= CLIENT_TRACKING_NOLOOP; |
3249 | } else if (!strcasecmp(c->argv[j]->ptr,"prefix" ) && moreargs) { |
3250 | j++; |
3251 | prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); |
3252 | prefix[numprefix++] = c->argv[j]; |
3253 | } else { |
3254 | zfree(prefix); |
3255 | addReplyErrorObject(c,shared.syntaxerr); |
3256 | return; |
3257 | } |
3258 | } |
3259 | |
3260 | /* Options are ok: enable or disable the tracking for this client. */ |
3261 | if (!strcasecmp(c->argv[2]->ptr,"on" )) { |
3262 | /* Before enabling tracking, make sure options are compatible |
3263 | * among each other and with the current state of the client. */ |
3264 | if (!(options & CLIENT_TRACKING_BCAST) && numprefix) { |
3265 | addReplyError(c, |
3266 | "PREFIX option requires BCAST mode to be enabled" ); |
3267 | zfree(prefix); |
3268 | return; |
3269 | } |
3270 | |
3271 | if (c->flags & CLIENT_TRACKING) { |
3272 | int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); |
3273 | int newbcast = !!(options & CLIENT_TRACKING_BCAST); |
3274 | if (oldbcast != newbcast) { |
3275 | addReplyError(c, |
3276 | "You can't switch BCAST mode on/off before disabling " |
3277 | "tracking for this client, and then re-enabling it with " |
3278 | "a different mode." ); |
3279 | zfree(prefix); |
3280 | return; |
3281 | } |
3282 | } |
3283 | |
3284 | if (options & CLIENT_TRACKING_BCAST && |
3285 | options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT)) |
3286 | { |
3287 | addReplyError(c, |
3288 | "OPTIN and OPTOUT are not compatible with BCAST" ); |
3289 | zfree(prefix); |
3290 | return; |
3291 | } |
3292 | |
3293 | if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT) |
3294 | { |
3295 | addReplyError(c, |
3296 | "You can't specify both OPTIN mode and OPTOUT mode" ); |
3297 | zfree(prefix); |
3298 | return; |
3299 | } |
3300 | |
3301 | if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) || |
3302 | (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN)) |
3303 | { |
3304 | addReplyError(c, |
3305 | "You can't switch OPTIN/OPTOUT mode before disabling " |
3306 | "tracking for this client, and then re-enabling it with " |
3307 | "a different mode." ); |
3308 | zfree(prefix); |
3309 | return; |
3310 | } |
3311 | |
3312 | if (options & CLIENT_TRACKING_BCAST) { |
3313 | if (!checkPrefixCollisionsOrReply(c,prefix,numprefix)) { |
3314 | zfree(prefix); |
3315 | return; |
3316 | } |
3317 | } |
3318 | |
3319 | enableTracking(c,redir,options,prefix,numprefix); |
3320 | } else if (!strcasecmp(c->argv[2]->ptr,"off" )) { |
3321 | disableTracking(c); |
3322 | } else { |
3323 | zfree(prefix); |
3324 | addReplyErrorObject(c,shared.syntaxerr); |
3325 | return; |
3326 | } |
3327 | zfree(prefix); |
3328 | addReply(c,shared.ok); |
3329 | } else if (!strcasecmp(c->argv[1]->ptr,"caching" ) && c->argc >= 3) { |
3330 | if (!(c->flags & CLIENT_TRACKING)) { |
3331 | addReplyError(c,"CLIENT CACHING can be called only when the " |
3332 | "client is in tracking mode with OPTIN or " |
3333 | "OPTOUT mode enabled" ); |
3334 | return; |
3335 | } |
3336 | |
3337 | char *opt = c->argv[2]->ptr; |
3338 | if (!strcasecmp(opt,"yes" )) { |
3339 | if (c->flags & CLIENT_TRACKING_OPTIN) { |
3340 | c->flags |= CLIENT_TRACKING_CACHING; |
3341 | } else { |
3342 | addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode." ); |
3343 | return; |
3344 | } |
3345 | } else if (!strcasecmp(opt,"no" )) { |
3346 | if (c->flags & CLIENT_TRACKING_OPTOUT) { |
3347 | c->flags |= CLIENT_TRACKING_CACHING; |
3348 | } else { |
3349 | addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode." ); |
3350 | return; |
3351 | } |
3352 | } else { |
3353 | addReplyErrorObject(c,shared.syntaxerr); |
3354 | return; |
3355 | } |
3356 | |
3357 | /* Common reply for when we succeeded. */ |
3358 | addReply(c,shared.ok); |
3359 | } else if (!strcasecmp(c->argv[1]->ptr,"getredir" ) && c->argc == 2) { |
3360 | /* CLIENT GETREDIR */ |
3361 | if (c->flags & CLIENT_TRACKING) { |
3362 | addReplyLongLong(c,c->client_tracking_redirection); |
3363 | } else { |
3364 | addReplyLongLong(c,-1); |
3365 | } |
3366 | } else if (!strcasecmp(c->argv[1]->ptr,"trackinginfo" ) && c->argc == 2) { |
3367 | addReplyMapLen(c,3); |
3368 | |
3369 | /* Flags */ |
3370 | addReplyBulkCString(c,"flags" ); |
3371 | void *arraylen_ptr = addReplyDeferredLen(c); |
3372 | int numflags = 0; |
3373 | addReplyBulkCString(c,c->flags & CLIENT_TRACKING ? "on" : "off" ); |
3374 | numflags++; |
3375 | if (c->flags & CLIENT_TRACKING_BCAST) { |
3376 | addReplyBulkCString(c,"bcast" ); |
3377 | numflags++; |
3378 | } |
3379 | if (c->flags & CLIENT_TRACKING_OPTIN) { |
3380 | addReplyBulkCString(c,"optin" ); |
3381 | numflags++; |
3382 | if (c->flags & CLIENT_TRACKING_CACHING) { |
3383 | addReplyBulkCString(c,"caching-yes" ); |
3384 | numflags++; |
3385 | } |
3386 | } |
3387 | if (c->flags & CLIENT_TRACKING_OPTOUT) { |
3388 | addReplyBulkCString(c,"optout" ); |
3389 | numflags++; |
3390 | if (c->flags & CLIENT_TRACKING_CACHING) { |
3391 | addReplyBulkCString(c,"caching-no" ); |
3392 | numflags++; |
3393 | } |
3394 | } |
3395 | if (c->flags & CLIENT_TRACKING_NOLOOP) { |
3396 | addReplyBulkCString(c,"noloop" ); |
3397 | numflags++; |
3398 | } |
3399 | if (c->flags & CLIENT_TRACKING_BROKEN_REDIR) { |
3400 | addReplyBulkCString(c,"broken_redirect" ); |
3401 | numflags++; |
3402 | } |
3403 | setDeferredSetLen(c,arraylen_ptr,numflags); |
3404 | |
3405 | /* Redirect */ |
3406 | addReplyBulkCString(c,"redirect" ); |
3407 | if (c->flags & CLIENT_TRACKING) { |
3408 | addReplyLongLong(c,c->client_tracking_redirection); |
3409 | } else { |
3410 | addReplyLongLong(c,-1); |
3411 | } |
3412 | |
3413 | /* Prefixes */ |
3414 | addReplyBulkCString(c,"prefixes" ); |
3415 | if (c->client_tracking_prefixes) { |
3416 | addReplyArrayLen(c,raxSize(c->client_tracking_prefixes)); |
3417 | raxIterator ri; |
3418 | raxStart(&ri,c->client_tracking_prefixes); |
3419 | raxSeek(&ri,"^" ,NULL,0); |
3420 | while(raxNext(&ri)) { |
3421 | addReplyBulkCBuffer(c,ri.key,ri.key_len); |
3422 | } |
3423 | raxStop(&ri); |
3424 | } else { |
3425 | addReplyArrayLen(c,0); |
3426 | } |
3427 | } else { |
3428 | addReplySubcommandSyntaxError(c); |
3429 | } |
3430 | } |
3431 | |
3432 | /* HELLO [<protocol-version> [AUTH <user> <password>] [SETNAME <name>] ] */ |
3433 | void helloCommand(client *c) { |
3434 | long long ver = 0; |
3435 | int next_arg = 1; |
3436 | |
3437 | if (c->argc >= 2) { |
3438 | if (getLongLongFromObjectOrReply(c, c->argv[next_arg++], &ver, |
3439 | "Protocol version is not an integer or out of range" ) != C_OK) { |
3440 | return; |
3441 | } |
3442 | |
3443 | if (ver < 2 || ver > 3) { |
3444 | addReplyError(c,"-NOPROTO unsupported protocol version" ); |
3445 | return; |
3446 | } |
3447 | } |
3448 | |
3449 | for (int j = next_arg; j < c->argc; j++) { |
3450 | int moreargs = (c->argc-1) - j; |
3451 | const char *opt = c->argv[j]->ptr; |
3452 | if (!strcasecmp(opt,"AUTH" ) && moreargs >= 2) { |
3453 | redactClientCommandArgument(c, j+1); |
3454 | redactClientCommandArgument(c, j+2); |
3455 | if (ACLAuthenticateUser(c, c->argv[j+1], c->argv[j+2]) == C_ERR) { |
3456 | addReplyError(c,"-WRONGPASS invalid username-password pair or user is disabled." ); |
3457 | return; |
3458 | } |
3459 | j += 2; |
3460 | } else if (!strcasecmp(opt,"SETNAME" ) && moreargs) { |
3461 | if (clientSetNameOrReply(c, c->argv[j+1]) == C_ERR) return; |
3462 | j++; |
3463 | } else { |
3464 | addReplyErrorFormat(c,"Syntax error in HELLO option '%s'" ,opt); |
3465 | return; |
3466 | } |
3467 | } |
3468 | |
3469 | /* At this point we need to be authenticated to continue. */ |
3470 | if (!c->authenticated) { |
3471 | addReplyError(c,"-NOAUTH HELLO must be called with the client already " |
3472 | "authenticated, otherwise the HELLO AUTH <user> <pass> " |
3473 | "option can be used to authenticate the client and " |
3474 | "select the RESP protocol version at the same time" ); |
3475 | return; |
3476 | } |
3477 | |
3478 | /* Let's switch to the specified RESP mode. */ |
3479 | if (ver) c->resp = ver; |
3480 | addReplyMapLen(c,6 + !server.sentinel_mode); |
3481 | |
3482 | addReplyBulkCString(c,"server" ); |
3483 | addReplyBulkCString(c,"redis" ); |
3484 | |
3485 | addReplyBulkCString(c,"version" ); |
3486 | addReplyBulkCString(c,REDIS_VERSION); |
3487 | |
3488 | addReplyBulkCString(c,"proto" ); |
3489 | addReplyLongLong(c,c->resp); |
3490 | |
3491 | addReplyBulkCString(c,"id" ); |
3492 | addReplyLongLong(c,c->id); |
3493 | |
3494 | addReplyBulkCString(c,"mode" ); |
3495 | if (server.sentinel_mode) addReplyBulkCString(c,"sentinel" ); |
3496 | else if (server.cluster_enabled) addReplyBulkCString(c,"cluster" ); |
3497 | else addReplyBulkCString(c,"standalone" ); |
3498 | |
3499 | if (!server.sentinel_mode) { |
3500 | addReplyBulkCString(c,"role" ); |
3501 | addReplyBulkCString(c,server.masterhost ? "replica" : "master" ); |
3502 | } |
3503 | |
3504 | addReplyBulkCString(c,"modules" ); |
3505 | addReplyLoadedModules(c); |
3506 | } |
3507 | |
3508 | /* This callback is bound to POST and "Host:" command names. Those are not |
3509 | * really commands, but are used in security attacks in order to talk to |
3510 | * Redis instances via HTTP, with a technique called "cross protocol scripting" |
3511 | * which exploits the fact that services like Redis will discard invalid |
3512 | * HTTP headers and will process what follows. |
3513 | * |
3514 | * As a protection against this attack, Redis will terminate the connection |
3515 | * when a POST or "Host:" header is seen, and will log the event from |
3516 | * time to time (to avoid creating a DOS as a result of too many logs). */ |
3517 | void securityWarningCommand(client *c) { |
3518 | static time_t logged_time = 0; |
3519 | time_t now = time(NULL); |
3520 | |
3521 | if (llabs(now-logged_time) > 60) { |
3522 | serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted." ); |
3523 | logged_time = now; |
3524 | } |
3525 | freeClientAsync(c); |
3526 | } |
3527 | |
3528 | /* Keep track of the original command arguments so that we can generate |
3529 | * an accurate slowlog entry after the command has been executed. */ |
3530 | static void retainOriginalCommandVector(client *c) { |
3531 | /* We already rewrote this command, so don't rewrite it again */ |
3532 | if (c->original_argv) return; |
3533 | c->original_argc = c->argc; |
3534 | c->original_argv = zmalloc(sizeof(robj*)*(c->argc)); |
3535 | for (int j = 0; j < c->argc; j++) { |
3536 | c->original_argv[j] = c->argv[j]; |
3537 | incrRefCount(c->argv[j]); |
3538 | } |
3539 | } |
3540 | |
3541 | /* Redact a given argument to prevent it from being shown |
3542 | * in the slowlog. This information is stored in the |
3543 | * original_argv array. */ |
3544 | void redactClientCommandArgument(client *c, int argc) { |
3545 | retainOriginalCommandVector(c); |
3546 | if (c->original_argv[argc] == shared.redacted) { |
3547 | /* This argument has already been redacted */ |
3548 | return; |
3549 | } |
3550 | decrRefCount(c->original_argv[argc]); |
3551 | c->original_argv[argc] = shared.redacted; |
3552 | } |
3553 | |
3554 | /* Rewrite the command vector of the client. All the new objects ref count |
3555 | * is incremented. The old command vector is freed, and the old objects |
3556 | * ref count is decremented. */ |
3557 | void rewriteClientCommandVector(client *c, int argc, ...) { |
3558 | va_list ap; |
3559 | int j; |
3560 | robj **argv; /* The new argument vector */ |
3561 | |
3562 | argv = zmalloc(sizeof(robj*)*argc); |
3563 | va_start(ap,argc); |
3564 | for (j = 0; j < argc; j++) { |
3565 | robj *a; |
3566 | |
3567 | a = va_arg(ap, robj*); |
3568 | argv[j] = a; |
3569 | incrRefCount(a); |
3570 | } |
3571 | replaceClientCommandVector(c, argc, argv); |
3572 | va_end(ap); |
3573 | } |
3574 | |
3575 | /* Completely replace the client command vector with the provided one. */ |
3576 | void replaceClientCommandVector(client *c, int argc, robj **argv) { |
3577 | int j; |
3578 | retainOriginalCommandVector(c); |
3579 | freeClientArgv(c); |
3580 | c->argv = argv; |
3581 | c->argc = argc; |
3582 | c->argv_len_sum = 0; |
3583 | for (j = 0; j < c->argc; j++) |
3584 | if (c->argv[j]) |
3585 | c->argv_len_sum += getStringObjectLen(c->argv[j]); |
3586 | c->cmd = lookupCommandOrOriginal(c->argv,c->argc); |
3587 | serverAssertWithInfo(c,NULL,c->cmd != NULL); |
3588 | } |
3589 | |
3590 | /* Rewrite a single item in the command vector. |
3591 | * The new val ref count is incremented, and the old decremented. |
3592 | * |
3593 | * It is possible to specify an argument over the current size of the |
3594 | * argument vector: in this case the array of objects gets reallocated |
3595 | * and c->argc set to the max value. However it's up to the caller to |
3596 | * |
3597 | * 1. Make sure there are no "holes" and all the arguments are set. |
3598 | * 2. If the original argument vector was longer than the one we |
3599 | * want to end with, it's up to the caller to set c->argc and |
3600 | * free the no longer used objects on c->argv. */ |
3601 | void rewriteClientCommandArgument(client *c, int i, robj *newval) { |
3602 | robj *oldval; |
3603 | retainOriginalCommandVector(c); |
3604 | |
3605 | /* We need to handle both extending beyond argc (just update it and |
3606 | * initialize the new element) or beyond argv_len (realloc is needed). |
3607 | */ |
3608 | if (i >= c->argc) { |
3609 | if (i >= c->argv_len) { |
3610 | c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1)); |
3611 | c->argv_len = i+1; |
3612 | } |
3613 | c->argc = i+1; |
3614 | c->argv[i] = NULL; |
3615 | } |
3616 | oldval = c->argv[i]; |
3617 | if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); |
3618 | if (newval) c->argv_len_sum += getStringObjectLen(newval); |
3619 | c->argv[i] = newval; |
3620 | incrRefCount(newval); |
3621 | if (oldval) decrRefCount(oldval); |
3622 | |
3623 | /* If this is the command name make sure to fix c->cmd. */ |
3624 | if (i == 0) { |
3625 | c->cmd = lookupCommandOrOriginal(c->argv,c->argc); |
3626 | serverAssertWithInfo(c,NULL,c->cmd != NULL); |
3627 | } |
3628 | } |
3629 | |
3630 | /* This function returns the number of bytes that Redis is |
3631 | * using to store the reply still not read by the client. |
3632 | * |
3633 | * Note: this function is very fast so can be called as many time as |
3634 | * the caller wishes. The main usage of this function currently is |
3635 | * enforcing the client output length limits. */ |
3636 | size_t getClientOutputBufferMemoryUsage(client *c) { |
3637 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
3638 | size_t repl_buf_size = 0; |
3639 | size_t repl_node_num = 0; |
3640 | size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock); |
3641 | if (c->ref_repl_buf_node) { |
3642 | replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); |
3643 | replBufBlock *cur = listNodeValue(c->ref_repl_buf_node); |
3644 | repl_buf_size = last->repl_offset + last->size - cur->repl_offset; |
3645 | repl_node_num = last->id - cur->id + 1; |
3646 | } |
3647 | return repl_buf_size + (repl_node_size*repl_node_num); |
3648 | } else { |
3649 | size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); |
3650 | return c->reply_bytes + (list_item_size*listLength(c->reply)); |
3651 | } |
3652 | } |
3653 | |
3654 | /* Returns the total client's memory usage. |
3655 | * Optionally, if output_buffer_mem_usage is not NULL, it fills it with |
3656 | * the client output buffer memory usage portion of the total. */ |
3657 | size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { |
3658 | size_t mem = getClientOutputBufferMemoryUsage(c); |
3659 | if (output_buffer_mem_usage != NULL) |
3660 | *output_buffer_mem_usage = mem; |
3661 | mem += sdsZmallocSize(c->querybuf); |
3662 | mem += zmalloc_size(c); |
3663 | mem += c->buf_usable_size; |
3664 | /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory |
3665 | * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to |
3666 | * spot problematic clients. */ |
3667 | mem += c->argv_len_sum + sizeof(robj*)*c->argc; |
3668 | mem += multiStateMemOverhead(c); |
3669 | |
3670 | /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers |
3671 | * to the strings themselves because they aren't stored per client. */ |
3672 | mem += pubsubMemOverhead(c); |
3673 | |
3674 | /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */ |
3675 | if (c->client_tracking_prefixes) |
3676 | mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*)); |
3677 | |
3678 | return mem; |
3679 | } |
3680 | |
3681 | /* Get the class of a client, used in order to enforce limits to different |
3682 | * classes of clients. |
3683 | * |
3684 | * The function will return one of the following: |
3685 | * CLIENT_TYPE_NORMAL -> Normal client |
3686 | * CLIENT_TYPE_SLAVE -> Slave |
3687 | * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels |
3688 | * CLIENT_TYPE_MASTER -> The client representing our replication master. |
3689 | */ |
3690 | int getClientType(client *c) { |
3691 | if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER; |
3692 | /* Even though MONITOR clients are marked as replicas, we |
3693 | * want the expose them as normal clients. */ |
3694 | if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) |
3695 | return CLIENT_TYPE_SLAVE; |
3696 | if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB; |
3697 | return CLIENT_TYPE_NORMAL; |
3698 | } |
3699 | |
3700 | int getClientTypeByName(char *name) { |
3701 | if (!strcasecmp(name,"normal" )) return CLIENT_TYPE_NORMAL; |
3702 | else if (!strcasecmp(name,"slave" )) return CLIENT_TYPE_SLAVE; |
3703 | else if (!strcasecmp(name,"replica" )) return CLIENT_TYPE_SLAVE; |
3704 | else if (!strcasecmp(name,"pubsub" )) return CLIENT_TYPE_PUBSUB; |
3705 | else if (!strcasecmp(name,"master" )) return CLIENT_TYPE_MASTER; |
3706 | else return -1; |
3707 | } |
3708 | |
3709 | char *getClientTypeName(int class) { |
3710 | switch(class) { |
3711 | case CLIENT_TYPE_NORMAL: return "normal" ; |
3712 | case CLIENT_TYPE_SLAVE: return "slave" ; |
3713 | case CLIENT_TYPE_PUBSUB: return "pubsub" ; |
3714 | case CLIENT_TYPE_MASTER: return "master" ; |
3715 | default: return NULL; |
3716 | } |
3717 | } |
3718 | |
3719 | /* The function checks if the client reached output buffer soft or hard |
3720 | * limit, and also update the state needed to check the soft limit as |
3721 | * a side effect. |
3722 | * |
3723 | * Return value: non-zero if the client reached the soft or the hard limit. |
3724 | * Otherwise zero is returned. */ |
3725 | int checkClientOutputBufferLimits(client *c) { |
3726 | int soft = 0, hard = 0, class; |
3727 | unsigned long used_mem = getClientOutputBufferMemoryUsage(c); |
3728 | |
3729 | class = getClientType(c); |
3730 | /* For the purpose of output buffer limiting, masters are handled |
3731 | * like normal clients. */ |
3732 | if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL; |
3733 | |
3734 | /* Note that it doesn't make sense to set the replica clients output buffer |
3735 | * limit lower than the repl-backlog-size config (partial sync will succeed |
3736 | * and then replica will get disconnected). |
3737 | * Such a configuration is ignored (the size of repl-backlog-size will be used). |
3738 | * This doesn't have memory consumption implications since the replica client |
3739 | * will share the backlog buffers memory. */ |
3740 | size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes; |
3741 | if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes && |
3742 | (long long)hard_limit_bytes < server.repl_backlog_size) |
3743 | hard_limit_bytes = server.repl_backlog_size; |
3744 | if (server.client_obuf_limits[class].hard_limit_bytes && |
3745 | used_mem >= hard_limit_bytes) |
3746 | hard = 1; |
3747 | if (server.client_obuf_limits[class].soft_limit_bytes && |
3748 | used_mem >= server.client_obuf_limits[class].soft_limit_bytes) |
3749 | soft = 1; |
3750 | |
3751 | /* We need to check if the soft limit is reached continuously for the |
3752 | * specified amount of seconds. */ |
3753 | if (soft) { |
3754 | if (c->obuf_soft_limit_reached_time == 0) { |
3755 | c->obuf_soft_limit_reached_time = server.unixtime; |
3756 | soft = 0; /* First time we see the soft limit reached */ |
3757 | } else { |
3758 | time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; |
3759 | |
3760 | if (elapsed <= |
3761 | server.client_obuf_limits[class].soft_limit_seconds) { |
3762 | soft = 0; /* The client still did not reached the max number of |
3763 | seconds for the soft limit to be considered |
3764 | reached. */ |
3765 | } |
3766 | } |
3767 | } else { |
3768 | c->obuf_soft_limit_reached_time = 0; |
3769 | } |
3770 | return soft || hard; |
3771 | } |
3772 | |
3773 | /* Asynchronously close a client if soft or hard limit is reached on the |
3774 | * output buffer size. The caller can check if the client will be closed |
3775 | * checking if the client CLIENT_CLOSE_ASAP flag is set. |
3776 | * |
3777 | * Note: we need to close the client asynchronously because this function is |
3778 | * called from contexts where the client can't be freed safely, i.e. from the |
3779 | * lower level functions pushing data inside the client output buffers. |
3780 | * When `async` is set to 0, we close the client immediately, this is |
3781 | * useful when called from cron. |
3782 | * |
3783 | * Returns 1 if client was (flagged) closed. */ |
3784 | int closeClientOnOutputBufferLimitReached(client *c, int async) { |
3785 | if (!c->conn) return 0; /* It is unsafe to free fake clients. */ |
3786 | serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); |
3787 | /* Note that c->reply_bytes is irrelevant for replica clients |
3788 | * (they use the global repl buffers). */ |
3789 | if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) || |
3790 | c->flags & CLIENT_CLOSE_ASAP) return 0; |
3791 | if (checkClientOutputBufferLimits(c)) { |
3792 | sds client = catClientInfoString(sdsempty(),c); |
3793 | |
3794 | if (async) { |
3795 | freeClientAsync(c); |
3796 | serverLog(LL_WARNING, |
3797 | "Client %s scheduled to be closed ASAP for overcoming of output buffer limits." , |
3798 | client); |
3799 | } else { |
3800 | freeClient(c); |
3801 | serverLog(LL_WARNING, |
3802 | "Client %s closed for overcoming of output buffer limits." , |
3803 | client); |
3804 | } |
3805 | sdsfree(client); |
3806 | return 1; |
3807 | } |
3808 | return 0; |
3809 | } |
3810 | |
3811 | /* Helper function used by performEvictions() in order to flush slaves |
3812 | * output buffers without returning control to the event loop. |
3813 | * This is also called by SHUTDOWN for a best-effort attempt to send |
3814 | * slaves the latest writes. */ |
3815 | void flushSlavesOutputBuffers(void) { |
3816 | listIter li; |
3817 | listNode *ln; |
3818 | |
3819 | listRewind(server.slaves,&li); |
3820 | while((ln = listNext(&li))) { |
3821 | client *slave = listNodeValue(ln); |
3822 | int can_receive_writes = connHasWriteHandler(slave->conn) || |
3823 | (slave->flags & CLIENT_PENDING_WRITE); |
3824 | |
3825 | /* We don't want to send the pending data to the replica in a few |
3826 | * cases: |
3827 | * |
3828 | * 1. For some reason there is neither the write handler installed |
3829 | * nor the client is flagged as to have pending writes: for some |
3830 | * reason this replica may not be set to receive data. This is |
3831 | * just for the sake of defensive programming. |
3832 | * |
3833 | * 2. The put_online_on_ack flag is true. To know why we don't want |
3834 | * to send data to the replica in this case, please grep for the |
3835 | * flag for this flag. |
3836 | * |
3837 | * 3. Obviously if the slave is not ONLINE. |
3838 | */ |
3839 | if (slave->replstate == SLAVE_STATE_ONLINE && |
3840 | can_receive_writes && |
3841 | !slave->repl_start_cmd_stream_on_ack && |
3842 | clientHasPendingReplies(slave)) |
3843 | { |
3844 | writeToClient(slave,0); |
3845 | } |
3846 | } |
3847 | } |
3848 | |
3849 | /* Compute current most restrictive pause type and its end time, aggregated for |
3850 | * all pause purposes. */ |
3851 | static void updateClientPauseTypeAndEndTime(void) { |
3852 | pause_type old_type = server.client_pause_type; |
3853 | pause_type type = CLIENT_PAUSE_OFF; |
3854 | mstime_t end = 0; |
3855 | for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) { |
3856 | pause_event *p = server.client_pause_per_purpose[i]; |
3857 | if (p == NULL) { |
3858 | /* Nothing to do. */ |
3859 | } else if (p->end < server.mstime) { |
3860 | /* This one expired. */ |
3861 | zfree(p); |
3862 | server.client_pause_per_purpose[i] = NULL; |
3863 | } else if (p->type > type) { |
3864 | /* This type is the most restrictive so far. */ |
3865 | type = p->type; |
3866 | } |
3867 | } |
3868 | |
3869 | /* Find the furthest end time among the pause purposes of the most |
3870 | * restrictive type */ |
3871 | for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) { |
3872 | pause_event *p = server.client_pause_per_purpose[i]; |
3873 | if (p != NULL && p->type == type && p->end > end) end = p->end; |
3874 | } |
3875 | server.client_pause_type = type; |
3876 | server.client_pause_end_time = end; |
3877 | |
3878 | /* If the pause type is less restrictive than before, we unblock all clients |
3879 | * so they are reprocessed (may get re-paused). */ |
3880 | if (type < old_type) { |
3881 | unblockPostponedClients(); |
3882 | } |
3883 | } |
3884 | |
3885 | /* Unblock all paused clients (ones that where blocked by BLOCKED_POSTPONE (possibly in processCommand). |
3886 | * This means they'll get re-processed in beforeSleep, and may get paused again if needed. */ |
3887 | void unblockPostponedClients() { |
3888 | listNode *ln; |
3889 | listIter li; |
3890 | listRewind(server.postponed_clients, &li); |
3891 | while ((ln = listNext(&li)) != NULL) { |
3892 | client *c = listNodeValue(ln); |
3893 | unblockClient(c); |
3894 | } |
3895 | } |
3896 | |
3897 | /* Pause clients up to the specified unixtime (in ms) for a given type of |
3898 | * commands. |
3899 | * |
3900 | * A main use case of this function is to allow pausing replication traffic |
3901 | * so that a failover without data loss to occur. Replicas will continue to receive |
3902 | * traffic to facilitate this functionality. |
3903 | * |
3904 | * This function is also internally used by Redis Cluster for the manual |
3905 | * failover procedure implemented by CLUSTER FAILOVER. |
3906 | * |
3907 | * The function always succeed, even if there is already a pause in progress. |
3908 | * In such a case, the duration is set to the maximum and new end time and the |
3909 | * type is set to the more restrictive type of pause. */ |
3910 | void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) { |
3911 | /* Manage pause type and end time per pause purpose. */ |
3912 | if (server.client_pause_per_purpose[purpose] == NULL) { |
3913 | server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event)); |
3914 | server.client_pause_per_purpose[purpose]->type = type; |
3915 | server.client_pause_per_purpose[purpose]->end = end; |
3916 | } else { |
3917 | pause_event *p = server.client_pause_per_purpose[purpose]; |
3918 | p->type = max(p->type, type); |
3919 | p->end = max(p->end, end); |
3920 | } |
3921 | updateClientPauseTypeAndEndTime(); |
3922 | |
3923 | /* We allow write commands that were queued |
3924 | * up before and after to execute. We need |
3925 | * to track this state so that we don't assert |
3926 | * in propagateNow(). */ |
3927 | if (server.in_exec) { |
3928 | server.client_pause_in_transaction = 1; |
3929 | } |
3930 | } |
3931 | |
3932 | /* Unpause clients and queue them for reprocessing. */ |
3933 | void unpauseClients(pause_purpose purpose) { |
3934 | if (server.client_pause_per_purpose[purpose] == NULL) return; |
3935 | zfree(server.client_pause_per_purpose[purpose]); |
3936 | server.client_pause_per_purpose[purpose] = NULL; |
3937 | updateClientPauseTypeAndEndTime(); |
3938 | } |
3939 | |
3940 | /* Returns true if clients are paused and false otherwise. */ |
3941 | int areClientsPaused(void) { |
3942 | return server.client_pause_type != CLIENT_PAUSE_OFF; |
3943 | } |
3944 | |
3945 | /* Checks if the current client pause has elapsed and unpause clients |
3946 | * if it has. Also returns true if clients are now paused and false |
3947 | * otherwise. */ |
3948 | int checkClientPauseTimeoutAndReturnIfPaused(void) { |
3949 | if (!areClientsPaused()) |
3950 | return 0; |
3951 | if (server.client_pause_end_time < server.mstime) { |
3952 | updateClientPauseTypeAndEndTime(); |
3953 | } |
3954 | return areClientsPaused(); |
3955 | } |
3956 | |
3957 | /* This function is called by Redis in order to process a few events from |
3958 | * time to time while blocked into some not interruptible operation. |
3959 | * This allows to reply to clients with the -LOADING error while loading the |
3960 | * data set at startup or after a full resynchronization with the master |
3961 | * and so forth. |
3962 | * |
3963 | * It calls the event loop in order to process a few events. Specifically we |
3964 | * try to call the event loop 4 times as long as we receive acknowledge that |
3965 | * some event was processed, in order to go forward with the accept, read, |
3966 | * write, close sequence needed to serve a client. |
3967 | * |
3968 | * The function returns the total number of events processed. */ |
3969 | void processEventsWhileBlocked(void) { |
3970 | int iterations = 4; /* See the function top-comment. */ |
3971 | |
3972 | /* Update our cached time since it is used to create and update the last |
3973 | * interaction time with clients and for other important things. */ |
3974 | updateCachedTime(0); |
3975 | |
3976 | /* Note: when we are processing events while blocked (for instance during |
3977 | * busy Lua scripts), we set a global flag. When such flag is set, we |
3978 | * avoid handling the read part of clients using threaded I/O. |
3979 | * See https://github.com/redis/redis/issues/6988 for more info. |
3980 | * Note that there could be cases of nested calls to this function, |
3981 | * specifically on a busy script during async_loading rdb, and scripts |
3982 | * that came from AOF. */ |
3983 | ProcessingEventsWhileBlocked++; |
3984 | while (iterations--) { |
3985 | long long startval = server.events_processed_while_blocked; |
3986 | long long ae_events = aeProcessEvents(server.el, |
3987 | AE_FILE_EVENTS|AE_DONT_WAIT| |
3988 | AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); |
3989 | /* Note that server.events_processed_while_blocked will also get |
3990 | * incremented by callbacks called by the event loop handlers. */ |
3991 | server.events_processed_while_blocked += ae_events; |
3992 | long long events = server.events_processed_while_blocked - startval; |
3993 | if (!events) break; |
3994 | } |
3995 | |
3996 | whileBlockedCron(); |
3997 | |
3998 | ProcessingEventsWhileBlocked--; |
3999 | serverAssert(ProcessingEventsWhileBlocked >= 0); |
4000 | } |
4001 | |
4002 | /* ========================================================================== |
4003 | * Threaded I/O |
4004 | * ========================================================================== */ |
4005 | |
4006 | #define IO_THREADS_MAX_NUM 128 |
4007 | #define CACHE_LINE_SIZE 64 |
4008 | |
4009 | typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { |
4010 | redisAtomic unsigned long value; |
4011 | } threads_pending; |
4012 | |
4013 | pthread_t io_threads[IO_THREADS_MAX_NUM]; |
4014 | pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; |
4015 | threads_pending io_threads_pending[IO_THREADS_MAX_NUM]; |
4016 | int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??! |
4017 | |
4018 | /* This is the list of clients each thread will serve when threaded I/O is |
4019 | * used. We spawn io_threads_num-1 threads, since one is the main thread |
4020 | * itself. */ |
4021 | list *io_threads_list[IO_THREADS_MAX_NUM]; |
4022 | |
4023 | static inline unsigned long getIOPendingCount(int i) { |
4024 | unsigned long count = 0; |
4025 | atomicGetWithSync(io_threads_pending[i].value, count); |
4026 | return count; |
4027 | } |
4028 | |
4029 | static inline void setIOPendingCount(int i, unsigned long count) { |
4030 | atomicSetWithSync(io_threads_pending[i].value, count); |
4031 | } |
4032 | |
4033 | void *IOThreadMain(void *myid) { |
4034 | /* The ID is the thread number (from 0 to server.iothreads_num-1), and is |
4035 | * used by the thread to just manipulate a single sub-array of clients. */ |
4036 | long id = (unsigned long)myid; |
4037 | char thdname[16]; |
4038 | |
4039 | snprintf(thdname, sizeof(thdname), "io_thd_%ld" , id); |
4040 | redis_set_thread_title(thdname); |
4041 | redisSetCpuAffinity(server.server_cpulist); |
4042 | makeThreadKillable(); |
4043 | |
4044 | while(1) { |
4045 | /* Wait for start */ |
4046 | for (int j = 0; j < 1000000; j++) { |
4047 | if (getIOPendingCount(id) != 0) break; |
4048 | } |
4049 | |
4050 | /* Give the main thread a chance to stop this thread. */ |
4051 | if (getIOPendingCount(id) == 0) { |
4052 | pthread_mutex_lock(&io_threads_mutex[id]); |
4053 | pthread_mutex_unlock(&io_threads_mutex[id]); |
4054 | continue; |
4055 | } |
4056 | |
4057 | serverAssert(getIOPendingCount(id) != 0); |
4058 | |
4059 | /* Process: note that the main thread will never touch our list |
4060 | * before we drop the pending count to 0. */ |
4061 | listIter li; |
4062 | listNode *ln; |
4063 | listRewind(io_threads_list[id],&li); |
4064 | while((ln = listNext(&li))) { |
4065 | client *c = listNodeValue(ln); |
4066 | if (io_threads_op == IO_THREADS_OP_WRITE) { |
4067 | writeToClient(c,0); |
4068 | } else if (io_threads_op == IO_THREADS_OP_READ) { |
4069 | readQueryFromClient(c->conn); |
4070 | } else { |
4071 | serverPanic("io_threads_op value is unknown" ); |
4072 | } |
4073 | } |
4074 | listEmpty(io_threads_list[id]); |
4075 | setIOPendingCount(id, 0); |
4076 | } |
4077 | } |
4078 | |
4079 | /* Initialize the data structures needed for threaded I/O. */ |
4080 | void initThreadedIO(void) { |
4081 | server.io_threads_active = 0; /* We start with threads not active. */ |
4082 | |
4083 | /* Indicate that io-threads are currently idle */ |
4084 | io_threads_op = IO_THREADS_OP_IDLE; |
4085 | |
4086 | /* Don't spawn any thread if the user selected a single thread: |
4087 | * we'll handle I/O directly from the main thread. */ |
4088 | if (server.io_threads_num == 1) return; |
4089 | |
4090 | if (server.io_threads_num > IO_THREADS_MAX_NUM) { |
4091 | serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " |
4092 | "The maximum number is %d." , IO_THREADS_MAX_NUM); |
4093 | exit(1); |
4094 | } |
4095 | |
4096 | /* Spawn and initialize the I/O threads. */ |
4097 | for (int i = 0; i < server.io_threads_num; i++) { |
4098 | /* Things we do for all the threads including the main thread. */ |
4099 | io_threads_list[i] = listCreate(); |
4100 | if (i == 0) continue; /* Thread 0 is the main thread. */ |
4101 | |
4102 | /* Things we do only for the additional threads. */ |
4103 | pthread_t tid; |
4104 | pthread_mutex_init(&io_threads_mutex[i],NULL); |
4105 | setIOPendingCount(i, 0); |
4106 | pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ |
4107 | if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { |
4108 | serverLog(LL_WARNING,"Fatal: Can't initialize IO thread." ); |
4109 | exit(1); |
4110 | } |
4111 | io_threads[i] = tid; |
4112 | } |
4113 | } |
4114 | |
4115 | void killIOThreads(void) { |
4116 | int err, j; |
4117 | for (j = 0; j < server.io_threads_num; j++) { |
4118 | if (io_threads[j] == pthread_self()) continue; |
4119 | if (io_threads[j] && pthread_cancel(io_threads[j]) == 0) { |
4120 | if ((err = pthread_join(io_threads[j],NULL)) != 0) { |
4121 | serverLog(LL_WARNING, |
4122 | "IO thread(tid:%lu) can not be joined: %s" , |
4123 | (unsigned long)io_threads[j], strerror(err)); |
4124 | } else { |
4125 | serverLog(LL_WARNING, |
4126 | "IO thread(tid:%lu) terminated" ,(unsigned long)io_threads[j]); |
4127 | } |
4128 | } |
4129 | } |
4130 | } |
4131 | |
4132 | void startThreadedIO(void) { |
4133 | serverAssert(server.io_threads_active == 0); |
4134 | for (int j = 1; j < server.io_threads_num; j++) |
4135 | pthread_mutex_unlock(&io_threads_mutex[j]); |
4136 | server.io_threads_active = 1; |
4137 | } |
4138 | |
4139 | void stopThreadedIO(void) { |
4140 | /* We may have still clients with pending reads when this function |
4141 | * is called: handle them before stopping the threads. */ |
4142 | handleClientsWithPendingReadsUsingThreads(); |
4143 | serverAssert(server.io_threads_active == 1); |
4144 | for (int j = 1; j < server.io_threads_num; j++) |
4145 | pthread_mutex_lock(&io_threads_mutex[j]); |
4146 | server.io_threads_active = 0; |
4147 | } |
4148 | |
4149 | /* This function checks if there are not enough pending clients to justify |
4150 | * taking the I/O threads active: in that case I/O threads are stopped if |
4151 | * currently active. We track the pending writes as a measure of clients |
4152 | * we need to handle in parallel, however the I/O threading is disabled |
4153 | * globally for reads as well if we have too little pending clients. |
4154 | * |
4155 | * The function returns 0 if the I/O threading should be used because there |
4156 | * are enough active threads, otherwise 1 is returned and the I/O threads |
4157 | * could be possibly stopped (if already active) as a side effect. */ |
4158 | int stopThreadedIOIfNeeded(void) { |
4159 | int pending = listLength(server.clients_pending_write); |
4160 | |
4161 | /* Return ASAP if IO threads are disabled (single threaded mode). */ |
4162 | if (server.io_threads_num == 1) return 1; |
4163 | |
4164 | if (pending < (server.io_threads_num*2)) { |
4165 | if (server.io_threads_active) stopThreadedIO(); |
4166 | return 1; |
4167 | } else { |
4168 | return 0; |
4169 | } |
4170 | } |
4171 | |
4172 | /* This function achieves thread safety using a fan-out -> fan-in paradigm: |
4173 | * Fan out: The main thread fans out work to the io-threads which block until |
4174 | * setIOPendingCount() is called with a value larger than 0 by the main thread. |
4175 | * Fan in: The main thread waits until getIOPendingCount() returns 0. Then |
4176 | * it can safely perform post-processing and return to normal synchronous |
4177 | * work. */ |
4178 | int handleClientsWithPendingWritesUsingThreads(void) { |
4179 | int processed = listLength(server.clients_pending_write); |
4180 | if (processed == 0) return 0; /* Return ASAP if there are no clients. */ |
4181 | |
4182 | /* If I/O threads are disabled or we have few clients to serve, don't |
4183 | * use I/O threads, but the boring synchronous code. */ |
4184 | if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { |
4185 | return handleClientsWithPendingWrites(); |
4186 | } |
4187 | |
4188 | /* Start threads if needed. */ |
4189 | if (!server.io_threads_active) startThreadedIO(); |
4190 | |
4191 | /* Distribute the clients across N different lists. */ |
4192 | listIter li; |
4193 | listNode *ln; |
4194 | listRewind(server.clients_pending_write,&li); |
4195 | int item_id = 0; |
4196 | while((ln = listNext(&li))) { |
4197 | client *c = listNodeValue(ln); |
4198 | c->flags &= ~CLIENT_PENDING_WRITE; |
4199 | |
4200 | /* Remove clients from the list of pending writes since |
4201 | * they are going to be closed ASAP. */ |
4202 | if (c->flags & CLIENT_CLOSE_ASAP) { |
4203 | listDelNode(server.clients_pending_write, ln); |
4204 | continue; |
4205 | } |
4206 | |
4207 | /* Since all replicas and replication backlog use global replication |
4208 | * buffer, to guarantee data accessing thread safe, we must put all |
4209 | * replicas client into io_threads_list[0] i.e. main thread handles |
4210 | * sending the output buffer of all replicas. */ |
4211 | if (getClientType(c) == CLIENT_TYPE_SLAVE) { |
4212 | listAddNodeTail(io_threads_list[0],c); |
4213 | continue; |
4214 | } |
4215 | |
4216 | int target_id = item_id % server.io_threads_num; |
4217 | listAddNodeTail(io_threads_list[target_id],c); |
4218 | item_id++; |
4219 | } |
4220 | |
4221 | /* Give the start condition to the waiting threads, by setting the |
4222 | * start condition atomic var. */ |
4223 | io_threads_op = IO_THREADS_OP_WRITE; |
4224 | for (int j = 1; j < server.io_threads_num; j++) { |
4225 | int count = listLength(io_threads_list[j]); |
4226 | setIOPendingCount(j, count); |
4227 | } |
4228 | |
4229 | /* Also use the main thread to process a slice of clients. */ |
4230 | listRewind(io_threads_list[0],&li); |
4231 | while((ln = listNext(&li))) { |
4232 | client *c = listNodeValue(ln); |
4233 | writeToClient(c,0); |
4234 | } |
4235 | listEmpty(io_threads_list[0]); |
4236 | |
4237 | /* Wait for all the other threads to end their work. */ |
4238 | while(1) { |
4239 | unsigned long pending = 0; |
4240 | for (int j = 1; j < server.io_threads_num; j++) |
4241 | pending += getIOPendingCount(j); |
4242 | if (pending == 0) break; |
4243 | } |
4244 | |
4245 | io_threads_op = IO_THREADS_OP_IDLE; |
4246 | |
4247 | /* Run the list of clients again to install the write handler where |
4248 | * needed. */ |
4249 | listRewind(server.clients_pending_write,&li); |
4250 | while((ln = listNext(&li))) { |
4251 | client *c = listNodeValue(ln); |
4252 | |
4253 | /* Update the client in the mem usage after we're done processing it in the io-threads */ |
4254 | updateClientMemUsage(c); |
4255 | |
4256 | /* Install the write handler if there are pending writes in some |
4257 | * of the clients. */ |
4258 | if (clientHasPendingReplies(c)) { |
4259 | installClientWriteHandler(c); |
4260 | } |
4261 | } |
4262 | listEmpty(server.clients_pending_write); |
4263 | |
4264 | /* Update processed count on server */ |
4265 | server.stat_io_writes_processed += processed; |
4266 | |
4267 | return processed; |
4268 | } |
4269 | |
4270 | /* Return 1 if we want to handle the client read later using threaded I/O. |
4271 | * This is called by the readable handler of the event loop. |
4272 | * As a side effect of calling this function the client is put in the |
4273 | * pending read clients and flagged as such. */ |
4274 | int postponeClientRead(client *c) { |
4275 | if (server.io_threads_active && |
4276 | server.io_threads_do_reads && |
4277 | !ProcessingEventsWhileBlocked && |
4278 | !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) && |
4279 | io_threads_op == IO_THREADS_OP_IDLE) |
4280 | { |
4281 | listAddNodeHead(server.clients_pending_read,c); |
4282 | c->pending_read_list_node = listFirst(server.clients_pending_read); |
4283 | return 1; |
4284 | } else { |
4285 | return 0; |
4286 | } |
4287 | } |
4288 | |
4289 | /* When threaded I/O is also enabled for the reading + parsing side, the |
4290 | * readable handler will just put normal clients into a queue of clients to |
4291 | * process (instead of serving them synchronously). This function runs |
4292 | * the queue using the I/O threads, and process them in order to accumulate |
4293 | * the reads in the buffers, and also parse the first command available |
4294 | * rendering it in the client structures. |
4295 | * This function achieves thread safety using a fan-out -> fan-in paradigm: |
4296 | * Fan out: The main thread fans out work to the io-threads which block until |
4297 | * setIOPendingCount() is called with a value larger than 0 by the main thread. |
4298 | * Fan in: The main thread waits until getIOPendingCount() returns 0. Then |
4299 | * it can safely perform post-processing and return to normal synchronous |
4300 | * work. */ |
4301 | int handleClientsWithPendingReadsUsingThreads(void) { |
4302 | if (!server.io_threads_active || !server.io_threads_do_reads) return 0; |
4303 | int processed = listLength(server.clients_pending_read); |
4304 | if (processed == 0) return 0; |
4305 | |
4306 | /* Distribute the clients across N different lists. */ |
4307 | listIter li; |
4308 | listNode *ln; |
4309 | listRewind(server.clients_pending_read,&li); |
4310 | int item_id = 0; |
4311 | while((ln = listNext(&li))) { |
4312 | client *c = listNodeValue(ln); |
4313 | int target_id = item_id % server.io_threads_num; |
4314 | listAddNodeTail(io_threads_list[target_id],c); |
4315 | item_id++; |
4316 | } |
4317 | |
4318 | /* Give the start condition to the waiting threads, by setting the |
4319 | * start condition atomic var. */ |
4320 | io_threads_op = IO_THREADS_OP_READ; |
4321 | for (int j = 1; j < server.io_threads_num; j++) { |
4322 | int count = listLength(io_threads_list[j]); |
4323 | setIOPendingCount(j, count); |
4324 | } |
4325 | |
4326 | /* Also use the main thread to process a slice of clients. */ |
4327 | listRewind(io_threads_list[0],&li); |
4328 | while((ln = listNext(&li))) { |
4329 | client *c = listNodeValue(ln); |
4330 | readQueryFromClient(c->conn); |
4331 | } |
4332 | listEmpty(io_threads_list[0]); |
4333 | |
4334 | /* Wait for all the other threads to end their work. */ |
4335 | while(1) { |
4336 | unsigned long pending = 0; |
4337 | for (int j = 1; j < server.io_threads_num; j++) |
4338 | pending += getIOPendingCount(j); |
4339 | if (pending == 0) break; |
4340 | } |
4341 | |
4342 | io_threads_op = IO_THREADS_OP_IDLE; |
4343 | |
4344 | /* Run the list of clients again to process the new buffers. */ |
4345 | while(listLength(server.clients_pending_read)) { |
4346 | ln = listFirst(server.clients_pending_read); |
4347 | client *c = listNodeValue(ln); |
4348 | listDelNode(server.clients_pending_read,ln); |
4349 | c->pending_read_list_node = NULL; |
4350 | |
4351 | serverAssert(!(c->flags & CLIENT_BLOCKED)); |
4352 | |
4353 | if (beforeNextClient(c) == C_ERR) { |
4354 | /* If the client is no longer valid, we avoid |
4355 | * processing the client later. So we just go |
4356 | * to the next. */ |
4357 | continue; |
4358 | } |
4359 | |
4360 | /* Once io-threads are idle we can update the client in the mem usage */ |
4361 | updateClientMemUsage(c); |
4362 | |
4363 | if (processPendingCommandAndInputBuffer(c) == C_ERR) { |
4364 | /* If the client is no longer valid, we avoid |
4365 | * processing the client later. So we just go |
4366 | * to the next. */ |
4367 | continue; |
4368 | } |
4369 | |
4370 | /* We may have pending replies if a thread readQueryFromClient() produced |
4371 | * replies and did not put the client in pending write queue (it can't). |
4372 | */ |
4373 | if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) |
4374 | putClientInPendingWriteQueue(c); |
4375 | } |
4376 | |
4377 | /* Update processed count on server */ |
4378 | server.stat_io_reads_processed += processed; |
4379 | |
4380 | return processed; |
4381 | } |
4382 | |
4383 | /* Returns the actual client eviction limit based on current configuration or |
4384 | * 0 if no limit. */ |
4385 | size_t getClientEvictionLimit(void) { |
4386 | size_t maxmemory_clients_actual = SIZE_MAX; |
4387 | |
4388 | /* Handle percentage of maxmemory*/ |
4389 | if (server.maxmemory_clients < 0 && server.maxmemory > 0) { |
4390 | unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100); |
4391 | if (maxmemory_clients_bytes <= SIZE_MAX) |
4392 | maxmemory_clients_actual = maxmemory_clients_bytes; |
4393 | } |
4394 | else if (server.maxmemory_clients > 0) |
4395 | maxmemory_clients_actual = server.maxmemory_clients; |
4396 | else |
4397 | return 0; |
4398 | |
4399 | /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate |
4400 | * at all with the server because of bad configuration */ |
4401 | if (maxmemory_clients_actual < 1024*128) |
4402 | maxmemory_clients_actual = 1024*128; |
4403 | |
4404 | return maxmemory_clients_actual; |
4405 | } |
4406 | |
4407 | void evictClients(void) { |
4408 | /* Start eviction from topmost bucket (largest clients) */ |
4409 | int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1; |
4410 | listIter bucket_iter; |
4411 | listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter); |
4412 | size_t client_eviction_limit = getClientEvictionLimit(); |
4413 | if (client_eviction_limit == 0) |
4414 | return; |
4415 | while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + |
4416 | server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) { |
4417 | listNode *ln = listNext(&bucket_iter); |
4418 | if (ln) { |
4419 | client *c = ln->value; |
4420 | sds ci = catClientInfoString(sdsempty(),c); |
4421 | serverLog(LL_NOTICE, "Evicting client: %s" , ci); |
4422 | freeClient(c); |
4423 | sdsfree(ci); |
4424 | server.stat_evictedclients++; |
4425 | } else { |
4426 | curr_bucket--; |
4427 | if (curr_bucket < 0) { |
4428 | serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients" ); |
4429 | break; |
4430 | } |
4431 | listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter); |
4432 | } |
4433 | } |
4434 | } |
4435 | |