1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "cluster.h" |
32 | #include "atomicvar.h" |
33 | #include "latency.h" |
34 | #include "script.h" |
35 | #include "functions.h" |
36 | |
37 | #include <signal.h> |
38 | #include <ctype.h> |
39 | |
40 | /*----------------------------------------------------------------------------- |
41 | * C-level DB API |
42 | *----------------------------------------------------------------------------*/ |
43 | |
44 | int expireIfNeeded(redisDb *db, robj *key, int force_delete_expired); |
45 | int keyIsExpired(redisDb *db, robj *key); |
46 | |
47 | /* Update LFU when an object is accessed. |
48 | * Firstly, decrement the counter if the decrement time is reached. |
49 | * Then logarithmically increment the counter, and update the access time. */ |
50 | void updateLFU(robj *val) { |
51 | unsigned long counter = LFUDecrAndReturn(val); |
52 | counter = LFULogIncr(counter); |
53 | val->lru = (LFUGetTimeInMinutes()<<8) | counter; |
54 | } |
55 | |
56 | /* Lookup a key for read or write operations, or return NULL if the key is not |
57 | * found in the specified DB. This function implements the functionality of |
58 | * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants. |
59 | * |
60 | * Side-effects of calling this function: |
61 | * |
62 | * 1. A key gets expired if it reached it's TTL. |
63 | * 2. The key's last access time is updated. |
64 | * 3. The global keys hits/misses stats are updated (reported in INFO). |
65 | * 4. If keyspace notifications are enabled, a "keymiss" notification is fired. |
66 | * |
67 | * Flags change the behavior of this command: |
68 | * |
69 | * LOOKUP_NONE (or zero): No special flags are passed. |
70 | * LOOKUP_NOTOUCH: Don't alter the last access time of the key. |
71 | * LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss. |
72 | * LOOKUP_NOSTATS: Don't increment key hits/misses counters. |
73 | * LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on |
74 | * replicas, use separate keyspace stats and events (TODO)). |
75 | * |
76 | * Note: this function also returns NULL if the key is logically expired but |
77 | * still existing, in case this is a replica and the LOOKUP_WRITE is not set. |
78 | * Even if the key expiry is master-driven, we can correctly report a key is |
79 | * expired on replicas even if the master is lagging expiring our key via DELs |
80 | * in the replication link. */ |
81 | robj *lookupKey(redisDb *db, robj *key, int flags) { |
82 | dictEntry *de = dictFind(db->dict,key->ptr); |
83 | robj *val = NULL; |
84 | if (de) { |
85 | val = dictGetVal(de); |
86 | /* Forcing deletion of expired keys on a replica makes the replica |
87 | * inconsistent with the master. We forbid it on readonly replicas, but |
88 | * we have to allow it on writable replicas to make write commands |
89 | * behave consistently. |
90 | * |
91 | * It's possible that the WRITE flag is set even during a readonly |
92 | * command, since the command may trigger events that cause modules to |
93 | * perform additional writes. */ |
94 | int is_ro_replica = server.masterhost && server.repl_slave_ro; |
95 | int force_delete_expired = flags & LOOKUP_WRITE && !is_ro_replica; |
96 | if (expireIfNeeded(db, key, force_delete_expired)) { |
97 | /* The key is no longer valid. */ |
98 | val = NULL; |
99 | } |
100 | } |
101 | |
102 | if (val) { |
103 | /* Update the access time for the ageing algorithm. |
104 | * Don't do it if we have a saving child, as this will trigger |
105 | * a copy on write madness. */ |
106 | if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){ |
107 | if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { |
108 | updateLFU(val); |
109 | } else { |
110 | val->lru = LRU_CLOCK(); |
111 | } |
112 | } |
113 | |
114 | if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) |
115 | server.stat_keyspace_hits++; |
116 | /* TODO: Use separate hits stats for WRITE */ |
117 | } else { |
118 | if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) |
119 | notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss" , key, db->id); |
120 | if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) |
121 | server.stat_keyspace_misses++; |
122 | /* TODO: Use separate misses stats and notify event for WRITE */ |
123 | } |
124 | |
125 | return val; |
126 | } |
127 | |
128 | /* Lookup a key for read operations, or return NULL if the key is not found |
129 | * in the specified DB. |
130 | * |
131 | * This API should not be used when we write to the key after obtaining |
132 | * the object linked to the key, but only for read only operations. |
133 | * |
134 | * This function is equivalent to lookupKey(). The point of using this function |
135 | * rather than lookupKey() directly is to indicate that the purpose is to read |
136 | * the key. */ |
137 | robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { |
138 | serverAssert(!(flags & LOOKUP_WRITE)); |
139 | return lookupKey(db, key, flags); |
140 | } |
141 | |
142 | /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the |
143 | * common case. */ |
144 | robj *lookupKeyRead(redisDb *db, robj *key) { |
145 | return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); |
146 | } |
147 | |
148 | /* Lookup a key for write operations, and as a side effect, if needed, expires |
149 | * the key if its TTL is reached. It's equivalent to lookupKey() with the |
150 | * LOOKUP_WRITE flag added. |
151 | * |
152 | * Returns the linked value object if the key exists or NULL if the key |
153 | * does not exist in the specified DB. */ |
154 | robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { |
155 | return lookupKey(db, key, flags | LOOKUP_WRITE); |
156 | } |
157 | |
158 | robj *lookupKeyWrite(redisDb *db, robj *key) { |
159 | return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); |
160 | } |
161 | |
162 | robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { |
163 | robj *o = lookupKeyRead(c->db, key); |
164 | if (!o) addReplyOrErrorObject(c, reply); |
165 | return o; |
166 | } |
167 | |
168 | robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { |
169 | robj *o = lookupKeyWrite(c->db, key); |
170 | if (!o) addReplyOrErrorObject(c, reply); |
171 | return o; |
172 | } |
173 | |
174 | /* Add the key to the DB. It's up to the caller to increment the reference |
175 | * counter of the value if needed. |
176 | * |
177 | * The program is aborted if the key already exists. */ |
178 | void dbAdd(redisDb *db, robj *key, robj *val) { |
179 | sds copy = sdsdup(key->ptr); |
180 | dictEntry *de = dictAddRaw(db->dict, copy, NULL); |
181 | serverAssertWithInfo(NULL, key, de != NULL); |
182 | dictSetVal(db->dict, de, val); |
183 | signalKeyAsReady(db, key, val->type); |
184 | if (server.cluster_enabled) slotToKeyAddEntry(de, db); |
185 | notifyKeyspaceEvent(NOTIFY_NEW,"new" ,key,db->id); |
186 | } |
187 | |
188 | /* This is a special version of dbAdd() that is used only when loading |
189 | * keys from the RDB file: the key is passed as an SDS string that is |
190 | * retained by the function (and not freed by the caller). |
191 | * |
192 | * Moreover this function will not abort if the key is already busy, to |
193 | * give more control to the caller, nor will signal the key as ready |
194 | * since it is not useful in this context. |
195 | * |
196 | * The function returns 1 if the key was added to the database, taking |
197 | * ownership of the SDS string, otherwise 0 is returned, and is up to the |
198 | * caller to free the SDS string. */ |
199 | int dbAddRDBLoad(redisDb *db, sds key, robj *val) { |
200 | dictEntry *de = dictAddRaw(db->dict, key, NULL); |
201 | if (de == NULL) return 0; |
202 | dictSetVal(db->dict, de, val); |
203 | if (server.cluster_enabled) slotToKeyAddEntry(de, db); |
204 | return 1; |
205 | } |
206 | |
207 | /* Overwrite an existing key with a new value. Incrementing the reference |
208 | * count of the new value is up to the caller. |
209 | * This function does not modify the expire time of the existing key. |
210 | * |
211 | * The program is aborted if the key was not already present. */ |
212 | void dbOverwrite(redisDb *db, robj *key, robj *val) { |
213 | dictEntry *de = dictFind(db->dict,key->ptr); |
214 | |
215 | serverAssertWithInfo(NULL,key,de != NULL); |
216 | dictEntry auxentry = *de; |
217 | robj *old = dictGetVal(de); |
218 | if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { |
219 | val->lru = old->lru; |
220 | } |
221 | /* Although the key is not really deleted from the database, we regard |
222 | * overwrite as two steps of unlink+add, so we still need to call the unlink |
223 | * callback of the module. */ |
224 | moduleNotifyKeyUnlink(key,old,db->id); |
225 | /* We want to try to unblock any client using a blocking XREADGROUP */ |
226 | if (old->type == OBJ_STREAM) |
227 | signalKeyAsReady(db,key,old->type); |
228 | dictSetVal(db->dict, de, val); |
229 | |
230 | if (server.lazyfree_lazy_server_del) { |
231 | freeObjAsync(key,old,db->id); |
232 | dictSetVal(db->dict, &auxentry, NULL); |
233 | } |
234 | |
235 | dictFreeVal(db->dict, &auxentry); |
236 | } |
237 | |
238 | /* High level Set operation. This function can be used in order to set |
239 | * a key, whatever it was existing or not, to a new object. |
240 | * |
241 | * 1) The ref count of the value object is incremented. |
242 | * 2) clients WATCHing for the destination key notified. |
243 | * 3) The expire time of the key is reset (the key is made persistent), |
244 | * unless 'SETKEY_KEEPTTL' is enabled in flags. |
245 | * 4) The key lookup can take place outside this interface outcome will be |
246 | * delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST' |
247 | * |
248 | * All the new keys in the database should be created via this interface. |
249 | * The client 'c' argument may be set to NULL if the operation is performed |
250 | * in a context where there is no clear client performing the operation. */ |
251 | void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { |
252 | int keyfound = 0; |
253 | |
254 | if (flags & SETKEY_ALREADY_EXIST) |
255 | keyfound = 1; |
256 | else if (!(flags & SETKEY_DOESNT_EXIST)) |
257 | keyfound = (lookupKeyWrite(db,key) != NULL); |
258 | |
259 | if (!keyfound) { |
260 | dbAdd(db,key,val); |
261 | } else { |
262 | dbOverwrite(db,key,val); |
263 | } |
264 | incrRefCount(val); |
265 | if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key); |
266 | if (!(flags & SETKEY_NO_SIGNAL)) signalModifiedKey(c,db,key); |
267 | } |
268 | |
269 | /* Return a random key, in form of a Redis object. |
270 | * If there are no keys, NULL is returned. |
271 | * |
272 | * The function makes sure to return keys not already expired. */ |
273 | robj *dbRandomKey(redisDb *db) { |
274 | dictEntry *de; |
275 | int maxtries = 100; |
276 | int allvolatile = dictSize(db->dict) == dictSize(db->expires); |
277 | |
278 | while(1) { |
279 | sds key; |
280 | robj *keyobj; |
281 | |
282 | de = dictGetFairRandomKey(db->dict); |
283 | if (de == NULL) return NULL; |
284 | |
285 | key = dictGetKey(de); |
286 | keyobj = createStringObject(key,sdslen(key)); |
287 | if (dictFind(db->expires,key)) { |
288 | if (allvolatile && server.masterhost && --maxtries == 0) { |
289 | /* If the DB is composed only of keys with an expire set, |
290 | * it could happen that all the keys are already logically |
291 | * expired in the slave, so the function cannot stop because |
292 | * expireIfNeeded() is false, nor it can stop because |
293 | * dictGetFairRandomKey() returns NULL (there are keys to return). |
294 | * To prevent the infinite loop we do some tries, but if there |
295 | * are the conditions for an infinite loop, eventually we |
296 | * return a key name that may be already expired. */ |
297 | return keyobj; |
298 | } |
299 | if (expireIfNeeded(db,keyobj,0)) { |
300 | decrRefCount(keyobj); |
301 | continue; /* search for another key. This expired. */ |
302 | } |
303 | } |
304 | return keyobj; |
305 | } |
306 | } |
307 | |
308 | /* Helper for sync and async delete. */ |
309 | static int dbGenericDelete(redisDb *db, robj *key, int async) { |
310 | /* Deleting an entry from the expires dict will not free the sds of |
311 | * the key, because it is shared with the main dictionary. */ |
312 | if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); |
313 | dictEntry *de = dictUnlink(db->dict,key->ptr); |
314 | if (de) { |
315 | robj *val = dictGetVal(de); |
316 | /* Tells the module that the key has been unlinked from the database. */ |
317 | moduleNotifyKeyUnlink(key,val,db->id); |
318 | /* We want to try to unblock any client using a blocking XREADGROUP */ |
319 | if (val->type == OBJ_STREAM) |
320 | signalKeyAsReady(db,key,val->type); |
321 | if (async) { |
322 | freeObjAsync(key, val, db->id); |
323 | dictSetVal(db->dict, de, NULL); |
324 | } |
325 | if (server.cluster_enabled) slotToKeyDelEntry(de, db); |
326 | dictFreeUnlinkedEntry(db->dict,de); |
327 | return 1; |
328 | } else { |
329 | return 0; |
330 | } |
331 | } |
332 | |
333 | /* Delete a key, value, and associated expiration entry if any, from the DB */ |
334 | int dbSyncDelete(redisDb *db, robj *key) { |
335 | return dbGenericDelete(db, key, 0); |
336 | } |
337 | |
338 | /* Delete a key, value, and associated expiration entry if any, from the DB. If |
339 | * the value consists of many allocations, it may be freed asynchronously. */ |
340 | int dbAsyncDelete(redisDb *db, robj *key) { |
341 | return dbGenericDelete(db, key, 1); |
342 | } |
343 | |
344 | /* This is a wrapper whose behavior depends on the Redis lazy free |
345 | * configuration. Deletes the key synchronously or asynchronously. */ |
346 | int dbDelete(redisDb *db, robj *key) { |
347 | return dbGenericDelete(db, key, server.lazyfree_lazy_server_del); |
348 | } |
349 | |
350 | /* Prepare the string object stored at 'key' to be modified destructively |
351 | * to implement commands like SETBIT or APPEND. |
352 | * |
353 | * An object is usually ready to be modified unless one of the two conditions |
354 | * are true: |
355 | * |
356 | * 1) The object 'o' is shared (refcount > 1), we don't want to affect |
357 | * other users. |
358 | * 2) The object encoding is not "RAW". |
359 | * |
360 | * If the object is found in one of the above conditions (or both) by the |
361 | * function, an unshared / not-encoded copy of the string object is stored |
362 | * at 'key' in the specified 'db'. Otherwise the object 'o' itself is |
363 | * returned. |
364 | * |
365 | * USAGE: |
366 | * |
367 | * The object 'o' is what the caller already obtained by looking up 'key' |
368 | * in 'db', the usage pattern looks like this: |
369 | * |
370 | * o = lookupKeyWrite(db,key); |
371 | * if (checkType(c,o,OBJ_STRING)) return; |
372 | * o = dbUnshareStringValue(db,key,o); |
373 | * |
374 | * At this point the caller is ready to modify the object, for example |
375 | * using an sdscat() call to append some data, or anything else. |
376 | */ |
377 | robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { |
378 | serverAssert(o->type == OBJ_STRING); |
379 | if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { |
380 | robj *decoded = getDecodedObject(o); |
381 | o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); |
382 | decrRefCount(decoded); |
383 | dbOverwrite(db,key,o); |
384 | } |
385 | return o; |
386 | } |
387 | |
388 | /* Remove all keys from the database(s) structure. The dbarray argument |
389 | * may not be the server main DBs (could be a temporary DB). |
390 | * |
391 | * The dbnum can be -1 if all the DBs should be emptied, or the specified |
392 | * DB index if we want to empty only a single database. |
393 | * The function returns the number of keys removed from the database(s). */ |
394 | long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, |
395 | void(callback)(dict*)) |
396 | { |
397 | long long removed = 0; |
398 | int startdb, enddb; |
399 | |
400 | if (dbnum == -1) { |
401 | startdb = 0; |
402 | enddb = server.dbnum-1; |
403 | } else { |
404 | startdb = enddb = dbnum; |
405 | } |
406 | |
407 | for (int j = startdb; j <= enddb; j++) { |
408 | removed += dictSize(dbarray[j].dict); |
409 | if (async) { |
410 | emptyDbAsync(&dbarray[j]); |
411 | } else { |
412 | dictEmpty(dbarray[j].dict,callback); |
413 | dictEmpty(dbarray[j].expires,callback); |
414 | } |
415 | /* Because all keys of database are removed, reset average ttl. */ |
416 | dbarray[j].avg_ttl = 0; |
417 | dbarray[j].expires_cursor = 0; |
418 | } |
419 | |
420 | return removed; |
421 | } |
422 | |
423 | /* Remove all data (keys and functions) from all the databases in a |
424 | * Redis server. If callback is given the function is called from |
425 | * time to time to signal that work is in progress. |
426 | * |
427 | * The dbnum can be -1 if all the DBs should be flushed, or the specified |
428 | * DB number if we want to flush only a single Redis database number. |
429 | * |
430 | * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or |
431 | * EMPTYDB_ASYNC if we want the memory to be freed in a different thread |
432 | * and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set |
433 | * to specify that we do not want to delete the functions. |
434 | * |
435 | * On success the function returns the number of keys removed from the |
436 | * database(s). Otherwise -1 is returned in the specific case the |
437 | * DB number is out of range, and errno is set to EINVAL. */ |
438 | long long emptyData(int dbnum, int flags, void(callback)(dict*)) { |
439 | int async = (flags & EMPTYDB_ASYNC); |
440 | int with_functions = !(flags & EMPTYDB_NOFUNCTIONS); |
441 | RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; |
442 | long long removed = 0; |
443 | |
444 | if (dbnum < -1 || dbnum >= server.dbnum) { |
445 | errno = EINVAL; |
446 | return -1; |
447 | } |
448 | |
449 | /* Fire the flushdb modules event. */ |
450 | moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, |
451 | REDISMODULE_SUBEVENT_FLUSHDB_START, |
452 | &fi); |
453 | |
454 | /* Make sure the WATCHed keys are affected by the FLUSH* commands. |
455 | * Note that we need to call the function while the keys are still |
456 | * there. */ |
457 | signalFlushedDb(dbnum, async); |
458 | |
459 | /* Empty redis database structure. */ |
460 | removed = emptyDbStructure(server.db, dbnum, async, callback); |
461 | |
462 | /* Flush slots to keys map if enable cluster, we can flush entire |
463 | * slots to keys map whatever dbnum because only support one DB |
464 | * in cluster mode. */ |
465 | if (server.cluster_enabled) slotToKeyFlush(server.db); |
466 | |
467 | if (dbnum == -1) flushSlaveKeysWithExpireList(); |
468 | |
469 | if (with_functions) { |
470 | serverAssert(dbnum == -1); |
471 | functionsLibCtxClearCurrent(async); |
472 | } |
473 | |
474 | /* Also fire the end event. Note that this event will fire almost |
475 | * immediately after the start event if the flush is asynchronous. */ |
476 | moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, |
477 | REDISMODULE_SUBEVENT_FLUSHDB_END, |
478 | &fi); |
479 | |
480 | return removed; |
481 | } |
482 | |
483 | /* Initialize temporary db on replica for use during diskless replication. */ |
484 | redisDb *initTempDb(void) { |
485 | redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum); |
486 | for (int i=0; i<server.dbnum; i++) { |
487 | tempDb[i].dict = dictCreate(&dbDictType); |
488 | tempDb[i].expires = dictCreate(&dbExpiresDictType); |
489 | tempDb[i].slots_to_keys = NULL; |
490 | } |
491 | |
492 | if (server.cluster_enabled) { |
493 | /* Prepare temp slot to key map to be written during async diskless replication. */ |
494 | slotToKeyInit(tempDb); |
495 | } |
496 | |
497 | return tempDb; |
498 | } |
499 | |
500 | /* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */ |
501 | void discardTempDb(redisDb *tempDb, void(callback)(dict*)) { |
502 | int async = 1; |
503 | |
504 | /* Release temp DBs. */ |
505 | emptyDbStructure(tempDb, -1, async, callback); |
506 | for (int i=0; i<server.dbnum; i++) { |
507 | dictRelease(tempDb[i].dict); |
508 | dictRelease(tempDb[i].expires); |
509 | } |
510 | |
511 | if (server.cluster_enabled) { |
512 | /* Release temp slot to key map. */ |
513 | slotToKeyDestroy(tempDb); |
514 | } |
515 | |
516 | zfree(tempDb); |
517 | } |
518 | |
519 | int selectDb(client *c, int id) { |
520 | if (id < 0 || id >= server.dbnum) |
521 | return C_ERR; |
522 | c->db = &server.db[id]; |
523 | return C_OK; |
524 | } |
525 | |
526 | long long dbTotalServerKeyCount() { |
527 | long long total = 0; |
528 | int j; |
529 | for (j = 0; j < server.dbnum; j++) { |
530 | total += dictSize(server.db[j].dict); |
531 | } |
532 | return total; |
533 | } |
534 | |
535 | /*----------------------------------------------------------------------------- |
536 | * Hooks for key space changes. |
537 | * |
538 | * Every time a key in the database is modified the function |
539 | * signalModifiedKey() is called. |
540 | * |
541 | * Every time a DB is flushed the function signalFlushDb() is called. |
542 | *----------------------------------------------------------------------------*/ |
543 | |
544 | /* Note that the 'c' argument may be NULL if the key was modified out of |
545 | * a context of a client. */ |
546 | void signalModifiedKey(client *c, redisDb *db, robj *key) { |
547 | touchWatchedKey(db,key); |
548 | trackingInvalidateKey(c,key,1); |
549 | } |
550 | |
551 | void signalFlushedDb(int dbid, int async) { |
552 | int startdb, enddb; |
553 | if (dbid == -1) { |
554 | startdb = 0; |
555 | enddb = server.dbnum-1; |
556 | } else { |
557 | startdb = enddb = dbid; |
558 | } |
559 | |
560 | for (int j = startdb; j <= enddb; j++) { |
561 | scanDatabaseForDeletedStreams(&server.db[j], NULL); |
562 | touchAllWatchedKeysInDb(&server.db[j], NULL); |
563 | } |
564 | |
565 | trackingInvalidateKeysOnFlush(async); |
566 | |
567 | /* Changes in this method may take place in swapMainDbWithTempDb as well, |
568 | * where we execute similar calls, but with subtle differences as it's |
569 | * not simply flushing db. */ |
570 | } |
571 | |
572 | /*----------------------------------------------------------------------------- |
573 | * Type agnostic commands operating on the key space |
574 | *----------------------------------------------------------------------------*/ |
575 | |
576 | /* Return the set of flags to use for the emptyDb() call for FLUSHALL |
577 | * and FLUSHDB commands. |
578 | * |
579 | * sync: flushes the database in an sync manner. |
580 | * async: flushes the database in an async manner. |
581 | * no option: determine sync or async according to the value of lazyfree-lazy-user-flush. |
582 | * |
583 | * On success C_OK is returned and the flags are stored in *flags, otherwise |
584 | * C_ERR is returned and the function sends an error to the client. */ |
585 | int getFlushCommandFlags(client *c, int *flags) { |
586 | /* Parse the optional ASYNC option. */ |
587 | if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync" )) { |
588 | *flags = EMPTYDB_NO_FLAGS; |
589 | } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async" )) { |
590 | *flags = EMPTYDB_ASYNC; |
591 | } else if (c->argc == 1) { |
592 | *flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; |
593 | } else { |
594 | addReplyErrorObject(c,shared.syntaxerr); |
595 | return C_ERR; |
596 | } |
597 | return C_OK; |
598 | } |
599 | |
600 | /* Flushes the whole server data set. */ |
601 | void flushAllDataAndResetRDB(int flags) { |
602 | server.dirty += emptyData(-1,flags,NULL); |
603 | if (server.child_type == CHILD_TYPE_RDB) killRDBChild(); |
604 | if (server.saveparamslen > 0) { |
605 | rdbSaveInfo rsi, *rsiptr; |
606 | rsiptr = rdbPopulateSaveInfo(&rsi); |
607 | rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr); |
608 | } |
609 | |
610 | #if defined(USE_JEMALLOC) |
611 | /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. |
612 | * for large databases, flushdb blocks for long anyway, so a bit more won't |
613 | * harm and this way the flush and purge will be synchronous. */ |
614 | if (!(flags & EMPTYDB_ASYNC)) |
615 | jemalloc_purge(); |
616 | #endif |
617 | } |
618 | |
619 | /* FLUSHDB [ASYNC] |
620 | * |
621 | * Flushes the currently SELECTed Redis DB. */ |
622 | void flushdbCommand(client *c) { |
623 | int flags; |
624 | |
625 | if (getFlushCommandFlags(c,&flags) == C_ERR) return; |
626 | /* flushdb should not flush the functions */ |
627 | server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL); |
628 | |
629 | /* Without the forceCommandPropagation, when DB was already empty, |
630 | * FLUSHDB will not be replicated nor put into the AOF. */ |
631 | forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); |
632 | |
633 | addReply(c,shared.ok); |
634 | |
635 | #if defined(USE_JEMALLOC) |
636 | /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. |
637 | * for large databases, flushdb blocks for long anyway, so a bit more won't |
638 | * harm and this way the flush and purge will be synchronous. */ |
639 | if (!(flags & EMPTYDB_ASYNC)) |
640 | jemalloc_purge(); |
641 | #endif |
642 | } |
643 | |
644 | /* FLUSHALL [ASYNC] |
645 | * |
646 | * Flushes the whole server data set. */ |
647 | void flushallCommand(client *c) { |
648 | int flags; |
649 | if (getFlushCommandFlags(c,&flags) == C_ERR) return; |
650 | /* flushall should not flush the functions */ |
651 | flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); |
652 | |
653 | /* Without the forceCommandPropagation, when DBs were already empty, |
654 | * FLUSHALL will not be replicated nor put into the AOF. */ |
655 | forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); |
656 | |
657 | addReply(c,shared.ok); |
658 | } |
659 | |
660 | /* This command implements DEL and LAZYDEL. */ |
661 | void delGenericCommand(client *c, int lazy) { |
662 | int numdel = 0, j; |
663 | |
664 | for (j = 1; j < c->argc; j++) { |
665 | expireIfNeeded(c->db,c->argv[j],0); |
666 | int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : |
667 | dbSyncDelete(c->db,c->argv[j]); |
668 | if (deleted) { |
669 | signalModifiedKey(c,c->db,c->argv[j]); |
670 | notifyKeyspaceEvent(NOTIFY_GENERIC, |
671 | "del" ,c->argv[j],c->db->id); |
672 | server.dirty++; |
673 | numdel++; |
674 | } |
675 | } |
676 | addReplyLongLong(c,numdel); |
677 | } |
678 | |
679 | void delCommand(client *c) { |
680 | delGenericCommand(c,server.lazyfree_lazy_user_del); |
681 | } |
682 | |
683 | void unlinkCommand(client *c) { |
684 | delGenericCommand(c,1); |
685 | } |
686 | |
687 | /* EXISTS key1 key2 ... key_N. |
688 | * Return value is the number of keys existing. */ |
689 | void existsCommand(client *c) { |
690 | long long count = 0; |
691 | int j; |
692 | |
693 | for (j = 1; j < c->argc; j++) { |
694 | if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++; |
695 | } |
696 | addReplyLongLong(c,count); |
697 | } |
698 | |
699 | void selectCommand(client *c) { |
700 | int id; |
701 | |
702 | if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK) |
703 | return; |
704 | |
705 | if (server.cluster_enabled && id != 0) { |
706 | addReplyError(c,"SELECT is not allowed in cluster mode" ); |
707 | return; |
708 | } |
709 | if (selectDb(c,id) == C_ERR) { |
710 | addReplyError(c,"DB index is out of range" ); |
711 | } else { |
712 | addReply(c,shared.ok); |
713 | } |
714 | } |
715 | |
716 | void randomkeyCommand(client *c) { |
717 | robj *key; |
718 | |
719 | if ((key = dbRandomKey(c->db)) == NULL) { |
720 | addReplyNull(c); |
721 | return; |
722 | } |
723 | |
724 | addReplyBulk(c,key); |
725 | decrRefCount(key); |
726 | } |
727 | |
728 | void keysCommand(client *c) { |
729 | dictIterator *di; |
730 | dictEntry *de; |
731 | sds pattern = c->argv[1]->ptr; |
732 | int plen = sdslen(pattern), allkeys; |
733 | unsigned long numkeys = 0; |
734 | void *replylen = addReplyDeferredLen(c); |
735 | |
736 | di = dictGetSafeIterator(c->db->dict); |
737 | allkeys = (pattern[0] == '*' && plen == 1); |
738 | while((de = dictNext(di)) != NULL) { |
739 | sds key = dictGetKey(de); |
740 | robj *keyobj; |
741 | |
742 | if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { |
743 | keyobj = createStringObject(key,sdslen(key)); |
744 | if (!keyIsExpired(c->db,keyobj)) { |
745 | addReplyBulk(c,keyobj); |
746 | numkeys++; |
747 | } |
748 | decrRefCount(keyobj); |
749 | } |
750 | } |
751 | dictReleaseIterator(di); |
752 | setDeferredArrayLen(c,replylen,numkeys); |
753 | } |
754 | |
755 | /* This callback is used by scanGenericCommand in order to collect elements |
756 | * returned by the dictionary iterator into a list. */ |
757 | void scanCallback(void *privdata, const dictEntry *de) { |
758 | void **pd = (void**) privdata; |
759 | list *keys = pd[0]; |
760 | robj *o = pd[1]; |
761 | robj *key, *val = NULL; |
762 | |
763 | if (o == NULL) { |
764 | sds sdskey = dictGetKey(de); |
765 | key = createStringObject(sdskey, sdslen(sdskey)); |
766 | } else if (o->type == OBJ_SET) { |
767 | sds keysds = dictGetKey(de); |
768 | key = createStringObject(keysds,sdslen(keysds)); |
769 | } else if (o->type == OBJ_HASH) { |
770 | sds sdskey = dictGetKey(de); |
771 | sds sdsval = dictGetVal(de); |
772 | key = createStringObject(sdskey,sdslen(sdskey)); |
773 | val = createStringObject(sdsval,sdslen(sdsval)); |
774 | } else if (o->type == OBJ_ZSET) { |
775 | sds sdskey = dictGetKey(de); |
776 | key = createStringObject(sdskey,sdslen(sdskey)); |
777 | val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0); |
778 | } else { |
779 | serverPanic("Type not handled in SCAN callback." ); |
780 | } |
781 | |
782 | listAddNodeTail(keys, key); |
783 | if (val) listAddNodeTail(keys, val); |
784 | } |
785 | |
786 | /* Try to parse a SCAN cursor stored at object 'o': |
787 | * if the cursor is valid, store it as unsigned integer into *cursor and |
788 | * returns C_OK. Otherwise return C_ERR and send an error to the |
789 | * client. */ |
790 | int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { |
791 | char *eptr; |
792 | |
793 | /* Use strtoul() because we need an *unsigned* long, so |
794 | * getLongLongFromObject() does not cover the whole cursor space. */ |
795 | errno = 0; |
796 | *cursor = strtoul(o->ptr, &eptr, 10); |
797 | if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE) |
798 | { |
799 | addReplyError(c, "invalid cursor" ); |
800 | return C_ERR; |
801 | } |
802 | return C_OK; |
803 | } |
804 | |
805 | /* This command implements SCAN, HSCAN and SSCAN commands. |
806 | * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise |
807 | * if 'o' is NULL the command will operate on the dictionary associated with |
808 | * the current database. |
809 | * |
810 | * When 'o' is not NULL the function assumes that the first argument in |
811 | * the client arguments vector is a key so it skips it before iterating |
812 | * in order to parse options. |
813 | * |
814 | * In the case of a Hash object the function returns both the field and value |
815 | * of every element on the Hash. */ |
816 | void scanGenericCommand(client *c, robj *o, unsigned long cursor) { |
817 | int i, j; |
818 | list *keys = listCreate(); |
819 | listNode *node, *nextnode; |
820 | long count = 10; |
821 | sds pat = NULL; |
822 | sds typename = NULL; |
823 | int patlen = 0, use_pattern = 0; |
824 | dict *ht; |
825 | |
826 | /* Object must be NULL (to iterate keys names), or the type of the object |
827 | * must be Set, Sorted Set, or Hash. */ |
828 | serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH || |
829 | o->type == OBJ_ZSET); |
830 | |
831 | /* Set i to the first option argument. The previous one is the cursor. */ |
832 | i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */ |
833 | |
834 | /* Step 1: Parse options. */ |
835 | while (i < c->argc) { |
836 | j = c->argc - i; |
837 | if (!strcasecmp(c->argv[i]->ptr, "count" ) && j >= 2) { |
838 | if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL) |
839 | != C_OK) |
840 | { |
841 | goto cleanup; |
842 | } |
843 | |
844 | if (count < 1) { |
845 | addReplyErrorObject(c,shared.syntaxerr); |
846 | goto cleanup; |
847 | } |
848 | |
849 | i += 2; |
850 | } else if (!strcasecmp(c->argv[i]->ptr, "match" ) && j >= 2) { |
851 | pat = c->argv[i+1]->ptr; |
852 | patlen = sdslen(pat); |
853 | |
854 | /* The pattern always matches if it is exactly "*", so it is |
855 | * equivalent to disabling it. */ |
856 | use_pattern = !(patlen == 1 && pat[0] == '*'); |
857 | |
858 | i += 2; |
859 | } else if (!strcasecmp(c->argv[i]->ptr, "type" ) && o == NULL && j >= 2) { |
860 | /* SCAN for a particular type only applies to the db dict */ |
861 | typename = c->argv[i+1]->ptr; |
862 | i+= 2; |
863 | } else { |
864 | addReplyErrorObject(c,shared.syntaxerr); |
865 | goto cleanup; |
866 | } |
867 | } |
868 | |
869 | /* Step 2: Iterate the collection. |
870 | * |
871 | * Note that if the object is encoded with a listpack, intset, or any other |
872 | * representation that is not a hash table, we are sure that it is also |
873 | * composed of a small number of elements. So to avoid taking state we |
874 | * just return everything inside the object in a single call, setting the |
875 | * cursor to zero to signal the end of the iteration. */ |
876 | |
877 | /* Handle the case of a hash table. */ |
878 | ht = NULL; |
879 | if (o == NULL) { |
880 | ht = c->db->dict; |
881 | } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { |
882 | ht = o->ptr; |
883 | } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { |
884 | ht = o->ptr; |
885 | count *= 2; /* We return key / value for this type. */ |
886 | } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { |
887 | zset *zs = o->ptr; |
888 | ht = zs->dict; |
889 | count *= 2; /* We return key / value for this type. */ |
890 | } |
891 | |
892 | if (ht) { |
893 | void *privdata[2]; |
894 | /* We set the max number of iterations to ten times the specified |
895 | * COUNT, so if the hash table is in a pathological state (very |
896 | * sparsely populated) we avoid to block too much time at the cost |
897 | * of returning no or very few elements. */ |
898 | long maxiterations = count*10; |
899 | |
900 | /* We pass two pointers to the callback: the list to which it will |
901 | * add new elements, and the object containing the dictionary so that |
902 | * it is possible to fetch more data in a type-dependent way. */ |
903 | privdata[0] = keys; |
904 | privdata[1] = o; |
905 | do { |
906 | cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); |
907 | } while (cursor && |
908 | maxiterations-- && |
909 | listLength(keys) < (unsigned long)count); |
910 | } else if (o->type == OBJ_SET) { |
911 | int pos = 0; |
912 | int64_t ll; |
913 | |
914 | while(intsetGet(o->ptr,pos++,&ll)) |
915 | listAddNodeTail(keys,createStringObjectFromLongLong(ll)); |
916 | cursor = 0; |
917 | } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) { |
918 | unsigned char *p = lpFirst(o->ptr); |
919 | unsigned char *vstr; |
920 | int64_t vlen; |
921 | unsigned char intbuf[LP_INTBUF_SIZE]; |
922 | |
923 | while(p) { |
924 | vstr = lpGet(p,&vlen,intbuf); |
925 | listAddNodeTail(keys, createStringObject((char*)vstr,vlen)); |
926 | p = lpNext(o->ptr,p); |
927 | } |
928 | cursor = 0; |
929 | } else { |
930 | serverPanic("Not handled encoding in SCAN." ); |
931 | } |
932 | |
933 | /* Step 3: Filter elements. */ |
934 | node = listFirst(keys); |
935 | while (node) { |
936 | robj *kobj = listNodeValue(node); |
937 | nextnode = listNextNode(node); |
938 | int filter = 0; |
939 | |
940 | /* Filter element if it does not match the pattern. */ |
941 | if (use_pattern) { |
942 | if (sdsEncodedObject(kobj)) { |
943 | if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0)) |
944 | filter = 1; |
945 | } else { |
946 | char buf[LONG_STR_SIZE]; |
947 | int len; |
948 | |
949 | serverAssert(kobj->encoding == OBJ_ENCODING_INT); |
950 | len = ll2string(buf,sizeof(buf),(long)kobj->ptr); |
951 | if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1; |
952 | } |
953 | } |
954 | |
955 | /* Filter an element if it isn't the type we want. */ |
956 | if (!filter && o == NULL && typename){ |
957 | robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH); |
958 | char* type = getObjectTypeName(typecheck); |
959 | if (strcasecmp((char*) typename, type)) filter = 1; |
960 | } |
961 | |
962 | /* Filter element if it is an expired key. */ |
963 | if (!filter && o == NULL && expireIfNeeded(c->db, kobj, 0)) filter = 1; |
964 | |
965 | /* Remove the element and its associated value if needed. */ |
966 | if (filter) { |
967 | decrRefCount(kobj); |
968 | listDelNode(keys, node); |
969 | } |
970 | |
971 | /* If this is a hash or a sorted set, we have a flat list of |
972 | * key-value elements, so if this element was filtered, remove the |
973 | * value, or skip it if it was not filtered: we only match keys. */ |
974 | if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) { |
975 | node = nextnode; |
976 | serverAssert(node); /* assertion for valgrind (avoid NPD) */ |
977 | nextnode = listNextNode(node); |
978 | if (filter) { |
979 | kobj = listNodeValue(node); |
980 | decrRefCount(kobj); |
981 | listDelNode(keys, node); |
982 | } |
983 | } |
984 | node = nextnode; |
985 | } |
986 | |
987 | /* Step 4: Reply to the client. */ |
988 | addReplyArrayLen(c, 2); |
989 | addReplyBulkLongLong(c,cursor); |
990 | |
991 | addReplyArrayLen(c, listLength(keys)); |
992 | while ((node = listFirst(keys)) != NULL) { |
993 | robj *kobj = listNodeValue(node); |
994 | addReplyBulk(c, kobj); |
995 | decrRefCount(kobj); |
996 | listDelNode(keys, node); |
997 | } |
998 | |
999 | cleanup: |
1000 | listSetFreeMethod(keys,decrRefCountVoid); |
1001 | listRelease(keys); |
1002 | } |
1003 | |
1004 | /* The SCAN command completely relies on scanGenericCommand. */ |
1005 | void scanCommand(client *c) { |
1006 | unsigned long cursor; |
1007 | if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return; |
1008 | scanGenericCommand(c,NULL,cursor); |
1009 | } |
1010 | |
1011 | void dbsizeCommand(client *c) { |
1012 | addReplyLongLong(c,dictSize(c->db->dict)); |
1013 | } |
1014 | |
1015 | void lastsaveCommand(client *c) { |
1016 | addReplyLongLong(c,server.lastsave); |
1017 | } |
1018 | |
1019 | char* getObjectTypeName(robj *o) { |
1020 | char* type; |
1021 | if (o == NULL) { |
1022 | type = "none" ; |
1023 | } else { |
1024 | switch(o->type) { |
1025 | case OBJ_STRING: type = "string" ; break; |
1026 | case OBJ_LIST: type = "list" ; break; |
1027 | case OBJ_SET: type = "set" ; break; |
1028 | case OBJ_ZSET: type = "zset" ; break; |
1029 | case OBJ_HASH: type = "hash" ; break; |
1030 | case OBJ_STREAM: type = "stream" ; break; |
1031 | case OBJ_MODULE: { |
1032 | moduleValue *mv = o->ptr; |
1033 | type = mv->type->name; |
1034 | }; break; |
1035 | default: type = "unknown" ; break; |
1036 | } |
1037 | } |
1038 | return type; |
1039 | } |
1040 | |
1041 | void typeCommand(client *c) { |
1042 | robj *o; |
1043 | o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); |
1044 | addReplyStatus(c, getObjectTypeName(o)); |
1045 | } |
1046 | |
1047 | void shutdownCommand(client *c) { |
1048 | int flags = SHUTDOWN_NOFLAGS; |
1049 | int abort = 0; |
1050 | for (int i = 1; i < c->argc; i++) { |
1051 | if (!strcasecmp(c->argv[i]->ptr,"nosave" )) { |
1052 | flags |= SHUTDOWN_NOSAVE; |
1053 | } else if (!strcasecmp(c->argv[i]->ptr,"save" )) { |
1054 | flags |= SHUTDOWN_SAVE; |
1055 | } else if (!strcasecmp(c->argv[i]->ptr, "now" )) { |
1056 | flags |= SHUTDOWN_NOW; |
1057 | } else if (!strcasecmp(c->argv[i]->ptr, "force" )) { |
1058 | flags |= SHUTDOWN_FORCE; |
1059 | } else if (!strcasecmp(c->argv[i]->ptr, "abort" )) { |
1060 | abort = 1; |
1061 | } else { |
1062 | addReplyErrorObject(c,shared.syntaxerr); |
1063 | return; |
1064 | } |
1065 | } |
1066 | if ((abort && flags != SHUTDOWN_NOFLAGS) || |
1067 | (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE)) |
1068 | { |
1069 | /* Illegal combo. */ |
1070 | addReplyErrorObject(c,shared.syntaxerr); |
1071 | return; |
1072 | } |
1073 | |
1074 | if (abort) { |
1075 | if (abortShutdown() == C_OK) |
1076 | addReply(c, shared.ok); |
1077 | else |
1078 | addReplyError(c, "No shutdown in progress." ); |
1079 | return; |
1080 | } |
1081 | |
1082 | if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) { |
1083 | addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client" ); |
1084 | return; |
1085 | } |
1086 | |
1087 | if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) { |
1088 | /* Script timed out. Shutdown allowed only with the NOSAVE flag. See |
1089 | * also processCommand where these errors are returned. */ |
1090 | if (server.busy_module_yield_flags && server.busy_module_yield_reply) { |
1091 | addReplyErrorFormat(c, "-BUSY %s" , server.busy_module_yield_reply); |
1092 | } else if (server.busy_module_yield_flags) { |
1093 | addReplyErrorObject(c, shared.slowmoduleerr); |
1094 | } else if (scriptIsEval()) { |
1095 | addReplyErrorObject(c, shared.slowevalerr); |
1096 | } else { |
1097 | addReplyErrorObject(c, shared.slowscripterr); |
1098 | } |
1099 | return; |
1100 | } |
1101 | |
1102 | blockClient(c, BLOCKED_SHUTDOWN); |
1103 | if (prepareForShutdown(flags) == C_OK) exit(0); |
1104 | /* If we're here, then shutdown is ongoing (the client is still blocked) or |
1105 | * failed (the client has received an error). */ |
1106 | } |
1107 | |
1108 | void renameGenericCommand(client *c, int nx) { |
1109 | robj *o; |
1110 | long long expire; |
1111 | int samekey = 0; |
1112 | |
1113 | /* When source and dest key is the same, no operation is performed, |
1114 | * if the key exists, however we still return an error on unexisting key. */ |
1115 | if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1; |
1116 | |
1117 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL) |
1118 | return; |
1119 | |
1120 | if (samekey) { |
1121 | addReply(c,nx ? shared.czero : shared.ok); |
1122 | return; |
1123 | } |
1124 | |
1125 | incrRefCount(o); |
1126 | expire = getExpire(c->db,c->argv[1]); |
1127 | if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { |
1128 | if (nx) { |
1129 | decrRefCount(o); |
1130 | addReply(c,shared.czero); |
1131 | return; |
1132 | } |
1133 | /* Overwrite: delete the old key before creating the new one |
1134 | * with the same name. */ |
1135 | dbDelete(c->db,c->argv[2]); |
1136 | } |
1137 | dbAdd(c->db,c->argv[2],o); |
1138 | if (expire != -1) setExpire(c,c->db,c->argv[2],expire); |
1139 | dbDelete(c->db,c->argv[1]); |
1140 | signalModifiedKey(c,c->db,c->argv[1]); |
1141 | signalModifiedKey(c,c->db,c->argv[2]); |
1142 | notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from" , |
1143 | c->argv[1],c->db->id); |
1144 | notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to" , |
1145 | c->argv[2],c->db->id); |
1146 | server.dirty++; |
1147 | addReply(c,nx ? shared.cone : shared.ok); |
1148 | } |
1149 | |
1150 | void renameCommand(client *c) { |
1151 | renameGenericCommand(c,0); |
1152 | } |
1153 | |
1154 | void renamenxCommand(client *c) { |
1155 | renameGenericCommand(c,1); |
1156 | } |
1157 | |
1158 | void moveCommand(client *c) { |
1159 | robj *o; |
1160 | redisDb *src, *dst; |
1161 | int srcid, dbid; |
1162 | long long expire; |
1163 | |
1164 | if (server.cluster_enabled) { |
1165 | addReplyError(c,"MOVE is not allowed in cluster mode" ); |
1166 | return; |
1167 | } |
1168 | |
1169 | /* Obtain source and target DB pointers */ |
1170 | src = c->db; |
1171 | srcid = c->db->id; |
1172 | |
1173 | if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) |
1174 | return; |
1175 | |
1176 | if (selectDb(c,dbid) == C_ERR) { |
1177 | addReplyError(c,"DB index is out of range" ); |
1178 | return; |
1179 | } |
1180 | dst = c->db; |
1181 | selectDb(c,srcid); /* Back to the source DB */ |
1182 | |
1183 | /* If the user is moving using as target the same |
1184 | * DB as the source DB it is probably an error. */ |
1185 | if (src == dst) { |
1186 | addReplyErrorObject(c,shared.sameobjecterr); |
1187 | return; |
1188 | } |
1189 | |
1190 | /* Check if the element exists and get a reference */ |
1191 | o = lookupKeyWrite(c->db,c->argv[1]); |
1192 | if (!o) { |
1193 | addReply(c,shared.czero); |
1194 | return; |
1195 | } |
1196 | expire = getExpire(c->db,c->argv[1]); |
1197 | |
1198 | /* Return zero if the key already exists in the target DB */ |
1199 | if (lookupKeyWrite(dst,c->argv[1]) != NULL) { |
1200 | addReply(c,shared.czero); |
1201 | return; |
1202 | } |
1203 | dbAdd(dst,c->argv[1],o); |
1204 | if (expire != -1) setExpire(c,dst,c->argv[1],expire); |
1205 | incrRefCount(o); |
1206 | |
1207 | /* OK! key moved, free the entry in the source DB */ |
1208 | dbDelete(src,c->argv[1]); |
1209 | signalModifiedKey(c,src,c->argv[1]); |
1210 | signalModifiedKey(c,dst,c->argv[1]); |
1211 | notifyKeyspaceEvent(NOTIFY_GENERIC, |
1212 | "move_from" ,c->argv[1],src->id); |
1213 | notifyKeyspaceEvent(NOTIFY_GENERIC, |
1214 | "move_to" ,c->argv[1],dst->id); |
1215 | |
1216 | server.dirty++; |
1217 | addReply(c,shared.cone); |
1218 | } |
1219 | |
1220 | void copyCommand(client *c) { |
1221 | robj *o; |
1222 | redisDb *src, *dst; |
1223 | int srcid, dbid; |
1224 | long long expire; |
1225 | int j, replace = 0, delete = 0; |
1226 | |
1227 | /* Obtain source and target DB pointers |
1228 | * Default target DB is the same as the source DB |
1229 | * Parse the REPLACE option and targetDB option. */ |
1230 | src = c->db; |
1231 | dst = c->db; |
1232 | srcid = c->db->id; |
1233 | dbid = c->db->id; |
1234 | for (j = 3; j < c->argc; j++) { |
1235 | int additional = c->argc - j - 1; |
1236 | if (!strcasecmp(c->argv[j]->ptr,"replace" )) { |
1237 | replace = 1; |
1238 | } else if (!strcasecmp(c->argv[j]->ptr, "db" ) && additional >= 1) { |
1239 | if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK) |
1240 | return; |
1241 | |
1242 | if (selectDb(c, dbid) == C_ERR) { |
1243 | addReplyError(c,"DB index is out of range" ); |
1244 | return; |
1245 | } |
1246 | dst = c->db; |
1247 | selectDb(c,srcid); /* Back to the source DB */ |
1248 | j++; /* Consume additional arg. */ |
1249 | } else { |
1250 | addReplyErrorObject(c,shared.syntaxerr); |
1251 | return; |
1252 | } |
1253 | } |
1254 | |
1255 | if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) { |
1256 | addReplyError(c,"Copying to another database is not allowed in cluster mode" ); |
1257 | return; |
1258 | } |
1259 | |
1260 | /* If the user select the same DB as |
1261 | * the source DB and using newkey as the same key |
1262 | * it is probably an error. */ |
1263 | robj *key = c->argv[1]; |
1264 | robj *newkey = c->argv[2]; |
1265 | if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) { |
1266 | addReplyErrorObject(c,shared.sameobjecterr); |
1267 | return; |
1268 | } |
1269 | |
1270 | /* Check if the element exists and get a reference */ |
1271 | o = lookupKeyRead(c->db, key); |
1272 | if (!o) { |
1273 | addReply(c,shared.czero); |
1274 | return; |
1275 | } |
1276 | expire = getExpire(c->db,key); |
1277 | |
1278 | /* Return zero if the key already exists in the target DB. |
1279 | * If REPLACE option is selected, delete newkey from targetDB. */ |
1280 | if (lookupKeyWrite(dst,newkey) != NULL) { |
1281 | if (replace) { |
1282 | delete = 1; |
1283 | } else { |
1284 | addReply(c,shared.czero); |
1285 | return; |
1286 | } |
1287 | } |
1288 | |
1289 | /* Duplicate object according to object's type. */ |
1290 | robj *newobj; |
1291 | switch(o->type) { |
1292 | case OBJ_STRING: newobj = dupStringObject(o); break; |
1293 | case OBJ_LIST: newobj = listTypeDup(o); break; |
1294 | case OBJ_SET: newobj = setTypeDup(o); break; |
1295 | case OBJ_ZSET: newobj = zsetDup(o); break; |
1296 | case OBJ_HASH: newobj = hashTypeDup(o); break; |
1297 | case OBJ_STREAM: newobj = streamDup(o); break; |
1298 | case OBJ_MODULE: |
1299 | newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o); |
1300 | if (!newobj) return; |
1301 | break; |
1302 | default: |
1303 | addReplyError(c, "unknown type object" ); |
1304 | return; |
1305 | } |
1306 | |
1307 | if (delete) { |
1308 | dbDelete(dst,newkey); |
1309 | } |
1310 | |
1311 | dbAdd(dst,newkey,newobj); |
1312 | if (expire != -1) setExpire(c, dst, newkey, expire); |
1313 | |
1314 | /* OK! key copied */ |
1315 | signalModifiedKey(c,dst,c->argv[2]); |
1316 | notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to" ,c->argv[2],dst->id); |
1317 | |
1318 | server.dirty++; |
1319 | addReply(c,shared.cone); |
1320 | } |
1321 | |
1322 | /* Helper function for dbSwapDatabases(): scans the list of keys that have |
1323 | * one or more blocked clients for B[LR]POP or other blocking commands |
1324 | * and signal the keys as ready if they are of the right type. See the comment |
1325 | * where the function is used for more info. */ |
1326 | void scanDatabaseForReadyKeys(redisDb *db) { |
1327 | dictEntry *de; |
1328 | dictIterator *di = dictGetSafeIterator(db->blocking_keys); |
1329 | while((de = dictNext(di)) != NULL) { |
1330 | robj *key = dictGetKey(de); |
1331 | dictEntry *kde = dictFind(db->dict,key->ptr); |
1332 | if (kde) { |
1333 | robj *value = dictGetVal(kde); |
1334 | signalKeyAsReady(db, key, value->type); |
1335 | } |
1336 | } |
1337 | dictReleaseIterator(di); |
1338 | } |
1339 | |
1340 | /* Since we are unblocking XREADGROUP clients in the event the |
1341 | * key was deleted/overwritten we must do the same in case the |
1342 | * database was flushed/swapped. */ |
1343 | void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) { |
1344 | /* Optimization: If no clients are in type BLOCKED_STREAM, |
1345 | * we can skip this loop. */ |
1346 | if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; |
1347 | |
1348 | dictEntry *de; |
1349 | dictIterator *di = dictGetSafeIterator(emptied->blocking_keys); |
1350 | while((de = dictNext(di)) != NULL) { |
1351 | robj *key = dictGetKey(de); |
1352 | int was_stream = 0, is_stream = 0; |
1353 | |
1354 | dictEntry *kde = dictFind(emptied->dict, key->ptr); |
1355 | if (kde) { |
1356 | robj *value = dictGetVal(kde); |
1357 | was_stream = value->type == OBJ_STREAM; |
1358 | } |
1359 | if (replaced_with) { |
1360 | dictEntry *kde = dictFind(replaced_with->dict, key->ptr); |
1361 | if (kde) { |
1362 | robj *value = dictGetVal(kde); |
1363 | is_stream = value->type == OBJ_STREAM; |
1364 | } |
1365 | } |
1366 | /* We want to try to unblock any client using a blocking XREADGROUP */ |
1367 | if (was_stream && !is_stream) |
1368 | signalKeyAsReady(emptied, key, OBJ_STREAM); |
1369 | } |
1370 | dictReleaseIterator(di); |
1371 | } |
1372 | |
1373 | /* Swap two databases at runtime so that all clients will magically see |
1374 | * the new database even if already connected. Note that the client |
1375 | * structure c->db points to a given DB, so we need to be smarter and |
1376 | * swap the underlying referenced structures, otherwise we would need |
1377 | * to fix all the references to the Redis DB structure. |
1378 | * |
1379 | * Returns C_ERR if at least one of the DB ids are out of range, otherwise |
1380 | * C_OK is returned. */ |
1381 | int dbSwapDatabases(int id1, int id2) { |
1382 | if (id1 < 0 || id1 >= server.dbnum || |
1383 | id2 < 0 || id2 >= server.dbnum) return C_ERR; |
1384 | if (id1 == id2) return C_OK; |
1385 | redisDb aux = server.db[id1]; |
1386 | redisDb *db1 = &server.db[id1], *db2 = &server.db[id2]; |
1387 | |
1388 | /* Swapdb should make transaction fail if there is any |
1389 | * client watching keys */ |
1390 | touchAllWatchedKeysInDb(db1, db2); |
1391 | touchAllWatchedKeysInDb(db2, db1); |
1392 | |
1393 | /* Try to unblock any XREADGROUP clients if the key no longer exists. */ |
1394 | scanDatabaseForDeletedStreams(db1, db2); |
1395 | scanDatabaseForDeletedStreams(db2, db1); |
1396 | |
1397 | /* Swap hash tables. Note that we don't swap blocking_keys, |
1398 | * ready_keys and watched_keys, since we want clients to |
1399 | * remain in the same DB they were. */ |
1400 | db1->dict = db2->dict; |
1401 | db1->expires = db2->expires; |
1402 | db1->avg_ttl = db2->avg_ttl; |
1403 | db1->expires_cursor = db2->expires_cursor; |
1404 | |
1405 | db2->dict = aux.dict; |
1406 | db2->expires = aux.expires; |
1407 | db2->avg_ttl = aux.avg_ttl; |
1408 | db2->expires_cursor = aux.expires_cursor; |
1409 | |
1410 | /* Now we need to handle clients blocked on lists: as an effect |
1411 | * of swapping the two DBs, a client that was waiting for list |
1412 | * X in a given DB, may now actually be unblocked if X happens |
1413 | * to exist in the new version of the DB, after the swap. |
1414 | * |
1415 | * However normally we only do this check for efficiency reasons |
1416 | * in dbAdd() when a list is created. So here we need to rescan |
1417 | * the list of clients blocked on lists and signal lists as ready |
1418 | * if needed. */ |
1419 | scanDatabaseForReadyKeys(db1); |
1420 | scanDatabaseForReadyKeys(db2); |
1421 | return C_OK; |
1422 | } |
1423 | |
1424 | /* Logically, this discards (flushes) the old main database, and apply the newly loaded |
1425 | * database (temp) as the main (active) database, the actual freeing of old database |
1426 | * (which will now be placed in the temp one) is done later. */ |
1427 | void swapMainDbWithTempDb(redisDb *tempDb) { |
1428 | if (server.cluster_enabled) { |
1429 | /* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */ |
1430 | clusterSlotToKeyMapping *aux = server.db->slots_to_keys; |
1431 | server.db->slots_to_keys = tempDb->slots_to_keys; |
1432 | tempDb->slots_to_keys = aux; |
1433 | } |
1434 | |
1435 | for (int i=0; i<server.dbnum; i++) { |
1436 | redisDb aux = server.db[i]; |
1437 | redisDb *activedb = &server.db[i], *newdb = &tempDb[i]; |
1438 | |
1439 | /* Swapping databases should make transaction fail if there is any |
1440 | * client watching keys. */ |
1441 | touchAllWatchedKeysInDb(activedb, newdb); |
1442 | |
1443 | /* Try to unblock any XREADGROUP clients if the key no longer exists. */ |
1444 | scanDatabaseForDeletedStreams(activedb, newdb); |
1445 | |
1446 | /* Swap hash tables. Note that we don't swap blocking_keys, |
1447 | * ready_keys and watched_keys, since clients |
1448 | * remain in the same DB they were. */ |
1449 | activedb->dict = newdb->dict; |
1450 | activedb->expires = newdb->expires; |
1451 | activedb->avg_ttl = newdb->avg_ttl; |
1452 | activedb->expires_cursor = newdb->expires_cursor; |
1453 | |
1454 | newdb->dict = aux.dict; |
1455 | newdb->expires = aux.expires; |
1456 | newdb->avg_ttl = aux.avg_ttl; |
1457 | newdb->expires_cursor = aux.expires_cursor; |
1458 | |
1459 | /* Now we need to handle clients blocked on lists: as an effect |
1460 | * of swapping the two DBs, a client that was waiting for list |
1461 | * X in a given DB, may now actually be unblocked if X happens |
1462 | * to exist in the new version of the DB, after the swap. |
1463 | * |
1464 | * However normally we only do this check for efficiency reasons |
1465 | * in dbAdd() when a list is created. So here we need to rescan |
1466 | * the list of clients blocked on lists and signal lists as ready |
1467 | * if needed. */ |
1468 | scanDatabaseForReadyKeys(activedb); |
1469 | } |
1470 | |
1471 | trackingInvalidateKeysOnFlush(1); |
1472 | flushSlaveKeysWithExpireList(); |
1473 | } |
1474 | |
1475 | /* SWAPDB db1 db2 */ |
1476 | void swapdbCommand(client *c) { |
1477 | int id1, id2; |
1478 | |
1479 | /* Not allowed in cluster mode: we have just DB 0 there. */ |
1480 | if (server.cluster_enabled) { |
1481 | addReplyError(c,"SWAPDB is not allowed in cluster mode" ); |
1482 | return; |
1483 | } |
1484 | |
1485 | /* Get the two DBs indexes. */ |
1486 | if (getIntFromObjectOrReply(c, c->argv[1], &id1, |
1487 | "invalid first DB index" ) != C_OK) |
1488 | return; |
1489 | |
1490 | if (getIntFromObjectOrReply(c, c->argv[2], &id2, |
1491 | "invalid second DB index" ) != C_OK) |
1492 | return; |
1493 | |
1494 | /* Swap... */ |
1495 | if (dbSwapDatabases(id1,id2) == C_ERR) { |
1496 | addReplyError(c,"DB index is out of range" ); |
1497 | return; |
1498 | } else { |
1499 | RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2}; |
1500 | moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si); |
1501 | server.dirty++; |
1502 | addReply(c,shared.ok); |
1503 | } |
1504 | } |
1505 | |
1506 | /*----------------------------------------------------------------------------- |
1507 | * Expires API |
1508 | *----------------------------------------------------------------------------*/ |
1509 | |
1510 | int removeExpire(redisDb *db, robj *key) { |
1511 | /* An expire may only be removed if there is a corresponding entry in the |
1512 | * main dict. Otherwise, the key will never be freed. */ |
1513 | serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); |
1514 | return dictDelete(db->expires,key->ptr) == DICT_OK; |
1515 | } |
1516 | |
1517 | /* Set an expire to the specified key. If the expire is set in the context |
1518 | * of an user calling a command 'c' is the client, otherwise 'c' is set |
1519 | * to NULL. The 'when' parameter is the absolute unix time in milliseconds |
1520 | * after which the key will no longer be considered valid. */ |
1521 | void setExpire(client *c, redisDb *db, robj *key, long long when) { |
1522 | dictEntry *kde, *de; |
1523 | |
1524 | /* Reuse the sds from the main dict in the expire dict */ |
1525 | kde = dictFind(db->dict,key->ptr); |
1526 | serverAssertWithInfo(NULL,key,kde != NULL); |
1527 | de = dictAddOrFind(db->expires,dictGetKey(kde)); |
1528 | dictSetSignedIntegerVal(de,when); |
1529 | |
1530 | int writable_slave = server.masterhost && server.repl_slave_ro == 0; |
1531 | if (c && writable_slave && !(c->flags & CLIENT_MASTER)) |
1532 | rememberSlaveKeyWithExpire(db,key); |
1533 | } |
1534 | |
1535 | /* Return the expire time of the specified key, or -1 if no expire |
1536 | * is associated with this key (i.e. the key is non volatile) */ |
1537 | long long getExpire(redisDb *db, robj *key) { |
1538 | dictEntry *de; |
1539 | |
1540 | /* No expire? return ASAP */ |
1541 | if (dictSize(db->expires) == 0 || |
1542 | (de = dictFind(db->expires,key->ptr)) == NULL) return -1; |
1543 | |
1544 | /* The entry was found in the expire dict, this means it should also |
1545 | * be present in the main dict (safety check). */ |
1546 | serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); |
1547 | return dictGetSignedIntegerVal(de); |
1548 | } |
1549 | |
1550 | /* Delete the specified expired key and propagate expire. */ |
1551 | void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { |
1552 | mstime_t expire_latency; |
1553 | latencyStartMonitor(expire_latency); |
1554 | if (server.lazyfree_lazy_expire) |
1555 | dbAsyncDelete(db,keyobj); |
1556 | else |
1557 | dbSyncDelete(db,keyobj); |
1558 | latencyEndMonitor(expire_latency); |
1559 | latencyAddSampleIfNeeded("expire-del" ,expire_latency); |
1560 | notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired" ,keyobj,db->id); |
1561 | signalModifiedKey(NULL, db, keyobj); |
1562 | propagateDeletion(db,keyobj,server.lazyfree_lazy_expire); |
1563 | server.stat_expiredkeys++; |
1564 | } |
1565 | |
1566 | /* Propagate expires into slaves and the AOF file. |
1567 | * When a key expires in the master, a DEL operation for this key is sent |
1568 | * to all the slaves and the AOF file if enabled. |
1569 | * |
1570 | * This way the key expiry is centralized in one place, and since both |
1571 | * AOF and the master->slave link guarantee operation ordering, everything |
1572 | * will be consistent even if we allow write operations against expiring |
1573 | * keys. |
1574 | * |
1575 | * This function may be called from: |
1576 | * 1. Within call(): Example: Lazy-expire on key access. |
1577 | * In this case the caller doesn't have to do anything |
1578 | * because call() handles server.also_propagate(); or |
1579 | * 2. Outside of call(): Example: Active-expire, eviction. |
1580 | * In this the caller must remember to call |
1581 | * propagatePendingCommands, preferably at the end of |
1582 | * the deletion batch, so that DELs will be wrapped |
1583 | * in MULTI/EXEC */ |
1584 | void propagateDeletion(redisDb *db, robj *key, int lazy) { |
1585 | robj *argv[2]; |
1586 | |
1587 | argv[0] = lazy ? shared.unlink : shared.del; |
1588 | argv[1] = key; |
1589 | incrRefCount(argv[0]); |
1590 | incrRefCount(argv[1]); |
1591 | |
1592 | /* If the master decided to expire a key we must propagate it to replicas no matter what.. |
1593 | * Even if module executed a command without asking for propagation. */ |
1594 | int prev_replication_allowed = server.replication_allowed; |
1595 | server.replication_allowed = 1; |
1596 | alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); |
1597 | server.replication_allowed = prev_replication_allowed; |
1598 | |
1599 | decrRefCount(argv[0]); |
1600 | decrRefCount(argv[1]); |
1601 | } |
1602 | |
1603 | /* Check if the key is expired. */ |
1604 | int keyIsExpired(redisDb *db, robj *key) { |
1605 | mstime_t when = getExpire(db,key); |
1606 | mstime_t now; |
1607 | |
1608 | if (when < 0) return 0; /* No expire for this key */ |
1609 | |
1610 | /* Don't expire anything while loading. It will be done later. */ |
1611 | if (server.loading) return 0; |
1612 | |
1613 | /* If we are in the context of a Lua script, we pretend that time is |
1614 | * blocked to when the Lua script started. This way a key can expire |
1615 | * only the first time it is accessed and not in the middle of the |
1616 | * script execution, making propagation to slaves / AOF consistent. |
1617 | * See issue #1525 on Github for more information. */ |
1618 | if (server.script_caller) { |
1619 | now = scriptTimeSnapshot(); |
1620 | } |
1621 | /* If we are in the middle of a command execution, we still want to use |
1622 | * a reference time that does not change: in that case we just use the |
1623 | * cached time, that we update before each call in the call() function. |
1624 | * This way we avoid that commands such as RPOPLPUSH or similar, that |
1625 | * may re-open the same key multiple times, can invalidate an already |
1626 | * open object in a next call, if the next call will see the key expired, |
1627 | * while the first did not. */ |
1628 | else if (server.fixed_time_expire > 0) { |
1629 | now = server.mstime; |
1630 | } |
1631 | /* For the other cases, we want to use the most fresh time we have. */ |
1632 | else { |
1633 | now = mstime(); |
1634 | } |
1635 | |
1636 | /* The key expired if the current (virtual or real) time is greater |
1637 | * than the expire time of the key. */ |
1638 | return now > when; |
1639 | } |
1640 | |
1641 | /* This function is called when we are going to perform some operation |
1642 | * in a given key, but such key may be already logically expired even if |
1643 | * it still exists in the database. The main way this function is called |
1644 | * is via lookupKey*() family of functions. |
1645 | * |
1646 | * The behavior of the function depends on the replication role of the |
1647 | * instance, because by default replicas do not delete expired keys. They |
1648 | * wait for DELs from the master for consistency matters. However even |
1649 | * replicas will try to have a coherent return value for the function, |
1650 | * so that read commands executed in the replica side will be able to |
1651 | * behave like if the key is expired even if still present (because the |
1652 | * master has yet to propagate the DEL). |
1653 | * |
1654 | * In masters as a side effect of finding a key which is expired, such |
1655 | * key will be evicted from the database. Also this may trigger the |
1656 | * propagation of a DEL/UNLINK command in AOF / replication stream. |
1657 | * |
1658 | * On replicas, this function does not delete expired keys by default, but |
1659 | * it still returns 1 if the key is logically expired. To force deletion |
1660 | * of logically expired keys even on replicas, set force_delete_expired to |
1661 | * a non-zero value. Note though that if the current client is executing |
1662 | * replicated commands from the master, keys are never considered expired. |
1663 | * |
1664 | * The return value of the function is 0 if the key is still valid, |
1665 | * otherwise the function returns 1 if the key is expired. */ |
1666 | int expireIfNeeded(redisDb *db, robj *key, int force_delete_expired) { |
1667 | if (!keyIsExpired(db,key)) return 0; |
1668 | |
1669 | /* If we are running in the context of a replica, instead of |
1670 | * evicting the expired key from the database, we return ASAP: |
1671 | * the replica key expiration is controlled by the master that will |
1672 | * send us synthesized DEL operations for expired keys. The |
1673 | * exception is when write operations are performed on writable |
1674 | * replicas. |
1675 | * |
1676 | * Still we try to return the right information to the caller, |
1677 | * that is, 0 if we think the key should be still valid, 1 if |
1678 | * we think the key is expired at this time. |
1679 | * |
1680 | * When replicating commands from the master, keys are never considered |
1681 | * expired. */ |
1682 | if (server.masterhost != NULL) { |
1683 | if (server.current_client == server.master) return 0; |
1684 | if (!force_delete_expired) return 1; |
1685 | } |
1686 | |
1687 | /* If clients are paused, we keep the current dataset constant, |
1688 | * but return to the client what we believe is the right state. Typically, |
1689 | * at the end of the pause we will properly expire the key OR we will |
1690 | * have failed over and the new primary will send us the expire. */ |
1691 | if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; |
1692 | |
1693 | /* Delete the key */ |
1694 | deleteExpiredKeyAndPropagate(db,key); |
1695 | return 1; |
1696 | } |
1697 | |
1698 | /* ----------------------------------------------------------------------------- |
1699 | * API to get key arguments from commands |
1700 | * ---------------------------------------------------------------------------*/ |
1701 | |
1702 | /* Prepare the getKeysResult struct to hold numkeys, either by using the |
1703 | * pre-allocated keysbuf or by allocating a new array on the heap. |
1704 | * |
1705 | * This function must be called at least once before starting to populate |
1706 | * the result, and can be called repeatedly to enlarge the result array. |
1707 | */ |
1708 | keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) { |
1709 | /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack |
1710 | * buffer here. */ |
1711 | if (!result->keys) { |
1712 | serverAssert(!result->numkeys); |
1713 | result->keys = result->keysbuf; |
1714 | } |
1715 | |
1716 | /* Resize if necessary */ |
1717 | if (numkeys > result->size) { |
1718 | if (result->keys != result->keysbuf) { |
1719 | /* We're not using a static buffer, just (re)alloc */ |
1720 | result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference)); |
1721 | } else { |
1722 | /* We are using a static buffer, copy its contents */ |
1723 | result->keys = zmalloc(numkeys * sizeof(keyReference)); |
1724 | if (result->numkeys) |
1725 | memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference)); |
1726 | } |
1727 | result->size = numkeys; |
1728 | } |
1729 | |
1730 | return result->keys; |
1731 | } |
1732 | |
1733 | /* Returns a bitmask with all the flags found in any of the key specs of the command. |
1734 | * The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */ |
1735 | int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) { |
1736 | int64_t flags = 0; |
1737 | for (int j = 0; j < cmd->key_specs_num; j++) { |
1738 | keySpec *spec = cmd->key_specs + j; |
1739 | flags |= inv? ~spec->flags : spec->flags; |
1740 | } |
1741 | return flags; |
1742 | } |
1743 | |
1744 | /* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error. |
1745 | * There are several flags that can be used to modify how this function finds keys in a command. |
1746 | * |
1747 | * GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys. |
1748 | * GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys |
1749 | * found in other valid keyspecs. |
1750 | */ |
1751 | int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) { |
1752 | int j, i, k = 0, last, first, step; |
1753 | keyReference *keys; |
1754 | |
1755 | for (j = 0; j < cmd->key_specs_num; j++) { |
1756 | keySpec *spec = cmd->key_specs + j; |
1757 | serverAssert(spec->begin_search_type != KSPEC_BS_INVALID); |
1758 | /* Skip specs that represent 'fake' keys */ |
1759 | if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) { |
1760 | continue; |
1761 | } |
1762 | |
1763 | first = 0; |
1764 | if (spec->begin_search_type == KSPEC_BS_INDEX) { |
1765 | first = spec->bs.index.pos; |
1766 | } else if (spec->begin_search_type == KSPEC_BS_KEYWORD) { |
1767 | int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom; |
1768 | int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1; |
1769 | for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) { |
1770 | if (i >= argc || i < 1) |
1771 | break; |
1772 | if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) { |
1773 | first = i+1; |
1774 | break; |
1775 | } |
1776 | } |
1777 | /* keyword not found */ |
1778 | if (!first) { |
1779 | continue; |
1780 | } |
1781 | } else { |
1782 | /* unknown spec */ |
1783 | goto invalid_spec; |
1784 | } |
1785 | |
1786 | if (spec->find_keys_type == KSPEC_FK_RANGE) { |
1787 | step = spec->fk.range.keystep; |
1788 | if (spec->fk.range.lastkey >= 0) { |
1789 | last = first + spec->fk.range.lastkey; |
1790 | } else { |
1791 | if (!spec->fk.range.limit) { |
1792 | last = argc + spec->fk.range.lastkey; |
1793 | } else { |
1794 | serverAssert(spec->fk.range.lastkey == -1); |
1795 | last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey); |
1796 | } |
1797 | } |
1798 | } else if (spec->find_keys_type == KSPEC_FK_KEYNUM) { |
1799 | step = spec->fk.keynum.keystep; |
1800 | long long numkeys; |
1801 | if (spec->fk.keynum.keynumidx >= argc) |
1802 | goto invalid_spec; |
1803 | |
1804 | sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr; |
1805 | if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) { |
1806 | /* Unable to parse the numkeys argument or it was invalid */ |
1807 | goto invalid_spec; |
1808 | } |
1809 | |
1810 | first += spec->fk.keynum.firstkey; |
1811 | last = first + (int)numkeys-1; |
1812 | } else { |
1813 | /* unknown spec */ |
1814 | goto invalid_spec; |
1815 | } |
1816 | |
1817 | int count = ((last - first)+1); |
1818 | keys = getKeysPrepareResult(result, count); |
1819 | |
1820 | /* First or last is out of bounds, which indicates a syntax error */ |
1821 | if (last >= argc || last < first || first >= argc) { |
1822 | goto invalid_spec; |
1823 | } |
1824 | |
1825 | for (i = first; i <= last; i += step) { |
1826 | if (i >= argc || i < first) { |
1827 | /* Modules commands, and standard commands with a not fixed number |
1828 | * of arguments (negative arity parameter) do not have dispatch |
1829 | * time arity checks, so we need to handle the case where the user |
1830 | * passed an invalid number of arguments here. In this case we |
1831 | * return no keys and expect the command implementation to report |
1832 | * an arity or syntax error. */ |
1833 | if (cmd->flags & CMD_MODULE || cmd->arity < 0) { |
1834 | continue; |
1835 | } else { |
1836 | serverPanic("Redis built-in command declared keys positions not matching the arity requirements." ); |
1837 | } |
1838 | } |
1839 | keys[k].pos = i; |
1840 | keys[k++].flags = spec->flags; |
1841 | } |
1842 | |
1843 | /* Handle incomplete specs (only after we added the current spec |
1844 | * to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */ |
1845 | if (spec->flags & CMD_KEY_INCOMPLETE) { |
1846 | goto invalid_spec; |
1847 | } |
1848 | |
1849 | /* Done with this spec */ |
1850 | continue; |
1851 | |
1852 | invalid_spec: |
1853 | if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) { |
1854 | continue; |
1855 | } else { |
1856 | result->numkeys = 0; |
1857 | return -1; |
1858 | } |
1859 | } |
1860 | |
1861 | result->numkeys = k; |
1862 | return k; |
1863 | } |
1864 | |
1865 | /* Return all the arguments that are keys in the command passed via argc / argv. |
1866 | * This function will eventually replace getKeysFromCommand. |
1867 | * |
1868 | * The command returns the positions of all the key arguments inside the array, |
1869 | * so the actual return value is a heap allocated array of integers. The |
1870 | * length of the array is returned by reference into *numkeys. |
1871 | * |
1872 | * Along with the position, this command also returns the flags that are |
1873 | * associated with how Redis will access the key. |
1874 | * |
1875 | * 'cmd' must be point to the corresponding entry into the redisCommand |
1876 | * table, according to the command name in argv[0]. */ |
1877 | int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) { |
1878 | /* The command has at least one key-spec not marked as NOT_KEY */ |
1879 | int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); |
1880 | /* The command has at least one key-spec marked as VARIABLE_FLAGS */ |
1881 | int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS); |
1882 | |
1883 | /* Flags indicating that we have a getkeys callback */ |
1884 | int has_module_getkeys = cmd->flags & CMD_MODULE_GETKEYS; |
1885 | |
1886 | /* The key-spec that's auto generated by RM_CreateCommand sets VARIABLE_FLAGS since no flags are given. |
1887 | * If the module provides getkeys callback, we'll prefer it, but if it didn't, we'll use key-spec anyway. */ |
1888 | if ((cmd->flags & CMD_MODULE) && has_varflags && !has_module_getkeys) |
1889 | has_varflags = 0; |
1890 | |
1891 | /* We prefer key-specs if there are any, and their flags are reliable. */ |
1892 | if (has_keyspec && !has_varflags) { |
1893 | int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result); |
1894 | if (ret >= 0) |
1895 | return ret; |
1896 | /* If the specs returned with an error (probably an INVALID or INCOMPLETE spec), |
1897 | * fallback to the callback method. */ |
1898 | } |
1899 | |
1900 | /* Resort to getkeys callback methods. */ |
1901 | if (has_module_getkeys) |
1902 | return moduleGetCommandKeysViaAPI(cmd,argv,argc,result); |
1903 | |
1904 | /* We use native getkeys as a last resort, since not all these native getkeys provide |
1905 | * flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/ |
1906 | if (cmd->getkeys_proc) |
1907 | return cmd->getkeys_proc(cmd,argv,argc,result); |
1908 | return 0; |
1909 | } |
1910 | |
1911 | /* This function returns a sanity check if the command may have keys. */ |
1912 | int doesCommandHaveKeys(struct redisCommand *cmd) { |
1913 | return cmd->getkeys_proc || /* has getkeys_proc (non modules) */ |
1914 | (cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */ |
1915 | (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */ |
1916 | } |
1917 | |
1918 | /* A simplified channel spec table that contains all of the redis commands |
1919 | * and which channels they have and how they are accessed. */ |
1920 | typedef struct ChannelSpecs { |
1921 | redisCommandProc *proc; /* Command procedure to match against */ |
1922 | uint64_t flags; /* CMD_CHANNEL_* flags for this command */ |
1923 | int start; /* The initial position of the first channel */ |
1924 | int count; /* The number of channels, or -1 if all remaining |
1925 | * arguments are channels. */ |
1926 | } ChannelSpecs; |
1927 | |
1928 | ChannelSpecs commands_with_channels[] = { |
1929 | {subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1}, |
1930 | {ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1}, |
1931 | {unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, |
1932 | {sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, |
1933 | {psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1}, |
1934 | {punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, |
1935 | {publishCommand, CMD_CHANNEL_PUBLISH, 1, 1}, |
1936 | {spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1}, |
1937 | {NULL,0} /* Terminator. */ |
1938 | }; |
1939 | |
1940 | /* Returns 1 if the command may access any channels matched by the flags |
1941 | * argument. */ |
1942 | int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) { |
1943 | /* If a module declares get channels, we are just going to assume |
1944 | * has channels. This API is allowed to return false positives. */ |
1945 | if (cmd->flags & CMD_MODULE_GETCHANNELS) { |
1946 | return 1; |
1947 | } |
1948 | for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) { |
1949 | if (cmd->proc == spec->proc) { |
1950 | return !!(spec->flags & flags); |
1951 | } |
1952 | } |
1953 | return 0; |
1954 | } |
1955 | |
1956 | /* Return all the arguments that are channels in the command passed via argc / argv. |
1957 | * This function behaves similar to getKeysFromCommandWithSpecs, but with channels |
1958 | * instead of keys. |
1959 | * |
1960 | * The command returns the positions of all the channel arguments inside the array, |
1961 | * so the actual return value is a heap allocated array of integers. The |
1962 | * length of the array is returned by reference into *numkeys. |
1963 | * |
1964 | * Along with the position, this command also returns the flags that are |
1965 | * associated with how Redis will access the channel. |
1966 | * |
1967 | * 'cmd' must be point to the corresponding entry into the redisCommand |
1968 | * table, according to the command name in argv[0]. */ |
1969 | int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
1970 | keyReference *keys; |
1971 | /* If a module declares get channels, use that. */ |
1972 | if (cmd->flags & CMD_MODULE_GETCHANNELS) { |
1973 | return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result); |
1974 | } |
1975 | /* Otherwise check the channel spec table */ |
1976 | for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) { |
1977 | if (cmd->proc == spec->proc) { |
1978 | int start = spec->start; |
1979 | int stop = (spec->count == -1) ? argc : start + spec->count; |
1980 | if (stop > argc) stop = argc; |
1981 | int count = 0; |
1982 | keys = getKeysPrepareResult(result, stop - start); |
1983 | for (int i = start; i < stop; i++ ) { |
1984 | keys[count].pos = i; |
1985 | keys[count++].flags = spec->flags; |
1986 | } |
1987 | result->numkeys = count; |
1988 | return count; |
1989 | } |
1990 | } |
1991 | return 0; |
1992 | } |
1993 | |
1994 | /* The base case is to use the keys position as given in the command table |
1995 | * (firstkey, lastkey, step). |
1996 | * This function works only on command with the legacy_range_key_spec, |
1997 | * all other commands should be handled by getkeys_proc. |
1998 | * |
1999 | * If the commands keyspec is incomplete, no keys will be returned, and the provided |
2000 | * keys function should be called instead. |
2001 | * |
2002 | * NOTE: This function does not guarantee populating the flags for |
2003 | * the keys, in order to get flags you should use getKeysUsingKeySpecs. */ |
2004 | int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2005 | int j, i = 0, last, first, step; |
2006 | keyReference *keys; |
2007 | UNUSED(argv); |
2008 | |
2009 | if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) { |
2010 | result->numkeys = 0; |
2011 | return 0; |
2012 | } |
2013 | |
2014 | first = cmd->legacy_range_key_spec.bs.index.pos; |
2015 | last = cmd->legacy_range_key_spec.fk.range.lastkey; |
2016 | if (last >= 0) |
2017 | last += first; |
2018 | step = cmd->legacy_range_key_spec.fk.range.keystep; |
2019 | |
2020 | if (last < 0) last = argc+last; |
2021 | |
2022 | int count = ((last - first)+1); |
2023 | keys = getKeysPrepareResult(result, count); |
2024 | |
2025 | for (j = first; j <= last; j += step) { |
2026 | if (j >= argc || j < first) { |
2027 | /* Modules commands, and standard commands with a not fixed number |
2028 | * of arguments (negative arity parameter) do not have dispatch |
2029 | * time arity checks, so we need to handle the case where the user |
2030 | * passed an invalid number of arguments here. In this case we |
2031 | * return no keys and expect the command implementation to report |
2032 | * an arity or syntax error. */ |
2033 | if (cmd->flags & CMD_MODULE || cmd->arity < 0) { |
2034 | result->numkeys = 0; |
2035 | return 0; |
2036 | } else { |
2037 | serverPanic("Redis built-in command declared keys positions not matching the arity requirements." ); |
2038 | } |
2039 | } |
2040 | keys[i].pos = j; |
2041 | /* Flags are omitted from legacy key specs */ |
2042 | keys[i++].flags = 0; |
2043 | } |
2044 | result->numkeys = i; |
2045 | return i; |
2046 | } |
2047 | |
2048 | /* Return all the arguments that are keys in the command passed via argc / argv. |
2049 | * |
2050 | * The command returns the positions of all the key arguments inside the array, |
2051 | * so the actual return value is a heap allocated array of integers. The |
2052 | * length of the array is returned by reference into *numkeys. |
2053 | * |
2054 | * 'cmd' must be point to the corresponding entry into the redisCommand |
2055 | * table, according to the command name in argv[0]. |
2056 | * |
2057 | * This function uses the command table if a command-specific helper function |
2058 | * is not required, otherwise it calls the command-specific function. */ |
2059 | int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2060 | if (cmd->flags & CMD_MODULE_GETKEYS) { |
2061 | return moduleGetCommandKeysViaAPI(cmd,argv,argc,result); |
2062 | } else if (cmd->getkeys_proc) { |
2063 | return cmd->getkeys_proc(cmd,argv,argc,result); |
2064 | } else { |
2065 | return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result); |
2066 | } |
2067 | } |
2068 | |
2069 | /* Free the result of getKeysFromCommand. */ |
2070 | void getKeysFreeResult(getKeysResult *result) { |
2071 | if (result && result->keys != result->keysbuf) |
2072 | zfree(result->keys); |
2073 | } |
2074 | |
2075 | /* Helper function to extract keys from following commands: |
2076 | * COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options> |
2077 | * |
2078 | * eg: |
2079 | * ZUNION <num-keys> <key> <key> ... <key> <options> |
2080 | * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options> |
2081 | * |
2082 | * 'storeKeyOfs': destkey index, 0 means destkey not exists. |
2083 | * 'keyCountOfs': num-keys index. |
2084 | * 'firstKeyOfs': firstkey index. |
2085 | * 'keyStep': the interval of each key, usually this value is 1. |
2086 | * |
2087 | * The commands using this functoin have a fully defined keyspec, so returning flags isn't needed. */ |
2088 | int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep, |
2089 | robj **argv, int argc, getKeysResult *result) { |
2090 | int i, num; |
2091 | keyReference *keys; |
2092 | |
2093 | num = atoi(argv[keyCountOfs]->ptr); |
2094 | /* Sanity check. Don't return any key if the command is going to |
2095 | * reply with syntax error. (no input keys). */ |
2096 | if (num < 1 || num > (argc - firstKeyOfs)/keyStep) { |
2097 | result->numkeys = 0; |
2098 | return 0; |
2099 | } |
2100 | |
2101 | int numkeys = storeKeyOfs ? num + 1 : num; |
2102 | keys = getKeysPrepareResult(result, numkeys); |
2103 | result->numkeys = numkeys; |
2104 | |
2105 | /* Add all key positions for argv[firstKeyOfs...n] to keys[] */ |
2106 | for (i = 0; i < num; i++) { |
2107 | keys[i].pos = firstKeyOfs+(i*keyStep); |
2108 | keys[i].flags = 0; |
2109 | } |
2110 | |
2111 | if (storeKeyOfs) { |
2112 | keys[num].pos = storeKeyOfs; |
2113 | keys[num].flags = 0; |
2114 | } |
2115 | return result->numkeys; |
2116 | } |
2117 | |
2118 | int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2119 | UNUSED(cmd); |
2120 | return genericGetKeys(0, 1, 2, 1, argv, argc, result); |
2121 | } |
2122 | |
2123 | int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2124 | UNUSED(cmd); |
2125 | return genericGetKeys(1, 2, 3, 1, argv, argc, result); |
2126 | } |
2127 | |
2128 | int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2129 | UNUSED(cmd); |
2130 | return genericGetKeys(0, 1, 2, 1, argv, argc, result); |
2131 | } |
2132 | |
2133 | int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2134 | UNUSED(cmd); |
2135 | return genericGetKeys(0, 2, 3, 1, argv, argc, result); |
2136 | } |
2137 | |
2138 | int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2139 | UNUSED(cmd); |
2140 | return genericGetKeys(0, 2, 3, 1, argv, argc, result); |
2141 | } |
2142 | |
2143 | int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2144 | UNUSED(cmd); |
2145 | return genericGetKeys(0, 1, 2, 1, argv, argc, result); |
2146 | } |
2147 | |
2148 | int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2149 | UNUSED(cmd); |
2150 | return genericGetKeys(0, 2, 3, 1, argv, argc, result); |
2151 | } |
2152 | |
2153 | int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2154 | UNUSED(cmd); |
2155 | return genericGetKeys(0, 1, 2, 1, argv, argc, result); |
2156 | } |
2157 | |
2158 | int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2159 | UNUSED(cmd); |
2160 | return genericGetKeys(0, 2, 3, 1, argv, argc, result); |
2161 | } |
2162 | |
2163 | /* Helper function to extract keys from the SORT RO command. |
2164 | * |
2165 | * SORT <sort-key> |
2166 | * |
2167 | * The second argument of SORT is always a key, however an arbitrary number of |
2168 | * keys may be accessed while doing the sort (the BY and GET args), so the |
2169 | * key-spec declares incomplete keys which is why we have to provide a concrete |
2170 | * implementation to fetch the keys. |
2171 | * |
2172 | * This command declares incomplete keys, so the flags are correctly set for this function */ |
2173 | int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2174 | keyReference *keys; |
2175 | UNUSED(cmd); |
2176 | UNUSED(argv); |
2177 | UNUSED(argc); |
2178 | |
2179 | keys = getKeysPrepareResult(result, 1); |
2180 | keys[0].pos = 1; /* <sort-key> is always present. */ |
2181 | keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS; |
2182 | return 1; |
2183 | } |
2184 | |
2185 | /* Helper function to extract keys from the SORT command. |
2186 | * |
2187 | * SORT <sort-key> ... STORE <store-key> ... |
2188 | * |
2189 | * The first argument of SORT is always a key, however a list of options |
2190 | * follow in SQL-alike style. Here we parse just the minimum in order to |
2191 | * correctly identify keys in the "STORE" option. |
2192 | * |
2193 | * This command declares incomplete keys, so the flags are correctly set for this function */ |
2194 | int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2195 | int i, j, num, found_store = 0; |
2196 | keyReference *keys; |
2197 | UNUSED(cmd); |
2198 | |
2199 | num = 0; |
2200 | keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */ |
2201 | keys[num].pos = 1; /* <sort-key> is always present. */ |
2202 | keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS; |
2203 | |
2204 | /* Search for STORE option. By default we consider options to don't |
2205 | * have arguments, so if we find an unknown option name we scan the |
2206 | * next. However there are options with 1 or 2 arguments, so we |
2207 | * provide a list here in order to skip the right number of args. */ |
2208 | struct { |
2209 | char *name; |
2210 | int skip; |
2211 | } skiplist[] = { |
2212 | {"limit" , 2}, |
2213 | {"get" , 1}, |
2214 | {"by" , 1}, |
2215 | {NULL, 0} /* End of elements. */ |
2216 | }; |
2217 | |
2218 | for (i = 2; i < argc; i++) { |
2219 | for (j = 0; skiplist[j].name != NULL; j++) { |
2220 | if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) { |
2221 | i += skiplist[j].skip; |
2222 | break; |
2223 | } else if (!strcasecmp(argv[i]->ptr,"store" ) && i+1 < argc) { |
2224 | /* Note: we don't increment "num" here and continue the loop |
2225 | * to be sure to process the *last* "STORE" option if multiple |
2226 | * ones are provided. This is same behavior as SORT. */ |
2227 | found_store = 1; |
2228 | keys[num].pos = i+1; /* <store-key> */ |
2229 | keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE; |
2230 | break; |
2231 | } |
2232 | } |
2233 | } |
2234 | result->numkeys = num + found_store; |
2235 | return result->numkeys; |
2236 | } |
2237 | |
2238 | /* This command declares incomplete keys, so the flags are correctly set for this function */ |
2239 | int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2240 | int i, num, first; |
2241 | keyReference *keys; |
2242 | UNUSED(cmd); |
2243 | |
2244 | /* Assume the obvious form. */ |
2245 | first = 3; |
2246 | num = 1; |
2247 | |
2248 | /* But check for the extended one with the KEYS option. */ |
2249 | if (argc > 6) { |
2250 | for (i = 6; i < argc; i++) { |
2251 | if (!strcasecmp(argv[i]->ptr,"keys" ) && |
2252 | sdslen(argv[3]->ptr) == 0) |
2253 | { |
2254 | first = i+1; |
2255 | num = argc-first; |
2256 | break; |
2257 | } |
2258 | } |
2259 | } |
2260 | |
2261 | keys = getKeysPrepareResult(result, num); |
2262 | for (i = 0; i < num; i++) { |
2263 | keys[i].pos = first+i; |
2264 | keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE; |
2265 | } |
2266 | result->numkeys = num; |
2267 | return num; |
2268 | } |
2269 | |
2270 | /* Helper function to extract keys from following commands: |
2271 | * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC] |
2272 | * [COUNT count] [STORE key] [STOREDIST key] |
2273 | * GEORADIUSBYMEMBER key member radius unit ... options ... |
2274 | * |
2275 | * This command has a fully defined keyspec, so returning flags isn't needed. */ |
2276 | int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2277 | int i, num; |
2278 | keyReference *keys; |
2279 | UNUSED(cmd); |
2280 | |
2281 | /* Check for the presence of the stored key in the command */ |
2282 | int stored_key = -1; |
2283 | for (i = 5; i < argc; i++) { |
2284 | char *arg = argv[i]->ptr; |
2285 | /* For the case when user specifies both "store" and "storedist" options, the |
2286 | * second key specified would override the first key. This behavior is kept |
2287 | * the same as in georadiusCommand method. |
2288 | */ |
2289 | if ((!strcasecmp(arg, "store" ) || !strcasecmp(arg, "storedist" )) && ((i+1) < argc)) { |
2290 | stored_key = i+1; |
2291 | i++; |
2292 | } |
2293 | } |
2294 | num = 1 + (stored_key == -1 ? 0 : 1); |
2295 | |
2296 | /* Keys in the command come from two places: |
2297 | * argv[1] = key, |
2298 | * argv[5...n] = stored key if present |
2299 | */ |
2300 | keys = getKeysPrepareResult(result, num); |
2301 | |
2302 | /* Add all key positions to keys[] */ |
2303 | keys[0].pos = 1; |
2304 | keys[0].flags = 0; |
2305 | if(num > 1) { |
2306 | keys[1].pos = stored_key; |
2307 | keys[1].flags = 0; |
2308 | } |
2309 | result->numkeys = num; |
2310 | return num; |
2311 | } |
2312 | |
2313 | /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] |
2314 | * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N |
2315 | * |
2316 | * This command has a fully defined keyspec, so returning flags isn't needed. */ |
2317 | int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2318 | int i, num = 0; |
2319 | keyReference *keys; |
2320 | UNUSED(cmd); |
2321 | |
2322 | /* We need to parse the options of the command in order to seek the first |
2323 | * "STREAMS" string which is actually the option. This is needed because |
2324 | * "STREAMS" could also be the name of the consumer group and even the |
2325 | * name of the stream key. */ |
2326 | int streams_pos = -1; |
2327 | for (i = 1; i < argc; i++) { |
2328 | char *arg = argv[i]->ptr; |
2329 | if (!strcasecmp(arg, "block" )) { |
2330 | i++; /* Skip option argument. */ |
2331 | } else if (!strcasecmp(arg, "count" )) { |
2332 | i++; /* Skip option argument. */ |
2333 | } else if (!strcasecmp(arg, "group" )) { |
2334 | i += 2; /* Skip option argument. */ |
2335 | } else if (!strcasecmp(arg, "noack" )) { |
2336 | /* Nothing to do. */ |
2337 | } else if (!strcasecmp(arg, "streams" )) { |
2338 | streams_pos = i; |
2339 | break; |
2340 | } else { |
2341 | break; /* Syntax error. */ |
2342 | } |
2343 | } |
2344 | if (streams_pos != -1) num = argc - streams_pos - 1; |
2345 | |
2346 | /* Syntax error. */ |
2347 | if (streams_pos == -1 || num == 0 || num % 2 != 0) { |
2348 | result->numkeys = 0; |
2349 | return 0; |
2350 | } |
2351 | num /= 2; /* We have half the keys as there are arguments because |
2352 | there are also the IDs, one per key. */ |
2353 | |
2354 | keys = getKeysPrepareResult(result, num); |
2355 | for (i = streams_pos+1; i < argc-num; i++) { |
2356 | keys[i-streams_pos-1].pos = i; |
2357 | keys[i-streams_pos-1].flags = 0; |
2358 | } |
2359 | result->numkeys = num; |
2360 | return num; |
2361 | } |
2362 | |
2363 | /* Helper function to extract keys from the SET command, which may have |
2364 | * a read flag if the GET argument is passed in. */ |
2365 | int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2366 | keyReference *keys; |
2367 | UNUSED(cmd); |
2368 | |
2369 | keys = getKeysPrepareResult(result, 1); |
2370 | keys[0].pos = 1; /* We always know the position */ |
2371 | result->numkeys = 1; |
2372 | |
2373 | for (int i = 3; i < argc; i++) { |
2374 | char *arg = argv[i]->ptr; |
2375 | if ((arg[0] == 'g' || arg[0] == 'G') && |
2376 | (arg[1] == 'e' || arg[1] == 'E') && |
2377 | (arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0') |
2378 | { |
2379 | keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE; |
2380 | return 1; |
2381 | } |
2382 | } |
2383 | |
2384 | keys[0].flags = CMD_KEY_OW | CMD_KEY_UPDATE; |
2385 | return 1; |
2386 | } |
2387 | |
2388 | /* Helper function to extract keys from the BITFIELD command, which may be |
2389 | * read-only if the BITFIELD GET subcommand is used. */ |
2390 | int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { |
2391 | keyReference *keys; |
2392 | UNUSED(cmd); |
2393 | |
2394 | keys = getKeysPrepareResult(result, 1); |
2395 | keys[0].pos = 1; /* We always know the position */ |
2396 | result->numkeys = 1; |
2397 | |
2398 | for (int i = 2; i < argc; i++) { |
2399 | int remargs = argc - i - 1; /* Remaining args other than current. */ |
2400 | char *arg = argv[i]->ptr; |
2401 | if (!strcasecmp(arg, "get" ) && remargs >= 2) { |
2402 | keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS; |
2403 | return 1; |
2404 | } |
2405 | } |
2406 | |
2407 | keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE; |
2408 | return 1; |
2409 | } |
2410 | |