1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31/*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35/* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to a hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view").
42 *
43 * Note that the SDS string representing the element is the same in both
44 * the hash table and skiplist in order to save memory. What we do in order
45 * to manage the shared SDS string more easily is to free the SDS string
46 * only in zslFreeNode(). The dictionary has no value free method set.
47 * So we should always remove an element from the dictionary, and later from
48 * the skiplist.
49 *
50 * This skiplist implementation is almost a C translation of the original
51 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
52 * Alternative to Balanced Trees", modified in three ways:
53 * a) this implementation allows for repeated scores.
54 * b) the comparison is not just by key (our 'score') but by satellite data.
55 * c) there is a back pointer, so it's a doubly linked list with the back
56 * pointers being only at "level 1". This allows to traverse the list
57 * from tail to head, useful for ZREVRANGE. */
58
59#include "server.h"
60#include <math.h>
61
62/*-----------------------------------------------------------------------------
63 * Skiplist implementation of the low level API
64 *----------------------------------------------------------------------------*/
65
66int zslLexValueGteMin(sds value, zlexrangespec *spec);
67int zslLexValueLteMax(sds value, zlexrangespec *spec);
68
69/* Create a skiplist node with the specified number of levels.
70 * The SDS string 'ele' is referenced by the node after the call. */
71zskiplistNode *zslCreateNode(int level, double score, sds ele) {
72 zskiplistNode *zn =
73 zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
74 zn->score = score;
75 zn->ele = ele;
76 return zn;
77}
78
79/* Create a new skiplist. */
80zskiplist *zslCreate(void) {
81 int j;
82 zskiplist *zsl;
83
84 zsl = zmalloc(sizeof(*zsl));
85 zsl->level = 1;
86 zsl->length = 0;
87 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
88 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
89 zsl->header->level[j].forward = NULL;
90 zsl->header->level[j].span = 0;
91 }
92 zsl->header->backward = NULL;
93 zsl->tail = NULL;
94 return zsl;
95}
96
97/* Free the specified skiplist node. The referenced SDS string representation
98 * of the element is freed too, unless node->ele is set to NULL before calling
99 * this function. */
100void zslFreeNode(zskiplistNode *node) {
101 sdsfree(node->ele);
102 zfree(node);
103}
104
105/* Free a whole skiplist. */
106void zslFree(zskiplist *zsl) {
107 zskiplistNode *node = zsl->header->level[0].forward, *next;
108
109 zfree(zsl->header);
110 while(node) {
111 next = node->level[0].forward;
112 zslFreeNode(node);
113 node = next;
114 }
115 zfree(zsl);
116}
117
118/* Returns a random level for the new skiplist node we are going to create.
119 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
120 * (both inclusive), with a powerlaw-alike distribution where higher
121 * levels are less likely to be returned. */
122int zslRandomLevel(void) {
123 static const int threshold = ZSKIPLIST_P*RAND_MAX;
124 int level = 1;
125 while (random() < threshold)
126 level += 1;
127 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
128}
129
130/* Insert a new node in the skiplist. Assumes the element does not already
131 * exist (up to the caller to enforce that). The skiplist takes ownership
132 * of the passed SDS string 'ele'. */
133zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
134 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
135 unsigned long rank[ZSKIPLIST_MAXLEVEL];
136 int i, level;
137
138 serverAssert(!isnan(score));
139 x = zsl->header;
140 for (i = zsl->level-1; i >= 0; i--) {
141 /* store rank that is crossed to reach the insert position */
142 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
143 while (x->level[i].forward &&
144 (x->level[i].forward->score < score ||
145 (x->level[i].forward->score == score &&
146 sdscmp(x->level[i].forward->ele,ele) < 0)))
147 {
148 rank[i] += x->level[i].span;
149 x = x->level[i].forward;
150 }
151 update[i] = x;
152 }
153 /* we assume the element is not already inside, since we allow duplicated
154 * scores, reinserting the same element should never happen since the
155 * caller of zslInsert() should test in the hash table if the element is
156 * already inside or not. */
157 level = zslRandomLevel();
158 if (level > zsl->level) {
159 for (i = zsl->level; i < level; i++) {
160 rank[i] = 0;
161 update[i] = zsl->header;
162 update[i]->level[i].span = zsl->length;
163 }
164 zsl->level = level;
165 }
166 x = zslCreateNode(level,score,ele);
167 for (i = 0; i < level; i++) {
168 x->level[i].forward = update[i]->level[i].forward;
169 update[i]->level[i].forward = x;
170
171 /* update span covered by update[i] as x is inserted here */
172 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
173 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
174 }
175
176 /* increment span for untouched levels */
177 for (i = level; i < zsl->level; i++) {
178 update[i]->level[i].span++;
179 }
180
181 x->backward = (update[0] == zsl->header) ? NULL : update[0];
182 if (x->level[0].forward)
183 x->level[0].forward->backward = x;
184 else
185 zsl->tail = x;
186 zsl->length++;
187 return x;
188}
189
190/* Internal function used by zslDelete, zslDeleteRangeByScore and
191 * zslDeleteRangeByRank. */
192void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
193 int i;
194 for (i = 0; i < zsl->level; i++) {
195 if (update[i]->level[i].forward == x) {
196 update[i]->level[i].span += x->level[i].span - 1;
197 update[i]->level[i].forward = x->level[i].forward;
198 } else {
199 update[i]->level[i].span -= 1;
200 }
201 }
202 if (x->level[0].forward) {
203 x->level[0].forward->backward = x->backward;
204 } else {
205 zsl->tail = x->backward;
206 }
207 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
208 zsl->level--;
209 zsl->length--;
210}
211
212/* Delete an element with matching score/element from the skiplist.
213 * The function returns 1 if the node was found and deleted, otherwise
214 * 0 is returned.
215 *
216 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
217 * it is not freed (but just unlinked) and *node is set to the node pointer,
218 * so that it is possible for the caller to reuse the node (including the
219 * referenced SDS string at node->ele). */
220int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
221 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
222 int i;
223
224 x = zsl->header;
225 for (i = zsl->level-1; i >= 0; i--) {
226 while (x->level[i].forward &&
227 (x->level[i].forward->score < score ||
228 (x->level[i].forward->score == score &&
229 sdscmp(x->level[i].forward->ele,ele) < 0)))
230 {
231 x = x->level[i].forward;
232 }
233 update[i] = x;
234 }
235 /* We may have multiple elements with the same score, what we need
236 * is to find the element with both the right score and object. */
237 x = x->level[0].forward;
238 if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
239 zslDeleteNode(zsl, x, update);
240 if (!node)
241 zslFreeNode(x);
242 else
243 *node = x;
244 return 1;
245 }
246 return 0; /* not found */
247}
248
249/* Update the score of an element inside the sorted set skiplist.
250 * Note that the element must exist and must match 'score'.
251 * This function does not update the score in the hash table side, the
252 * caller should take care of it.
253 *
254 * Note that this function attempts to just update the node, in case after
255 * the score update, the node would be exactly at the same position.
256 * Otherwise the skiplist is modified by removing and re-adding a new
257 * element, which is more costly.
258 *
259 * The function returns the updated element skiplist node pointer. */
260zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
261 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
262 int i;
263
264 /* We need to seek to element to update to start: this is useful anyway,
265 * we'll have to update or remove it. */
266 x = zsl->header;
267 for (i = zsl->level-1; i >= 0; i--) {
268 while (x->level[i].forward &&
269 (x->level[i].forward->score < curscore ||
270 (x->level[i].forward->score == curscore &&
271 sdscmp(x->level[i].forward->ele,ele) < 0)))
272 {
273 x = x->level[i].forward;
274 }
275 update[i] = x;
276 }
277
278 /* Jump to our element: note that this function assumes that the
279 * element with the matching score exists. */
280 x = x->level[0].forward;
281 serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
282
283 /* If the node, after the score update, would be still exactly
284 * at the same position, we can just update the score without
285 * actually removing and re-inserting the element in the skiplist. */
286 if ((x->backward == NULL || x->backward->score < newscore) &&
287 (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
288 {
289 x->score = newscore;
290 return x;
291 }
292
293 /* No way to reuse the old node: we need to remove and insert a new
294 * one at a different place. */
295 zslDeleteNode(zsl, x, update);
296 zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
297 /* We reused the old node x->ele SDS string, free the node now
298 * since zslInsert created a new one. */
299 x->ele = NULL;
300 zslFreeNode(x);
301 return newnode;
302}
303
304int zslValueGteMin(double value, zrangespec *spec) {
305 return spec->minex ? (value > spec->min) : (value >= spec->min);
306}
307
308int zslValueLteMax(double value, zrangespec *spec) {
309 return spec->maxex ? (value < spec->max) : (value <= spec->max);
310}
311
312/* Returns if there is a part of the zset is in range. */
313int zslIsInRange(zskiplist *zsl, zrangespec *range) {
314 zskiplistNode *x;
315
316 /* Test for ranges that will always be empty. */
317 if (range->min > range->max ||
318 (range->min == range->max && (range->minex || range->maxex)))
319 return 0;
320 x = zsl->tail;
321 if (x == NULL || !zslValueGteMin(x->score,range))
322 return 0;
323 x = zsl->header->level[0].forward;
324 if (x == NULL || !zslValueLteMax(x->score,range))
325 return 0;
326 return 1;
327}
328
329/* Find the first node that is contained in the specified range.
330 * Returns NULL when no element is contained in the range. */
331zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
332 zskiplistNode *x;
333 int i;
334
335 /* If everything is out of range, return early. */
336 if (!zslIsInRange(zsl,range)) return NULL;
337
338 x = zsl->header;
339 for (i = zsl->level-1; i >= 0; i--) {
340 /* Go forward while *OUT* of range. */
341 while (x->level[i].forward &&
342 !zslValueGteMin(x->level[i].forward->score,range))
343 x = x->level[i].forward;
344 }
345
346 /* This is an inner range, so the next node cannot be NULL. */
347 x = x->level[0].forward;
348 serverAssert(x != NULL);
349
350 /* Check if score <= max. */
351 if (!zslValueLteMax(x->score,range)) return NULL;
352 return x;
353}
354
355/* Find the last node that is contained in the specified range.
356 * Returns NULL when no element is contained in the range. */
357zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
358 zskiplistNode *x;
359 int i;
360
361 /* If everything is out of range, return early. */
362 if (!zslIsInRange(zsl,range)) return NULL;
363
364 x = zsl->header;
365 for (i = zsl->level-1; i >= 0; i--) {
366 /* Go forward while *IN* range. */
367 while (x->level[i].forward &&
368 zslValueLteMax(x->level[i].forward->score,range))
369 x = x->level[i].forward;
370 }
371
372 /* This is an inner range, so this node cannot be NULL. */
373 serverAssert(x != NULL);
374
375 /* Check if score >= min. */
376 if (!zslValueGteMin(x->score,range)) return NULL;
377 return x;
378}
379
380/* Delete all the elements with score between min and max from the skiplist.
381 * Both min and max can be inclusive or exclusive (see range->minex and
382 * range->maxex). When inclusive a score >= min && score <= max is deleted.
383 * Note that this function takes the reference to the hash table view of the
384 * sorted set, in order to remove the elements from the hash table too. */
385unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
386 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
387 unsigned long removed = 0;
388 int i;
389
390 x = zsl->header;
391 for (i = zsl->level-1; i >= 0; i--) {
392 while (x->level[i].forward &&
393 !zslValueGteMin(x->level[i].forward->score, range))
394 x = x->level[i].forward;
395 update[i] = x;
396 }
397
398 /* Current node is the last with score < or <= min. */
399 x = x->level[0].forward;
400
401 /* Delete nodes while in range. */
402 while (x && zslValueLteMax(x->score, range)) {
403 zskiplistNode *next = x->level[0].forward;
404 zslDeleteNode(zsl,x,update);
405 dictDelete(dict,x->ele);
406 zslFreeNode(x); /* Here is where x->ele is actually released. */
407 removed++;
408 x = next;
409 }
410 return removed;
411}
412
413unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
414 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
415 unsigned long removed = 0;
416 int i;
417
418
419 x = zsl->header;
420 for (i = zsl->level-1; i >= 0; i--) {
421 while (x->level[i].forward &&
422 !zslLexValueGteMin(x->level[i].forward->ele,range))
423 x = x->level[i].forward;
424 update[i] = x;
425 }
426
427 /* Current node is the last with score < or <= min. */
428 x = x->level[0].forward;
429
430 /* Delete nodes while in range. */
431 while (x && zslLexValueLteMax(x->ele,range)) {
432 zskiplistNode *next = x->level[0].forward;
433 zslDeleteNode(zsl,x,update);
434 dictDelete(dict,x->ele);
435 zslFreeNode(x); /* Here is where x->ele is actually released. */
436 removed++;
437 x = next;
438 }
439 return removed;
440}
441
442/* Delete all the elements with rank between start and end from the skiplist.
443 * Start and end are inclusive. Note that start and end need to be 1-based */
444unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
445 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
446 unsigned long traversed = 0, removed = 0;
447 int i;
448
449 x = zsl->header;
450 for (i = zsl->level-1; i >= 0; i--) {
451 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
452 traversed += x->level[i].span;
453 x = x->level[i].forward;
454 }
455 update[i] = x;
456 }
457
458 traversed++;
459 x = x->level[0].forward;
460 while (x && traversed <= end) {
461 zskiplistNode *next = x->level[0].forward;
462 zslDeleteNode(zsl,x,update);
463 dictDelete(dict,x->ele);
464 zslFreeNode(x);
465 removed++;
466 traversed++;
467 x = next;
468 }
469 return removed;
470}
471
472/* Find the rank for an element by both score and key.
473 * Returns 0 when the element cannot be found, rank otherwise.
474 * Note that the rank is 1-based due to the span of zsl->header to the
475 * first element. */
476unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
477 zskiplistNode *x;
478 unsigned long rank = 0;
479 int i;
480
481 x = zsl->header;
482 for (i = zsl->level-1; i >= 0; i--) {
483 while (x->level[i].forward &&
484 (x->level[i].forward->score < score ||
485 (x->level[i].forward->score == score &&
486 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
487 rank += x->level[i].span;
488 x = x->level[i].forward;
489 }
490
491 /* x might be equal to zsl->header, so test if obj is non-NULL */
492 if (x->ele && x->score == score && sdscmp(x->ele,ele) == 0) {
493 return rank;
494 }
495 }
496 return 0;
497}
498
499/* Finds an element by its rank. The rank argument needs to be 1-based. */
500zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
501 zskiplistNode *x;
502 unsigned long traversed = 0;
503 int i;
504
505 x = zsl->header;
506 for (i = zsl->level-1; i >= 0; i--) {
507 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
508 {
509 traversed += x->level[i].span;
510 x = x->level[i].forward;
511 }
512 if (traversed == rank) {
513 return x;
514 }
515 }
516 return NULL;
517}
518
519/* Populate the rangespec according to the objects min and max. */
520static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
521 char *eptr;
522 spec->minex = spec->maxex = 0;
523
524 /* Parse the min-max interval. If one of the values is prefixed
525 * by the "(" character, it's considered "open". For instance
526 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
527 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
528 if (min->encoding == OBJ_ENCODING_INT) {
529 spec->min = (long)min->ptr;
530 } else {
531 if (((char*)min->ptr)[0] == '(') {
532 spec->min = strtod((char*)min->ptr+1,&eptr);
533 if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
534 spec->minex = 1;
535 } else {
536 spec->min = strtod((char*)min->ptr,&eptr);
537 if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
538 }
539 }
540 if (max->encoding == OBJ_ENCODING_INT) {
541 spec->max = (long)max->ptr;
542 } else {
543 if (((char*)max->ptr)[0] == '(') {
544 spec->max = strtod((char*)max->ptr+1,&eptr);
545 if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
546 spec->maxex = 1;
547 } else {
548 spec->max = strtod((char*)max->ptr,&eptr);
549 if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
550 }
551 }
552
553 return C_OK;
554}
555
556/* ------------------------ Lexicographic ranges ---------------------------- */
557
558/* Parse max or min argument of ZRANGEBYLEX.
559 * (foo means foo (open interval)
560 * [foo means foo (closed interval)
561 * - means the min string possible
562 * + means the max string possible
563 *
564 * If the string is valid the *dest pointer is set to the redis object
565 * that will be used for the comparison, and ex will be set to 0 or 1
566 * respectively if the item is exclusive or inclusive. C_OK will be
567 * returned.
568 *
569 * If the string is not a valid range C_ERR is returned, and the value
570 * of *dest and *ex is undefined. */
571int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
572 char *c = item->ptr;
573
574 switch(c[0]) {
575 case '+':
576 if (c[1] != '\0') return C_ERR;
577 *ex = 1;
578 *dest = shared.maxstring;
579 return C_OK;
580 case '-':
581 if (c[1] != '\0') return C_ERR;
582 *ex = 1;
583 *dest = shared.minstring;
584 return C_OK;
585 case '(':
586 *ex = 1;
587 *dest = sdsnewlen(c+1,sdslen(c)-1);
588 return C_OK;
589 case '[':
590 *ex = 0;
591 *dest = sdsnewlen(c+1,sdslen(c)-1);
592 return C_OK;
593 default:
594 return C_ERR;
595 }
596}
597
598/* Free a lex range structure, must be called only after zslParseLexRange()
599 * populated the structure with success (C_OK returned). */
600void zslFreeLexRange(zlexrangespec *spec) {
601 if (spec->min != shared.minstring &&
602 spec->min != shared.maxstring) sdsfree(spec->min);
603 if (spec->max != shared.minstring &&
604 spec->max != shared.maxstring) sdsfree(spec->max);
605}
606
607/* Populate the lex rangespec according to the objects min and max.
608 *
609 * Return C_OK on success. On error C_ERR is returned.
610 * When OK is returned the structure must be freed with zslFreeLexRange(),
611 * otherwise no release is needed. */
612int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
613 /* The range can't be valid if objects are integer encoded.
614 * Every item must start with ( or [. */
615 if (min->encoding == OBJ_ENCODING_INT ||
616 max->encoding == OBJ_ENCODING_INT) return C_ERR;
617
618 spec->min = spec->max = NULL;
619 if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
620 zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
621 zslFreeLexRange(spec);
622 return C_ERR;
623 } else {
624 return C_OK;
625 }
626}
627
628/* This is just a wrapper to sdscmp() that is able to
629 * handle shared.minstring and shared.maxstring as the equivalent of
630 * -inf and +inf for strings */
631int sdscmplex(sds a, sds b) {
632 if (a == b) return 0;
633 if (a == shared.minstring || b == shared.maxstring) return -1;
634 if (a == shared.maxstring || b == shared.minstring) return 1;
635 return sdscmp(a,b);
636}
637
638int zslLexValueGteMin(sds value, zlexrangespec *spec) {
639 return spec->minex ?
640 (sdscmplex(value,spec->min) > 0) :
641 (sdscmplex(value,spec->min) >= 0);
642}
643
644int zslLexValueLteMax(sds value, zlexrangespec *spec) {
645 return spec->maxex ?
646 (sdscmplex(value,spec->max) < 0) :
647 (sdscmplex(value,spec->max) <= 0);
648}
649
650/* Returns if there is a part of the zset is in the lex range. */
651int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
652 zskiplistNode *x;
653
654 /* Test for ranges that will always be empty. */
655 int cmp = sdscmplex(range->min,range->max);
656 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
657 return 0;
658 x = zsl->tail;
659 if (x == NULL || !zslLexValueGteMin(x->ele,range))
660 return 0;
661 x = zsl->header->level[0].forward;
662 if (x == NULL || !zslLexValueLteMax(x->ele,range))
663 return 0;
664 return 1;
665}
666
667/* Find the first node that is contained in the specified lex range.
668 * Returns NULL when no element is contained in the range. */
669zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
670 zskiplistNode *x;
671 int i;
672
673 /* If everything is out of range, return early. */
674 if (!zslIsInLexRange(zsl,range)) return NULL;
675
676 x = zsl->header;
677 for (i = zsl->level-1; i >= 0; i--) {
678 /* Go forward while *OUT* of range. */
679 while (x->level[i].forward &&
680 !zslLexValueGteMin(x->level[i].forward->ele,range))
681 x = x->level[i].forward;
682 }
683
684 /* This is an inner range, so the next node cannot be NULL. */
685 x = x->level[0].forward;
686 serverAssert(x != NULL);
687
688 /* Check if score <= max. */
689 if (!zslLexValueLteMax(x->ele,range)) return NULL;
690 return x;
691}
692
693/* Find the last node that is contained in the specified range.
694 * Returns NULL when no element is contained in the range. */
695zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
696 zskiplistNode *x;
697 int i;
698
699 /* If everything is out of range, return early. */
700 if (!zslIsInLexRange(zsl,range)) return NULL;
701
702 x = zsl->header;
703 for (i = zsl->level-1; i >= 0; i--) {
704 /* Go forward while *IN* range. */
705 while (x->level[i].forward &&
706 zslLexValueLteMax(x->level[i].forward->ele,range))
707 x = x->level[i].forward;
708 }
709
710 /* This is an inner range, so this node cannot be NULL. */
711 serverAssert(x != NULL);
712
713 /* Check if score >= min. */
714 if (!zslLexValueGteMin(x->ele,range)) return NULL;
715 return x;
716}
717
718/*-----------------------------------------------------------------------------
719 * Listpack-backed sorted set API
720 *----------------------------------------------------------------------------*/
721
722double zzlStrtod(unsigned char *vstr, unsigned int vlen) {
723 char buf[128];
724 if (vlen > sizeof(buf) - 1)
725 vlen = sizeof(buf) - 1;
726 memcpy(buf,vstr,vlen);
727 buf[vlen] = '\0';
728 return strtod(buf,NULL);
729 }
730
731double zzlGetScore(unsigned char *sptr) {
732 unsigned char *vstr;
733 unsigned int vlen;
734 long long vlong;
735 double score;
736
737 serverAssert(sptr != NULL);
738 vstr = lpGetValue(sptr,&vlen,&vlong);
739
740 if (vstr) {
741 score = zzlStrtod(vstr,vlen);
742 } else {
743 score = vlong;
744 }
745
746 return score;
747}
748
749/* Return a listpack element as an SDS string. */
750sds lpGetObject(unsigned char *sptr) {
751 unsigned char *vstr;
752 unsigned int vlen;
753 long long vlong;
754
755 serverAssert(sptr != NULL);
756 vstr = lpGetValue(sptr,&vlen,&vlong);
757
758 if (vstr) {
759 return sdsnewlen((char*)vstr,vlen);
760 } else {
761 return sdsfromlonglong(vlong);
762 }
763}
764
765/* Compare element in sorted set with given element. */
766int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
767 unsigned char *vstr;
768 unsigned int vlen;
769 long long vlong;
770 unsigned char vbuf[32];
771 int minlen, cmp;
772
773 vstr = lpGetValue(eptr,&vlen,&vlong);
774 if (vstr == NULL) {
775 /* Store string representation of long long in buf. */
776 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
777 vstr = vbuf;
778 }
779
780 minlen = (vlen < clen) ? vlen : clen;
781 cmp = memcmp(vstr,cstr,minlen);
782 if (cmp == 0) return vlen-clen;
783 return cmp;
784}
785
786unsigned int zzlLength(unsigned char *zl) {
787 return lpLength(zl)/2;
788}
789
790/* Move to next entry based on the values in eptr and sptr. Both are set to
791 * NULL when there is no next entry. */
792void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
793 unsigned char *_eptr, *_sptr;
794 serverAssert(*eptr != NULL && *sptr != NULL);
795
796 _eptr = lpNext(zl,*sptr);
797 if (_eptr != NULL) {
798 _sptr = lpNext(zl,_eptr);
799 serverAssert(_sptr != NULL);
800 } else {
801 /* No next entry. */
802 _sptr = NULL;
803 }
804
805 *eptr = _eptr;
806 *sptr = _sptr;
807}
808
809/* Move to the previous entry based on the values in eptr and sptr. Both are
810 * set to NULL when there is no prev entry. */
811void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
812 unsigned char *_eptr, *_sptr;
813 serverAssert(*eptr != NULL && *sptr != NULL);
814
815 _sptr = lpPrev(zl,*eptr);
816 if (_sptr != NULL) {
817 _eptr = lpPrev(zl,_sptr);
818 serverAssert(_eptr != NULL);
819 } else {
820 /* No previous entry. */
821 _eptr = NULL;
822 }
823
824 *eptr = _eptr;
825 *sptr = _sptr;
826}
827
828/* Returns if there is a part of the zset is in range. Should only be used
829 * internally by zzlFirstInRange and zzlLastInRange. */
830int zzlIsInRange(unsigned char *zl, zrangespec *range) {
831 unsigned char *p;
832 double score;
833
834 /* Test for ranges that will always be empty. */
835 if (range->min > range->max ||
836 (range->min == range->max && (range->minex || range->maxex)))
837 return 0;
838
839 p = lpSeek(zl,-1); /* Last score. */
840 if (p == NULL) return 0; /* Empty sorted set */
841 score = zzlGetScore(p);
842 if (!zslValueGteMin(score,range))
843 return 0;
844
845 p = lpSeek(zl,1); /* First score. */
846 serverAssert(p != NULL);
847 score = zzlGetScore(p);
848 if (!zslValueLteMax(score,range))
849 return 0;
850
851 return 1;
852}
853
854/* Find pointer to the first element contained in the specified range.
855 * Returns NULL when no element is contained in the range. */
856unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
857 unsigned char *eptr = lpSeek(zl,0), *sptr;
858 double score;
859
860 /* If everything is out of range, return early. */
861 if (!zzlIsInRange(zl,range)) return NULL;
862
863 while (eptr != NULL) {
864 sptr = lpNext(zl,eptr);
865 serverAssert(sptr != NULL);
866
867 score = zzlGetScore(sptr);
868 if (zslValueGteMin(score,range)) {
869 /* Check if score <= max. */
870 if (zslValueLteMax(score,range))
871 return eptr;
872 return NULL;
873 }
874
875 /* Move to next element. */
876 eptr = lpNext(zl,sptr);
877 }
878
879 return NULL;
880}
881
882/* Find pointer to the last element contained in the specified range.
883 * Returns NULL when no element is contained in the range. */
884unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
885 unsigned char *eptr = lpSeek(zl,-2), *sptr;
886 double score;
887
888 /* If everything is out of range, return early. */
889 if (!zzlIsInRange(zl,range)) return NULL;
890
891 while (eptr != NULL) {
892 sptr = lpNext(zl,eptr);
893 serverAssert(sptr != NULL);
894
895 score = zzlGetScore(sptr);
896 if (zslValueLteMax(score,range)) {
897 /* Check if score >= min. */
898 if (zslValueGteMin(score,range))
899 return eptr;
900 return NULL;
901 }
902
903 /* Move to previous element by moving to the score of previous element.
904 * When this returns NULL, we know there also is no element. */
905 sptr = lpPrev(zl,eptr);
906 if (sptr != NULL)
907 serverAssert((eptr = lpPrev(zl,sptr)) != NULL);
908 else
909 eptr = NULL;
910 }
911
912 return NULL;
913}
914
915int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
916 sds value = lpGetObject(p);
917 int res = zslLexValueGteMin(value,spec);
918 sdsfree(value);
919 return res;
920}
921
922int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
923 sds value = lpGetObject(p);
924 int res = zslLexValueLteMax(value,spec);
925 sdsfree(value);
926 return res;
927}
928
929/* Returns if there is a part of the zset is in range. Should only be used
930 * internally by zzlFirstInLexRange and zzlLastInLexRange. */
931int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
932 unsigned char *p;
933
934 /* Test for ranges that will always be empty. */
935 int cmp = sdscmplex(range->min,range->max);
936 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
937 return 0;
938
939 p = lpSeek(zl,-2); /* Last element. */
940 if (p == NULL) return 0;
941 if (!zzlLexValueGteMin(p,range))
942 return 0;
943
944 p = lpSeek(zl,0); /* First element. */
945 serverAssert(p != NULL);
946 if (!zzlLexValueLteMax(p,range))
947 return 0;
948
949 return 1;
950}
951
952/* Find pointer to the first element contained in the specified lex range.
953 * Returns NULL when no element is contained in the range. */
954unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
955 unsigned char *eptr = lpSeek(zl,0), *sptr;
956
957 /* If everything is out of range, return early. */
958 if (!zzlIsInLexRange(zl,range)) return NULL;
959
960 while (eptr != NULL) {
961 if (zzlLexValueGteMin(eptr,range)) {
962 /* Check if score <= max. */
963 if (zzlLexValueLteMax(eptr,range))
964 return eptr;
965 return NULL;
966 }
967
968 /* Move to next element. */
969 sptr = lpNext(zl,eptr); /* This element score. Skip it. */
970 serverAssert(sptr != NULL);
971 eptr = lpNext(zl,sptr); /* Next element. */
972 }
973
974 return NULL;
975}
976
977/* Find pointer to the last element contained in the specified lex range.
978 * Returns NULL when no element is contained in the range. */
979unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
980 unsigned char *eptr = lpSeek(zl,-2), *sptr;
981
982 /* If everything is out of range, return early. */
983 if (!zzlIsInLexRange(zl,range)) return NULL;
984
985 while (eptr != NULL) {
986 if (zzlLexValueLteMax(eptr,range)) {
987 /* Check if score >= min. */
988 if (zzlLexValueGteMin(eptr,range))
989 return eptr;
990 return NULL;
991 }
992
993 /* Move to previous element by moving to the score of previous element.
994 * When this returns NULL, we know there also is no element. */
995 sptr = lpPrev(zl,eptr);
996 if (sptr != NULL)
997 serverAssert((eptr = lpPrev(zl,sptr)) != NULL);
998 else
999 eptr = NULL;
1000 }
1001
1002 return NULL;
1003}
1004
1005unsigned char *zzlFind(unsigned char *lp, sds ele, double *score) {
1006 unsigned char *eptr, *sptr;
1007
1008 if ((eptr = lpFirst(lp)) == NULL) return NULL;
1009 eptr = lpFind(lp, eptr, (unsigned char*)ele, sdslen(ele), 1);
1010 if (eptr) {
1011 sptr = lpNext(lp,eptr);
1012 serverAssert(sptr != NULL);
1013
1014 /* Matching element, pull out score. */
1015 if (score != NULL) *score = zzlGetScore(sptr);
1016 return eptr;
1017 }
1018
1019 return NULL;
1020}
1021
1022/* Delete (element,score) pair from listpack. Use local copy of eptr because we
1023 * don't want to modify the one given as argument. */
1024unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
1025 return lpDeleteRangeWithEntry(zl,&eptr,2);
1026}
1027
1028unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
1029 unsigned char *sptr;
1030 char scorebuf[MAX_D2STRING_CHARS];
1031 int scorelen;
1032 long long lscore;
1033 int score_is_long = double2ll(score, &lscore);
1034 if (!score_is_long)
1035 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
1036 if (eptr == NULL) {
1037 zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele));
1038 if (score_is_long)
1039 zl = lpAppendInteger(zl,lscore);
1040 else
1041 zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen);
1042 } else {
1043 /* Insert member before the element 'eptr'. */
1044 zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr);
1045
1046 /* Insert score after the member. */
1047 if (score_is_long)
1048 zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL);
1049 else
1050 zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL);
1051 }
1052 return zl;
1053}
1054
1055/* Insert (element,score) pair in listpack. This function assumes the element is
1056 * not yet present in the list. */
1057unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
1058 unsigned char *eptr = lpSeek(zl,0), *sptr;
1059 double s;
1060
1061 while (eptr != NULL) {
1062 sptr = lpNext(zl,eptr);
1063 serverAssert(sptr != NULL);
1064 s = zzlGetScore(sptr);
1065
1066 if (s > score) {
1067 /* First element with score larger than score for element to be
1068 * inserted. This means we should take its spot in the list to
1069 * maintain ordering. */
1070 zl = zzlInsertAt(zl,eptr,ele,score);
1071 break;
1072 } else if (s == score) {
1073 /* Ensure lexicographical ordering for elements. */
1074 if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
1075 zl = zzlInsertAt(zl,eptr,ele,score);
1076 break;
1077 }
1078 }
1079
1080 /* Move to next element. */
1081 eptr = lpNext(zl,sptr);
1082 }
1083
1084 /* Push on tail of list when it was not yet inserted. */
1085 if (eptr == NULL)
1086 zl = zzlInsertAt(zl,NULL,ele,score);
1087 return zl;
1088}
1089
1090unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1091 unsigned char *eptr, *sptr;
1092 double score;
1093 unsigned long num = 0;
1094
1095 if (deleted != NULL) *deleted = 0;
1096
1097 eptr = zzlFirstInRange(zl,range);
1098 if (eptr == NULL) return zl;
1099
1100 /* When the tail of the listpack is deleted, eptr will be NULL. */
1101 while (eptr && (sptr = lpNext(zl,eptr)) != NULL) {
1102 score = zzlGetScore(sptr);
1103 if (zslValueLteMax(score,range)) {
1104 /* Delete both the element and the score. */
1105 zl = lpDeleteRangeWithEntry(zl,&eptr,2);
1106 num++;
1107 } else {
1108 /* No longer in range. */
1109 break;
1110 }
1111 }
1112
1113 if (deleted != NULL) *deleted = num;
1114 return zl;
1115}
1116
1117unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1118 unsigned char *eptr, *sptr;
1119 unsigned long num = 0;
1120
1121 if (deleted != NULL) *deleted = 0;
1122
1123 eptr = zzlFirstInLexRange(zl,range);
1124 if (eptr == NULL) return zl;
1125
1126 /* When the tail of the listpack is deleted, eptr will be NULL. */
1127 while (eptr && (sptr = lpNext(zl,eptr)) != NULL) {
1128 if (zzlLexValueLteMax(eptr,range)) {
1129 /* Delete both the element and the score. */
1130 zl = lpDeleteRangeWithEntry(zl,&eptr,2);
1131 num++;
1132 } else {
1133 /* No longer in range. */
1134 break;
1135 }
1136 }
1137
1138 if (deleted != NULL) *deleted = num;
1139 return zl;
1140}
1141
1142/* Delete all the elements with rank between start and end from the skiplist.
1143 * Start and end are inclusive. Note that start and end need to be 1-based */
1144unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1145 unsigned int num = (end-start)+1;
1146 if (deleted) *deleted = num;
1147 zl = lpDeleteRange(zl,2*(start-1),2*num);
1148 return zl;
1149}
1150
1151/*-----------------------------------------------------------------------------
1152 * Common sorted set API
1153 *----------------------------------------------------------------------------*/
1154
1155unsigned long zsetLength(const robj *zobj) {
1156 unsigned long length = 0;
1157 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1158 length = zzlLength(zobj->ptr);
1159 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1160 length = ((const zset*)zobj->ptr)->zsl->length;
1161 } else {
1162 serverPanic("Unknown sorted set encoding");
1163 }
1164 return length;
1165}
1166
1167void zsetConvert(robj *zobj, int encoding) {
1168 zset *zs;
1169 zskiplistNode *node, *next;
1170 sds ele;
1171 double score;
1172
1173 if (zobj->encoding == encoding) return;
1174 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1175 unsigned char *zl = zobj->ptr;
1176 unsigned char *eptr, *sptr;
1177 unsigned char *vstr;
1178 unsigned int vlen;
1179 long long vlong;
1180
1181 if (encoding != OBJ_ENCODING_SKIPLIST)
1182 serverPanic("Unknown target encoding");
1183
1184 zs = zmalloc(sizeof(*zs));
1185 zs->dict = dictCreate(&zsetDictType);
1186 zs->zsl = zslCreate();
1187
1188 eptr = lpSeek(zl,0);
1189 if (eptr != NULL) {
1190 sptr = lpNext(zl,eptr);
1191 serverAssertWithInfo(NULL,zobj,sptr != NULL);
1192 }
1193
1194 while (eptr != NULL) {
1195 score = zzlGetScore(sptr);
1196 vstr = lpGetValue(eptr,&vlen,&vlong);
1197 if (vstr == NULL)
1198 ele = sdsfromlonglong(vlong);
1199 else
1200 ele = sdsnewlen((char*)vstr,vlen);
1201
1202 node = zslInsert(zs->zsl,score,ele);
1203 serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
1204 zzlNext(zl,&eptr,&sptr);
1205 }
1206
1207 zfree(zobj->ptr);
1208 zobj->ptr = zs;
1209 zobj->encoding = OBJ_ENCODING_SKIPLIST;
1210 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1211 unsigned char *zl = lpNew(0);
1212
1213 if (encoding != OBJ_ENCODING_LISTPACK)
1214 serverPanic("Unknown target encoding");
1215
1216 /* Approach similar to zslFree(), since we want to free the skiplist at
1217 * the same time as creating the listpack. */
1218 zs = zobj->ptr;
1219 dictRelease(zs->dict);
1220 node = zs->zsl->header->level[0].forward;
1221 zfree(zs->zsl->header);
1222 zfree(zs->zsl);
1223
1224 while (node) {
1225 zl = zzlInsertAt(zl,NULL,node->ele,node->score);
1226 next = node->level[0].forward;
1227 zslFreeNode(node);
1228 node = next;
1229 }
1230
1231 zfree(zs);
1232 zobj->ptr = zl;
1233 zobj->encoding = OBJ_ENCODING_LISTPACK;
1234 } else {
1235 serverPanic("Unknown sorted set encoding");
1236 }
1237}
1238
1239/* Convert the sorted set object into a listpack if it is not already a listpack
1240 * and if the number of elements and the maximum element size and total elements size
1241 * are within the expected ranges. */
1242void zsetConvertToListpackIfNeeded(robj *zobj, size_t maxelelen, size_t totelelen) {
1243 if (zobj->encoding == OBJ_ENCODING_LISTPACK) return;
1244 zset *zset = zobj->ptr;
1245
1246 if (zset->zsl->length <= server.zset_max_listpack_entries &&
1247 maxelelen <= server.zset_max_listpack_value &&
1248 lpSafeToAdd(NULL, totelelen))
1249 {
1250 zsetConvert(zobj,OBJ_ENCODING_LISTPACK);
1251 }
1252}
1253
1254/* Return (by reference) the score of the specified member of the sorted set
1255 * storing it into *score. If the element does not exist C_ERR is returned
1256 * otherwise C_OK is returned and *score is correctly populated.
1257 * If 'zobj' or 'member' is NULL, C_ERR is returned. */
1258int zsetScore(robj *zobj, sds member, double *score) {
1259 if (!zobj || !member) return C_ERR;
1260
1261 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1262 if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
1263 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1264 zset *zs = zobj->ptr;
1265 dictEntry *de = dictFind(zs->dict, member);
1266 if (de == NULL) return C_ERR;
1267 *score = *(double*)dictGetVal(de);
1268 } else {
1269 serverPanic("Unknown sorted set encoding");
1270 }
1271 return C_OK;
1272}
1273
1274/* Add a new element or update the score of an existing element in a sorted
1275 * set, regardless of its encoding.
1276 *
1277 * The set of flags change the command behavior.
1278 *
1279 * The input flags are the following:
1280 *
1281 * ZADD_INCR: Increment the current element score by 'score' instead of updating
1282 * the current element score. If the element does not exist, we
1283 * assume 0 as previous score.
1284 * ZADD_NX: Perform the operation only if the element does not exist.
1285 * ZADD_XX: Perform the operation only if the element already exist.
1286 * ZADD_GT: Perform the operation on existing elements only if the new score is
1287 * greater than the current score.
1288 * ZADD_LT: Perform the operation on existing elements only if the new score is
1289 * less than the current score.
1290 *
1291 * When ZADD_INCR is used, the new score of the element is stored in
1292 * '*newscore' if 'newscore' is not NULL.
1293 *
1294 * The returned flags are the following:
1295 *
1296 * ZADD_NAN: The resulting score is not a number.
1297 * ZADD_ADDED: The element was added (not present before the call).
1298 * ZADD_UPDATED: The element score was updated.
1299 * ZADD_NOP: No operation was performed because of NX or XX.
1300 *
1301 * Return value:
1302 *
1303 * The function returns 1 on success, and sets the appropriate flags
1304 * ADDED or UPDATED to signal what happened during the operation (note that
1305 * none could be set if we re-added an element using the same score it used
1306 * to have, or in the case a zero increment is used).
1307 *
1308 * The function returns 0 on error, currently only when the increment
1309 * produces a NAN condition, or when the 'score' value is NAN since the
1310 * start.
1311 *
1312 * The command as a side effect of adding a new element may convert the sorted
1313 * set internal encoding from listpack to hashtable+skiplist.
1314 *
1315 * Memory management of 'ele':
1316 *
1317 * The function does not take ownership of the 'ele' SDS string, but copies
1318 * it if needed. */
1319int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
1320 /* Turn options into simple to check vars. */
1321 int incr = (in_flags & ZADD_IN_INCR) != 0;
1322 int nx = (in_flags & ZADD_IN_NX) != 0;
1323 int xx = (in_flags & ZADD_IN_XX) != 0;
1324 int gt = (in_flags & ZADD_IN_GT) != 0;
1325 int lt = (in_flags & ZADD_IN_LT) != 0;
1326 *out_flags = 0; /* We'll return our response flags. */
1327 double curscore;
1328
1329 /* NaN as input is an error regardless of all the other parameters. */
1330 if (isnan(score)) {
1331 *out_flags = ZADD_OUT_NAN;
1332 return 0;
1333 }
1334
1335 /* Update the sorted set according to its encoding. */
1336 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1337 unsigned char *eptr;
1338
1339 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1340 /* NX? Return, same element already exists. */
1341 if (nx) {
1342 *out_flags |= ZADD_OUT_NOP;
1343 return 1;
1344 }
1345
1346 /* Prepare the score for the increment if needed. */
1347 if (incr) {
1348 score += curscore;
1349 if (isnan(score)) {
1350 *out_flags |= ZADD_OUT_NAN;
1351 return 0;
1352 }
1353 }
1354
1355 /* GT/LT? Only update if score is greater/less than current. */
1356 if ((lt && score >= curscore) || (gt && score <= curscore)) {
1357 *out_flags |= ZADD_OUT_NOP;
1358 return 1;
1359 }
1360
1361 if (newscore) *newscore = score;
1362
1363 /* Remove and re-insert when score changed. */
1364 if (score != curscore) {
1365 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1366 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1367 *out_flags |= ZADD_OUT_UPDATED;
1368 }
1369 return 1;
1370 } else if (!xx) {
1371 /* check if the element is too large or the list
1372 * becomes too long *before* executing zzlInsert. */
1373 if (zzlLength(zobj->ptr)+1 > server.zset_max_listpack_entries ||
1374 sdslen(ele) > server.zset_max_listpack_value ||
1375 !lpSafeToAdd(zobj->ptr, sdslen(ele)))
1376 {
1377 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1378 } else {
1379 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1380 if (newscore) *newscore = score;
1381 *out_flags |= ZADD_OUT_ADDED;
1382 return 1;
1383 }
1384 } else {
1385 *out_flags |= ZADD_OUT_NOP;
1386 return 1;
1387 }
1388 }
1389
1390 /* Note that the above block handling listpack would have either returned or
1391 * converted the key to skiplist. */
1392 if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1393 zset *zs = zobj->ptr;
1394 zskiplistNode *znode;
1395 dictEntry *de;
1396
1397 de = dictFind(zs->dict,ele);
1398 if (de != NULL) {
1399 /* NX? Return, same element already exists. */
1400 if (nx) {
1401 *out_flags |= ZADD_OUT_NOP;
1402 return 1;
1403 }
1404
1405 curscore = *(double*)dictGetVal(de);
1406
1407 /* Prepare the score for the increment if needed. */
1408 if (incr) {
1409 score += curscore;
1410 if (isnan(score)) {
1411 *out_flags |= ZADD_OUT_NAN;
1412 return 0;
1413 }
1414 }
1415
1416 /* GT/LT? Only update if score is greater/less than current. */
1417 if ((lt && score >= curscore) || (gt && score <= curscore)) {
1418 *out_flags |= ZADD_OUT_NOP;
1419 return 1;
1420 }
1421
1422 if (newscore) *newscore = score;
1423
1424 /* Remove and re-insert when score changes. */
1425 if (score != curscore) {
1426 znode = zslUpdateScore(zs->zsl,curscore,ele,score);
1427 /* Note that we did not removed the original element from
1428 * the hash table representing the sorted set, so we just
1429 * update the score. */
1430 dictGetVal(de) = &znode->score; /* Update score ptr. */
1431 *out_flags |= ZADD_OUT_UPDATED;
1432 }
1433 return 1;
1434 } else if (!xx) {
1435 ele = sdsdup(ele);
1436 znode = zslInsert(zs->zsl,score,ele);
1437 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1438 *out_flags |= ZADD_OUT_ADDED;
1439 if (newscore) *newscore = score;
1440 return 1;
1441 } else {
1442 *out_flags |= ZADD_OUT_NOP;
1443 return 1;
1444 }
1445 } else {
1446 serverPanic("Unknown sorted set encoding");
1447 }
1448 return 0; /* Never reached. */
1449}
1450
1451/* Deletes the element 'ele' from the sorted set encoded as a skiplist+dict,
1452 * returning 1 if the element existed and was deleted, 0 otherwise (the
1453 * element was not there). It does not resize the dict after deleting the
1454 * element. */
1455static int zsetRemoveFromSkiplist(zset *zs, sds ele) {
1456 dictEntry *de;
1457 double score;
1458
1459 de = dictUnlink(zs->dict,ele);
1460 if (de != NULL) {
1461 /* Get the score in order to delete from the skiplist later. */
1462 score = *(double*)dictGetVal(de);
1463
1464 /* Delete from the hash table and later from the skiplist.
1465 * Note that the order is important: deleting from the skiplist
1466 * actually releases the SDS string representing the element,
1467 * which is shared between the skiplist and the hash table, so
1468 * we need to delete from the skiplist as the final step. */
1469 dictFreeUnlinkedEntry(zs->dict,de);
1470
1471 /* Delete from skiplist. */
1472 int retval = zslDelete(zs->zsl,score,ele,NULL);
1473 serverAssert(retval);
1474
1475 return 1;
1476 }
1477
1478 return 0;
1479}
1480
1481/* Delete the element 'ele' from the sorted set, returning 1 if the element
1482 * existed and was deleted, 0 otherwise (the element was not there). */
1483int zsetDel(robj *zobj, sds ele) {
1484 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1485 unsigned char *eptr;
1486
1487 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
1488 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1489 return 1;
1490 }
1491 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1492 zset *zs = zobj->ptr;
1493 if (zsetRemoveFromSkiplist(zs, ele)) {
1494 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1495 return 1;
1496 }
1497 } else {
1498 serverPanic("Unknown sorted set encoding");
1499 }
1500 return 0; /* No such element found. */
1501}
1502
1503/* Given a sorted set object returns the 0-based rank of the object or
1504 * -1 if the object does not exist.
1505 *
1506 * For rank we mean the position of the element in the sorted collection
1507 * of elements. So the first element has rank 0, the second rank 1, and so
1508 * forth up to length-1 elements.
1509 *
1510 * If 'reverse' is false, the rank is returned considering as first element
1511 * the one with the lowest score. Otherwise if 'reverse' is non-zero
1512 * the rank is computed considering as element with rank 0 the one with
1513 * the highest score. */
1514long zsetRank(robj *zobj, sds ele, int reverse) {
1515 unsigned long llen;
1516 unsigned long rank;
1517
1518 llen = zsetLength(zobj);
1519
1520 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1521 unsigned char *zl = zobj->ptr;
1522 unsigned char *eptr, *sptr;
1523
1524 eptr = lpSeek(zl,0);
1525 serverAssert(eptr != NULL);
1526 sptr = lpNext(zl,eptr);
1527 serverAssert(sptr != NULL);
1528
1529 rank = 1;
1530 while(eptr != NULL) {
1531 if (lpCompare(eptr,(unsigned char*)ele,sdslen(ele)))
1532 break;
1533 rank++;
1534 zzlNext(zl,&eptr,&sptr);
1535 }
1536
1537 if (eptr != NULL) {
1538 if (reverse)
1539 return llen-rank;
1540 else
1541 return rank-1;
1542 } else {
1543 return -1;
1544 }
1545 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1546 zset *zs = zobj->ptr;
1547 zskiplist *zsl = zs->zsl;
1548 dictEntry *de;
1549 double score;
1550
1551 de = dictFind(zs->dict,ele);
1552 if (de != NULL) {
1553 score = *(double*)dictGetVal(de);
1554 rank = zslGetRank(zsl,score,ele);
1555 /* Existing elements always have a rank. */
1556 serverAssert(rank != 0);
1557 if (reverse)
1558 return llen-rank;
1559 else
1560 return rank-1;
1561 } else {
1562 return -1;
1563 }
1564 } else {
1565 serverPanic("Unknown sorted set encoding");
1566 }
1567}
1568
1569/* This is a helper function for the COPY command.
1570 * Duplicate a sorted set object, with the guarantee that the returned object
1571 * has the same encoding as the original one.
1572 *
1573 * The resulting object always has refcount set to 1 */
1574robj *zsetDup(robj *o) {
1575 robj *zobj;
1576 zset *zs;
1577 zset *new_zs;
1578
1579 serverAssert(o->type == OBJ_ZSET);
1580
1581 /* Create a new sorted set object that have the same encoding as the original object's encoding */
1582 if (o->encoding == OBJ_ENCODING_LISTPACK) {
1583 unsigned char *zl = o->ptr;
1584 size_t sz = lpBytes(zl);
1585 unsigned char *new_zl = zmalloc(sz);
1586 memcpy(new_zl, zl, sz);
1587 zobj = createObject(OBJ_ZSET, new_zl);
1588 zobj->encoding = OBJ_ENCODING_LISTPACK;
1589 } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
1590 zobj = createZsetObject();
1591 zs = o->ptr;
1592 new_zs = zobj->ptr;
1593 dictExpand(new_zs->dict,dictSize(zs->dict));
1594 zskiplist *zsl = zs->zsl;
1595 zskiplistNode *ln;
1596 sds ele;
1597 long llen = zsetLength(o);
1598
1599 /* We copy the skiplist elements from the greatest to the
1600 * smallest (that's trivial since the elements are already ordered in
1601 * the skiplist): this improves the load process, since the next loaded
1602 * element will always be the smaller, so adding to the skiplist
1603 * will always immediately stop at the head, making the insertion
1604 * O(1) instead of O(log(N)). */
1605 ln = zsl->tail;
1606 while (llen--) {
1607 ele = ln->ele;
1608 sds new_ele = sdsdup(ele);
1609 zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele);
1610 dictAdd(new_zs->dict,new_ele,&znode->score);
1611 ln = ln->backward;
1612 }
1613 } else {
1614 serverPanic("Unknown sorted set encoding");
1615 }
1616 return zobj;
1617}
1618
1619/* Create a new sds string from the listpack entry. */
1620sds zsetSdsFromListpackEntry(listpackEntry *e) {
1621 return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval);
1622}
1623
1624/* Reply with bulk string from the listpack entry. */
1625void zsetReplyFromListpackEntry(client *c, listpackEntry *e) {
1626 if (e->sval)
1627 addReplyBulkCBuffer(c, e->sval, e->slen);
1628 else
1629 addReplyBulkLongLong(c, e->lval);
1630}
1631
1632
1633/* Return random element from a non empty zset.
1634 * 'key' and 'val' will be set to hold the element.
1635 * The memory in `key` is not to be freed or modified by the caller.
1636 * 'score' can be NULL in which case it's not extracted. */
1637void zsetTypeRandomElement(robj *zsetobj, unsigned long zsetsize, listpackEntry *key, double *score) {
1638 if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST) {
1639 zset *zs = zsetobj->ptr;
1640 dictEntry *de = dictGetFairRandomKey(zs->dict);
1641 sds s = dictGetKey(de);
1642 key->sval = (unsigned char*)s;
1643 key->slen = sdslen(s);
1644 if (score)
1645 *score = *(double*)dictGetVal(de);
1646 } else if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) {
1647 listpackEntry val;
1648 lpRandomPair(zsetobj->ptr, zsetsize, key, &val);
1649 if (score) {
1650 if (val.sval) {
1651 *score = zzlStrtod(val.sval,val.slen);
1652 } else {
1653 *score = (double)val.lval;
1654 }
1655 }
1656 } else {
1657 serverPanic("Unknown zset encoding");
1658 }
1659}
1660
1661/*-----------------------------------------------------------------------------
1662 * Sorted set commands
1663 *----------------------------------------------------------------------------*/
1664
1665/* This generic command implements both ZADD and ZINCRBY. */
1666void zaddGenericCommand(client *c, int flags) {
1667 static char *nanerr = "resulting score is not a number (NaN)";
1668 robj *key = c->argv[1];
1669 robj *zobj;
1670 sds ele;
1671 double score = 0, *scores = NULL;
1672 int j, elements, ch = 0;
1673 int scoreidx = 0;
1674 /* The following vars are used in order to track what the command actually
1675 * did during the execution, to reply to the client and to trigger the
1676 * notification of keyspace change. */
1677 int added = 0; /* Number of new elements added. */
1678 int updated = 0; /* Number of elements with updated score. */
1679 int processed = 0; /* Number of elements processed, may remain zero with
1680 options like XX. */
1681
1682 /* Parse options. At the end 'scoreidx' is set to the argument position
1683 * of the score of the first score-element pair. */
1684 scoreidx = 2;
1685 while(scoreidx < c->argc) {
1686 char *opt = c->argv[scoreidx]->ptr;
1687 if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
1688 else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
1689 else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
1690 else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
1691 else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
1692 else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
1693 else break;
1694 scoreidx++;
1695 }
1696
1697 /* Turn options into simple to check vars. */
1698 int incr = (flags & ZADD_IN_INCR) != 0;
1699 int nx = (flags & ZADD_IN_NX) != 0;
1700 int xx = (flags & ZADD_IN_XX) != 0;
1701 int gt = (flags & ZADD_IN_GT) != 0;
1702 int lt = (flags & ZADD_IN_LT) != 0;
1703
1704 /* After the options, we expect to have an even number of args, since
1705 * we expect any number of score-element pairs. */
1706 elements = c->argc-scoreidx;
1707 if (elements % 2 || !elements) {
1708 addReplyErrorObject(c,shared.syntaxerr);
1709 return;
1710 }
1711 elements /= 2; /* Now this holds the number of score-element pairs. */
1712
1713 /* Check for incompatible options. */
1714 if (nx && xx) {
1715 addReplyError(c,
1716 "XX and NX options at the same time are not compatible");
1717 return;
1718 }
1719
1720 if ((gt && nx) || (lt && nx) || (gt && lt)) {
1721 addReplyError(c,
1722 "GT, LT, and/or NX options at the same time are not compatible");
1723 return;
1724 }
1725 /* Note that XX is compatible with either GT or LT */
1726
1727 if (incr && elements > 1) {
1728 addReplyError(c,
1729 "INCR option supports a single increment-element pair");
1730 return;
1731 }
1732
1733 /* Start parsing all the scores, we need to emit any syntax error
1734 * before executing additions to the sorted set, as the command should
1735 * either execute fully or nothing at all. */
1736 scores = zmalloc(sizeof(double)*elements);
1737 for (j = 0; j < elements; j++) {
1738 if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
1739 != C_OK) goto cleanup;
1740 }
1741
1742 /* Lookup the key and create the sorted set if does not exist. */
1743 zobj = lookupKeyWrite(c->db,key);
1744 if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
1745 if (zobj == NULL) {
1746 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1747 if (server.zset_max_listpack_entries == 0 ||
1748 server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
1749 {
1750 zobj = createZsetObject();
1751 } else {
1752 zobj = createZsetListpackObject();
1753 }
1754 dbAdd(c->db,key,zobj);
1755 }
1756
1757 for (j = 0; j < elements; j++) {
1758 double newscore;
1759 score = scores[j];
1760 int retflags = 0;
1761
1762 ele = c->argv[scoreidx+1+j*2]->ptr;
1763 int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
1764 if (retval == 0) {
1765 addReplyError(c,nanerr);
1766 goto cleanup;
1767 }
1768 if (retflags & ZADD_OUT_ADDED) added++;
1769 if (retflags & ZADD_OUT_UPDATED) updated++;
1770 if (!(retflags & ZADD_OUT_NOP)) processed++;
1771 score = newscore;
1772 }
1773 server.dirty += (added+updated);
1774
1775reply_to_client:
1776 if (incr) { /* ZINCRBY or INCR option. */
1777 if (processed)
1778 addReplyDouble(c,score);
1779 else
1780 addReplyNull(c);
1781 } else { /* ZADD. */
1782 addReplyLongLong(c,ch ? added+updated : added);
1783 }
1784
1785cleanup:
1786 zfree(scores);
1787 if (added || updated) {
1788 signalModifiedKey(c,c->db,key);
1789 notifyKeyspaceEvent(NOTIFY_ZSET,
1790 incr ? "zincr" : "zadd", key, c->db->id);
1791 }
1792}
1793
1794void zaddCommand(client *c) {
1795 zaddGenericCommand(c,ZADD_IN_NONE);
1796}
1797
1798void zincrbyCommand(client *c) {
1799 zaddGenericCommand(c,ZADD_IN_INCR);
1800}
1801
1802void zremCommand(client *c) {
1803 robj *key = c->argv[1];
1804 robj *zobj;
1805 int deleted = 0, keyremoved = 0, j;
1806
1807 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1808 checkType(c,zobj,OBJ_ZSET)) return;
1809
1810 for (j = 2; j < c->argc; j++) {
1811 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
1812 if (zsetLength(zobj) == 0) {
1813 dbDelete(c->db,key);
1814 keyremoved = 1;
1815 break;
1816 }
1817 }
1818
1819 if (deleted) {
1820 notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
1821 if (keyremoved)
1822 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1823 signalModifiedKey(c,c->db,key);
1824 server.dirty += deleted;
1825 }
1826 addReplyLongLong(c,deleted);
1827}
1828
1829typedef enum {
1830 ZRANGE_AUTO = 0,
1831 ZRANGE_RANK,
1832 ZRANGE_SCORE,
1833 ZRANGE_LEX,
1834} zrange_type;
1835
1836/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1837void zremrangeGenericCommand(client *c, zrange_type rangetype) {
1838 robj *key = c->argv[1];
1839 robj *zobj;
1840 int keyremoved = 0;
1841 unsigned long deleted = 0;
1842 zrangespec range;
1843 zlexrangespec lexrange;
1844 long start, end, llen;
1845 char *notify_type = NULL;
1846
1847 /* Step 1: Parse the range. */
1848 if (rangetype == ZRANGE_RANK) {
1849 notify_type = "zremrangebyrank";
1850 if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
1851 (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
1852 return;
1853 } else if (rangetype == ZRANGE_SCORE) {
1854 notify_type = "zremrangebyscore";
1855 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
1856 addReplyError(c,"min or max is not a float");
1857 return;
1858 }
1859 } else if (rangetype == ZRANGE_LEX) {
1860 notify_type = "zremrangebylex";
1861 if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
1862 addReplyError(c,"min or max not valid string range item");
1863 return;
1864 }
1865 } else {
1866 serverPanic("unknown rangetype %d", (int)rangetype);
1867 }
1868
1869 /* Step 2: Lookup & range sanity checks if needed. */
1870 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1871 checkType(c,zobj,OBJ_ZSET)) goto cleanup;
1872
1873 if (rangetype == ZRANGE_RANK) {
1874 /* Sanitize indexes. */
1875 llen = zsetLength(zobj);
1876 if (start < 0) start = llen+start;
1877 if (end < 0) end = llen+end;
1878 if (start < 0) start = 0;
1879
1880 /* Invariant: start >= 0, so this test will be true when end < 0.
1881 * The range is empty when start > end or start >= length. */
1882 if (start > end || start >= llen) {
1883 addReply(c,shared.czero);
1884 goto cleanup;
1885 }
1886 if (end >= llen) end = llen-1;
1887 }
1888
1889 /* Step 3: Perform the range deletion operation. */
1890 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
1891 switch(rangetype) {
1892 case ZRANGE_AUTO:
1893 case ZRANGE_RANK:
1894 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1895 break;
1896 case ZRANGE_SCORE:
1897 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1898 break;
1899 case ZRANGE_LEX:
1900 zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1901 break;
1902 }
1903 if (zzlLength(zobj->ptr) == 0) {
1904 dbDelete(c->db,key);
1905 keyremoved = 1;
1906 }
1907 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1908 zset *zs = zobj->ptr;
1909 switch(rangetype) {
1910 case ZRANGE_AUTO:
1911 case ZRANGE_RANK:
1912 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1913 break;
1914 case ZRANGE_SCORE:
1915 deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1916 break;
1917 case ZRANGE_LEX:
1918 deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1919 break;
1920 }
1921 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1922 if (dictSize(zs->dict) == 0) {
1923 dbDelete(c->db,key);
1924 keyremoved = 1;
1925 }
1926 } else {
1927 serverPanic("Unknown sorted set encoding");
1928 }
1929
1930 /* Step 4: Notifications and reply. */
1931 if (deleted) {
1932 signalModifiedKey(c,c->db,key);
1933 notifyKeyspaceEvent(NOTIFY_ZSET,notify_type,key,c->db->id);
1934 if (keyremoved)
1935 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1936 }
1937 server.dirty += deleted;
1938 addReplyLongLong(c,deleted);
1939
1940cleanup:
1941 if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1942}
1943
1944void zremrangebyrankCommand(client *c) {
1945 zremrangeGenericCommand(c,ZRANGE_RANK);
1946}
1947
1948void zremrangebyscoreCommand(client *c) {
1949 zremrangeGenericCommand(c,ZRANGE_SCORE);
1950}
1951
1952void zremrangebylexCommand(client *c) {
1953 zremrangeGenericCommand(c,ZRANGE_LEX);
1954}
1955
1956typedef struct {
1957 robj *subject;
1958 int type; /* Set, sorted set */
1959 int encoding;
1960 double weight;
1961
1962 union {
1963 /* Set iterators. */
1964 union _iterset {
1965 struct {
1966 intset *is;
1967 int ii;
1968 } is;
1969 struct {
1970 dict *dict;
1971 dictIterator *di;
1972 dictEntry *de;
1973 } ht;
1974 } set;
1975
1976 /* Sorted set iterators. */
1977 union _iterzset {
1978 struct {
1979 unsigned char *zl;
1980 unsigned char *eptr, *sptr;
1981 } zl;
1982 struct {
1983 zset *zs;
1984 zskiplistNode *node;
1985 } sl;
1986 } zset;
1987 } iter;
1988} zsetopsrc;
1989
1990
1991/* Use dirty flags for pointers that need to be cleaned up in the next
1992 * iteration over the zsetopval. The dirty flag for the long long value is
1993 * special, since long long values don't need cleanup. Instead, it means that
1994 * we already checked that "ell" holds a long long, or tried to convert another
1995 * representation into a long long value. When this was successful,
1996 * OPVAL_VALID_LL is set as well. */
1997#define OPVAL_DIRTY_SDS 1
1998#define OPVAL_DIRTY_LL 2
1999#define OPVAL_VALID_LL 4
2000
2001/* Store value retrieved from the iterator. */
2002typedef struct {
2003 int flags;
2004 unsigned char _buf[32]; /* Private buffer. */
2005 sds ele;
2006 unsigned char *estr;
2007 unsigned int elen;
2008 long long ell;
2009 double score;
2010} zsetopval;
2011
2012typedef union _iterset iterset;
2013typedef union _iterzset iterzset;
2014
2015void zuiInitIterator(zsetopsrc *op) {
2016 if (op->subject == NULL)
2017 return;
2018
2019 if (op->type == OBJ_SET) {
2020 iterset *it = &op->iter.set;
2021 if (op->encoding == OBJ_ENCODING_INTSET) {
2022 it->is.is = op->subject->ptr;
2023 it->is.ii = 0;
2024 } else if (op->encoding == OBJ_ENCODING_HT) {
2025 it->ht.dict = op->subject->ptr;
2026 it->ht.di = dictGetIterator(op->subject->ptr);
2027 it->ht.de = dictNext(it->ht.di);
2028 } else {
2029 serverPanic("Unknown set encoding");
2030 }
2031 } else if (op->type == OBJ_ZSET) {
2032 /* Sorted sets are traversed in reverse order to optimize for
2033 * the insertion of the elements in a new list as in
2034 * ZDIFF/ZINTER/ZUNION */
2035 iterzset *it = &op->iter.zset;
2036 if (op->encoding == OBJ_ENCODING_LISTPACK) {
2037 it->zl.zl = op->subject->ptr;
2038 it->zl.eptr = lpSeek(it->zl.zl,-2);
2039 if (it->zl.eptr != NULL) {
2040 it->zl.sptr = lpNext(it->zl.zl,it->zl.eptr);
2041 serverAssert(it->zl.sptr != NULL);
2042 }
2043 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2044 it->sl.zs = op->subject->ptr;
2045 it->sl.node = it->sl.zs->zsl->tail;
2046 } else {
2047 serverPanic("Unknown sorted set encoding");
2048 }
2049 } else {
2050 serverPanic("Unsupported type");
2051 }
2052}
2053
2054void zuiClearIterator(zsetopsrc *op) {
2055 if (op->subject == NULL)
2056 return;
2057
2058 if (op->type == OBJ_SET) {
2059 iterset *it = &op->iter.set;
2060 if (op->encoding == OBJ_ENCODING_INTSET) {
2061 UNUSED(it); /* skip */
2062 } else if (op->encoding == OBJ_ENCODING_HT) {
2063 dictReleaseIterator(it->ht.di);
2064 } else {
2065 serverPanic("Unknown set encoding");
2066 }
2067 } else if (op->type == OBJ_ZSET) {
2068 iterzset *it = &op->iter.zset;
2069 if (op->encoding == OBJ_ENCODING_LISTPACK) {
2070 UNUSED(it); /* skip */
2071 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2072 UNUSED(it); /* skip */
2073 } else {
2074 serverPanic("Unknown sorted set encoding");
2075 }
2076 } else {
2077 serverPanic("Unsupported type");
2078 }
2079}
2080
2081void zuiDiscardDirtyValue(zsetopval *val) {
2082 if (val->flags & OPVAL_DIRTY_SDS) {
2083 sdsfree(val->ele);
2084 val->ele = NULL;
2085 val->flags &= ~OPVAL_DIRTY_SDS;
2086 }
2087}
2088
2089unsigned long zuiLength(zsetopsrc *op) {
2090 if (op->subject == NULL)
2091 return 0;
2092
2093 if (op->type == OBJ_SET) {
2094 if (op->encoding == OBJ_ENCODING_INTSET) {
2095 return intsetLen(op->subject->ptr);
2096 } else if (op->encoding == OBJ_ENCODING_HT) {
2097 dict *ht = op->subject->ptr;
2098 return dictSize(ht);
2099 } else {
2100 serverPanic("Unknown set encoding");
2101 }
2102 } else if (op->type == OBJ_ZSET) {
2103 if (op->encoding == OBJ_ENCODING_LISTPACK) {
2104 return zzlLength(op->subject->ptr);
2105 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2106 zset *zs = op->subject->ptr;
2107 return zs->zsl->length;
2108 } else {
2109 serverPanic("Unknown sorted set encoding");
2110 }
2111 } else {
2112 serverPanic("Unsupported type");
2113 }
2114}
2115
2116/* Check if the current value is valid. If so, store it in the passed structure
2117 * and move to the next element. If not valid, this means we have reached the
2118 * end of the structure and can abort. */
2119int zuiNext(zsetopsrc *op, zsetopval *val) {
2120 if (op->subject == NULL)
2121 return 0;
2122
2123 zuiDiscardDirtyValue(val);
2124
2125 memset(val,0,sizeof(zsetopval));
2126
2127 if (op->type == OBJ_SET) {
2128 iterset *it = &op->iter.set;
2129 if (op->encoding == OBJ_ENCODING_INTSET) {
2130 int64_t ell;
2131
2132 if (!intsetGet(it->is.is,it->is.ii,&ell))
2133 return 0;
2134 val->ell = ell;
2135 val->score = 1.0;
2136
2137 /* Move to next element. */
2138 it->is.ii++;
2139 } else if (op->encoding == OBJ_ENCODING_HT) {
2140 if (it->ht.de == NULL)
2141 return 0;
2142 val->ele = dictGetKey(it->ht.de);
2143 val->score = 1.0;
2144
2145 /* Move to next element. */
2146 it->ht.de = dictNext(it->ht.di);
2147 } else {
2148 serverPanic("Unknown set encoding");
2149 }
2150 } else if (op->type == OBJ_ZSET) {
2151 iterzset *it = &op->iter.zset;
2152 if (op->encoding == OBJ_ENCODING_LISTPACK) {
2153 /* No need to check both, but better be explicit. */
2154 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
2155 return 0;
2156 val->estr = lpGetValue(it->zl.eptr,&val->elen,&val->ell);
2157 val->score = zzlGetScore(it->zl.sptr);
2158
2159 /* Move to next element (going backwards, see zuiInitIterator). */
2160 zzlPrev(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
2161 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2162 if (it->sl.node == NULL)
2163 return 0;
2164 val->ele = it->sl.node->ele;
2165 val->score = it->sl.node->score;
2166
2167 /* Move to next element. (going backwards, see zuiInitIterator) */
2168 it->sl.node = it->sl.node->backward;
2169 } else {
2170 serverPanic("Unknown sorted set encoding");
2171 }
2172 } else {
2173 serverPanic("Unsupported type");
2174 }
2175 return 1;
2176}
2177
2178int zuiLongLongFromValue(zsetopval *val) {
2179 if (!(val->flags & OPVAL_DIRTY_LL)) {
2180 val->flags |= OPVAL_DIRTY_LL;
2181
2182 if (val->ele != NULL) {
2183 if (string2ll(val->ele,sdslen(val->ele),&val->ell))
2184 val->flags |= OPVAL_VALID_LL;
2185 } else if (val->estr != NULL) {
2186 if (string2ll((char*)val->estr,val->elen,&val->ell))
2187 val->flags |= OPVAL_VALID_LL;
2188 } else {
2189 /* The long long was already set, flag as valid. */
2190 val->flags |= OPVAL_VALID_LL;
2191 }
2192 }
2193 return val->flags & OPVAL_VALID_LL;
2194}
2195
2196sds zuiSdsFromValue(zsetopval *val) {
2197 if (val->ele == NULL) {
2198 if (val->estr != NULL) {
2199 val->ele = sdsnewlen((char*)val->estr,val->elen);
2200 } else {
2201 val->ele = sdsfromlonglong(val->ell);
2202 }
2203 val->flags |= OPVAL_DIRTY_SDS;
2204 }
2205 return val->ele;
2206}
2207
2208/* This is different from zuiSdsFromValue since returns a new SDS string
2209 * which is up to the caller to free. */
2210sds zuiNewSdsFromValue(zsetopval *val) {
2211 if (val->flags & OPVAL_DIRTY_SDS) {
2212 /* We have already one to return! */
2213 sds ele = val->ele;
2214 val->flags &= ~OPVAL_DIRTY_SDS;
2215 val->ele = NULL;
2216 return ele;
2217 } else if (val->ele) {
2218 return sdsdup(val->ele);
2219 } else if (val->estr) {
2220 return sdsnewlen((char*)val->estr,val->elen);
2221 } else {
2222 return sdsfromlonglong(val->ell);
2223 }
2224}
2225
2226int zuiBufferFromValue(zsetopval *val) {
2227 if (val->estr == NULL) {
2228 if (val->ele != NULL) {
2229 val->elen = sdslen(val->ele);
2230 val->estr = (unsigned char*)val->ele;
2231 } else {
2232 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
2233 val->estr = val->_buf;
2234 }
2235 }
2236 return 1;
2237}
2238
2239/* Find value pointed to by val in the source pointer to by op. When found,
2240 * return 1 and store its score in target. Return 0 otherwise. */
2241int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
2242 if (op->subject == NULL)
2243 return 0;
2244
2245 if (op->type == OBJ_SET) {
2246 if (op->encoding == OBJ_ENCODING_INTSET) {
2247 if (zuiLongLongFromValue(val) &&
2248 intsetFind(op->subject->ptr,val->ell))
2249 {
2250 *score = 1.0;
2251 return 1;
2252 } else {
2253 return 0;
2254 }
2255 } else if (op->encoding == OBJ_ENCODING_HT) {
2256 dict *ht = op->subject->ptr;
2257 zuiSdsFromValue(val);
2258 if (dictFind(ht,val->ele) != NULL) {
2259 *score = 1.0;
2260 return 1;
2261 } else {
2262 return 0;
2263 }
2264 } else {
2265 serverPanic("Unknown set encoding");
2266 }
2267 } else if (op->type == OBJ_ZSET) {
2268 zuiSdsFromValue(val);
2269
2270 if (op->encoding == OBJ_ENCODING_LISTPACK) {
2271 if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
2272 /* Score is already set by zzlFind. */
2273 return 1;
2274 } else {
2275 return 0;
2276 }
2277 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2278 zset *zs = op->subject->ptr;
2279 dictEntry *de;
2280 if ((de = dictFind(zs->dict,val->ele)) != NULL) {
2281 *score = *(double*)dictGetVal(de);
2282 return 1;
2283 } else {
2284 return 0;
2285 }
2286 } else {
2287 serverPanic("Unknown sorted set encoding");
2288 }
2289 } else {
2290 serverPanic("Unsupported type");
2291 }
2292}
2293
2294int zuiCompareByCardinality(const void *s1, const void *s2) {
2295 unsigned long first = zuiLength((zsetopsrc*)s1);
2296 unsigned long second = zuiLength((zsetopsrc*)s2);
2297 if (first > second) return 1;
2298 if (first < second) return -1;
2299 return 0;
2300}
2301
2302static int zuiCompareByRevCardinality(const void *s1, const void *s2) {
2303 return zuiCompareByCardinality(s1, s2) * -1;
2304}
2305
2306#define REDIS_AGGR_SUM 1
2307#define REDIS_AGGR_MIN 2
2308#define REDIS_AGGR_MAX 3
2309#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
2310
2311inline static void zunionInterAggregate(double *target, double val, int aggregate) {
2312 if (aggregate == REDIS_AGGR_SUM) {
2313 *target = *target + val;
2314 /* The result of adding two doubles is NaN when one variable
2315 * is +inf and the other is -inf. When these numbers are added,
2316 * we maintain the convention of the result being 0.0. */
2317 if (isnan(*target)) *target = 0.0;
2318 } else if (aggregate == REDIS_AGGR_MIN) {
2319 *target = val < *target ? val : *target;
2320 } else if (aggregate == REDIS_AGGR_MAX) {
2321 *target = val > *target ? val : *target;
2322 } else {
2323 /* safety net */
2324 serverPanic("Unknown ZUNION/INTER aggregate type");
2325 }
2326}
2327
2328static size_t zsetDictGetMaxElementLength(dict *d, size_t *totallen) {
2329 dictIterator *di;
2330 dictEntry *de;
2331 size_t maxelelen = 0;
2332
2333 di = dictGetIterator(d);
2334
2335 while((de = dictNext(di)) != NULL) {
2336 sds ele = dictGetKey(de);
2337 if (sdslen(ele) > maxelelen) maxelelen = sdslen(ele);
2338 if (totallen)
2339 (*totallen) += sdslen(ele);
2340 }
2341
2342 dictReleaseIterator(di);
2343
2344 return maxelelen;
2345}
2346
2347static void zdiffAlgorithm1(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen, size_t *totelelen) {
2348 /* DIFF Algorithm 1:
2349 *
2350 * We perform the diff by iterating all the elements of the first set,
2351 * and only adding it to the target set if the element does not exist
2352 * into all the other sets.
2353 *
2354 * This way we perform at max N*M operations, where N is the size of
2355 * the first set, and M the number of sets.
2356 *
2357 * There is also a O(K*log(K)) cost for adding the resulting elements
2358 * to the target set, where K is the final size of the target set.
2359 *
2360 * The final complexity of this algorithm is O(N*M + K*log(K)). */
2361 int j;
2362 zsetopval zval;
2363 zskiplistNode *znode;
2364 sds tmp;
2365
2366 /* With algorithm 1 it is better to order the sets to subtract
2367 * by decreasing size, so that we are more likely to find
2368 * duplicated elements ASAP. */
2369 qsort(src+1,setnum-1,sizeof(zsetopsrc),zuiCompareByRevCardinality);
2370
2371 memset(&zval, 0, sizeof(zval));
2372 zuiInitIterator(&src[0]);
2373 while (zuiNext(&src[0],&zval)) {
2374 double value;
2375 int exists = 0;
2376
2377 for (j = 1; j < setnum; j++) {
2378 /* It is not safe to access the zset we are
2379 * iterating, so explicitly check for equal object.
2380 * This check isn't really needed anymore since we already
2381 * check for a duplicate set in the zsetChooseDiffAlgorithm
2382 * function, but we're leaving it for future-proofing. */
2383 if (src[j].subject == src[0].subject ||
2384 zuiFind(&src[j],&zval,&value)) {
2385 exists = 1;
2386 break;
2387 }
2388 }
2389
2390 if (!exists) {
2391 tmp = zuiNewSdsFromValue(&zval);
2392 znode = zslInsert(dstzset->zsl,zval.score,tmp);
2393 dictAdd(dstzset->dict,tmp,&znode->score);
2394 if (sdslen(tmp) > *maxelelen) *maxelelen = sdslen(tmp);
2395 (*totelelen) += sdslen(tmp);
2396 }
2397 }
2398 zuiClearIterator(&src[0]);
2399}
2400
2401
2402static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen, size_t *totelelen) {
2403 /* DIFF Algorithm 2:
2404 *
2405 * Add all the elements of the first set to the auxiliary set.
2406 * Then remove all the elements of all the next sets from it.
2407 *
2408
2409 * This is O(L + (N-K)log(N)) where L is the sum of all the elements in every
2410 * set, N is the size of the first set, and K is the size of the result set.
2411 *
2412 * Note that from the (L-N) dict searches, (N-K) got to the zsetRemoveFromSkiplist
2413 * which costs log(N)
2414 *
2415 * There is also a O(K) cost at the end for finding the largest element
2416 * size, but this doesn't change the algorithm complexity since K < L, and
2417 * O(2L) is the same as O(L). */
2418 int j;
2419 int cardinality = 0;
2420 zsetopval zval;
2421 zskiplistNode *znode;
2422 sds tmp;
2423
2424 for (j = 0; j < setnum; j++) {
2425 if (zuiLength(&src[j]) == 0) continue;
2426
2427 memset(&zval, 0, sizeof(zval));
2428 zuiInitIterator(&src[j]);
2429 while (zuiNext(&src[j],&zval)) {
2430 if (j == 0) {
2431 tmp = zuiNewSdsFromValue(&zval);
2432 znode = zslInsert(dstzset->zsl,zval.score,tmp);
2433 dictAdd(dstzset->dict,tmp,&znode->score);
2434 cardinality++;
2435 } else {
2436 tmp = zuiSdsFromValue(&zval);
2437 if (zsetRemoveFromSkiplist(dstzset, tmp)) {
2438 cardinality--;
2439 }
2440 }
2441
2442 /* Exit if result set is empty as any additional removal
2443 * of elements will have no effect. */
2444 if (cardinality == 0) break;
2445 }
2446 zuiClearIterator(&src[j]);
2447
2448 if (cardinality == 0) break;
2449 }
2450
2451 /* Resize dict if needed after removing multiple elements */
2452 if (htNeedsResize(dstzset->dict)) dictResize(dstzset->dict);
2453
2454 /* Using this algorithm, we can't calculate the max element as we go,
2455 * we have to iterate through all elements to find the max one after. */
2456 *maxelelen = zsetDictGetMaxElementLength(dstzset->dict, totelelen);
2457}
2458
2459static int zsetChooseDiffAlgorithm(zsetopsrc *src, long setnum) {
2460 int j;
2461
2462 /* Select what DIFF algorithm to use.
2463 *
2464 * Algorithm 1 is O(N*M + K*log(K)) where N is the size of the
2465 * first set, M the total number of sets, and K is the size of the
2466 * result set.
2467 *
2468 * Algorithm 2 is O(L + (N-K)log(N)) where L is the total number of elements
2469 * in all the sets, N is the size of the first set, and K is the size of the
2470 * result set.
2471 *
2472 * We compute what is the best bet with the current input here. */
2473 long long algo_one_work = 0;
2474 long long algo_two_work = 0;
2475
2476 for (j = 0; j < setnum; j++) {
2477 /* If any other set is equal to the first set, there is nothing to be
2478 * done, since we would remove all elements anyway. */
2479 if (j > 0 && src[0].subject == src[j].subject) {
2480 return 0;
2481 }
2482
2483 algo_one_work += zuiLength(&src[0]);
2484 algo_two_work += zuiLength(&src[j]);
2485 }
2486
2487 /* Algorithm 1 has better constant times and performs less operations
2488 * if there are elements in common. Give it some advantage. */
2489 algo_one_work /= 2;
2490 return (algo_one_work <= algo_two_work) ? 1 : 2;
2491}
2492
2493static void zdiff(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen, size_t *totelelen) {
2494 /* Skip everything if the smallest input is empty. */
2495 if (zuiLength(&src[0]) > 0) {
2496 int diff_algo = zsetChooseDiffAlgorithm(src, setnum);
2497 if (diff_algo == 1) {
2498 zdiffAlgorithm1(src, setnum, dstzset, maxelelen, totelelen);
2499 } else if (diff_algo == 2) {
2500 zdiffAlgorithm2(src, setnum, dstzset, maxelelen, totelelen);
2501 } else if (diff_algo != 0) {
2502 serverPanic("Unknown algorithm");
2503 }
2504 }
2505}
2506
2507dictType setAccumulatorDictType = {
2508 dictSdsHash, /* hash function */
2509 NULL, /* key dup */
2510 NULL, /* val dup */
2511 dictSdsKeyCompare, /* key compare */
2512 NULL, /* key destructor */
2513 NULL, /* val destructor */
2514 NULL /* allow to expand */
2515};
2516
2517/* The zunionInterDiffGenericCommand() function is called in order to implement the
2518 * following commands: ZUNION, ZINTER, ZDIFF, ZUNIONSTORE, ZINTERSTORE, ZDIFFSTORE,
2519 * ZINTERCARD.
2520 *
2521 * 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER/ZDIFF command,
2522 * this value is 1, for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE command, this value is 2.
2523 *
2524 * 'op' SET_OP_INTER, SET_OP_UNION or SET_OP_DIFF.
2525 *
2526 * 'cardinality_only' is currently only applicable when 'op' is SET_OP_INTER.
2527 * Work for SINTERCARD, only return the cardinality with minimum processing and memory overheads.
2528 */
2529void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op,
2530 int cardinality_only) {
2531 int i, j;
2532 long setnum;
2533 int aggregate = REDIS_AGGR_SUM;
2534 zsetopsrc *src;
2535 zsetopval zval;
2536 sds tmp;
2537 size_t maxelelen = 0, totelelen = 0;
2538 robj *dstobj;
2539 zset *dstzset;
2540 zskiplistNode *znode;
2541 int withscores = 0;
2542 unsigned long cardinality = 0;
2543 long limit = 0; /* Stop searching after reaching the limit. 0 means unlimited. */
2544
2545 /* expect setnum input keys to be given */
2546 if ((getLongFromObjectOrReply(c, c->argv[numkeysIndex], &setnum, NULL) != C_OK))
2547 return;
2548
2549 if (setnum < 1) {
2550 addReplyErrorFormat(c,
2551 "at least 1 input key is needed for '%s' command", c->cmd->fullname);
2552 return;
2553 }
2554
2555 /* test if the expected number of keys would overflow */
2556 if (setnum > (c->argc-(numkeysIndex+1))) {
2557 addReplyErrorObject(c,shared.syntaxerr);
2558 return;
2559 }
2560
2561 /* read keys to be used for input */
2562 src = zcalloc(sizeof(zsetopsrc) * setnum);
2563 for (i = 0, j = numkeysIndex+1; i < setnum; i++, j++) {
2564 robj *obj = lookupKeyRead(c->db, c->argv[j]);
2565 if (obj != NULL) {
2566 if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
2567 zfree(src);
2568 addReplyErrorObject(c,shared.wrongtypeerr);
2569 return;
2570 }
2571
2572 src[i].subject = obj;
2573 src[i].type = obj->type;
2574 src[i].encoding = obj->encoding;
2575 } else {
2576 src[i].subject = NULL;
2577 }
2578
2579 /* Default all weights to 1. */
2580 src[i].weight = 1.0;
2581 }
2582
2583 /* parse optional extra arguments */
2584 if (j < c->argc) {
2585 int remaining = c->argc - j;
2586
2587 while (remaining) {
2588 if (op != SET_OP_DIFF && !cardinality_only &&
2589 remaining >= (setnum + 1) &&
2590 !strcasecmp(c->argv[j]->ptr,"weights"))
2591 {
2592 j++; remaining--;
2593 for (i = 0; i < setnum; i++, j++, remaining--) {
2594 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
2595 "weight value is not a float") != C_OK)
2596 {
2597 zfree(src);
2598 return;
2599 }
2600 }
2601 } else if (op != SET_OP_DIFF && !cardinality_only &&
2602 remaining >= 2 &&
2603 !strcasecmp(c->argv[j]->ptr,"aggregate"))
2604 {
2605 j++; remaining--;
2606 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2607 aggregate = REDIS_AGGR_SUM;
2608 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2609 aggregate = REDIS_AGGR_MIN;
2610 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2611 aggregate = REDIS_AGGR_MAX;
2612 } else {
2613 zfree(src);
2614 addReplyErrorObject(c,shared.syntaxerr);
2615 return;
2616 }
2617 j++; remaining--;
2618 } else if (remaining >= 1 &&
2619 !dstkey && !cardinality_only &&
2620 !strcasecmp(c->argv[j]->ptr,"withscores"))
2621 {
2622 j++; remaining--;
2623 withscores = 1;
2624 } else if (cardinality_only && remaining >= 2 &&
2625 !strcasecmp(c->argv[j]->ptr, "limit"))
2626 {
2627 j++; remaining--;
2628 if (getPositiveLongFromObjectOrReply(c, c->argv[j], &limit,
2629 "LIMIT can't be negative") != C_OK)
2630 {
2631 zfree(src);
2632 return;
2633 }
2634 j++; remaining--;
2635 } else {
2636 zfree(src);
2637 addReplyErrorObject(c,shared.syntaxerr);
2638 return;
2639 }
2640 }
2641 }
2642
2643 if (op != SET_OP_DIFF) {
2644 /* sort sets from the smallest to largest, this will improve our
2645 * algorithm's performance */
2646 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2647 }
2648
2649 dstobj = createZsetObject();
2650 dstzset = dstobj->ptr;
2651 memset(&zval, 0, sizeof(zval));
2652
2653 if (op == SET_OP_INTER) {
2654 /* Skip everything if the smallest input is empty. */
2655 if (zuiLength(&src[0]) > 0) {
2656 /* Precondition: as src[0] is non-empty and the inputs are ordered
2657 * by size, all src[i > 0] are non-empty too. */
2658 zuiInitIterator(&src[0]);
2659 while (zuiNext(&src[0],&zval)) {
2660 double score, value;
2661
2662 score = src[0].weight * zval.score;
2663 if (isnan(score)) score = 0;
2664
2665 for (j = 1; j < setnum; j++) {
2666 /* It is not safe to access the zset we are
2667 * iterating, so explicitly check for equal object. */
2668 if (src[j].subject == src[0].subject) {
2669 value = zval.score*src[j].weight;
2670 zunionInterAggregate(&score,value,aggregate);
2671 } else if (zuiFind(&src[j],&zval,&value)) {
2672 value *= src[j].weight;
2673 zunionInterAggregate(&score,value,aggregate);
2674 } else {
2675 break;
2676 }
2677 }
2678
2679 /* Only continue when present in every input. */
2680 if (j == setnum && cardinality_only) {
2681 cardinality++;
2682
2683 /* We stop the searching after reaching the limit. */
2684 if (limit && cardinality >= (unsigned long)limit) {
2685 /* Cleanup before we break the zuiNext loop. */
2686 zuiDiscardDirtyValue(&zval);
2687 break;
2688 }
2689 } else if (j == setnum) {
2690 tmp = zuiNewSdsFromValue(&zval);
2691 znode = zslInsert(dstzset->zsl,score,tmp);
2692 dictAdd(dstzset->dict,tmp,&znode->score);
2693 totelelen += sdslen(tmp);
2694 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2695 }
2696 }
2697 zuiClearIterator(&src[0]);
2698 }
2699 } else if (op == SET_OP_UNION) {
2700 dict *accumulator = dictCreate(&setAccumulatorDictType);
2701 dictIterator *di;
2702 dictEntry *de, *existing;
2703 double score;
2704
2705 if (setnum) {
2706 /* Our union is at least as large as the largest set.
2707 * Resize the dictionary ASAP to avoid useless rehashing. */
2708 dictExpand(accumulator,zuiLength(&src[setnum-1]));
2709 }
2710
2711 /* Step 1: Create a dictionary of elements -> aggregated-scores
2712 * by iterating one sorted set after the other. */
2713 for (i = 0; i < setnum; i++) {
2714 if (zuiLength(&src[i]) == 0) continue;
2715
2716 zuiInitIterator(&src[i]);
2717 while (zuiNext(&src[i],&zval)) {
2718 /* Initialize value */
2719 score = src[i].weight * zval.score;
2720 if (isnan(score)) score = 0;
2721
2722 /* Search for this element in the accumulating dictionary. */
2723 de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
2724 /* If we don't have it, we need to create a new entry. */
2725 if (!existing) {
2726 tmp = zuiNewSdsFromValue(&zval);
2727 /* Remember the longest single element encountered,
2728 * to understand if it's possible to convert to listpack
2729 * at the end. */
2730 totelelen += sdslen(tmp);
2731 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2732 /* Update the element with its initial score. */
2733 dictSetKey(accumulator, de, tmp);
2734 dictSetDoubleVal(de,score);
2735 } else {
2736 /* Update the score with the score of the new instance
2737 * of the element found in the current sorted set.
2738 *
2739 * Here we access directly the dictEntry double
2740 * value inside the union as it is a big speedup
2741 * compared to using the getDouble/setDouble API. */
2742 zunionInterAggregate(&existing->v.d,score,aggregate);
2743 }
2744 }
2745 zuiClearIterator(&src[i]);
2746 }
2747
2748 /* Step 2: convert the dictionary into the final sorted set. */
2749 di = dictGetIterator(accumulator);
2750
2751 /* We now are aware of the final size of the resulting sorted set,
2752 * let's resize the dictionary embedded inside the sorted set to the
2753 * right size, in order to save rehashing time. */
2754 dictExpand(dstzset->dict,dictSize(accumulator));
2755
2756 while((de = dictNext(di)) != NULL) {
2757 sds ele = dictGetKey(de);
2758 score = dictGetDoubleVal(de);
2759 znode = zslInsert(dstzset->zsl,score,ele);
2760 dictAdd(dstzset->dict,ele,&znode->score);
2761 }
2762 dictReleaseIterator(di);
2763 dictRelease(accumulator);
2764 } else if (op == SET_OP_DIFF) {
2765 zdiff(src, setnum, dstzset, &maxelelen, &totelelen);
2766 } else {
2767 serverPanic("Unknown operator");
2768 }
2769
2770 if (dstkey) {
2771 if (dstzset->zsl->length) {
2772 zsetConvertToListpackIfNeeded(dstobj, maxelelen, totelelen);
2773 setKey(c, c->db, dstkey, dstobj, 0);
2774 addReplyLongLong(c, zsetLength(dstobj));
2775 notifyKeyspaceEvent(NOTIFY_ZSET,
2776 (op == SET_OP_UNION) ? "zunionstore" :
2777 (op == SET_OP_INTER ? "zinterstore" : "zdiffstore"),
2778 dstkey, c->db->id);
2779 server.dirty++;
2780 } else {
2781 addReply(c, shared.czero);
2782 if (dbDelete(c->db, dstkey)) {
2783 signalModifiedKey(c, c->db, dstkey);
2784 notifyKeyspaceEvent(NOTIFY_GENERIC, "del", dstkey, c->db->id);
2785 server.dirty++;
2786 }
2787 }
2788 } else if (cardinality_only) {
2789 addReplyLongLong(c, cardinality);
2790 } else {
2791 unsigned long length = dstzset->zsl->length;
2792 zskiplist *zsl = dstzset->zsl;
2793 zskiplistNode *zn = zsl->header->level[0].forward;
2794 /* In case of WITHSCORES, respond with a single array in RESP2, and
2795 * nested arrays in RESP3. We can't use a map response type since the
2796 * client library needs to know to respect the order. */
2797 if (withscores && c->resp == 2)
2798 addReplyArrayLen(c, length*2);
2799 else
2800 addReplyArrayLen(c, length);
2801
2802 while (zn != NULL) {
2803 if (withscores && c->resp > 2) addReplyArrayLen(c,2);
2804 addReplyBulkCBuffer(c,zn->ele,sdslen(zn->ele));
2805 if (withscores) addReplyDouble(c,zn->score);
2806 zn = zn->level[0].forward;
2807 }
2808 }
2809 decrRefCount(dstobj);
2810 zfree(src);
2811}
2812
2813/* ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] */
2814void zunionstoreCommand(client *c) {
2815 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_UNION, 0);
2816}
2817
2818/* ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] */
2819void zinterstoreCommand(client *c) {
2820 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_INTER, 0);
2821}
2822
2823/* ZDIFFSTORE destination numkeys key [key ...] */
2824void zdiffstoreCommand(client *c) {
2825 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_DIFF, 0);
2826}
2827
2828/* ZUNION numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] [WITHSCORES] */
2829void zunionCommand(client *c) {
2830 zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_UNION, 0);
2831}
2832
2833/* ZINTER numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] [WITHSCORES] */
2834void zinterCommand(client *c) {
2835 zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_INTER, 0);
2836}
2837
2838/* ZINTERCARD numkeys key [key ...] [LIMIT limit] */
2839void zinterCardCommand(client *c) {
2840 zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_INTER, 1);
2841}
2842
2843/* ZDIFF numkeys key [key ...] [WITHSCORES] */
2844void zdiffCommand(client *c) {
2845 zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_DIFF, 0);
2846}
2847
2848typedef enum {
2849 ZRANGE_DIRECTION_AUTO = 0,
2850 ZRANGE_DIRECTION_FORWARD,
2851 ZRANGE_DIRECTION_REVERSE
2852} zrange_direction;
2853
2854typedef enum {
2855 ZRANGE_CONSUMER_TYPE_CLIENT = 0,
2856 ZRANGE_CONSUMER_TYPE_INTERNAL
2857} zrange_consumer_type;
2858
2859typedef struct zrange_result_handler zrange_result_handler;
2860
2861typedef void (*zrangeResultBeginFunction)(zrange_result_handler *c, long length);
2862typedef void (*zrangeResultFinalizeFunction)(
2863 zrange_result_handler *c, size_t result_count);
2864typedef void (*zrangeResultEmitCBufferFunction)(
2865 zrange_result_handler *c, const void *p, size_t len, double score);
2866typedef void (*zrangeResultEmitLongLongFunction)(
2867 zrange_result_handler *c, long long ll, double score);
2868
2869void zrangeGenericCommand (zrange_result_handler *handler, int argc_start, int store,
2870 zrange_type rangetype, zrange_direction direction);
2871
2872/* Interface struct for ZRANGE/ZRANGESTORE generic implementation.
2873 * There is one implementation of this interface that sends a RESP reply to clients.
2874 * and one implementation that stores the range result into a zset object. */
2875struct zrange_result_handler {
2876 zrange_consumer_type type;
2877 client *client;
2878 robj *dstkey;
2879 robj *dstobj;
2880 void *userdata;
2881 int withscores;
2882 int should_emit_array_length;
2883 zrangeResultBeginFunction beginResultEmission;
2884 zrangeResultFinalizeFunction finalizeResultEmission;
2885 zrangeResultEmitCBufferFunction emitResultFromCBuffer;
2886 zrangeResultEmitLongLongFunction emitResultFromLongLong;
2887};
2888
2889/* Result handler methods for responding the ZRANGE to clients.
2890 * length can be used to provide the result length in advance (avoids deferred reply overhead).
2891 * length can be set to -1 if the result length is not know in advance.
2892 */
2893static void zrangeResultBeginClient(zrange_result_handler *handler, long length) {
2894 if (length > 0) {
2895 /* In case of WITHSCORES, respond with a single array in RESP2, and
2896 * nested arrays in RESP3. We can't use a map response type since the
2897 * client library needs to know to respect the order. */
2898 if (handler->withscores && (handler->client->resp == 2)) {
2899 length *= 2;
2900 }
2901 addReplyArrayLen(handler->client, length);
2902 handler->userdata = NULL;
2903 return;
2904 }
2905 handler->userdata = addReplyDeferredLen(handler->client);
2906}
2907
2908static void zrangeResultEmitCBufferToClient(zrange_result_handler *handler,
2909 const void *value, size_t value_length_in_bytes, double score)
2910{
2911 if (handler->should_emit_array_length) {
2912 addReplyArrayLen(handler->client, 2);
2913 }
2914
2915 addReplyBulkCBuffer(handler->client, value, value_length_in_bytes);
2916
2917 if (handler->withscores) {
2918 addReplyDouble(handler->client, score);
2919 }
2920}
2921
2922static void zrangeResultEmitLongLongToClient(zrange_result_handler *handler,
2923 long long value, double score)
2924{
2925 if (handler->should_emit_array_length) {
2926 addReplyArrayLen(handler->client, 2);
2927 }
2928
2929 addReplyBulkLongLong(handler->client, value);
2930
2931 if (handler->withscores) {
2932 addReplyDouble(handler->client, score);
2933 }
2934}
2935
2936static void zrangeResultFinalizeClient(zrange_result_handler *handler,
2937 size_t result_count)
2938{
2939 /* If the reply size was know at start there's nothing left to do */
2940 if (!handler->userdata)
2941 return;
2942 /* In case of WITHSCORES, respond with a single array in RESP2, and
2943 * nested arrays in RESP3. We can't use a map response type since the
2944 * client library needs to know to respect the order. */
2945 if (handler->withscores && (handler->client->resp == 2)) {
2946 result_count *= 2;
2947 }
2948
2949 setDeferredArrayLen(handler->client, handler->userdata, result_count);
2950}
2951
2952/* Result handler methods for storing the ZRANGESTORE to a zset. */
2953static void zrangeResultBeginStore(zrange_result_handler *handler, long length)
2954{
2955 if (length > (long)server.zset_max_listpack_entries)
2956 handler->dstobj = createZsetObject();
2957 else
2958 handler->dstobj = createZsetListpackObject();
2959}
2960
2961static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler,
2962 const void *value, size_t value_length_in_bytes, double score)
2963{
2964 double newscore;
2965 int retflags = 0;
2966 sds ele = sdsnewlen(value, value_length_in_bytes);
2967 int retval = zsetAdd(handler->dstobj, score, ele, ZADD_IN_NONE, &retflags, &newscore);
2968 sdsfree(ele);
2969 serverAssert(retval);
2970}
2971
2972static void zrangeResultEmitLongLongForStore(zrange_result_handler *handler,
2973 long long value, double score)
2974{
2975 double newscore;
2976 int retflags = 0;
2977 sds ele = sdsfromlonglong(value);
2978 int retval = zsetAdd(handler->dstobj, score, ele, ZADD_IN_NONE, &retflags, &newscore);
2979 sdsfree(ele);
2980 serverAssert(retval);
2981}
2982
2983static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t result_count)
2984{
2985 if (result_count) {
2986 setKey(handler->client, handler->client->db, handler->dstkey, handler->dstobj, 0);
2987 addReplyLongLong(handler->client, result_count);
2988 notifyKeyspaceEvent(NOTIFY_ZSET, "zrangestore", handler->dstkey, handler->client->db->id);
2989 server.dirty++;
2990 } else {
2991 addReply(handler->client, shared.czero);
2992 if (dbDelete(handler->client->db, handler->dstkey)) {
2993 signalModifiedKey(handler->client, handler->client->db, handler->dstkey);
2994 notifyKeyspaceEvent(NOTIFY_GENERIC, "del", handler->dstkey, handler->client->db->id);
2995 server.dirty++;
2996 }
2997 }
2998 decrRefCount(handler->dstobj);
2999}
3000
3001/* Initialize the consumer interface type with the requested type. */
3002static void zrangeResultHandlerInit(zrange_result_handler *handler,
3003 client *client, zrange_consumer_type type)
3004{
3005 memset(handler, 0, sizeof(*handler));
3006
3007 handler->client = client;
3008
3009 switch (type) {
3010 case ZRANGE_CONSUMER_TYPE_CLIENT:
3011 handler->beginResultEmission = zrangeResultBeginClient;
3012 handler->finalizeResultEmission = zrangeResultFinalizeClient;
3013 handler->emitResultFromCBuffer = zrangeResultEmitCBufferToClient;
3014 handler->emitResultFromLongLong = zrangeResultEmitLongLongToClient;
3015 break;
3016
3017 case ZRANGE_CONSUMER_TYPE_INTERNAL:
3018 handler->beginResultEmission = zrangeResultBeginStore;
3019 handler->finalizeResultEmission = zrangeResultFinalizeStore;
3020 handler->emitResultFromCBuffer = zrangeResultEmitCBufferForStore;
3021 handler->emitResultFromLongLong = zrangeResultEmitLongLongForStore;
3022 break;
3023 }
3024}
3025
3026static void zrangeResultHandlerScoreEmissionEnable(zrange_result_handler *handler) {
3027 handler->withscores = 1;
3028 handler->should_emit_array_length = (handler->client->resp > 2);
3029}
3030
3031static void zrangeResultHandlerDestinationKeySet (zrange_result_handler *handler,
3032 robj *dstkey)
3033{
3034 handler->dstkey = dstkey;
3035}
3036
3037/* This command implements ZRANGE, ZREVRANGE. */
3038void genericZrangebyrankCommand(zrange_result_handler *handler,
3039 robj *zobj, long start, long end, int withscores, int reverse) {
3040
3041 client *c = handler->client;
3042 long llen;
3043 long rangelen;
3044 size_t result_cardinality;
3045
3046 /* Sanitize indexes. */
3047 llen = zsetLength(zobj);
3048 if (start < 0) start = llen+start;
3049 if (end < 0) end = llen+end;
3050 if (start < 0) start = 0;
3051
3052
3053 /* Invariant: start >= 0, so this test will be true when end < 0.
3054 * The range is empty when start > end or start >= length. */
3055 if (start > end || start >= llen) {
3056 handler->beginResultEmission(handler, 0);
3057 handler->finalizeResultEmission(handler, 0);
3058 return;
3059 }
3060 if (end >= llen) end = llen-1;
3061 rangelen = (end-start)+1;
3062 result_cardinality = rangelen;
3063
3064 handler->beginResultEmission(handler, rangelen);
3065 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
3066 unsigned char *zl = zobj->ptr;
3067 unsigned char *eptr, *sptr;
3068 unsigned char *vstr;
3069 unsigned int vlen;
3070 long long vlong;
3071 double score = 0.0;
3072
3073 if (reverse)
3074 eptr = lpSeek(zl,-2-(2*start));
3075 else
3076 eptr = lpSeek(zl,2*start);
3077
3078 serverAssertWithInfo(c,zobj,eptr != NULL);
3079 sptr = lpNext(zl,eptr);
3080
3081 while (rangelen--) {
3082 serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
3083 vstr = lpGetValue(eptr,&vlen,&vlong);
3084
3085 if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
3086 score = zzlGetScore(sptr);
3087
3088 if (vstr == NULL) {
3089 handler->emitResultFromLongLong(handler, vlong, score);
3090 } else {
3091 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3092 }
3093
3094 if (reverse)
3095 zzlPrev(zl,&eptr,&sptr);
3096 else
3097 zzlNext(zl,&eptr,&sptr);
3098 }
3099
3100 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3101 zset *zs = zobj->ptr;
3102 zskiplist *zsl = zs->zsl;
3103 zskiplistNode *ln;
3104
3105 /* Check if starting point is trivial, before doing log(N) lookup. */
3106 if (reverse) {
3107 ln = zsl->tail;
3108 if (start > 0)
3109 ln = zslGetElementByRank(zsl,llen-start);
3110 } else {
3111 ln = zsl->header->level[0].forward;
3112 if (start > 0)
3113 ln = zslGetElementByRank(zsl,start+1);
3114 }
3115
3116 while(rangelen--) {
3117 serverAssertWithInfo(c,zobj,ln != NULL);
3118 sds ele = ln->ele;
3119 handler->emitResultFromCBuffer(handler, ele, sdslen(ele), ln->score);
3120 ln = reverse ? ln->backward : ln->level[0].forward;
3121 }
3122 } else {
3123 serverPanic("Unknown sorted set encoding");
3124 }
3125
3126 handler->finalizeResultEmission(handler, result_cardinality);
3127}
3128
3129/* ZRANGESTORE <dst> <src> <min> <max> [BYSCORE | BYLEX] [REV] [LIMIT offset count] */
3130void zrangestoreCommand (client *c) {
3131 robj *dstkey = c->argv[1];
3132 zrange_result_handler handler;
3133 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_INTERNAL);
3134 zrangeResultHandlerDestinationKeySet(&handler, dstkey);
3135 zrangeGenericCommand(&handler, 2, 1, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
3136}
3137
3138/* ZRANGE <key> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] */
3139void zrangeCommand(client *c) {
3140 zrange_result_handler handler;
3141 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3142 zrangeGenericCommand(&handler, 1, 0, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
3143}
3144
3145/* ZREVRANGE <key> <start> <stop> [WITHSCORES] */
3146void zrevrangeCommand(client *c) {
3147 zrange_result_handler handler;
3148 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3149 zrangeGenericCommand(&handler, 1, 0, ZRANGE_RANK, ZRANGE_DIRECTION_REVERSE);
3150}
3151
3152/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
3153void genericZrangebyscoreCommand(zrange_result_handler *handler,
3154 zrangespec *range, robj *zobj, long offset, long limit,
3155 int reverse) {
3156 unsigned long rangelen = 0;
3157
3158 handler->beginResultEmission(handler, -1);
3159
3160 /* For invalid offset, return directly. */
3161 if (offset > 0 && offset >= (long)zsetLength(zobj)) {
3162 handler->finalizeResultEmission(handler, 0);
3163 return;
3164 }
3165
3166 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
3167 unsigned char *zl = zobj->ptr;
3168 unsigned char *eptr, *sptr;
3169 unsigned char *vstr;
3170 unsigned int vlen;
3171 long long vlong;
3172
3173 /* If reversed, get the last node in range as starting point. */
3174 if (reverse) {
3175 eptr = zzlLastInRange(zl,range);
3176 } else {
3177 eptr = zzlFirstInRange(zl,range);
3178 }
3179
3180 /* Get score pointer for the first element. */
3181 if (eptr)
3182 sptr = lpNext(zl,eptr);
3183
3184 /* If there is an offset, just traverse the number of elements without
3185 * checking the score because that is done in the next loop. */
3186 while (eptr && offset--) {
3187 if (reverse) {
3188 zzlPrev(zl,&eptr,&sptr);
3189 } else {
3190 zzlNext(zl,&eptr,&sptr);
3191 }
3192 }
3193
3194 while (eptr && limit--) {
3195 double score = zzlGetScore(sptr);
3196
3197 /* Abort when the node is no longer in range. */
3198 if (reverse) {
3199 if (!zslValueGteMin(score,range)) break;
3200 } else {
3201 if (!zslValueLteMax(score,range)) break;
3202 }
3203
3204 vstr = lpGetValue(eptr,&vlen,&vlong);
3205 rangelen++;
3206 if (vstr == NULL) {
3207 handler->emitResultFromLongLong(handler, vlong, score);
3208 } else {
3209 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3210 }
3211
3212 /* Move to next node */
3213 if (reverse) {
3214 zzlPrev(zl,&eptr,&sptr);
3215 } else {
3216 zzlNext(zl,&eptr,&sptr);
3217 }
3218 }
3219 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3220 zset *zs = zobj->ptr;
3221 zskiplist *zsl = zs->zsl;
3222 zskiplistNode *ln;
3223
3224 /* If reversed, get the last node in range as starting point. */
3225 if (reverse) {
3226 ln = zslLastInRange(zsl,range);
3227 } else {
3228 ln = zslFirstInRange(zsl,range);
3229 }
3230
3231 /* If there is an offset, just traverse the number of elements without
3232 * checking the score because that is done in the next loop. */
3233 while (ln && offset--) {
3234 if (reverse) {
3235 ln = ln->backward;
3236 } else {
3237 ln = ln->level[0].forward;
3238 }
3239 }
3240
3241 while (ln && limit--) {
3242 /* Abort when the node is no longer in range. */
3243 if (reverse) {
3244 if (!zslValueGteMin(ln->score,range)) break;
3245 } else {
3246 if (!zslValueLteMax(ln->score,range)) break;
3247 }
3248
3249 rangelen++;
3250 handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
3251
3252 /* Move to next node */
3253 if (reverse) {
3254 ln = ln->backward;
3255 } else {
3256 ln = ln->level[0].forward;
3257 }
3258 }
3259 } else {
3260 serverPanic("Unknown sorted set encoding");
3261 }
3262
3263 handler->finalizeResultEmission(handler, rangelen);
3264}
3265
3266/* ZRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
3267void zrangebyscoreCommand(client *c) {
3268 zrange_result_handler handler;
3269 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3270 zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_FORWARD);
3271}
3272
3273/* ZREVRANGEBYSCORE <key> <max> <min> [WITHSCORES] [LIMIT offset count] */
3274void zrevrangebyscoreCommand(client *c) {
3275 zrange_result_handler handler;
3276 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3277 zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_REVERSE);
3278}
3279
3280void zcountCommand(client *c) {
3281 robj *key = c->argv[1];
3282 robj *zobj;
3283 zrangespec range;
3284 unsigned long count = 0;
3285
3286 /* Parse the range arguments */
3287 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
3288 addReplyError(c,"min or max is not a float");
3289 return;
3290 }
3291
3292 /* Lookup the sorted set */
3293 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
3294 checkType(c, zobj, OBJ_ZSET)) return;
3295
3296 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
3297 unsigned char *zl = zobj->ptr;
3298 unsigned char *eptr, *sptr;
3299 double score;
3300
3301 /* Use the first element in range as the starting point */
3302 eptr = zzlFirstInRange(zl,&range);
3303
3304 /* No "first" element */
3305 if (eptr == NULL) {
3306 addReply(c, shared.czero);
3307 return;
3308 }
3309
3310 /* First element is in range */
3311 sptr = lpNext(zl,eptr);
3312 score = zzlGetScore(sptr);
3313 serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
3314
3315 /* Iterate over elements in range */
3316 while (eptr) {
3317 score = zzlGetScore(sptr);
3318
3319 /* Abort when the node is no longer in range. */
3320 if (!zslValueLteMax(score,&range)) {
3321 break;
3322 } else {
3323 count++;
3324 zzlNext(zl,&eptr,&sptr);
3325 }
3326 }
3327 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3328 zset *zs = zobj->ptr;
3329 zskiplist *zsl = zs->zsl;
3330 zskiplistNode *zn;
3331 unsigned long rank;
3332
3333 /* Find first element in range */
3334 zn = zslFirstInRange(zsl, &range);
3335
3336 /* Use rank of first element, if any, to determine preliminary count */
3337 if (zn != NULL) {
3338 rank = zslGetRank(zsl, zn->score, zn->ele);
3339 count = (zsl->length - (rank - 1));
3340
3341 /* Find last element in range */
3342 zn = zslLastInRange(zsl, &range);
3343
3344 /* Use rank of last element, if any, to determine the actual count */
3345 if (zn != NULL) {
3346 rank = zslGetRank(zsl, zn->score, zn->ele);
3347 count -= (zsl->length - rank);
3348 }
3349 }
3350 } else {
3351 serverPanic("Unknown sorted set encoding");
3352 }
3353
3354 addReplyLongLong(c, count);
3355}
3356
3357void zlexcountCommand(client *c) {
3358 robj *key = c->argv[1];
3359 robj *zobj;
3360 zlexrangespec range;
3361 unsigned long count = 0;
3362
3363 /* Parse the range arguments */
3364 if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
3365 addReplyError(c,"min or max not valid string range item");
3366 return;
3367 }
3368
3369 /* Lookup the sorted set */
3370 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
3371 checkType(c, zobj, OBJ_ZSET))
3372 {
3373 zslFreeLexRange(&range);
3374 return;
3375 }
3376
3377 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
3378 unsigned char *zl = zobj->ptr;
3379 unsigned char *eptr, *sptr;
3380
3381 /* Use the first element in range as the starting point */
3382 eptr = zzlFirstInLexRange(zl,&range);
3383
3384 /* No "first" element */
3385 if (eptr == NULL) {
3386 zslFreeLexRange(&range);
3387 addReply(c, shared.czero);
3388 return;
3389 }
3390
3391 /* First element is in range */
3392 sptr = lpNext(zl,eptr);
3393 serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
3394
3395 /* Iterate over elements in range */
3396 while (eptr) {
3397 /* Abort when the node is no longer in range. */
3398 if (!zzlLexValueLteMax(eptr,&range)) {
3399 break;
3400 } else {
3401 count++;
3402 zzlNext(zl,&eptr,&sptr);
3403 }
3404 }
3405 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3406 zset *zs = zobj->ptr;
3407 zskiplist *zsl = zs->zsl;
3408 zskiplistNode *zn;
3409 unsigned long rank;
3410
3411 /* Find first element in range */
3412 zn = zslFirstInLexRange(zsl, &range);
3413
3414 /* Use rank of first element, if any, to determine preliminary count */
3415 if (zn != NULL) {
3416 rank = zslGetRank(zsl, zn->score, zn->ele);
3417 count = (zsl->length - (rank - 1));
3418
3419 /* Find last element in range */
3420 zn = zslLastInLexRange(zsl, &range);
3421
3422 /* Use rank of last element, if any, to determine the actual count */
3423 if (zn != NULL) {
3424 rank = zslGetRank(zsl, zn->score, zn->ele);
3425 count -= (zsl->length - rank);
3426 }
3427 }
3428 } else {
3429 serverPanic("Unknown sorted set encoding");
3430 }
3431
3432 zslFreeLexRange(&range);
3433 addReplyLongLong(c, count);
3434}
3435
3436/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
3437void genericZrangebylexCommand(zrange_result_handler *handler,
3438 zlexrangespec *range, robj *zobj, int withscores, long offset, long limit,
3439 int reverse)
3440{
3441 unsigned long rangelen = 0;
3442
3443 handler->beginResultEmission(handler, -1);
3444
3445 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
3446 unsigned char *zl = zobj->ptr;
3447 unsigned char *eptr, *sptr;
3448 unsigned char *vstr;
3449 unsigned int vlen;
3450 long long vlong;
3451
3452 /* If reversed, get the last node in range as starting point. */
3453 if (reverse) {
3454 eptr = zzlLastInLexRange(zl,range);
3455 } else {
3456 eptr = zzlFirstInLexRange(zl,range);
3457 }
3458
3459 /* Get score pointer for the first element. */
3460 if (eptr)
3461 sptr = lpNext(zl,eptr);
3462
3463 /* If there is an offset, just traverse the number of elements without
3464 * checking the score because that is done in the next loop. */
3465 while (eptr && offset--) {
3466 if (reverse) {
3467 zzlPrev(zl,&eptr,&sptr);
3468 } else {
3469 zzlNext(zl,&eptr,&sptr);
3470 }
3471 }
3472
3473 while (eptr && limit--) {
3474 double score = 0;
3475 if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
3476 score = zzlGetScore(sptr);
3477
3478 /* Abort when the node is no longer in range. */
3479 if (reverse) {
3480 if (!zzlLexValueGteMin(eptr,range)) break;
3481 } else {
3482 if (!zzlLexValueLteMax(eptr,range)) break;
3483 }
3484
3485 vstr = lpGetValue(eptr,&vlen,&vlong);
3486 rangelen++;
3487 if (vstr == NULL) {
3488 handler->emitResultFromLongLong(handler, vlong, score);
3489 } else {
3490 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3491 }
3492
3493 /* Move to next node */
3494 if (reverse) {
3495 zzlPrev(zl,&eptr,&sptr);
3496 } else {
3497 zzlNext(zl,&eptr,&sptr);
3498 }
3499 }
3500 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3501 zset *zs = zobj->ptr;
3502 zskiplist *zsl = zs->zsl;
3503 zskiplistNode *ln;
3504
3505 /* If reversed, get the last node in range as starting point. */
3506 if (reverse) {
3507 ln = zslLastInLexRange(zsl,range);
3508 } else {
3509 ln = zslFirstInLexRange(zsl,range);
3510 }
3511
3512 /* If there is an offset, just traverse the number of elements without
3513 * checking the score because that is done in the next loop. */
3514 while (ln && offset--) {
3515 if (reverse) {
3516 ln = ln->backward;
3517 } else {
3518 ln = ln->level[0].forward;
3519 }
3520 }
3521
3522 while (ln && limit--) {
3523 /* Abort when the node is no longer in range. */
3524 if (reverse) {
3525 if (!zslLexValueGteMin(ln->ele,range)) break;
3526 } else {
3527 if (!zslLexValueLteMax(ln->ele,range)) break;
3528 }
3529
3530 rangelen++;
3531 handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
3532
3533 /* Move to next node */
3534 if (reverse) {
3535 ln = ln->backward;
3536 } else {
3537 ln = ln->level[0].forward;
3538 }
3539 }
3540 } else {
3541 serverPanic("Unknown sorted set encoding");
3542 }
3543
3544 handler->finalizeResultEmission(handler, rangelen);
3545}
3546
3547/* ZRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
3548void zrangebylexCommand(client *c) {
3549 zrange_result_handler handler;
3550 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3551 zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD);
3552}
3553
3554/* ZREVRANGEBYLEX <key> <max> <min> [LIMIT offset count] */
3555void zrevrangebylexCommand(client *c) {
3556 zrange_result_handler handler;
3557 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3558 zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_REVERSE);
3559}
3560
3561/**
3562 * This function handles ZRANGE and ZRANGESTORE, and also the deprecated
3563 * Z[REV]RANGE[BYPOS|BYLEX] commands.
3564 *
3565 * The simple ZRANGE and ZRANGESTORE can take _AUTO in rangetype and direction,
3566 * other command pass explicit value.
3567 *
3568 * The argc_start points to the src key argument, so following syntax is like:
3569 * <src> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count]
3570 */
3571void zrangeGenericCommand(zrange_result_handler *handler, int argc_start, int store,
3572 zrange_type rangetype, zrange_direction direction)
3573{
3574 client *c = handler->client;
3575 robj *key = c->argv[argc_start];
3576 robj *zobj;
3577 zrangespec range;
3578 zlexrangespec lexrange;
3579 int minidx = argc_start + 1;
3580 int maxidx = argc_start + 2;
3581
3582 /* Options common to all */
3583 long opt_start = 0;
3584 long opt_end = 0;
3585 int opt_withscores = 0;
3586 long opt_offset = 0;
3587 long opt_limit = -1;
3588
3589 /* Step 1: Skip the <src> <min> <max> args and parse remaining optional arguments. */
3590 for (int j=argc_start + 3; j < c->argc; j++) {
3591 int leftargs = c->argc-j-1;
3592 if (!store && !strcasecmp(c->argv[j]->ptr,"withscores")) {
3593 opt_withscores = 1;
3594 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3595 if ((getLongFromObjectOrReply(c, c->argv[j+1], &opt_offset, NULL) != C_OK) ||
3596 (getLongFromObjectOrReply(c, c->argv[j+2], &opt_limit, NULL) != C_OK))
3597 {
3598 return;
3599 }
3600 j += 2;
3601 } else if (direction == ZRANGE_DIRECTION_AUTO &&
3602 !strcasecmp(c->argv[j]->ptr,"rev"))
3603 {
3604 direction = ZRANGE_DIRECTION_REVERSE;
3605 } else if (rangetype == ZRANGE_AUTO &&
3606 !strcasecmp(c->argv[j]->ptr,"bylex"))
3607 {
3608 rangetype = ZRANGE_LEX;
3609 } else if (rangetype == ZRANGE_AUTO &&
3610 !strcasecmp(c->argv[j]->ptr,"byscore"))
3611 {
3612 rangetype = ZRANGE_SCORE;
3613 } else {
3614 addReplyErrorObject(c,shared.syntaxerr);
3615 return;
3616 }
3617 }
3618
3619 /* Use defaults if not overridden by arguments. */
3620 if (direction == ZRANGE_DIRECTION_AUTO)
3621 direction = ZRANGE_DIRECTION_FORWARD;
3622 if (rangetype == ZRANGE_AUTO)
3623 rangetype = ZRANGE_RANK;
3624
3625 /* Check for conflicting arguments. */
3626 if (opt_limit != -1 && rangetype == ZRANGE_RANK) {
3627 addReplyError(c,"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX");
3628 return;
3629 }
3630 if (opt_withscores && rangetype == ZRANGE_LEX) {
3631 addReplyError(c,"syntax error, WITHSCORES not supported in combination with BYLEX");
3632 return;
3633 }
3634
3635 if (direction == ZRANGE_DIRECTION_REVERSE &&
3636 ((ZRANGE_SCORE == rangetype) || (ZRANGE_LEX == rangetype)))
3637 {
3638 /* Range is given as [max,min] */
3639 int tmp = maxidx;
3640 maxidx = minidx;
3641 minidx = tmp;
3642 }
3643
3644 /* Step 2: Parse the range. */
3645 switch (rangetype) {
3646 case ZRANGE_AUTO:
3647 case ZRANGE_RANK:
3648 /* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */
3649 if ((getLongFromObjectOrReply(c, c->argv[minidx], &opt_start,NULL) != C_OK) ||
3650 (getLongFromObjectOrReply(c, c->argv[maxidx], &opt_end,NULL) != C_OK))
3651 {
3652 return;
3653 }
3654 break;
3655
3656 case ZRANGE_SCORE:
3657 /* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */
3658 if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) {
3659 addReplyError(c, "min or max is not a float");
3660 return;
3661 }
3662 break;
3663
3664 case ZRANGE_LEX:
3665 /* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */
3666 if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK) {
3667 addReplyError(c, "min or max not valid string range item");
3668 return;
3669 }
3670 break;
3671 }
3672
3673 if (opt_withscores || store) {
3674 zrangeResultHandlerScoreEmissionEnable(handler);
3675 }
3676
3677 /* Step 3: Lookup the key and get the range. */
3678 zobj = lookupKeyRead(c->db, key);
3679 if (zobj == NULL) {
3680 if (store) {
3681 handler->beginResultEmission(handler, -1);
3682 handler->finalizeResultEmission(handler, 0);
3683 } else {
3684 addReply(c, shared.emptyarray);
3685 }
3686 goto cleanup;
3687 }
3688
3689 if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
3690
3691 /* Step 4: Pass this to the command-specific handler. */
3692 switch (rangetype) {
3693 case ZRANGE_AUTO:
3694 case ZRANGE_RANK:
3695 genericZrangebyrankCommand(handler, zobj, opt_start, opt_end,
3696 opt_withscores || store, direction == ZRANGE_DIRECTION_REVERSE);
3697 break;
3698
3699 case ZRANGE_SCORE:
3700 genericZrangebyscoreCommand(handler, &range, zobj, opt_offset,
3701 opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
3702 break;
3703
3704 case ZRANGE_LEX:
3705 genericZrangebylexCommand(handler, &lexrange, zobj, opt_withscores || store,
3706 opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
3707 break;
3708 }
3709
3710 /* Instead of returning here, we'll just fall-through the clean-up. */
3711
3712cleanup:
3713
3714 if (rangetype == ZRANGE_LEX) {
3715 zslFreeLexRange(&lexrange);
3716 }
3717}
3718
3719void zcardCommand(client *c) {
3720 robj *key = c->argv[1];
3721 robj *zobj;
3722
3723 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
3724 checkType(c,zobj,OBJ_ZSET)) return;
3725
3726 addReplyLongLong(c,zsetLength(zobj));
3727}
3728
3729void zscoreCommand(client *c) {
3730 robj *key = c->argv[1];
3731 robj *zobj;
3732 double score;
3733
3734 if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
3735 checkType(c,zobj,OBJ_ZSET)) return;
3736
3737 if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
3738 addReplyNull(c);
3739 } else {
3740 addReplyDouble(c,score);
3741 }
3742}
3743
3744void zmscoreCommand(client *c) {
3745 robj *key = c->argv[1];
3746 robj *zobj;
3747 double score;
3748 zobj = lookupKeyRead(c->db,key);
3749 if (checkType(c,zobj,OBJ_ZSET)) return;
3750
3751 addReplyArrayLen(c,c->argc - 2);
3752 for (int j = 2; j < c->argc; j++) {
3753 /* Treat a missing set the same way as an empty set */
3754 if (zobj == NULL || zsetScore(zobj,c->argv[j]->ptr,&score) == C_ERR) {
3755 addReplyNull(c);
3756 } else {
3757 addReplyDouble(c,score);
3758 }
3759 }
3760}
3761
3762void zrankGenericCommand(client *c, int reverse) {
3763 robj *key = c->argv[1];
3764 robj *ele = c->argv[2];
3765 robj *zobj;
3766 long rank;
3767
3768 if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
3769 checkType(c,zobj,OBJ_ZSET)) return;
3770
3771 serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
3772 rank = zsetRank(zobj,ele->ptr,reverse);
3773 if (rank >= 0) {
3774 addReplyLongLong(c,rank);
3775 } else {
3776 addReplyNull(c);
3777 }
3778}
3779
3780void zrankCommand(client *c) {
3781 zrankGenericCommand(c, 0);
3782}
3783
3784void zrevrankCommand(client *c) {
3785 zrankGenericCommand(c, 1);
3786}
3787
3788void zscanCommand(client *c) {
3789 robj *o;
3790 unsigned long cursor;
3791
3792 if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
3793 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
3794 checkType(c,o,OBJ_ZSET)) return;
3795 scanGenericCommand(c,o,cursor);
3796}
3797
3798/* This command implements the generic zpop operation, used by:
3799 * ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX and ZMPOP. This function is also used
3800 * inside blocked.c in the unblocking stage of BZPOPMIN, BZPOPMAX and BZMPOP.
3801 *
3802 * If 'emitkey' is true also the key name is emitted, useful for the blocking
3803 * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
3804 * Or in ZMPOP/BZMPOP, because we also can take multiple keys.
3805 *
3806 * 'count' is the number of elements requested to pop, or -1 for plain single pop.
3807 *
3808 * 'use_nested_array' when false it generates a flat array (with or without key name).
3809 * When true, it generates a nested 2 level array of field + score pairs, or 3 level when emitkey is set.
3810 *
3811 * 'reply_nil_when_empty' when true we reply a NIL if we are not able to pop up any elements.
3812 * Like in ZMPOP/BZMPOP we reply with a structured nested array containing key name
3813 * and member + score pairs. In these commands, we reply with null when we have no result.
3814 * Otherwise in ZPOPMIN/ZPOPMAX we reply an empty array by default.
3815 *
3816 * 'deleted' is an optional output argument to get an indication
3817 * if the key got deleted by this function.
3818 * */
3819void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey,
3820 long count, int use_nested_array, int reply_nil_when_empty, int *deleted) {
3821 int idx;
3822 robj *key = NULL;
3823 robj *zobj = NULL;
3824 sds ele;
3825 double score;
3826
3827 if (deleted) *deleted = 0;
3828
3829 /* Check type and break on the first error, otherwise identify candidate. */
3830 idx = 0;
3831 while (idx < keyc) {
3832 key = keyv[idx++];
3833 zobj = lookupKeyWrite(c->db,key);
3834 if (!zobj) continue;
3835 if (checkType(c,zobj,OBJ_ZSET)) return;
3836 break;
3837 }
3838
3839 /* No candidate for zpopping, return empty. */
3840 if (!zobj) {
3841 if (reply_nil_when_empty) {
3842 addReplyNullArray(c);
3843 } else {
3844 addReply(c,shared.emptyarray);
3845 }
3846 return;
3847 }
3848
3849 if (count == 0) {
3850 /* ZPOPMIN/ZPOPMAX with count 0. */
3851 addReply(c, shared.emptyarray);
3852 return;
3853 }
3854
3855 long result_count = 0;
3856
3857 /* When count is -1, we need to correct it to 1 for plain single pop. */
3858 if (count == -1) count = 1;
3859
3860 long llen = zsetLength(zobj);
3861 long rangelen = (count > llen) ? llen : count;
3862
3863 if (!use_nested_array && !emitkey) {
3864 /* ZPOPMIN/ZPOPMAX with or without COUNT option in RESP2. */
3865 addReplyArrayLen(c, rangelen * 2);
3866 } else if (use_nested_array && !emitkey) {
3867 /* ZPOPMIN/ZPOPMAX with COUNT option in RESP3. */
3868 addReplyArrayLen(c, rangelen);
3869 } else if (!use_nested_array && emitkey) {
3870 /* BZPOPMIN/BZPOPMAX in RESP2 and RESP3. */
3871 addReplyArrayLen(c, rangelen * 2 + 1);
3872 addReplyBulk(c, key);
3873 } else if (use_nested_array && emitkey) {
3874 /* ZMPOP/BZMPOP in RESP2 and RESP3. */
3875 addReplyArrayLen(c, 2);
3876 addReplyBulk(c, key);
3877 addReplyArrayLen(c, rangelen);
3878 }
3879
3880 /* Remove the element. */
3881 do {
3882 if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
3883 unsigned char *zl = zobj->ptr;
3884 unsigned char *eptr, *sptr;
3885 unsigned char *vstr;
3886 unsigned int vlen;
3887 long long vlong;
3888
3889 /* Get the first or last element in the sorted set. */
3890 eptr = lpSeek(zl,where == ZSET_MAX ? -2 : 0);
3891 serverAssertWithInfo(c,zobj,eptr != NULL);
3892 vstr = lpGetValue(eptr,&vlen,&vlong);
3893 if (vstr == NULL)
3894 ele = sdsfromlonglong(vlong);
3895 else
3896 ele = sdsnewlen(vstr,vlen);
3897
3898 /* Get the score. */
3899 sptr = lpNext(zl,eptr);
3900 serverAssertWithInfo(c,zobj,sptr != NULL);
3901 score = zzlGetScore(sptr);
3902 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3903 zset *zs = zobj->ptr;
3904 zskiplist *zsl = zs->zsl;
3905 zskiplistNode *zln;
3906
3907 /* Get the first or last element in the sorted set. */
3908 zln = (where == ZSET_MAX ? zsl->tail :
3909 zsl->header->level[0].forward);
3910
3911 /* There must be an element in the sorted set. */
3912 serverAssertWithInfo(c,zobj,zln != NULL);
3913 ele = sdsdup(zln->ele);
3914 score = zln->score;
3915 } else {
3916 serverPanic("Unknown sorted set encoding");
3917 }
3918
3919 serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
3920 server.dirty++;
3921
3922 if (result_count == 0) { /* Do this only for the first iteration. */
3923 char *events[2] = {"zpopmin","zpopmax"};
3924 notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
3925 signalModifiedKey(c,c->db,key);
3926 }
3927
3928 if (use_nested_array) {
3929 addReplyArrayLen(c,2);
3930 }
3931 addReplyBulkCBuffer(c,ele,sdslen(ele));
3932 addReplyDouble(c,score);
3933 sdsfree(ele);
3934 ++result_count;
3935 } while(--rangelen);
3936
3937 /* Remove the key, if indeed needed. */
3938 if (zsetLength(zobj) == 0) {
3939 if (deleted) *deleted = 1;
3940
3941 dbDelete(c->db,key);
3942 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
3943 }
3944
3945 if (c->cmd->proc == zmpopCommand) {
3946 /* Always replicate it as ZPOP[MIN|MAX] with COUNT option instead of ZMPOP. */
3947 robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
3948 rewriteClientCommandVector(c, 3,
3949 (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
3950 key, count_obj);
3951 decrRefCount(count_obj);
3952 }
3953}
3954
3955/* ZPOPMIN/ZPOPMAX key [<count>] */
3956void zpopMinMaxCommand(client *c, int where) {
3957 if (c->argc > 3) {
3958 addReplyErrorObject(c,shared.syntaxerr);
3959 return;
3960 }
3961
3962 long count = -1; /* -1 for plain single pop. */
3963 if (c->argc == 3 && getPositiveLongFromObjectOrReply(c, c->argv[2], &count, NULL) != C_OK)
3964 return;
3965
3966 /* Respond with a single (flat) array in RESP2 or if count is -1
3967 * (returning a single element). In RESP3, when count > 0 use nested array. */
3968 int use_nested_array = (c->resp > 2 && count != -1);
3969
3970 genericZpopCommand(c, &c->argv[1], 1, where, 0, count, use_nested_array, 0, NULL);
3971}
3972
3973/* ZPOPMIN key [<count>] */
3974void zpopminCommand(client *c) {
3975 zpopMinMaxCommand(c, ZSET_MIN);
3976}
3977
3978/* ZPOPMAX key [<count>] */
3979void zpopmaxCommand(client *c) {
3980 zpopMinMaxCommand(c, ZSET_MAX);
3981}
3982
3983/* BZPOPMIN, BZPOPMAX, BZMPOP actual implementation.
3984 *
3985 * 'numkeys' is the number of keys.
3986 *
3987 * 'timeout_idx' parameter position of block timeout.
3988 *
3989 * 'where' ZSET_MIN or ZSET_MAX.
3990 *
3991 * 'count' is the number of elements requested to pop, or -1 for plain single pop.
3992 *
3993 * 'use_nested_array' when false it generates a flat array (with or without key name).
3994 * When true, it generates a nested 3 level array of keyname, field + score pairs.
3995 * */
3996void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where,
3997 int timeout_idx, long count, int use_nested_array, int reply_nil_when_empty) {
3998 robj *o;
3999 robj *key;
4000 mstime_t timeout;
4001 int j;
4002
4003 if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS)
4004 != C_OK) return;
4005
4006 for (j = 0; j < numkeys; j++) {
4007 key = keys[j];
4008 o = lookupKeyWrite(c->db,key);
4009 /* Non-existing key, move to next key. */
4010 if (o == NULL) continue;
4011
4012 if (checkType(c,o,OBJ_ZSET)) return;
4013
4014 long llen = zsetLength(o);
4015 /* Empty zset, move to next key. */
4016 if (llen == 0) continue;
4017
4018 /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
4019 genericZpopCommand(c, &key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, NULL);
4020
4021 if (count == -1) {
4022 /* Replicate it as ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
4023 rewriteClientCommandVector(c,2,
4024 (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
4025 key);
4026 } else {
4027 /* Replicate it as ZPOP[MIN|MAX] with COUNT option. */
4028 robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
4029 rewriteClientCommandVector(c, 3,
4030 (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
4031 key, count_obj);
4032 decrRefCount(count_obj);
4033 }
4034
4035 return;
4036 }
4037
4038 /* If we are not allowed to block the client and the zset is empty the only thing
4039 * we can do is treating it as a timeout (even with timeout 0). */
4040 if (c->flags & CLIENT_DENY_BLOCKING) {
4041 addReplyNullArray(c);
4042 return;
4043 }
4044
4045 /* If the keys do not exist we must block */
4046 struct blockPos pos = {where};
4047 blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL);
4048}
4049
4050// BZPOPMIN key [key ...] timeout
4051void bzpopminCommand(client *c) {
4052 blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MIN, c->argc-1, -1, 0, 0);
4053}
4054
4055// BZPOPMAX key [key ...] timeout
4056void bzpopmaxCommand(client *c) {
4057 blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MAX, c->argc-1, -1, 0, 0);
4058}
4059
4060static void zrandmemberReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
4061 for (unsigned long i = 0; i < count; i++) {
4062 if (vals && c->resp > 2)
4063 addReplyArrayLen(c,2);
4064 if (keys[i].sval)
4065 addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
4066 else
4067 addReplyBulkLongLong(c, keys[i].lval);
4068 if (vals) {
4069 if (vals[i].sval) {
4070 addReplyDouble(c, zzlStrtod(vals[i].sval,vals[i].slen));
4071 } else
4072 addReplyDouble(c, vals[i].lval);
4073 }
4074 }
4075}
4076
4077/* How many times bigger should be the zset compared to the requested size
4078 * for us to not use the "remove elements" strategy? Read later in the
4079 * implementation for more info. */
4080#define ZRANDMEMBER_SUB_STRATEGY_MUL 3
4081
4082/* If client is trying to ask for a very large number of random elements,
4083 * queuing may consume an unlimited amount of memory, so we want to limit
4084 * the number of randoms per time. */
4085#define ZRANDMEMBER_RANDOM_SAMPLE_LIMIT 1000
4086
4087void zrandmemberWithCountCommand(client *c, long l, int withscores) {
4088 unsigned long count, size;
4089 int uniq = 1;
4090 robj *zsetobj;
4091
4092 if ((zsetobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptyarray))
4093 == NULL || checkType(c, zsetobj, OBJ_ZSET)) return;
4094 size = zsetLength(zsetobj);
4095
4096 if(l >= 0) {
4097 count = (unsigned long) l;
4098 } else {
4099 count = -l;
4100 uniq = 0;
4101 }
4102
4103 /* If count is zero, serve it ASAP to avoid special cases later. */
4104 if (count == 0) {
4105 addReply(c,shared.emptyarray);
4106 return;
4107 }
4108
4109 /* CASE 1: The count was negative, so the extraction method is just:
4110 * "return N random elements" sampling the whole set every time.
4111 * This case is trivial and can be served without auxiliary data
4112 * structures. This case is the only one that also needs to return the
4113 * elements in random order. */
4114 if (!uniq || count == 1) {
4115 if (withscores && c->resp == 2)
4116 addReplyArrayLen(c, count*2);
4117 else
4118 addReplyArrayLen(c, count);
4119 if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST) {
4120 zset *zs = zsetobj->ptr;
4121 while (count--) {
4122 dictEntry *de = dictGetFairRandomKey(zs->dict);
4123 sds key = dictGetKey(de);
4124 if (withscores && c->resp > 2)
4125 addReplyArrayLen(c,2);
4126 addReplyBulkCBuffer(c, key, sdslen(key));
4127 if (withscores)
4128 addReplyDouble(c, *(double*)dictGetVal(de));
4129 }
4130 } else if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) {
4131 listpackEntry *keys, *vals = NULL;
4132 unsigned long limit, sample_count;
4133 limit = count > ZRANDMEMBER_RANDOM_SAMPLE_LIMIT ? ZRANDMEMBER_RANDOM_SAMPLE_LIMIT : count;
4134 keys = zmalloc(sizeof(listpackEntry)*limit);
4135 if (withscores)
4136 vals = zmalloc(sizeof(listpackEntry)*limit);
4137 while (count) {
4138 sample_count = count > limit ? limit : count;
4139 count -= sample_count;
4140 lpRandomPairs(zsetobj->ptr, sample_count, keys, vals);
4141 zrandmemberReplyWithListpack(c, sample_count, keys, vals);
4142 }
4143 zfree(keys);
4144 zfree(vals);
4145 }
4146 return;
4147 }
4148
4149 zsetopsrc src;
4150 zsetopval zval;
4151 src.subject = zsetobj;
4152 src.type = zsetobj->type;
4153 src.encoding = zsetobj->encoding;
4154 zuiInitIterator(&src);
4155 memset(&zval, 0, sizeof(zval));
4156
4157 /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */
4158 long reply_size = count < size ? count : size;
4159 if (withscores && c->resp == 2)
4160 addReplyArrayLen(c, reply_size*2);
4161 else
4162 addReplyArrayLen(c, reply_size);
4163
4164 /* CASE 2:
4165 * The number of requested elements is greater than the number of
4166 * elements inside the zset: simply return the whole zset. */
4167 if (count >= size) {
4168 while (zuiNext(&src, &zval)) {
4169 if (withscores && c->resp > 2)
4170 addReplyArrayLen(c,2);
4171 addReplyBulkSds(c, zuiNewSdsFromValue(&zval));
4172 if (withscores)
4173 addReplyDouble(c, zval.score);
4174 }
4175 zuiClearIterator(&src);
4176 return;
4177 }
4178
4179 /* CASE 3:
4180 * The number of elements inside the zset is not greater than
4181 * ZRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
4182 * In this case we create a dict from scratch with all the elements, and
4183 * subtract random elements to reach the requested number of elements.
4184 *
4185 * This is done because if the number of requested elements is just
4186 * a bit less than the number of elements in the set, the natural approach
4187 * used into CASE 4 is highly inefficient. */
4188 if (count*ZRANDMEMBER_SUB_STRATEGY_MUL > size) {
4189 dict *d = dictCreate(&sdsReplyDictType);
4190 dictExpand(d, size);
4191 /* Add all the elements into the temporary dictionary. */
4192 while (zuiNext(&src, &zval)) {
4193 sds key = zuiNewSdsFromValue(&zval);
4194 dictEntry *de = dictAddRaw(d, key, NULL);
4195 serverAssert(de);
4196 if (withscores)
4197 dictSetDoubleVal(de, zval.score);
4198 }
4199 serverAssert(dictSize(d) == size);
4200
4201 /* Remove random elements to reach the right count. */
4202 while (size > count) {
4203 dictEntry *de;
4204 de = dictGetFairRandomKey(d);
4205 dictUnlink(d,dictGetKey(de));
4206 sdsfree(dictGetKey(de));
4207 dictFreeUnlinkedEntry(d,de);
4208 size--;
4209 }
4210
4211 /* Reply with what's in the dict and release memory */
4212 dictIterator *di;
4213 dictEntry *de;
4214 di = dictGetIterator(d);
4215 while ((de = dictNext(di)) != NULL) {
4216 if (withscores && c->resp > 2)
4217 addReplyArrayLen(c,2);
4218 addReplyBulkSds(c, dictGetKey(de));
4219 if (withscores)
4220 addReplyDouble(c, dictGetDoubleVal(de));
4221 }
4222
4223 dictReleaseIterator(di);
4224 dictRelease(d);
4225 }
4226
4227 /* CASE 4: We have a big zset compared to the requested number of elements.
4228 * In this case we can simply get random elements from the zset and add
4229 * to the temporary set, trying to eventually get enough unique elements
4230 * to reach the specified count. */
4231 else {
4232 if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) {
4233 /* it is inefficient to repeatedly pick one random element from a
4234 * listpack. so we use this instead: */
4235 listpackEntry *keys, *vals = NULL;
4236 keys = zmalloc(sizeof(listpackEntry)*count);
4237 if (withscores)
4238 vals = zmalloc(sizeof(listpackEntry)*count);
4239 serverAssert(lpRandomPairsUnique(zsetobj->ptr, count, keys, vals) == count);
4240 zrandmemberReplyWithListpack(c, count, keys, vals);
4241 zfree(keys);
4242 zfree(vals);
4243 zuiClearIterator(&src);
4244 return;
4245 }
4246
4247 /* Hashtable encoding (generic implementation) */
4248 unsigned long added = 0;
4249 dict *d = dictCreate(&hashDictType);
4250 dictExpand(d, count);
4251
4252 while (added < count) {
4253 listpackEntry key;
4254 double score;
4255 zsetTypeRandomElement(zsetobj, size, &key, withscores ? &score: NULL);
4256
4257 /* Try to add the object to the dictionary. If it already exists
4258 * free it, otherwise increment the number of objects we have
4259 * in the result dictionary. */
4260 sds skey = zsetSdsFromListpackEntry(&key);
4261 if (dictAdd(d,skey,NULL) != DICT_OK) {
4262 sdsfree(skey);
4263 continue;
4264 }
4265 added++;
4266
4267 if (withscores && c->resp > 2)
4268 addReplyArrayLen(c,2);
4269 zsetReplyFromListpackEntry(c, &key);
4270 if (withscores)
4271 addReplyDouble(c, score);
4272 }
4273
4274 /* Release memory */
4275 dictRelease(d);
4276 }
4277 zuiClearIterator(&src);
4278}
4279
4280/* ZRANDMEMBER key [<count> [WITHSCORES]] */
4281void zrandmemberCommand(client *c) {
4282 long l;
4283 int withscores = 0;
4284 robj *zset;
4285 listpackEntry ele;
4286
4287 if (c->argc >= 3) {
4288 if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
4289 if (c->argc > 4 || (c->argc == 4 && strcasecmp(c->argv[3]->ptr,"withscores"))) {
4290 addReplyErrorObject(c,shared.syntaxerr);
4291 return;
4292 } else if (c->argc == 4)
4293 withscores = 1;
4294 zrandmemberWithCountCommand(c, l, withscores);
4295 return;
4296 }
4297
4298 /* Handle variant without <count> argument. Reply with simple bulk string */
4299 if ((zset = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))== NULL ||
4300 checkType(c,zset,OBJ_ZSET)) {
4301 return;
4302 }
4303
4304 zsetTypeRandomElement(zset, zsetLength(zset), &ele,NULL);
4305 zsetReplyFromListpackEntry(c,&ele);
4306}
4307
4308/* ZMPOP/BZMPOP
4309 *
4310 * 'numkeys_idx' parameter position of key number.
4311 * 'is_block' this indicates whether it is a blocking variant. */
4312void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
4313 long j;
4314 long numkeys = 0; /* Number of keys. */
4315 int where = 0; /* ZSET_MIN or ZSET_MAX. */
4316 long count = -1; /* Reply will consist of up to count elements, depending on the zset's length. */
4317
4318 /* Parse the numkeys. */
4319 if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX,
4320 &numkeys, "numkeys should be greater than 0") != C_OK)
4321 return;
4322
4323 /* Parse the where. where_idx: the index of where in the c->argv. */
4324 long where_idx = numkeys_idx + numkeys + 1;
4325 if (where_idx >= c->argc) {
4326 addReplyErrorObject(c, shared.syntaxerr);
4327 return;
4328 }
4329 if (!strcasecmp(c->argv[where_idx]->ptr, "MIN")) {
4330 where = ZSET_MIN;
4331 } else if (!strcasecmp(c->argv[where_idx]->ptr, "MAX")) {
4332 where = ZSET_MAX;
4333 } else {
4334 addReplyErrorObject(c, shared.syntaxerr);
4335 return;
4336 }
4337
4338 /* Parse the optional arguments. */
4339 for (j = where_idx + 1; j < c->argc; j++) {
4340 char *opt = c->argv[j]->ptr;
4341 int moreargs = (c->argc - 1) - j;
4342
4343 if (count == -1 && !strcasecmp(opt, "COUNT") && moreargs) {
4344 j++;
4345 if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX,
4346 &count,"count should be greater than 0") != C_OK)
4347 return;
4348 } else {
4349 addReplyErrorObject(c, shared.syntaxerr);
4350 return;
4351 }
4352 }
4353
4354 if (count == -1) count = 1;
4355
4356 if (is_block) {
4357 /* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingGenericZpopCommand. */
4358 blockingGenericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1);
4359 } else {
4360 /* NON-BLOCK */
4361 genericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1, NULL);
4362 }
4363}
4364
4365/* ZMPOP numkeys key [<key> ...] MIN|MAX [COUNT count] */
4366void zmpopCommand(client *c) {
4367 zmpopGenericCommand(c, 1, 0);
4368}
4369
4370/* BZMPOP timeout numkeys key [<key> ...] MIN|MAX [COUNT count] */
4371void bzmpopCommand(client *c) {
4372 zmpopGenericCommand(c, 2, 1);
4373}
4374