1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "cluster.h"
32#include "atomicvar.h"
33#include "latency.h"
34#include "script.h"
35#include "functions.h"
36
37#include <signal.h>
38#include <ctype.h>
39
40/*-----------------------------------------------------------------------------
41 * C-level DB API
42 *----------------------------------------------------------------------------*/
43
44int expireIfNeeded(redisDb *db, robj *key, int force_delete_expired);
45int keyIsExpired(redisDb *db, robj *key);
46
47/* Update LFU when an object is accessed.
48 * Firstly, decrement the counter if the decrement time is reached.
49 * Then logarithmically increment the counter, and update the access time. */
50void updateLFU(robj *val) {
51 unsigned long counter = LFUDecrAndReturn(val);
52 counter = LFULogIncr(counter);
53 val->lru = (LFUGetTimeInMinutes()<<8) | counter;
54}
55
56/* Lookup a key for read or write operations, or return NULL if the key is not
57 * found in the specified DB. This function implements the functionality of
58 * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
59 *
60 * Side-effects of calling this function:
61 *
62 * 1. A key gets expired if it reached it's TTL.
63 * 2. The key's last access time is updated.
64 * 3. The global keys hits/misses stats are updated (reported in INFO).
65 * 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
66 *
67 * Flags change the behavior of this command:
68 *
69 * LOOKUP_NONE (or zero): No special flags are passed.
70 * LOOKUP_NOTOUCH: Don't alter the last access time of the key.
71 * LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
72 * LOOKUP_NOSTATS: Don't increment key hits/misses counters.
73 * LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
74 * replicas, use separate keyspace stats and events (TODO)).
75 *
76 * Note: this function also returns NULL if the key is logically expired but
77 * still existing, in case this is a replica and the LOOKUP_WRITE is not set.
78 * Even if the key expiry is master-driven, we can correctly report a key is
79 * expired on replicas even if the master is lagging expiring our key via DELs
80 * in the replication link. */
81robj *lookupKey(redisDb *db, robj *key, int flags) {
82 dictEntry *de = dictFind(db->dict,key->ptr);
83 robj *val = NULL;
84 if (de) {
85 val = dictGetVal(de);
86 /* Forcing deletion of expired keys on a replica makes the replica
87 * inconsistent with the master. We forbid it on readonly replicas, but
88 * we have to allow it on writable replicas to make write commands
89 * behave consistently.
90 *
91 * It's possible that the WRITE flag is set even during a readonly
92 * command, since the command may trigger events that cause modules to
93 * perform additional writes. */
94 int is_ro_replica = server.masterhost && server.repl_slave_ro;
95 int force_delete_expired = flags & LOOKUP_WRITE && !is_ro_replica;
96 if (expireIfNeeded(db, key, force_delete_expired)) {
97 /* The key is no longer valid. */
98 val = NULL;
99 }
100 }
101
102 if (val) {
103 /* Update the access time for the ageing algorithm.
104 * Don't do it if we have a saving child, as this will trigger
105 * a copy on write madness. */
106 if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
107 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
108 updateLFU(val);
109 } else {
110 val->lru = LRU_CLOCK();
111 }
112 }
113
114 if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
115 server.stat_keyspace_hits++;
116 /* TODO: Use separate hits stats for WRITE */
117 } else {
118 if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
119 notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
120 if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
121 server.stat_keyspace_misses++;
122 /* TODO: Use separate misses stats and notify event for WRITE */
123 }
124
125 return val;
126}
127
128/* Lookup a key for read operations, or return NULL if the key is not found
129 * in the specified DB.
130 *
131 * This API should not be used when we write to the key after obtaining
132 * the object linked to the key, but only for read only operations.
133 *
134 * This function is equivalent to lookupKey(). The point of using this function
135 * rather than lookupKey() directly is to indicate that the purpose is to read
136 * the key. */
137robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
138 serverAssert(!(flags & LOOKUP_WRITE));
139 return lookupKey(db, key, flags);
140}
141
142/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
143 * common case. */
144robj *lookupKeyRead(redisDb *db, robj *key) {
145 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
146}
147
148/* Lookup a key for write operations, and as a side effect, if needed, expires
149 * the key if its TTL is reached. It's equivalent to lookupKey() with the
150 * LOOKUP_WRITE flag added.
151 *
152 * Returns the linked value object if the key exists or NULL if the key
153 * does not exist in the specified DB. */
154robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
155 return lookupKey(db, key, flags | LOOKUP_WRITE);
156}
157
158robj *lookupKeyWrite(redisDb *db, robj *key) {
159 return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
160}
161
162robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
163 robj *o = lookupKeyRead(c->db, key);
164 if (!o) addReplyOrErrorObject(c, reply);
165 return o;
166}
167
168robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
169 robj *o = lookupKeyWrite(c->db, key);
170 if (!o) addReplyOrErrorObject(c, reply);
171 return o;
172}
173
174/* Add the key to the DB. It's up to the caller to increment the reference
175 * counter of the value if needed.
176 *
177 * The program is aborted if the key already exists. */
178void dbAdd(redisDb *db, robj *key, robj *val) {
179 sds copy = sdsdup(key->ptr);
180 dictEntry *de = dictAddRaw(db->dict, copy, NULL);
181 serverAssertWithInfo(NULL, key, de != NULL);
182 dictSetVal(db->dict, de, val);
183 signalKeyAsReady(db, key, val->type);
184 if (server.cluster_enabled) slotToKeyAddEntry(de, db);
185 notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
186}
187
188/* This is a special version of dbAdd() that is used only when loading
189 * keys from the RDB file: the key is passed as an SDS string that is
190 * retained by the function (and not freed by the caller).
191 *
192 * Moreover this function will not abort if the key is already busy, to
193 * give more control to the caller, nor will signal the key as ready
194 * since it is not useful in this context.
195 *
196 * The function returns 1 if the key was added to the database, taking
197 * ownership of the SDS string, otherwise 0 is returned, and is up to the
198 * caller to free the SDS string. */
199int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
200 dictEntry *de = dictAddRaw(db->dict, key, NULL);
201 if (de == NULL) return 0;
202 dictSetVal(db->dict, de, val);
203 if (server.cluster_enabled) slotToKeyAddEntry(de, db);
204 return 1;
205}
206
207/* Overwrite an existing key with a new value. Incrementing the reference
208 * count of the new value is up to the caller.
209 * This function does not modify the expire time of the existing key.
210 *
211 * The program is aborted if the key was not already present. */
212void dbOverwrite(redisDb *db, robj *key, robj *val) {
213 dictEntry *de = dictFind(db->dict,key->ptr);
214
215 serverAssertWithInfo(NULL,key,de != NULL);
216 dictEntry auxentry = *de;
217 robj *old = dictGetVal(de);
218 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
219 val->lru = old->lru;
220 }
221 /* Although the key is not really deleted from the database, we regard
222 * overwrite as two steps of unlink+add, so we still need to call the unlink
223 * callback of the module. */
224 moduleNotifyKeyUnlink(key,old,db->id);
225 /* We want to try to unblock any client using a blocking XREADGROUP */
226 if (old->type == OBJ_STREAM)
227 signalKeyAsReady(db,key,old->type);
228 dictSetVal(db->dict, de, val);
229
230 if (server.lazyfree_lazy_server_del) {
231 freeObjAsync(key,old,db->id);
232 dictSetVal(db->dict, &auxentry, NULL);
233 }
234
235 dictFreeVal(db->dict, &auxentry);
236}
237
238/* High level Set operation. This function can be used in order to set
239 * a key, whatever it was existing or not, to a new object.
240 *
241 * 1) The ref count of the value object is incremented.
242 * 2) clients WATCHing for the destination key notified.
243 * 3) The expire time of the key is reset (the key is made persistent),
244 * unless 'SETKEY_KEEPTTL' is enabled in flags.
245 * 4) The key lookup can take place outside this interface outcome will be
246 * delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
247 *
248 * All the new keys in the database should be created via this interface.
249 * The client 'c' argument may be set to NULL if the operation is performed
250 * in a context where there is no clear client performing the operation. */
251void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
252 int keyfound = 0;
253
254 if (flags & SETKEY_ALREADY_EXIST)
255 keyfound = 1;
256 else if (!(flags & SETKEY_DOESNT_EXIST))
257 keyfound = (lookupKeyWrite(db,key) != NULL);
258
259 if (!keyfound) {
260 dbAdd(db,key,val);
261 } else {
262 dbOverwrite(db,key,val);
263 }
264 incrRefCount(val);
265 if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
266 if (!(flags & SETKEY_NO_SIGNAL)) signalModifiedKey(c,db,key);
267}
268
269/* Return a random key, in form of a Redis object.
270 * If there are no keys, NULL is returned.
271 *
272 * The function makes sure to return keys not already expired. */
273robj *dbRandomKey(redisDb *db) {
274 dictEntry *de;
275 int maxtries = 100;
276 int allvolatile = dictSize(db->dict) == dictSize(db->expires);
277
278 while(1) {
279 sds key;
280 robj *keyobj;
281
282 de = dictGetFairRandomKey(db->dict);
283 if (de == NULL) return NULL;
284
285 key = dictGetKey(de);
286 keyobj = createStringObject(key,sdslen(key));
287 if (dictFind(db->expires,key)) {
288 if (allvolatile && server.masterhost && --maxtries == 0) {
289 /* If the DB is composed only of keys with an expire set,
290 * it could happen that all the keys are already logically
291 * expired in the slave, so the function cannot stop because
292 * expireIfNeeded() is false, nor it can stop because
293 * dictGetFairRandomKey() returns NULL (there are keys to return).
294 * To prevent the infinite loop we do some tries, but if there
295 * are the conditions for an infinite loop, eventually we
296 * return a key name that may be already expired. */
297 return keyobj;
298 }
299 if (expireIfNeeded(db,keyobj,0)) {
300 decrRefCount(keyobj);
301 continue; /* search for another key. This expired. */
302 }
303 }
304 return keyobj;
305 }
306}
307
308/* Helper for sync and async delete. */
309static int dbGenericDelete(redisDb *db, robj *key, int async) {
310 /* Deleting an entry from the expires dict will not free the sds of
311 * the key, because it is shared with the main dictionary. */
312 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
313 dictEntry *de = dictUnlink(db->dict,key->ptr);
314 if (de) {
315 robj *val = dictGetVal(de);
316 /* Tells the module that the key has been unlinked from the database. */
317 moduleNotifyKeyUnlink(key,val,db->id);
318 /* We want to try to unblock any client using a blocking XREADGROUP */
319 if (val->type == OBJ_STREAM)
320 signalKeyAsReady(db,key,val->type);
321 if (async) {
322 freeObjAsync(key, val, db->id);
323 dictSetVal(db->dict, de, NULL);
324 }
325 if (server.cluster_enabled) slotToKeyDelEntry(de, db);
326 dictFreeUnlinkedEntry(db->dict,de);
327 return 1;
328 } else {
329 return 0;
330 }
331}
332
333/* Delete a key, value, and associated expiration entry if any, from the DB */
334int dbSyncDelete(redisDb *db, robj *key) {
335 return dbGenericDelete(db, key, 0);
336}
337
338/* Delete a key, value, and associated expiration entry if any, from the DB. If
339 * the value consists of many allocations, it may be freed asynchronously. */
340int dbAsyncDelete(redisDb *db, robj *key) {
341 return dbGenericDelete(db, key, 1);
342}
343
344/* This is a wrapper whose behavior depends on the Redis lazy free
345 * configuration. Deletes the key synchronously or asynchronously. */
346int dbDelete(redisDb *db, robj *key) {
347 return dbGenericDelete(db, key, server.lazyfree_lazy_server_del);
348}
349
350/* Prepare the string object stored at 'key' to be modified destructively
351 * to implement commands like SETBIT or APPEND.
352 *
353 * An object is usually ready to be modified unless one of the two conditions
354 * are true:
355 *
356 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
357 * other users.
358 * 2) The object encoding is not "RAW".
359 *
360 * If the object is found in one of the above conditions (or both) by the
361 * function, an unshared / not-encoded copy of the string object is stored
362 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
363 * returned.
364 *
365 * USAGE:
366 *
367 * The object 'o' is what the caller already obtained by looking up 'key'
368 * in 'db', the usage pattern looks like this:
369 *
370 * o = lookupKeyWrite(db,key);
371 * if (checkType(c,o,OBJ_STRING)) return;
372 * o = dbUnshareStringValue(db,key,o);
373 *
374 * At this point the caller is ready to modify the object, for example
375 * using an sdscat() call to append some data, or anything else.
376 */
377robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
378 serverAssert(o->type == OBJ_STRING);
379 if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
380 robj *decoded = getDecodedObject(o);
381 o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
382 decrRefCount(decoded);
383 dbOverwrite(db,key,o);
384 }
385 return o;
386}
387
388/* Remove all keys from the database(s) structure. The dbarray argument
389 * may not be the server main DBs (could be a temporary DB).
390 *
391 * The dbnum can be -1 if all the DBs should be emptied, or the specified
392 * DB index if we want to empty only a single database.
393 * The function returns the number of keys removed from the database(s). */
394long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
395 void(callback)(dict*))
396{
397 long long removed = 0;
398 int startdb, enddb;
399
400 if (dbnum == -1) {
401 startdb = 0;
402 enddb = server.dbnum-1;
403 } else {
404 startdb = enddb = dbnum;
405 }
406
407 for (int j = startdb; j <= enddb; j++) {
408 removed += dictSize(dbarray[j].dict);
409 if (async) {
410 emptyDbAsync(&dbarray[j]);
411 } else {
412 dictEmpty(dbarray[j].dict,callback);
413 dictEmpty(dbarray[j].expires,callback);
414 }
415 /* Because all keys of database are removed, reset average ttl. */
416 dbarray[j].avg_ttl = 0;
417 dbarray[j].expires_cursor = 0;
418 }
419
420 return removed;
421}
422
423/* Remove all data (keys and functions) from all the databases in a
424 * Redis server. If callback is given the function is called from
425 * time to time to signal that work is in progress.
426 *
427 * The dbnum can be -1 if all the DBs should be flushed, or the specified
428 * DB number if we want to flush only a single Redis database number.
429 *
430 * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
431 * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
432 * and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set
433 * to specify that we do not want to delete the functions.
434 *
435 * On success the function returns the number of keys removed from the
436 * database(s). Otherwise -1 is returned in the specific case the
437 * DB number is out of range, and errno is set to EINVAL. */
438long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
439 int async = (flags & EMPTYDB_ASYNC);
440 int with_functions = !(flags & EMPTYDB_NOFUNCTIONS);
441 RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
442 long long removed = 0;
443
444 if (dbnum < -1 || dbnum >= server.dbnum) {
445 errno = EINVAL;
446 return -1;
447 }
448
449 /* Fire the flushdb modules event. */
450 moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
451 REDISMODULE_SUBEVENT_FLUSHDB_START,
452 &fi);
453
454 /* Make sure the WATCHed keys are affected by the FLUSH* commands.
455 * Note that we need to call the function while the keys are still
456 * there. */
457 signalFlushedDb(dbnum, async);
458
459 /* Empty redis database structure. */
460 removed = emptyDbStructure(server.db, dbnum, async, callback);
461
462 /* Flush slots to keys map if enable cluster, we can flush entire
463 * slots to keys map whatever dbnum because only support one DB
464 * in cluster mode. */
465 if (server.cluster_enabled) slotToKeyFlush(server.db);
466
467 if (dbnum == -1) flushSlaveKeysWithExpireList();
468
469 if (with_functions) {
470 serverAssert(dbnum == -1);
471 functionsLibCtxClearCurrent(async);
472 }
473
474 /* Also fire the end event. Note that this event will fire almost
475 * immediately after the start event if the flush is asynchronous. */
476 moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
477 REDISMODULE_SUBEVENT_FLUSHDB_END,
478 &fi);
479
480 return removed;
481}
482
483/* Initialize temporary db on replica for use during diskless replication. */
484redisDb *initTempDb(void) {
485 redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
486 for (int i=0; i<server.dbnum; i++) {
487 tempDb[i].dict = dictCreate(&dbDictType);
488 tempDb[i].expires = dictCreate(&dbExpiresDictType);
489 tempDb[i].slots_to_keys = NULL;
490 }
491
492 if (server.cluster_enabled) {
493 /* Prepare temp slot to key map to be written during async diskless replication. */
494 slotToKeyInit(tempDb);
495 }
496
497 return tempDb;
498}
499
500/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
501void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
502 int async = 1;
503
504 /* Release temp DBs. */
505 emptyDbStructure(tempDb, -1, async, callback);
506 for (int i=0; i<server.dbnum; i++) {
507 dictRelease(tempDb[i].dict);
508 dictRelease(tempDb[i].expires);
509 }
510
511 if (server.cluster_enabled) {
512 /* Release temp slot to key map. */
513 slotToKeyDestroy(tempDb);
514 }
515
516 zfree(tempDb);
517}
518
519int selectDb(client *c, int id) {
520 if (id < 0 || id >= server.dbnum)
521 return C_ERR;
522 c->db = &server.db[id];
523 return C_OK;
524}
525
526long long dbTotalServerKeyCount() {
527 long long total = 0;
528 int j;
529 for (j = 0; j < server.dbnum; j++) {
530 total += dictSize(server.db[j].dict);
531 }
532 return total;
533}
534
535/*-----------------------------------------------------------------------------
536 * Hooks for key space changes.
537 *
538 * Every time a key in the database is modified the function
539 * signalModifiedKey() is called.
540 *
541 * Every time a DB is flushed the function signalFlushDb() is called.
542 *----------------------------------------------------------------------------*/
543
544/* Note that the 'c' argument may be NULL if the key was modified out of
545 * a context of a client. */
546void signalModifiedKey(client *c, redisDb *db, robj *key) {
547 touchWatchedKey(db,key);
548 trackingInvalidateKey(c,key,1);
549}
550
551void signalFlushedDb(int dbid, int async) {
552 int startdb, enddb;
553 if (dbid == -1) {
554 startdb = 0;
555 enddb = server.dbnum-1;
556 } else {
557 startdb = enddb = dbid;
558 }
559
560 for (int j = startdb; j <= enddb; j++) {
561 scanDatabaseForDeletedStreams(&server.db[j], NULL);
562 touchAllWatchedKeysInDb(&server.db[j], NULL);
563 }
564
565 trackingInvalidateKeysOnFlush(async);
566
567 /* Changes in this method may take place in swapMainDbWithTempDb as well,
568 * where we execute similar calls, but with subtle differences as it's
569 * not simply flushing db. */
570}
571
572/*-----------------------------------------------------------------------------
573 * Type agnostic commands operating on the key space
574 *----------------------------------------------------------------------------*/
575
576/* Return the set of flags to use for the emptyDb() call for FLUSHALL
577 * and FLUSHDB commands.
578 *
579 * sync: flushes the database in an sync manner.
580 * async: flushes the database in an async manner.
581 * no option: determine sync or async according to the value of lazyfree-lazy-user-flush.
582 *
583 * On success C_OK is returned and the flags are stored in *flags, otherwise
584 * C_ERR is returned and the function sends an error to the client. */
585int getFlushCommandFlags(client *c, int *flags) {
586 /* Parse the optional ASYNC option. */
587 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) {
588 *flags = EMPTYDB_NO_FLAGS;
589 } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) {
590 *flags = EMPTYDB_ASYNC;
591 } else if (c->argc == 1) {
592 *flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
593 } else {
594 addReplyErrorObject(c,shared.syntaxerr);
595 return C_ERR;
596 }
597 return C_OK;
598}
599
600/* Flushes the whole server data set. */
601void flushAllDataAndResetRDB(int flags) {
602 server.dirty += emptyData(-1,flags,NULL);
603 if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
604 if (server.saveparamslen > 0) {
605 rdbSaveInfo rsi, *rsiptr;
606 rsiptr = rdbPopulateSaveInfo(&rsi);
607 rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
608 }
609
610#if defined(USE_JEMALLOC)
611 /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
612 * for large databases, flushdb blocks for long anyway, so a bit more won't
613 * harm and this way the flush and purge will be synchronous. */
614 if (!(flags & EMPTYDB_ASYNC))
615 jemalloc_purge();
616#endif
617}
618
619/* FLUSHDB [ASYNC]
620 *
621 * Flushes the currently SELECTed Redis DB. */
622void flushdbCommand(client *c) {
623 int flags;
624
625 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
626 /* flushdb should not flush the functions */
627 server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
628
629 /* Without the forceCommandPropagation, when DB was already empty,
630 * FLUSHDB will not be replicated nor put into the AOF. */
631 forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
632
633 addReply(c,shared.ok);
634
635#if defined(USE_JEMALLOC)
636 /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
637 * for large databases, flushdb blocks for long anyway, so a bit more won't
638 * harm and this way the flush and purge will be synchronous. */
639 if (!(flags & EMPTYDB_ASYNC))
640 jemalloc_purge();
641#endif
642}
643
644/* FLUSHALL [ASYNC]
645 *
646 * Flushes the whole server data set. */
647void flushallCommand(client *c) {
648 int flags;
649 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
650 /* flushall should not flush the functions */
651 flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
652
653 /* Without the forceCommandPropagation, when DBs were already empty,
654 * FLUSHALL will not be replicated nor put into the AOF. */
655 forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
656
657 addReply(c,shared.ok);
658}
659
660/* This command implements DEL and LAZYDEL. */
661void delGenericCommand(client *c, int lazy) {
662 int numdel = 0, j;
663
664 for (j = 1; j < c->argc; j++) {
665 expireIfNeeded(c->db,c->argv[j],0);
666 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
667 dbSyncDelete(c->db,c->argv[j]);
668 if (deleted) {
669 signalModifiedKey(c,c->db,c->argv[j]);
670 notifyKeyspaceEvent(NOTIFY_GENERIC,
671 "del",c->argv[j],c->db->id);
672 server.dirty++;
673 numdel++;
674 }
675 }
676 addReplyLongLong(c,numdel);
677}
678
679void delCommand(client *c) {
680 delGenericCommand(c,server.lazyfree_lazy_user_del);
681}
682
683void unlinkCommand(client *c) {
684 delGenericCommand(c,1);
685}
686
687/* EXISTS key1 key2 ... key_N.
688 * Return value is the number of keys existing. */
689void existsCommand(client *c) {
690 long long count = 0;
691 int j;
692
693 for (j = 1; j < c->argc; j++) {
694 if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
695 }
696 addReplyLongLong(c,count);
697}
698
699void selectCommand(client *c) {
700 int id;
701
702 if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
703 return;
704
705 if (server.cluster_enabled && id != 0) {
706 addReplyError(c,"SELECT is not allowed in cluster mode");
707 return;
708 }
709 if (selectDb(c,id) == C_ERR) {
710 addReplyError(c,"DB index is out of range");
711 } else {
712 addReply(c,shared.ok);
713 }
714}
715
716void randomkeyCommand(client *c) {
717 robj *key;
718
719 if ((key = dbRandomKey(c->db)) == NULL) {
720 addReplyNull(c);
721 return;
722 }
723
724 addReplyBulk(c,key);
725 decrRefCount(key);
726}
727
728void keysCommand(client *c) {
729 dictIterator *di;
730 dictEntry *de;
731 sds pattern = c->argv[1]->ptr;
732 int plen = sdslen(pattern), allkeys;
733 unsigned long numkeys = 0;
734 void *replylen = addReplyDeferredLen(c);
735
736 di = dictGetSafeIterator(c->db->dict);
737 allkeys = (pattern[0] == '*' && plen == 1);
738 while((de = dictNext(di)) != NULL) {
739 sds key = dictGetKey(de);
740 robj *keyobj;
741
742 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
743 keyobj = createStringObject(key,sdslen(key));
744 if (!keyIsExpired(c->db,keyobj)) {
745 addReplyBulk(c,keyobj);
746 numkeys++;
747 }
748 decrRefCount(keyobj);
749 }
750 }
751 dictReleaseIterator(di);
752 setDeferredArrayLen(c,replylen,numkeys);
753}
754
755/* This callback is used by scanGenericCommand in order to collect elements
756 * returned by the dictionary iterator into a list. */
757void scanCallback(void *privdata, const dictEntry *de) {
758 void **pd = (void**) privdata;
759 list *keys = pd[0];
760 robj *o = pd[1];
761 robj *key, *val = NULL;
762
763 if (o == NULL) {
764 sds sdskey = dictGetKey(de);
765 key = createStringObject(sdskey, sdslen(sdskey));
766 } else if (o->type == OBJ_SET) {
767 sds keysds = dictGetKey(de);
768 key = createStringObject(keysds,sdslen(keysds));
769 } else if (o->type == OBJ_HASH) {
770 sds sdskey = dictGetKey(de);
771 sds sdsval = dictGetVal(de);
772 key = createStringObject(sdskey,sdslen(sdskey));
773 val = createStringObject(sdsval,sdslen(sdsval));
774 } else if (o->type == OBJ_ZSET) {
775 sds sdskey = dictGetKey(de);
776 key = createStringObject(sdskey,sdslen(sdskey));
777 val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
778 } else {
779 serverPanic("Type not handled in SCAN callback.");
780 }
781
782 listAddNodeTail(keys, key);
783 if (val) listAddNodeTail(keys, val);
784}
785
786/* Try to parse a SCAN cursor stored at object 'o':
787 * if the cursor is valid, store it as unsigned integer into *cursor and
788 * returns C_OK. Otherwise return C_ERR and send an error to the
789 * client. */
790int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
791 char *eptr;
792
793 /* Use strtoul() because we need an *unsigned* long, so
794 * getLongLongFromObject() does not cover the whole cursor space. */
795 errno = 0;
796 *cursor = strtoul(o->ptr, &eptr, 10);
797 if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
798 {
799 addReplyError(c, "invalid cursor");
800 return C_ERR;
801 }
802 return C_OK;
803}
804
805/* This command implements SCAN, HSCAN and SSCAN commands.
806 * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
807 * if 'o' is NULL the command will operate on the dictionary associated with
808 * the current database.
809 *
810 * When 'o' is not NULL the function assumes that the first argument in
811 * the client arguments vector is a key so it skips it before iterating
812 * in order to parse options.
813 *
814 * In the case of a Hash object the function returns both the field and value
815 * of every element on the Hash. */
816void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
817 int i, j;
818 list *keys = listCreate();
819 listNode *node, *nextnode;
820 long count = 10;
821 sds pat = NULL;
822 sds typename = NULL;
823 int patlen = 0, use_pattern = 0;
824 dict *ht;
825
826 /* Object must be NULL (to iterate keys names), or the type of the object
827 * must be Set, Sorted Set, or Hash. */
828 serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
829 o->type == OBJ_ZSET);
830
831 /* Set i to the first option argument. The previous one is the cursor. */
832 i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
833
834 /* Step 1: Parse options. */
835 while (i < c->argc) {
836 j = c->argc - i;
837 if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
838 if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
839 != C_OK)
840 {
841 goto cleanup;
842 }
843
844 if (count < 1) {
845 addReplyErrorObject(c,shared.syntaxerr);
846 goto cleanup;
847 }
848
849 i += 2;
850 } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
851 pat = c->argv[i+1]->ptr;
852 patlen = sdslen(pat);
853
854 /* The pattern always matches if it is exactly "*", so it is
855 * equivalent to disabling it. */
856 use_pattern = !(patlen == 1 && pat[0] == '*');
857
858 i += 2;
859 } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
860 /* SCAN for a particular type only applies to the db dict */
861 typename = c->argv[i+1]->ptr;
862 i+= 2;
863 } else {
864 addReplyErrorObject(c,shared.syntaxerr);
865 goto cleanup;
866 }
867 }
868
869 /* Step 2: Iterate the collection.
870 *
871 * Note that if the object is encoded with a listpack, intset, or any other
872 * representation that is not a hash table, we are sure that it is also
873 * composed of a small number of elements. So to avoid taking state we
874 * just return everything inside the object in a single call, setting the
875 * cursor to zero to signal the end of the iteration. */
876
877 /* Handle the case of a hash table. */
878 ht = NULL;
879 if (o == NULL) {
880 ht = c->db->dict;
881 } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
882 ht = o->ptr;
883 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
884 ht = o->ptr;
885 count *= 2; /* We return key / value for this type. */
886 } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
887 zset *zs = o->ptr;
888 ht = zs->dict;
889 count *= 2; /* We return key / value for this type. */
890 }
891
892 if (ht) {
893 void *privdata[2];
894 /* We set the max number of iterations to ten times the specified
895 * COUNT, so if the hash table is in a pathological state (very
896 * sparsely populated) we avoid to block too much time at the cost
897 * of returning no or very few elements. */
898 long maxiterations = count*10;
899
900 /* We pass two pointers to the callback: the list to which it will
901 * add new elements, and the object containing the dictionary so that
902 * it is possible to fetch more data in a type-dependent way. */
903 privdata[0] = keys;
904 privdata[1] = o;
905 do {
906 cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
907 } while (cursor &&
908 maxiterations-- &&
909 listLength(keys) < (unsigned long)count);
910 } else if (o->type == OBJ_SET) {
911 int pos = 0;
912 int64_t ll;
913
914 while(intsetGet(o->ptr,pos++,&ll))
915 listAddNodeTail(keys,createStringObjectFromLongLong(ll));
916 cursor = 0;
917 } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
918 unsigned char *p = lpFirst(o->ptr);
919 unsigned char *vstr;
920 int64_t vlen;
921 unsigned char intbuf[LP_INTBUF_SIZE];
922
923 while(p) {
924 vstr = lpGet(p,&vlen,intbuf);
925 listAddNodeTail(keys, createStringObject((char*)vstr,vlen));
926 p = lpNext(o->ptr,p);
927 }
928 cursor = 0;
929 } else {
930 serverPanic("Not handled encoding in SCAN.");
931 }
932
933 /* Step 3: Filter elements. */
934 node = listFirst(keys);
935 while (node) {
936 robj *kobj = listNodeValue(node);
937 nextnode = listNextNode(node);
938 int filter = 0;
939
940 /* Filter element if it does not match the pattern. */
941 if (use_pattern) {
942 if (sdsEncodedObject(kobj)) {
943 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
944 filter = 1;
945 } else {
946 char buf[LONG_STR_SIZE];
947 int len;
948
949 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
950 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
951 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
952 }
953 }
954
955 /* Filter an element if it isn't the type we want. */
956 if (!filter && o == NULL && typename){
957 robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);
958 char* type = getObjectTypeName(typecheck);
959 if (strcasecmp((char*) typename, type)) filter = 1;
960 }
961
962 /* Filter element if it is an expired key. */
963 if (!filter && o == NULL && expireIfNeeded(c->db, kobj, 0)) filter = 1;
964
965 /* Remove the element and its associated value if needed. */
966 if (filter) {
967 decrRefCount(kobj);
968 listDelNode(keys, node);
969 }
970
971 /* If this is a hash or a sorted set, we have a flat list of
972 * key-value elements, so if this element was filtered, remove the
973 * value, or skip it if it was not filtered: we only match keys. */
974 if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
975 node = nextnode;
976 serverAssert(node); /* assertion for valgrind (avoid NPD) */
977 nextnode = listNextNode(node);
978 if (filter) {
979 kobj = listNodeValue(node);
980 decrRefCount(kobj);
981 listDelNode(keys, node);
982 }
983 }
984 node = nextnode;
985 }
986
987 /* Step 4: Reply to the client. */
988 addReplyArrayLen(c, 2);
989 addReplyBulkLongLong(c,cursor);
990
991 addReplyArrayLen(c, listLength(keys));
992 while ((node = listFirst(keys)) != NULL) {
993 robj *kobj = listNodeValue(node);
994 addReplyBulk(c, kobj);
995 decrRefCount(kobj);
996 listDelNode(keys, node);
997 }
998
999cleanup:
1000 listSetFreeMethod(keys,decrRefCountVoid);
1001 listRelease(keys);
1002}
1003
1004/* The SCAN command completely relies on scanGenericCommand. */
1005void scanCommand(client *c) {
1006 unsigned long cursor;
1007 if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
1008 scanGenericCommand(c,NULL,cursor);
1009}
1010
1011void dbsizeCommand(client *c) {
1012 addReplyLongLong(c,dictSize(c->db->dict));
1013}
1014
1015void lastsaveCommand(client *c) {
1016 addReplyLongLong(c,server.lastsave);
1017}
1018
1019char* getObjectTypeName(robj *o) {
1020 char* type;
1021 if (o == NULL) {
1022 type = "none";
1023 } else {
1024 switch(o->type) {
1025 case OBJ_STRING: type = "string"; break;
1026 case OBJ_LIST: type = "list"; break;
1027 case OBJ_SET: type = "set"; break;
1028 case OBJ_ZSET: type = "zset"; break;
1029 case OBJ_HASH: type = "hash"; break;
1030 case OBJ_STREAM: type = "stream"; break;
1031 case OBJ_MODULE: {
1032 moduleValue *mv = o->ptr;
1033 type = mv->type->name;
1034 }; break;
1035 default: type = "unknown"; break;
1036 }
1037 }
1038 return type;
1039}
1040
1041void typeCommand(client *c) {
1042 robj *o;
1043 o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
1044 addReplyStatus(c, getObjectTypeName(o));
1045}
1046
1047void shutdownCommand(client *c) {
1048 int flags = SHUTDOWN_NOFLAGS;
1049 int abort = 0;
1050 for (int i = 1; i < c->argc; i++) {
1051 if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
1052 flags |= SHUTDOWN_NOSAVE;
1053 } else if (!strcasecmp(c->argv[i]->ptr,"save")) {
1054 flags |= SHUTDOWN_SAVE;
1055 } else if (!strcasecmp(c->argv[i]->ptr, "now")) {
1056 flags |= SHUTDOWN_NOW;
1057 } else if (!strcasecmp(c->argv[i]->ptr, "force")) {
1058 flags |= SHUTDOWN_FORCE;
1059 } else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
1060 abort = 1;
1061 } else {
1062 addReplyErrorObject(c,shared.syntaxerr);
1063 return;
1064 }
1065 }
1066 if ((abort && flags != SHUTDOWN_NOFLAGS) ||
1067 (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
1068 {
1069 /* Illegal combo. */
1070 addReplyErrorObject(c,shared.syntaxerr);
1071 return;
1072 }
1073
1074 if (abort) {
1075 if (abortShutdown() == C_OK)
1076 addReply(c, shared.ok);
1077 else
1078 addReplyError(c, "No shutdown in progress.");
1079 return;
1080 }
1081
1082 if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
1083 addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
1084 return;
1085 }
1086
1087 if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) {
1088 /* Script timed out. Shutdown allowed only with the NOSAVE flag. See
1089 * also processCommand where these errors are returned. */
1090 if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
1091 addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
1092 } else if (server.busy_module_yield_flags) {
1093 addReplyErrorObject(c, shared.slowmoduleerr);
1094 } else if (scriptIsEval()) {
1095 addReplyErrorObject(c, shared.slowevalerr);
1096 } else {
1097 addReplyErrorObject(c, shared.slowscripterr);
1098 }
1099 return;
1100 }
1101
1102 blockClient(c, BLOCKED_SHUTDOWN);
1103 if (prepareForShutdown(flags) == C_OK) exit(0);
1104 /* If we're here, then shutdown is ongoing (the client is still blocked) or
1105 * failed (the client has received an error). */
1106}
1107
1108void renameGenericCommand(client *c, int nx) {
1109 robj *o;
1110 long long expire;
1111 int samekey = 0;
1112
1113 /* When source and dest key is the same, no operation is performed,
1114 * if the key exists, however we still return an error on unexisting key. */
1115 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
1116
1117 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
1118 return;
1119
1120 if (samekey) {
1121 addReply(c,nx ? shared.czero : shared.ok);
1122 return;
1123 }
1124
1125 incrRefCount(o);
1126 expire = getExpire(c->db,c->argv[1]);
1127 if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
1128 if (nx) {
1129 decrRefCount(o);
1130 addReply(c,shared.czero);
1131 return;
1132 }
1133 /* Overwrite: delete the old key before creating the new one
1134 * with the same name. */
1135 dbDelete(c->db,c->argv[2]);
1136 }
1137 dbAdd(c->db,c->argv[2],o);
1138 if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
1139 dbDelete(c->db,c->argv[1]);
1140 signalModifiedKey(c,c->db,c->argv[1]);
1141 signalModifiedKey(c,c->db,c->argv[2]);
1142 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
1143 c->argv[1],c->db->id);
1144 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
1145 c->argv[2],c->db->id);
1146 server.dirty++;
1147 addReply(c,nx ? shared.cone : shared.ok);
1148}
1149
1150void renameCommand(client *c) {
1151 renameGenericCommand(c,0);
1152}
1153
1154void renamenxCommand(client *c) {
1155 renameGenericCommand(c,1);
1156}
1157
1158void moveCommand(client *c) {
1159 robj *o;
1160 redisDb *src, *dst;
1161 int srcid, dbid;
1162 long long expire;
1163
1164 if (server.cluster_enabled) {
1165 addReplyError(c,"MOVE is not allowed in cluster mode");
1166 return;
1167 }
1168
1169 /* Obtain source and target DB pointers */
1170 src = c->db;
1171 srcid = c->db->id;
1172
1173 if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
1174 return;
1175
1176 if (selectDb(c,dbid) == C_ERR) {
1177 addReplyError(c,"DB index is out of range");
1178 return;
1179 }
1180 dst = c->db;
1181 selectDb(c,srcid); /* Back to the source DB */
1182
1183 /* If the user is moving using as target the same
1184 * DB as the source DB it is probably an error. */
1185 if (src == dst) {
1186 addReplyErrorObject(c,shared.sameobjecterr);
1187 return;
1188 }
1189
1190 /* Check if the element exists and get a reference */
1191 o = lookupKeyWrite(c->db,c->argv[1]);
1192 if (!o) {
1193 addReply(c,shared.czero);
1194 return;
1195 }
1196 expire = getExpire(c->db,c->argv[1]);
1197
1198 /* Return zero if the key already exists in the target DB */
1199 if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
1200 addReply(c,shared.czero);
1201 return;
1202 }
1203 dbAdd(dst,c->argv[1],o);
1204 if (expire != -1) setExpire(c,dst,c->argv[1],expire);
1205 incrRefCount(o);
1206
1207 /* OK! key moved, free the entry in the source DB */
1208 dbDelete(src,c->argv[1]);
1209 signalModifiedKey(c,src,c->argv[1]);
1210 signalModifiedKey(c,dst,c->argv[1]);
1211 notifyKeyspaceEvent(NOTIFY_GENERIC,
1212 "move_from",c->argv[1],src->id);
1213 notifyKeyspaceEvent(NOTIFY_GENERIC,
1214 "move_to",c->argv[1],dst->id);
1215
1216 server.dirty++;
1217 addReply(c,shared.cone);
1218}
1219
1220void copyCommand(client *c) {
1221 robj *o;
1222 redisDb *src, *dst;
1223 int srcid, dbid;
1224 long long expire;
1225 int j, replace = 0, delete = 0;
1226
1227 /* Obtain source and target DB pointers
1228 * Default target DB is the same as the source DB
1229 * Parse the REPLACE option and targetDB option. */
1230 src = c->db;
1231 dst = c->db;
1232 srcid = c->db->id;
1233 dbid = c->db->id;
1234 for (j = 3; j < c->argc; j++) {
1235 int additional = c->argc - j - 1;
1236 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
1237 replace = 1;
1238 } else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) {
1239 if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK)
1240 return;
1241
1242 if (selectDb(c, dbid) == C_ERR) {
1243 addReplyError(c,"DB index is out of range");
1244 return;
1245 }
1246 dst = c->db;
1247 selectDb(c,srcid); /* Back to the source DB */
1248 j++; /* Consume additional arg. */
1249 } else {
1250 addReplyErrorObject(c,shared.syntaxerr);
1251 return;
1252 }
1253 }
1254
1255 if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
1256 addReplyError(c,"Copying to another database is not allowed in cluster mode");
1257 return;
1258 }
1259
1260 /* If the user select the same DB as
1261 * the source DB and using newkey as the same key
1262 * it is probably an error. */
1263 robj *key = c->argv[1];
1264 robj *newkey = c->argv[2];
1265 if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) {
1266 addReplyErrorObject(c,shared.sameobjecterr);
1267 return;
1268 }
1269
1270 /* Check if the element exists and get a reference */
1271 o = lookupKeyRead(c->db, key);
1272 if (!o) {
1273 addReply(c,shared.czero);
1274 return;
1275 }
1276 expire = getExpire(c->db,key);
1277
1278 /* Return zero if the key already exists in the target DB.
1279 * If REPLACE option is selected, delete newkey from targetDB. */
1280 if (lookupKeyWrite(dst,newkey) != NULL) {
1281 if (replace) {
1282 delete = 1;
1283 } else {
1284 addReply(c,shared.czero);
1285 return;
1286 }
1287 }
1288
1289 /* Duplicate object according to object's type. */
1290 robj *newobj;
1291 switch(o->type) {
1292 case OBJ_STRING: newobj = dupStringObject(o); break;
1293 case OBJ_LIST: newobj = listTypeDup(o); break;
1294 case OBJ_SET: newobj = setTypeDup(o); break;
1295 case OBJ_ZSET: newobj = zsetDup(o); break;
1296 case OBJ_HASH: newobj = hashTypeDup(o); break;
1297 case OBJ_STREAM: newobj = streamDup(o); break;
1298 case OBJ_MODULE:
1299 newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
1300 if (!newobj) return;
1301 break;
1302 default:
1303 addReplyError(c, "unknown type object");
1304 return;
1305 }
1306
1307 if (delete) {
1308 dbDelete(dst,newkey);
1309 }
1310
1311 dbAdd(dst,newkey,newobj);
1312 if (expire != -1) setExpire(c, dst, newkey, expire);
1313
1314 /* OK! key copied */
1315 signalModifiedKey(c,dst,c->argv[2]);
1316 notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
1317
1318 server.dirty++;
1319 addReply(c,shared.cone);
1320}
1321
1322/* Helper function for dbSwapDatabases(): scans the list of keys that have
1323 * one or more blocked clients for B[LR]POP or other blocking commands
1324 * and signal the keys as ready if they are of the right type. See the comment
1325 * where the function is used for more info. */
1326void scanDatabaseForReadyKeys(redisDb *db) {
1327 dictEntry *de;
1328 dictIterator *di = dictGetSafeIterator(db->blocking_keys);
1329 while((de = dictNext(di)) != NULL) {
1330 robj *key = dictGetKey(de);
1331 dictEntry *kde = dictFind(db->dict,key->ptr);
1332 if (kde) {
1333 robj *value = dictGetVal(kde);
1334 signalKeyAsReady(db, key, value->type);
1335 }
1336 }
1337 dictReleaseIterator(di);
1338}
1339
1340/* Since we are unblocking XREADGROUP clients in the event the
1341 * key was deleted/overwritten we must do the same in case the
1342 * database was flushed/swapped. */
1343void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) {
1344 /* Optimization: If no clients are in type BLOCKED_STREAM,
1345 * we can skip this loop. */
1346 if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
1347
1348 dictEntry *de;
1349 dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
1350 while((de = dictNext(di)) != NULL) {
1351 robj *key = dictGetKey(de);
1352 int was_stream = 0, is_stream = 0;
1353
1354 dictEntry *kde = dictFind(emptied->dict, key->ptr);
1355 if (kde) {
1356 robj *value = dictGetVal(kde);
1357 was_stream = value->type == OBJ_STREAM;
1358 }
1359 if (replaced_with) {
1360 dictEntry *kde = dictFind(replaced_with->dict, key->ptr);
1361 if (kde) {
1362 robj *value = dictGetVal(kde);
1363 is_stream = value->type == OBJ_STREAM;
1364 }
1365 }
1366 /* We want to try to unblock any client using a blocking XREADGROUP */
1367 if (was_stream && !is_stream)
1368 signalKeyAsReady(emptied, key, OBJ_STREAM);
1369 }
1370 dictReleaseIterator(di);
1371}
1372
1373/* Swap two databases at runtime so that all clients will magically see
1374 * the new database even if already connected. Note that the client
1375 * structure c->db points to a given DB, so we need to be smarter and
1376 * swap the underlying referenced structures, otherwise we would need
1377 * to fix all the references to the Redis DB structure.
1378 *
1379 * Returns C_ERR if at least one of the DB ids are out of range, otherwise
1380 * C_OK is returned. */
1381int dbSwapDatabases(int id1, int id2) {
1382 if (id1 < 0 || id1 >= server.dbnum ||
1383 id2 < 0 || id2 >= server.dbnum) return C_ERR;
1384 if (id1 == id2) return C_OK;
1385 redisDb aux = server.db[id1];
1386 redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1387
1388 /* Swapdb should make transaction fail if there is any
1389 * client watching keys */
1390 touchAllWatchedKeysInDb(db1, db2);
1391 touchAllWatchedKeysInDb(db2, db1);
1392
1393 /* Try to unblock any XREADGROUP clients if the key no longer exists. */
1394 scanDatabaseForDeletedStreams(db1, db2);
1395 scanDatabaseForDeletedStreams(db2, db1);
1396
1397 /* Swap hash tables. Note that we don't swap blocking_keys,
1398 * ready_keys and watched_keys, since we want clients to
1399 * remain in the same DB they were. */
1400 db1->dict = db2->dict;
1401 db1->expires = db2->expires;
1402 db1->avg_ttl = db2->avg_ttl;
1403 db1->expires_cursor = db2->expires_cursor;
1404
1405 db2->dict = aux.dict;
1406 db2->expires = aux.expires;
1407 db2->avg_ttl = aux.avg_ttl;
1408 db2->expires_cursor = aux.expires_cursor;
1409
1410 /* Now we need to handle clients blocked on lists: as an effect
1411 * of swapping the two DBs, a client that was waiting for list
1412 * X in a given DB, may now actually be unblocked if X happens
1413 * to exist in the new version of the DB, after the swap.
1414 *
1415 * However normally we only do this check for efficiency reasons
1416 * in dbAdd() when a list is created. So here we need to rescan
1417 * the list of clients blocked on lists and signal lists as ready
1418 * if needed. */
1419 scanDatabaseForReadyKeys(db1);
1420 scanDatabaseForReadyKeys(db2);
1421 return C_OK;
1422}
1423
1424/* Logically, this discards (flushes) the old main database, and apply the newly loaded
1425 * database (temp) as the main (active) database, the actual freeing of old database
1426 * (which will now be placed in the temp one) is done later. */
1427void swapMainDbWithTempDb(redisDb *tempDb) {
1428 if (server.cluster_enabled) {
1429 /* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */
1430 clusterSlotToKeyMapping *aux = server.db->slots_to_keys;
1431 server.db->slots_to_keys = tempDb->slots_to_keys;
1432 tempDb->slots_to_keys = aux;
1433 }
1434
1435 for (int i=0; i<server.dbnum; i++) {
1436 redisDb aux = server.db[i];
1437 redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
1438
1439 /* Swapping databases should make transaction fail if there is any
1440 * client watching keys. */
1441 touchAllWatchedKeysInDb(activedb, newdb);
1442
1443 /* Try to unblock any XREADGROUP clients if the key no longer exists. */
1444 scanDatabaseForDeletedStreams(activedb, newdb);
1445
1446 /* Swap hash tables. Note that we don't swap blocking_keys,
1447 * ready_keys and watched_keys, since clients
1448 * remain in the same DB they were. */
1449 activedb->dict = newdb->dict;
1450 activedb->expires = newdb->expires;
1451 activedb->avg_ttl = newdb->avg_ttl;
1452 activedb->expires_cursor = newdb->expires_cursor;
1453
1454 newdb->dict = aux.dict;
1455 newdb->expires = aux.expires;
1456 newdb->avg_ttl = aux.avg_ttl;
1457 newdb->expires_cursor = aux.expires_cursor;
1458
1459 /* Now we need to handle clients blocked on lists: as an effect
1460 * of swapping the two DBs, a client that was waiting for list
1461 * X in a given DB, may now actually be unblocked if X happens
1462 * to exist in the new version of the DB, after the swap.
1463 *
1464 * However normally we only do this check for efficiency reasons
1465 * in dbAdd() when a list is created. So here we need to rescan
1466 * the list of clients blocked on lists and signal lists as ready
1467 * if needed. */
1468 scanDatabaseForReadyKeys(activedb);
1469 }
1470
1471 trackingInvalidateKeysOnFlush(1);
1472 flushSlaveKeysWithExpireList();
1473}
1474
1475/* SWAPDB db1 db2 */
1476void swapdbCommand(client *c) {
1477 int id1, id2;
1478
1479 /* Not allowed in cluster mode: we have just DB 0 there. */
1480 if (server.cluster_enabled) {
1481 addReplyError(c,"SWAPDB is not allowed in cluster mode");
1482 return;
1483 }
1484
1485 /* Get the two DBs indexes. */
1486 if (getIntFromObjectOrReply(c, c->argv[1], &id1,
1487 "invalid first DB index") != C_OK)
1488 return;
1489
1490 if (getIntFromObjectOrReply(c, c->argv[2], &id2,
1491 "invalid second DB index") != C_OK)
1492 return;
1493
1494 /* Swap... */
1495 if (dbSwapDatabases(id1,id2) == C_ERR) {
1496 addReplyError(c,"DB index is out of range");
1497 return;
1498 } else {
1499 RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
1500 moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
1501 server.dirty++;
1502 addReply(c,shared.ok);
1503 }
1504}
1505
1506/*-----------------------------------------------------------------------------
1507 * Expires API
1508 *----------------------------------------------------------------------------*/
1509
1510int removeExpire(redisDb *db, robj *key) {
1511 /* An expire may only be removed if there is a corresponding entry in the
1512 * main dict. Otherwise, the key will never be freed. */
1513 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1514 return dictDelete(db->expires,key->ptr) == DICT_OK;
1515}
1516
1517/* Set an expire to the specified key. If the expire is set in the context
1518 * of an user calling a command 'c' is the client, otherwise 'c' is set
1519 * to NULL. The 'when' parameter is the absolute unix time in milliseconds
1520 * after which the key will no longer be considered valid. */
1521void setExpire(client *c, redisDb *db, robj *key, long long when) {
1522 dictEntry *kde, *de;
1523
1524 /* Reuse the sds from the main dict in the expire dict */
1525 kde = dictFind(db->dict,key->ptr);
1526 serverAssertWithInfo(NULL,key,kde != NULL);
1527 de = dictAddOrFind(db->expires,dictGetKey(kde));
1528 dictSetSignedIntegerVal(de,when);
1529
1530 int writable_slave = server.masterhost && server.repl_slave_ro == 0;
1531 if (c && writable_slave && !(c->flags & CLIENT_MASTER))
1532 rememberSlaveKeyWithExpire(db,key);
1533}
1534
1535/* Return the expire time of the specified key, or -1 if no expire
1536 * is associated with this key (i.e. the key is non volatile) */
1537long long getExpire(redisDb *db, robj *key) {
1538 dictEntry *de;
1539
1540 /* No expire? return ASAP */
1541 if (dictSize(db->expires) == 0 ||
1542 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
1543
1544 /* The entry was found in the expire dict, this means it should also
1545 * be present in the main dict (safety check). */
1546 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1547 return dictGetSignedIntegerVal(de);
1548}
1549
1550/* Delete the specified expired key and propagate expire. */
1551void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
1552 mstime_t expire_latency;
1553 latencyStartMonitor(expire_latency);
1554 if (server.lazyfree_lazy_expire)
1555 dbAsyncDelete(db,keyobj);
1556 else
1557 dbSyncDelete(db,keyobj);
1558 latencyEndMonitor(expire_latency);
1559 latencyAddSampleIfNeeded("expire-del",expire_latency);
1560 notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
1561 signalModifiedKey(NULL, db, keyobj);
1562 propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
1563 server.stat_expiredkeys++;
1564}
1565
1566/* Propagate expires into slaves and the AOF file.
1567 * When a key expires in the master, a DEL operation for this key is sent
1568 * to all the slaves and the AOF file if enabled.
1569 *
1570 * This way the key expiry is centralized in one place, and since both
1571 * AOF and the master->slave link guarantee operation ordering, everything
1572 * will be consistent even if we allow write operations against expiring
1573 * keys.
1574 *
1575 * This function may be called from:
1576 * 1. Within call(): Example: Lazy-expire on key access.
1577 * In this case the caller doesn't have to do anything
1578 * because call() handles server.also_propagate(); or
1579 * 2. Outside of call(): Example: Active-expire, eviction.
1580 * In this the caller must remember to call
1581 * propagatePendingCommands, preferably at the end of
1582 * the deletion batch, so that DELs will be wrapped
1583 * in MULTI/EXEC */
1584void propagateDeletion(redisDb *db, robj *key, int lazy) {
1585 robj *argv[2];
1586
1587 argv[0] = lazy ? shared.unlink : shared.del;
1588 argv[1] = key;
1589 incrRefCount(argv[0]);
1590 incrRefCount(argv[1]);
1591
1592 /* If the master decided to expire a key we must propagate it to replicas no matter what..
1593 * Even if module executed a command without asking for propagation. */
1594 int prev_replication_allowed = server.replication_allowed;
1595 server.replication_allowed = 1;
1596 alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
1597 server.replication_allowed = prev_replication_allowed;
1598
1599 decrRefCount(argv[0]);
1600 decrRefCount(argv[1]);
1601}
1602
1603/* Check if the key is expired. */
1604int keyIsExpired(redisDb *db, robj *key) {
1605 mstime_t when = getExpire(db,key);
1606 mstime_t now;
1607
1608 if (when < 0) return 0; /* No expire for this key */
1609
1610 /* Don't expire anything while loading. It will be done later. */
1611 if (server.loading) return 0;
1612
1613 /* If we are in the context of a Lua script, we pretend that time is
1614 * blocked to when the Lua script started. This way a key can expire
1615 * only the first time it is accessed and not in the middle of the
1616 * script execution, making propagation to slaves / AOF consistent.
1617 * See issue #1525 on Github for more information. */
1618 if (server.script_caller) {
1619 now = scriptTimeSnapshot();
1620 }
1621 /* If we are in the middle of a command execution, we still want to use
1622 * a reference time that does not change: in that case we just use the
1623 * cached time, that we update before each call in the call() function.
1624 * This way we avoid that commands such as RPOPLPUSH or similar, that
1625 * may re-open the same key multiple times, can invalidate an already
1626 * open object in a next call, if the next call will see the key expired,
1627 * while the first did not. */
1628 else if (server.fixed_time_expire > 0) {
1629 now = server.mstime;
1630 }
1631 /* For the other cases, we want to use the most fresh time we have. */
1632 else {
1633 now = mstime();
1634 }
1635
1636 /* The key expired if the current (virtual or real) time is greater
1637 * than the expire time of the key. */
1638 return now > when;
1639}
1640
1641/* This function is called when we are going to perform some operation
1642 * in a given key, but such key may be already logically expired even if
1643 * it still exists in the database. The main way this function is called
1644 * is via lookupKey*() family of functions.
1645 *
1646 * The behavior of the function depends on the replication role of the
1647 * instance, because by default replicas do not delete expired keys. They
1648 * wait for DELs from the master for consistency matters. However even
1649 * replicas will try to have a coherent return value for the function,
1650 * so that read commands executed in the replica side will be able to
1651 * behave like if the key is expired even if still present (because the
1652 * master has yet to propagate the DEL).
1653 *
1654 * In masters as a side effect of finding a key which is expired, such
1655 * key will be evicted from the database. Also this may trigger the
1656 * propagation of a DEL/UNLINK command in AOF / replication stream.
1657 *
1658 * On replicas, this function does not delete expired keys by default, but
1659 * it still returns 1 if the key is logically expired. To force deletion
1660 * of logically expired keys even on replicas, set force_delete_expired to
1661 * a non-zero value. Note though that if the current client is executing
1662 * replicated commands from the master, keys are never considered expired.
1663 *
1664 * The return value of the function is 0 if the key is still valid,
1665 * otherwise the function returns 1 if the key is expired. */
1666int expireIfNeeded(redisDb *db, robj *key, int force_delete_expired) {
1667 if (!keyIsExpired(db,key)) return 0;
1668
1669 /* If we are running in the context of a replica, instead of
1670 * evicting the expired key from the database, we return ASAP:
1671 * the replica key expiration is controlled by the master that will
1672 * send us synthesized DEL operations for expired keys. The
1673 * exception is when write operations are performed on writable
1674 * replicas.
1675 *
1676 * Still we try to return the right information to the caller,
1677 * that is, 0 if we think the key should be still valid, 1 if
1678 * we think the key is expired at this time.
1679 *
1680 * When replicating commands from the master, keys are never considered
1681 * expired. */
1682 if (server.masterhost != NULL) {
1683 if (server.current_client == server.master) return 0;
1684 if (!force_delete_expired) return 1;
1685 }
1686
1687 /* If clients are paused, we keep the current dataset constant,
1688 * but return to the client what we believe is the right state. Typically,
1689 * at the end of the pause we will properly expire the key OR we will
1690 * have failed over and the new primary will send us the expire. */
1691 if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
1692
1693 /* Delete the key */
1694 deleteExpiredKeyAndPropagate(db,key);
1695 return 1;
1696}
1697
1698/* -----------------------------------------------------------------------------
1699 * API to get key arguments from commands
1700 * ---------------------------------------------------------------------------*/
1701
1702/* Prepare the getKeysResult struct to hold numkeys, either by using the
1703 * pre-allocated keysbuf or by allocating a new array on the heap.
1704 *
1705 * This function must be called at least once before starting to populate
1706 * the result, and can be called repeatedly to enlarge the result array.
1707 */
1708keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) {
1709 /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
1710 * buffer here. */
1711 if (!result->keys) {
1712 serverAssert(!result->numkeys);
1713 result->keys = result->keysbuf;
1714 }
1715
1716 /* Resize if necessary */
1717 if (numkeys > result->size) {
1718 if (result->keys != result->keysbuf) {
1719 /* We're not using a static buffer, just (re)alloc */
1720 result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference));
1721 } else {
1722 /* We are using a static buffer, copy its contents */
1723 result->keys = zmalloc(numkeys * sizeof(keyReference));
1724 if (result->numkeys)
1725 memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference));
1726 }
1727 result->size = numkeys;
1728 }
1729
1730 return result->keys;
1731}
1732
1733/* Returns a bitmask with all the flags found in any of the key specs of the command.
1734 * The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */
1735int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) {
1736 int64_t flags = 0;
1737 for (int j = 0; j < cmd->key_specs_num; j++) {
1738 keySpec *spec = cmd->key_specs + j;
1739 flags |= inv? ~spec->flags : spec->flags;
1740 }
1741 return flags;
1742}
1743
1744/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error.
1745 * There are several flags that can be used to modify how this function finds keys in a command.
1746 *
1747 * GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys.
1748 * GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys
1749 * found in other valid keyspecs.
1750 */
1751int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
1752 int j, i, k = 0, last, first, step;
1753 keyReference *keys;
1754
1755 for (j = 0; j < cmd->key_specs_num; j++) {
1756 keySpec *spec = cmd->key_specs + j;
1757 serverAssert(spec->begin_search_type != KSPEC_BS_INVALID);
1758 /* Skip specs that represent 'fake' keys */
1759 if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) {
1760 continue;
1761 }
1762
1763 first = 0;
1764 if (spec->begin_search_type == KSPEC_BS_INDEX) {
1765 first = spec->bs.index.pos;
1766 } else if (spec->begin_search_type == KSPEC_BS_KEYWORD) {
1767 int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom;
1768 int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1;
1769 for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) {
1770 if (i >= argc || i < 1)
1771 break;
1772 if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) {
1773 first = i+1;
1774 break;
1775 }
1776 }
1777 /* keyword not found */
1778 if (!first) {
1779 continue;
1780 }
1781 } else {
1782 /* unknown spec */
1783 goto invalid_spec;
1784 }
1785
1786 if (spec->find_keys_type == KSPEC_FK_RANGE) {
1787 step = spec->fk.range.keystep;
1788 if (spec->fk.range.lastkey >= 0) {
1789 last = first + spec->fk.range.lastkey;
1790 } else {
1791 if (!spec->fk.range.limit) {
1792 last = argc + spec->fk.range.lastkey;
1793 } else {
1794 serverAssert(spec->fk.range.lastkey == -1);
1795 last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey);
1796 }
1797 }
1798 } else if (spec->find_keys_type == KSPEC_FK_KEYNUM) {
1799 step = spec->fk.keynum.keystep;
1800 long long numkeys;
1801 if (spec->fk.keynum.keynumidx >= argc)
1802 goto invalid_spec;
1803
1804 sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr;
1805 if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) {
1806 /* Unable to parse the numkeys argument or it was invalid */
1807 goto invalid_spec;
1808 }
1809
1810 first += spec->fk.keynum.firstkey;
1811 last = first + (int)numkeys-1;
1812 } else {
1813 /* unknown spec */
1814 goto invalid_spec;
1815 }
1816
1817 int count = ((last - first)+1);
1818 keys = getKeysPrepareResult(result, count);
1819
1820 /* First or last is out of bounds, which indicates a syntax error */
1821 if (last >= argc || last < first || first >= argc) {
1822 goto invalid_spec;
1823 }
1824
1825 for (i = first; i <= last; i += step) {
1826 if (i >= argc || i < first) {
1827 /* Modules commands, and standard commands with a not fixed number
1828 * of arguments (negative arity parameter) do not have dispatch
1829 * time arity checks, so we need to handle the case where the user
1830 * passed an invalid number of arguments here. In this case we
1831 * return no keys and expect the command implementation to report
1832 * an arity or syntax error. */
1833 if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
1834 continue;
1835 } else {
1836 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
1837 }
1838 }
1839 keys[k].pos = i;
1840 keys[k++].flags = spec->flags;
1841 }
1842
1843 /* Handle incomplete specs (only after we added the current spec
1844 * to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */
1845 if (spec->flags & CMD_KEY_INCOMPLETE) {
1846 goto invalid_spec;
1847 }
1848
1849 /* Done with this spec */
1850 continue;
1851
1852invalid_spec:
1853 if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) {
1854 continue;
1855 } else {
1856 result->numkeys = 0;
1857 return -1;
1858 }
1859 }
1860
1861 result->numkeys = k;
1862 return k;
1863}
1864
1865/* Return all the arguments that are keys in the command passed via argc / argv.
1866 * This function will eventually replace getKeysFromCommand.
1867 *
1868 * The command returns the positions of all the key arguments inside the array,
1869 * so the actual return value is a heap allocated array of integers. The
1870 * length of the array is returned by reference into *numkeys.
1871 *
1872 * Along with the position, this command also returns the flags that are
1873 * associated with how Redis will access the key.
1874 *
1875 * 'cmd' must be point to the corresponding entry into the redisCommand
1876 * table, according to the command name in argv[0]. */
1877int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
1878 /* The command has at least one key-spec not marked as NOT_KEY */
1879 int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY);
1880 /* The command has at least one key-spec marked as VARIABLE_FLAGS */
1881 int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS);
1882
1883 /* Flags indicating that we have a getkeys callback */
1884 int has_module_getkeys = cmd->flags & CMD_MODULE_GETKEYS;
1885
1886 /* The key-spec that's auto generated by RM_CreateCommand sets VARIABLE_FLAGS since no flags are given.
1887 * If the module provides getkeys callback, we'll prefer it, but if it didn't, we'll use key-spec anyway. */
1888 if ((cmd->flags & CMD_MODULE) && has_varflags && !has_module_getkeys)
1889 has_varflags = 0;
1890
1891 /* We prefer key-specs if there are any, and their flags are reliable. */
1892 if (has_keyspec && !has_varflags) {
1893 int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result);
1894 if (ret >= 0)
1895 return ret;
1896 /* If the specs returned with an error (probably an INVALID or INCOMPLETE spec),
1897 * fallback to the callback method. */
1898 }
1899
1900 /* Resort to getkeys callback methods. */
1901 if (has_module_getkeys)
1902 return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
1903
1904 /* We use native getkeys as a last resort, since not all these native getkeys provide
1905 * flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/
1906 if (cmd->getkeys_proc)
1907 return cmd->getkeys_proc(cmd,argv,argc,result);
1908 return 0;
1909}
1910
1911/* This function returns a sanity check if the command may have keys. */
1912int doesCommandHaveKeys(struct redisCommand *cmd) {
1913 return cmd->getkeys_proc || /* has getkeys_proc (non modules) */
1914 (cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */
1915 (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */
1916}
1917
1918/* A simplified channel spec table that contains all of the redis commands
1919 * and which channels they have and how they are accessed. */
1920typedef struct ChannelSpecs {
1921 redisCommandProc *proc; /* Command procedure to match against */
1922 uint64_t flags; /* CMD_CHANNEL_* flags for this command */
1923 int start; /* The initial position of the first channel */
1924 int count; /* The number of channels, or -1 if all remaining
1925 * arguments are channels. */
1926} ChannelSpecs;
1927
1928ChannelSpecs commands_with_channels[] = {
1929 {subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
1930 {ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
1931 {unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
1932 {sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
1933 {psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1},
1934 {punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
1935 {publishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
1936 {spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
1937 {NULL,0} /* Terminator. */
1938};
1939
1940/* Returns 1 if the command may access any channels matched by the flags
1941 * argument. */
1942int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) {
1943 /* If a module declares get channels, we are just going to assume
1944 * has channels. This API is allowed to return false positives. */
1945 if (cmd->flags & CMD_MODULE_GETCHANNELS) {
1946 return 1;
1947 }
1948 for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) {
1949 if (cmd->proc == spec->proc) {
1950 return !!(spec->flags & flags);
1951 }
1952 }
1953 return 0;
1954}
1955
1956/* Return all the arguments that are channels in the command passed via argc / argv.
1957 * This function behaves similar to getKeysFromCommandWithSpecs, but with channels
1958 * instead of keys.
1959 *
1960 * The command returns the positions of all the channel arguments inside the array,
1961 * so the actual return value is a heap allocated array of integers. The
1962 * length of the array is returned by reference into *numkeys.
1963 *
1964 * Along with the position, this command also returns the flags that are
1965 * associated with how Redis will access the channel.
1966 *
1967 * 'cmd' must be point to the corresponding entry into the redisCommand
1968 * table, according to the command name in argv[0]. */
1969int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1970 keyReference *keys;
1971 /* If a module declares get channels, use that. */
1972 if (cmd->flags & CMD_MODULE_GETCHANNELS) {
1973 return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result);
1974 }
1975 /* Otherwise check the channel spec table */
1976 for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) {
1977 if (cmd->proc == spec->proc) {
1978 int start = spec->start;
1979 int stop = (spec->count == -1) ? argc : start + spec->count;
1980 if (stop > argc) stop = argc;
1981 int count = 0;
1982 keys = getKeysPrepareResult(result, stop - start);
1983 for (int i = start; i < stop; i++ ) {
1984 keys[count].pos = i;
1985 keys[count++].flags = spec->flags;
1986 }
1987 result->numkeys = count;
1988 return count;
1989 }
1990 }
1991 return 0;
1992}
1993
1994/* The base case is to use the keys position as given in the command table
1995 * (firstkey, lastkey, step).
1996 * This function works only on command with the legacy_range_key_spec,
1997 * all other commands should be handled by getkeys_proc.
1998 *
1999 * If the commands keyspec is incomplete, no keys will be returned, and the provided
2000 * keys function should be called instead.
2001 *
2002 * NOTE: This function does not guarantee populating the flags for
2003 * the keys, in order to get flags you should use getKeysUsingKeySpecs. */
2004int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2005 int j, i = 0, last, first, step;
2006 keyReference *keys;
2007 UNUSED(argv);
2008
2009 if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) {
2010 result->numkeys = 0;
2011 return 0;
2012 }
2013
2014 first = cmd->legacy_range_key_spec.bs.index.pos;
2015 last = cmd->legacy_range_key_spec.fk.range.lastkey;
2016 if (last >= 0)
2017 last += first;
2018 step = cmd->legacy_range_key_spec.fk.range.keystep;
2019
2020 if (last < 0) last = argc+last;
2021
2022 int count = ((last - first)+1);
2023 keys = getKeysPrepareResult(result, count);
2024
2025 for (j = first; j <= last; j += step) {
2026 if (j >= argc || j < first) {
2027 /* Modules commands, and standard commands with a not fixed number
2028 * of arguments (negative arity parameter) do not have dispatch
2029 * time arity checks, so we need to handle the case where the user
2030 * passed an invalid number of arguments here. In this case we
2031 * return no keys and expect the command implementation to report
2032 * an arity or syntax error. */
2033 if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
2034 result->numkeys = 0;
2035 return 0;
2036 } else {
2037 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
2038 }
2039 }
2040 keys[i].pos = j;
2041 /* Flags are omitted from legacy key specs */
2042 keys[i++].flags = 0;
2043 }
2044 result->numkeys = i;
2045 return i;
2046}
2047
2048/* Return all the arguments that are keys in the command passed via argc / argv.
2049 *
2050 * The command returns the positions of all the key arguments inside the array,
2051 * so the actual return value is a heap allocated array of integers. The
2052 * length of the array is returned by reference into *numkeys.
2053 *
2054 * 'cmd' must be point to the corresponding entry into the redisCommand
2055 * table, according to the command name in argv[0].
2056 *
2057 * This function uses the command table if a command-specific helper function
2058 * is not required, otherwise it calls the command-specific function. */
2059int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2060 if (cmd->flags & CMD_MODULE_GETKEYS) {
2061 return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
2062 } else if (cmd->getkeys_proc) {
2063 return cmd->getkeys_proc(cmd,argv,argc,result);
2064 } else {
2065 return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result);
2066 }
2067}
2068
2069/* Free the result of getKeysFromCommand. */
2070void getKeysFreeResult(getKeysResult *result) {
2071 if (result && result->keys != result->keysbuf)
2072 zfree(result->keys);
2073}
2074
2075/* Helper function to extract keys from following commands:
2076 * COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options>
2077 *
2078 * eg:
2079 * ZUNION <num-keys> <key> <key> ... <key> <options>
2080 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
2081 *
2082 * 'storeKeyOfs': destkey index, 0 means destkey not exists.
2083 * 'keyCountOfs': num-keys index.
2084 * 'firstKeyOfs': firstkey index.
2085 * 'keyStep': the interval of each key, usually this value is 1.
2086 *
2087 * The commands using this functoin have a fully defined keyspec, so returning flags isn't needed. */
2088int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep,
2089 robj **argv, int argc, getKeysResult *result) {
2090 int i, num;
2091 keyReference *keys;
2092
2093 num = atoi(argv[keyCountOfs]->ptr);
2094 /* Sanity check. Don't return any key if the command is going to
2095 * reply with syntax error. (no input keys). */
2096 if (num < 1 || num > (argc - firstKeyOfs)/keyStep) {
2097 result->numkeys = 0;
2098 return 0;
2099 }
2100
2101 int numkeys = storeKeyOfs ? num + 1 : num;
2102 keys = getKeysPrepareResult(result, numkeys);
2103 result->numkeys = numkeys;
2104
2105 /* Add all key positions for argv[firstKeyOfs...n] to keys[] */
2106 for (i = 0; i < num; i++) {
2107 keys[i].pos = firstKeyOfs+(i*keyStep);
2108 keys[i].flags = 0;
2109 }
2110
2111 if (storeKeyOfs) {
2112 keys[num].pos = storeKeyOfs;
2113 keys[num].flags = 0;
2114 }
2115 return result->numkeys;
2116}
2117
2118int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2119 UNUSED(cmd);
2120 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
2121}
2122
2123int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2124 UNUSED(cmd);
2125 return genericGetKeys(1, 2, 3, 1, argv, argc, result);
2126}
2127
2128int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2129 UNUSED(cmd);
2130 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
2131}
2132
2133int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2134 UNUSED(cmd);
2135 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
2136}
2137
2138int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2139 UNUSED(cmd);
2140 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
2141}
2142
2143int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2144 UNUSED(cmd);
2145 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
2146}
2147
2148int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2149 UNUSED(cmd);
2150 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
2151}
2152
2153int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2154 UNUSED(cmd);
2155 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
2156}
2157
2158int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2159 UNUSED(cmd);
2160 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
2161}
2162
2163/* Helper function to extract keys from the SORT RO command.
2164 *
2165 * SORT <sort-key>
2166 *
2167 * The second argument of SORT is always a key, however an arbitrary number of
2168 * keys may be accessed while doing the sort (the BY and GET args), so the
2169 * key-spec declares incomplete keys which is why we have to provide a concrete
2170 * implementation to fetch the keys.
2171 *
2172 * This command declares incomplete keys, so the flags are correctly set for this function */
2173int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2174 keyReference *keys;
2175 UNUSED(cmd);
2176 UNUSED(argv);
2177 UNUSED(argc);
2178
2179 keys = getKeysPrepareResult(result, 1);
2180 keys[0].pos = 1; /* <sort-key> is always present. */
2181 keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
2182 return 1;
2183}
2184
2185/* Helper function to extract keys from the SORT command.
2186 *
2187 * SORT <sort-key> ... STORE <store-key> ...
2188 *
2189 * The first argument of SORT is always a key, however a list of options
2190 * follow in SQL-alike style. Here we parse just the minimum in order to
2191 * correctly identify keys in the "STORE" option.
2192 *
2193 * This command declares incomplete keys, so the flags are correctly set for this function */
2194int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2195 int i, j, num, found_store = 0;
2196 keyReference *keys;
2197 UNUSED(cmd);
2198
2199 num = 0;
2200 keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
2201 keys[num].pos = 1; /* <sort-key> is always present. */
2202 keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
2203
2204 /* Search for STORE option. By default we consider options to don't
2205 * have arguments, so if we find an unknown option name we scan the
2206 * next. However there are options with 1 or 2 arguments, so we
2207 * provide a list here in order to skip the right number of args. */
2208 struct {
2209 char *name;
2210 int skip;
2211 } skiplist[] = {
2212 {"limit", 2},
2213 {"get", 1},
2214 {"by", 1},
2215 {NULL, 0} /* End of elements. */
2216 };
2217
2218 for (i = 2; i < argc; i++) {
2219 for (j = 0; skiplist[j].name != NULL; j++) {
2220 if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
2221 i += skiplist[j].skip;
2222 break;
2223 } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
2224 /* Note: we don't increment "num" here and continue the loop
2225 * to be sure to process the *last* "STORE" option if multiple
2226 * ones are provided. This is same behavior as SORT. */
2227 found_store = 1;
2228 keys[num].pos = i+1; /* <store-key> */
2229 keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
2230 break;
2231 }
2232 }
2233 }
2234 result->numkeys = num + found_store;
2235 return result->numkeys;
2236}
2237
2238/* This command declares incomplete keys, so the flags are correctly set for this function */
2239int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2240 int i, num, first;
2241 keyReference *keys;
2242 UNUSED(cmd);
2243
2244 /* Assume the obvious form. */
2245 first = 3;
2246 num = 1;
2247
2248 /* But check for the extended one with the KEYS option. */
2249 if (argc > 6) {
2250 for (i = 6; i < argc; i++) {
2251 if (!strcasecmp(argv[i]->ptr,"keys") &&
2252 sdslen(argv[3]->ptr) == 0)
2253 {
2254 first = i+1;
2255 num = argc-first;
2256 break;
2257 }
2258 }
2259 }
2260
2261 keys = getKeysPrepareResult(result, num);
2262 for (i = 0; i < num; i++) {
2263 keys[i].pos = first+i;
2264 keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE;
2265 }
2266 result->numkeys = num;
2267 return num;
2268}
2269
2270/* Helper function to extract keys from following commands:
2271 * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
2272 * [COUNT count] [STORE key] [STOREDIST key]
2273 * GEORADIUSBYMEMBER key member radius unit ... options ...
2274 *
2275 * This command has a fully defined keyspec, so returning flags isn't needed. */
2276int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2277 int i, num;
2278 keyReference *keys;
2279 UNUSED(cmd);
2280
2281 /* Check for the presence of the stored key in the command */
2282 int stored_key = -1;
2283 for (i = 5; i < argc; i++) {
2284 char *arg = argv[i]->ptr;
2285 /* For the case when user specifies both "store" and "storedist" options, the
2286 * second key specified would override the first key. This behavior is kept
2287 * the same as in georadiusCommand method.
2288 */
2289 if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
2290 stored_key = i+1;
2291 i++;
2292 }
2293 }
2294 num = 1 + (stored_key == -1 ? 0 : 1);
2295
2296 /* Keys in the command come from two places:
2297 * argv[1] = key,
2298 * argv[5...n] = stored key if present
2299 */
2300 keys = getKeysPrepareResult(result, num);
2301
2302 /* Add all key positions to keys[] */
2303 keys[0].pos = 1;
2304 keys[0].flags = 0;
2305 if(num > 1) {
2306 keys[1].pos = stored_key;
2307 keys[1].flags = 0;
2308 }
2309 result->numkeys = num;
2310 return num;
2311}
2312
2313/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
2314 * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
2315 *
2316 * This command has a fully defined keyspec, so returning flags isn't needed. */
2317int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2318 int i, num = 0;
2319 keyReference *keys;
2320 UNUSED(cmd);
2321
2322 /* We need to parse the options of the command in order to seek the first
2323 * "STREAMS" string which is actually the option. This is needed because
2324 * "STREAMS" could also be the name of the consumer group and even the
2325 * name of the stream key. */
2326 int streams_pos = -1;
2327 for (i = 1; i < argc; i++) {
2328 char *arg = argv[i]->ptr;
2329 if (!strcasecmp(arg, "block")) {
2330 i++; /* Skip option argument. */
2331 } else if (!strcasecmp(arg, "count")) {
2332 i++; /* Skip option argument. */
2333 } else if (!strcasecmp(arg, "group")) {
2334 i += 2; /* Skip option argument. */
2335 } else if (!strcasecmp(arg, "noack")) {
2336 /* Nothing to do. */
2337 } else if (!strcasecmp(arg, "streams")) {
2338 streams_pos = i;
2339 break;
2340 } else {
2341 break; /* Syntax error. */
2342 }
2343 }
2344 if (streams_pos != -1) num = argc - streams_pos - 1;
2345
2346 /* Syntax error. */
2347 if (streams_pos == -1 || num == 0 || num % 2 != 0) {
2348 result->numkeys = 0;
2349 return 0;
2350 }
2351 num /= 2; /* We have half the keys as there are arguments because
2352 there are also the IDs, one per key. */
2353
2354 keys = getKeysPrepareResult(result, num);
2355 for (i = streams_pos+1; i < argc-num; i++) {
2356 keys[i-streams_pos-1].pos = i;
2357 keys[i-streams_pos-1].flags = 0;
2358 }
2359 result->numkeys = num;
2360 return num;
2361}
2362
2363/* Helper function to extract keys from the SET command, which may have
2364 * a read flag if the GET argument is passed in. */
2365int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2366 keyReference *keys;
2367 UNUSED(cmd);
2368
2369 keys = getKeysPrepareResult(result, 1);
2370 keys[0].pos = 1; /* We always know the position */
2371 result->numkeys = 1;
2372
2373 for (int i = 3; i < argc; i++) {
2374 char *arg = argv[i]->ptr;
2375 if ((arg[0] == 'g' || arg[0] == 'G') &&
2376 (arg[1] == 'e' || arg[1] == 'E') &&
2377 (arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0')
2378 {
2379 keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
2380 return 1;
2381 }
2382 }
2383
2384 keys[0].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
2385 return 1;
2386}
2387
2388/* Helper function to extract keys from the BITFIELD command, which may be
2389 * read-only if the BITFIELD GET subcommand is used. */
2390int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
2391 keyReference *keys;
2392 UNUSED(cmd);
2393
2394 keys = getKeysPrepareResult(result, 1);
2395 keys[0].pos = 1; /* We always know the position */
2396 result->numkeys = 1;
2397
2398 for (int i = 2; i < argc; i++) {
2399 int remargs = argc - i - 1; /* Remaining args other than current. */
2400 char *arg = argv[i]->ptr;
2401 if (!strcasecmp(arg, "get") && remargs >= 2) {
2402 keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
2403 return 1;
2404 }
2405 }
2406
2407 keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
2408 return 1;
2409}
2410