1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | |
32 | /*----------------------------------------------------------------------------- |
33 | * Set Commands |
34 | *----------------------------------------------------------------------------*/ |
35 | |
36 | void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, |
37 | robj *dstkey, int op); |
38 | |
39 | /* Factory method to return a set that *can* hold "value". When the object has |
40 | * an integer-encodable value, an intset will be returned. Otherwise a regular |
41 | * hash table. */ |
42 | robj *setTypeCreate(sds value) { |
43 | if (isSdsRepresentableAsLongLong(value,NULL) == C_OK) |
44 | return createIntsetObject(); |
45 | return createSetObject(); |
46 | } |
47 | |
48 | /* Add the specified value into a set. |
49 | * |
50 | * If the value was already member of the set, nothing is done and 0 is |
51 | * returned, otherwise the new element is added and 1 is returned. */ |
52 | int setTypeAdd(robj *subject, sds value) { |
53 | long long llval; |
54 | if (subject->encoding == OBJ_ENCODING_HT) { |
55 | dict *ht = subject->ptr; |
56 | dictEntry *de = dictAddRaw(ht,value,NULL); |
57 | if (de) { |
58 | dictSetKey(ht,de,sdsdup(value)); |
59 | dictSetVal(ht,de,NULL); |
60 | return 1; |
61 | } |
62 | } else if (subject->encoding == OBJ_ENCODING_INTSET) { |
63 | if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { |
64 | uint8_t success = 0; |
65 | subject->ptr = intsetAdd(subject->ptr,llval,&success); |
66 | if (success) { |
67 | /* Convert to regular set when the intset contains |
68 | * too many entries. */ |
69 | size_t max_entries = server.set_max_intset_entries; |
70 | /* limit to 1G entries due to intset internals. */ |
71 | if (max_entries >= 1<<30) max_entries = 1<<30; |
72 | if (intsetLen(subject->ptr) > max_entries) |
73 | setTypeConvert(subject,OBJ_ENCODING_HT); |
74 | return 1; |
75 | } |
76 | } else { |
77 | /* Failed to get integer from object, convert to regular set. */ |
78 | setTypeConvert(subject,OBJ_ENCODING_HT); |
79 | |
80 | /* The set *was* an intset and this value is not integer |
81 | * encodable, so dictAdd should always work. */ |
82 | serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK); |
83 | return 1; |
84 | } |
85 | } else { |
86 | serverPanic("Unknown set encoding" ); |
87 | } |
88 | return 0; |
89 | } |
90 | |
91 | int setTypeRemove(robj *setobj, sds value) { |
92 | long long llval; |
93 | if (setobj->encoding == OBJ_ENCODING_HT) { |
94 | if (dictDelete(setobj->ptr,value) == DICT_OK) { |
95 | if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); |
96 | return 1; |
97 | } |
98 | } else if (setobj->encoding == OBJ_ENCODING_INTSET) { |
99 | if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { |
100 | int success; |
101 | setobj->ptr = intsetRemove(setobj->ptr,llval,&success); |
102 | if (success) return 1; |
103 | } |
104 | } else { |
105 | serverPanic("Unknown set encoding" ); |
106 | } |
107 | return 0; |
108 | } |
109 | |
110 | int setTypeIsMember(robj *subject, sds value) { |
111 | long long llval; |
112 | if (subject->encoding == OBJ_ENCODING_HT) { |
113 | return dictFind((dict*)subject->ptr,value) != NULL; |
114 | } else if (subject->encoding == OBJ_ENCODING_INTSET) { |
115 | if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { |
116 | return intsetFind((intset*)subject->ptr,llval); |
117 | } |
118 | } else { |
119 | serverPanic("Unknown set encoding" ); |
120 | } |
121 | return 0; |
122 | } |
123 | |
124 | setTypeIterator *setTypeInitIterator(robj *subject) { |
125 | setTypeIterator *si = zmalloc(sizeof(setTypeIterator)); |
126 | si->subject = subject; |
127 | si->encoding = subject->encoding; |
128 | if (si->encoding == OBJ_ENCODING_HT) { |
129 | si->di = dictGetIterator(subject->ptr); |
130 | } else if (si->encoding == OBJ_ENCODING_INTSET) { |
131 | si->ii = 0; |
132 | } else { |
133 | serverPanic("Unknown set encoding" ); |
134 | } |
135 | return si; |
136 | } |
137 | |
138 | void setTypeReleaseIterator(setTypeIterator *si) { |
139 | if (si->encoding == OBJ_ENCODING_HT) |
140 | dictReleaseIterator(si->di); |
141 | zfree(si); |
142 | } |
143 | |
144 | /* Move to the next entry in the set. Returns the object at the current |
145 | * position. |
146 | * |
147 | * Since set elements can be internally be stored as SDS strings or |
148 | * simple arrays of integers, setTypeNext returns the encoding of the |
149 | * set object you are iterating, and will populate the appropriate pointer |
150 | * (sdsele) or (llele) accordingly. |
151 | * |
152 | * Note that both the sdsele and llele pointers should be passed and cannot |
153 | * be NULL since the function will try to defensively populate the non |
154 | * used field with values which are easy to trap if misused. |
155 | * |
156 | * When there are no longer elements -1 is returned. */ |
157 | int setTypeNext(setTypeIterator *si, sds *sdsele, int64_t *llele) { |
158 | if (si->encoding == OBJ_ENCODING_HT) { |
159 | dictEntry *de = dictNext(si->di); |
160 | if (de == NULL) return -1; |
161 | *sdsele = dictGetKey(de); |
162 | *llele = -123456789; /* Not needed. Defensive. */ |
163 | } else if (si->encoding == OBJ_ENCODING_INTSET) { |
164 | if (!intsetGet(si->subject->ptr,si->ii++,llele)) |
165 | return -1; |
166 | *sdsele = NULL; /* Not needed. Defensive. */ |
167 | } else { |
168 | serverPanic("Wrong set encoding in setTypeNext" ); |
169 | } |
170 | return si->encoding; |
171 | } |
172 | |
173 | /* The not copy on write friendly version but easy to use version |
174 | * of setTypeNext() is setTypeNextObject(), returning new SDS |
175 | * strings. So if you don't retain a pointer to this object you should call |
176 | * sdsfree() against it. |
177 | * |
178 | * This function is the way to go for write operations where COW is not |
179 | * an issue. */ |
180 | sds setTypeNextObject(setTypeIterator *si) { |
181 | int64_t intele; |
182 | sds sdsele; |
183 | int encoding; |
184 | |
185 | encoding = setTypeNext(si,&sdsele,&intele); |
186 | switch(encoding) { |
187 | case -1: return NULL; |
188 | case OBJ_ENCODING_INTSET: |
189 | return sdsfromlonglong(intele); |
190 | case OBJ_ENCODING_HT: |
191 | return sdsdup(sdsele); |
192 | default: |
193 | serverPanic("Unsupported encoding" ); |
194 | } |
195 | return NULL; /* just to suppress warnings */ |
196 | } |
197 | |
198 | /* Return random element from a non empty set. |
199 | * The returned element can be an int64_t value if the set is encoded |
200 | * as an "intset" blob of integers, or an SDS string if the set |
201 | * is a regular set. |
202 | * |
203 | * The caller provides both pointers to be populated with the right |
204 | * object. The return value of the function is the object->encoding |
205 | * field of the object and is used by the caller to check if the |
206 | * int64_t pointer or the sds pointer was populated. |
207 | * |
208 | * Note that both the sdsele and llele pointers should be passed and cannot |
209 | * be NULL since the function will try to defensively populate the non |
210 | * used field with values which are easy to trap if misused. */ |
211 | int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele) { |
212 | if (setobj->encoding == OBJ_ENCODING_HT) { |
213 | dictEntry *de = dictGetFairRandomKey(setobj->ptr); |
214 | *sdsele = dictGetKey(de); |
215 | *llele = -123456789; /* Not needed. Defensive. */ |
216 | } else if (setobj->encoding == OBJ_ENCODING_INTSET) { |
217 | *llele = intsetRandom(setobj->ptr); |
218 | *sdsele = NULL; /* Not needed. Defensive. */ |
219 | } else { |
220 | serverPanic("Unknown set encoding" ); |
221 | } |
222 | return setobj->encoding; |
223 | } |
224 | |
225 | unsigned long setTypeSize(const robj *subject) { |
226 | if (subject->encoding == OBJ_ENCODING_HT) { |
227 | return dictSize((const dict*)subject->ptr); |
228 | } else if (subject->encoding == OBJ_ENCODING_INTSET) { |
229 | return intsetLen((const intset*)subject->ptr); |
230 | } else { |
231 | serverPanic("Unknown set encoding" ); |
232 | } |
233 | } |
234 | |
235 | /* Convert the set to specified encoding. The resulting dict (when converting |
236 | * to a hash table) is presized to hold the number of elements in the original |
237 | * set. */ |
238 | void setTypeConvert(robj *setobj, int enc) { |
239 | setTypeIterator *si; |
240 | serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET && |
241 | setobj->encoding == OBJ_ENCODING_INTSET); |
242 | |
243 | if (enc == OBJ_ENCODING_HT) { |
244 | int64_t intele; |
245 | dict *d = dictCreate(&setDictType); |
246 | sds element; |
247 | |
248 | /* Presize the dict to avoid rehashing */ |
249 | dictExpand(d,intsetLen(setobj->ptr)); |
250 | |
251 | /* To add the elements we extract integers and create redis objects */ |
252 | si = setTypeInitIterator(setobj); |
253 | while (setTypeNext(si,&element,&intele) != -1) { |
254 | element = sdsfromlonglong(intele); |
255 | serverAssert(dictAdd(d,element,NULL) == DICT_OK); |
256 | } |
257 | setTypeReleaseIterator(si); |
258 | |
259 | setobj->encoding = OBJ_ENCODING_HT; |
260 | zfree(setobj->ptr); |
261 | setobj->ptr = d; |
262 | } else { |
263 | serverPanic("Unsupported set conversion" ); |
264 | } |
265 | } |
266 | |
267 | /* This is a helper function for the COPY command. |
268 | * Duplicate a set object, with the guarantee that the returned object |
269 | * has the same encoding as the original one. |
270 | * |
271 | * The resulting object always has refcount set to 1 */ |
272 | robj *setTypeDup(robj *o) { |
273 | robj *set; |
274 | setTypeIterator *si; |
275 | sds elesds; |
276 | int64_t intobj; |
277 | |
278 | serverAssert(o->type == OBJ_SET); |
279 | |
280 | /* Create a new set object that have the same encoding as the original object's encoding */ |
281 | if (o->encoding == OBJ_ENCODING_INTSET) { |
282 | intset *is = o->ptr; |
283 | size_t size = intsetBlobLen(is); |
284 | intset *newis = zmalloc(size); |
285 | memcpy(newis,is,size); |
286 | set = createObject(OBJ_SET, newis); |
287 | set->encoding = OBJ_ENCODING_INTSET; |
288 | } else if (o->encoding == OBJ_ENCODING_HT) { |
289 | set = createSetObject(); |
290 | dict *d = o->ptr; |
291 | dictExpand(set->ptr, dictSize(d)); |
292 | si = setTypeInitIterator(o); |
293 | while (setTypeNext(si, &elesds, &intobj) != -1) { |
294 | setTypeAdd(set, elesds); |
295 | } |
296 | setTypeReleaseIterator(si); |
297 | } else { |
298 | serverPanic("Unknown set encoding" ); |
299 | } |
300 | return set; |
301 | } |
302 | |
303 | void saddCommand(client *c) { |
304 | robj *set; |
305 | int j, added = 0; |
306 | |
307 | set = lookupKeyWrite(c->db,c->argv[1]); |
308 | if (checkType(c,set,OBJ_SET)) return; |
309 | |
310 | if (set == NULL) { |
311 | set = setTypeCreate(c->argv[2]->ptr); |
312 | dbAdd(c->db,c->argv[1],set); |
313 | } |
314 | |
315 | for (j = 2; j < c->argc; j++) { |
316 | if (setTypeAdd(set,c->argv[j]->ptr)) added++; |
317 | } |
318 | if (added) { |
319 | signalModifiedKey(c,c->db,c->argv[1]); |
320 | notifyKeyspaceEvent(NOTIFY_SET,"sadd" ,c->argv[1],c->db->id); |
321 | } |
322 | server.dirty += added; |
323 | addReplyLongLong(c,added); |
324 | } |
325 | |
326 | void sremCommand(client *c) { |
327 | robj *set; |
328 | int j, deleted = 0, keyremoved = 0; |
329 | |
330 | if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || |
331 | checkType(c,set,OBJ_SET)) return; |
332 | |
333 | for (j = 2; j < c->argc; j++) { |
334 | if (setTypeRemove(set,c->argv[j]->ptr)) { |
335 | deleted++; |
336 | if (setTypeSize(set) == 0) { |
337 | dbDelete(c->db,c->argv[1]); |
338 | keyremoved = 1; |
339 | break; |
340 | } |
341 | } |
342 | } |
343 | if (deleted) { |
344 | signalModifiedKey(c,c->db,c->argv[1]); |
345 | notifyKeyspaceEvent(NOTIFY_SET,"srem" ,c->argv[1],c->db->id); |
346 | if (keyremoved) |
347 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,c->argv[1], |
348 | c->db->id); |
349 | server.dirty += deleted; |
350 | } |
351 | addReplyLongLong(c,deleted); |
352 | } |
353 | |
354 | void smoveCommand(client *c) { |
355 | robj *srcset, *dstset, *ele; |
356 | srcset = lookupKeyWrite(c->db,c->argv[1]); |
357 | dstset = lookupKeyWrite(c->db,c->argv[2]); |
358 | ele = c->argv[3]; |
359 | |
360 | /* If the source key does not exist return 0 */ |
361 | if (srcset == NULL) { |
362 | addReply(c,shared.czero); |
363 | return; |
364 | } |
365 | |
366 | /* If the source key has the wrong type, or the destination key |
367 | * is set and has the wrong type, return with an error. */ |
368 | if (checkType(c,srcset,OBJ_SET) || |
369 | checkType(c,dstset,OBJ_SET)) return; |
370 | |
371 | /* If srcset and dstset are equal, SMOVE is a no-op */ |
372 | if (srcset == dstset) { |
373 | addReply(c,setTypeIsMember(srcset,ele->ptr) ? |
374 | shared.cone : shared.czero); |
375 | return; |
376 | } |
377 | |
378 | /* If the element cannot be removed from the src set, return 0. */ |
379 | if (!setTypeRemove(srcset,ele->ptr)) { |
380 | addReply(c,shared.czero); |
381 | return; |
382 | } |
383 | notifyKeyspaceEvent(NOTIFY_SET,"srem" ,c->argv[1],c->db->id); |
384 | |
385 | /* Remove the src set from the database when empty */ |
386 | if (setTypeSize(srcset) == 0) { |
387 | dbDelete(c->db,c->argv[1]); |
388 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,c->argv[1],c->db->id); |
389 | } |
390 | |
391 | /* Create the destination set when it doesn't exist */ |
392 | if (!dstset) { |
393 | dstset = setTypeCreate(ele->ptr); |
394 | dbAdd(c->db,c->argv[2],dstset); |
395 | } |
396 | |
397 | signalModifiedKey(c,c->db,c->argv[1]); |
398 | server.dirty++; |
399 | |
400 | /* An extra key has changed when ele was successfully added to dstset */ |
401 | if (setTypeAdd(dstset,ele->ptr)) { |
402 | server.dirty++; |
403 | signalModifiedKey(c,c->db,c->argv[2]); |
404 | notifyKeyspaceEvent(NOTIFY_SET,"sadd" ,c->argv[2],c->db->id); |
405 | } |
406 | addReply(c,shared.cone); |
407 | } |
408 | |
409 | void sismemberCommand(client *c) { |
410 | robj *set; |
411 | |
412 | if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || |
413 | checkType(c,set,OBJ_SET)) return; |
414 | |
415 | if (setTypeIsMember(set,c->argv[2]->ptr)) |
416 | addReply(c,shared.cone); |
417 | else |
418 | addReply(c,shared.czero); |
419 | } |
420 | |
421 | void smismemberCommand(client *c) { |
422 | robj *set; |
423 | int j; |
424 | |
425 | /* Don't abort when the key cannot be found. Non-existing keys are empty |
426 | * sets, where SMISMEMBER should respond with a series of zeros. */ |
427 | set = lookupKeyRead(c->db,c->argv[1]); |
428 | if (set && checkType(c,set,OBJ_SET)) return; |
429 | |
430 | addReplyArrayLen(c,c->argc - 2); |
431 | |
432 | for (j = 2; j < c->argc; j++) { |
433 | if (set && setTypeIsMember(set,c->argv[j]->ptr)) |
434 | addReply(c,shared.cone); |
435 | else |
436 | addReply(c,shared.czero); |
437 | } |
438 | } |
439 | |
440 | void scardCommand(client *c) { |
441 | robj *o; |
442 | |
443 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || |
444 | checkType(c,o,OBJ_SET)) return; |
445 | |
446 | addReplyLongLong(c,setTypeSize(o)); |
447 | } |
448 | |
449 | /* Handle the "SPOP key <count>" variant. The normal version of the |
450 | * command is handled by the spopCommand() function itself. */ |
451 | |
452 | /* How many times bigger should be the set compared to the remaining size |
453 | * for us to use the "create new set" strategy? Read later in the |
454 | * implementation for more info. */ |
455 | #define SPOP_MOVE_STRATEGY_MUL 5 |
456 | |
457 | void spopWithCountCommand(client *c) { |
458 | long l; |
459 | unsigned long count, size; |
460 | robj *set; |
461 | |
462 | /* Get the count argument */ |
463 | if (getPositiveLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return; |
464 | count = (unsigned long) l; |
465 | |
466 | /* Make sure a key with the name inputted exists, and that it's type is |
467 | * indeed a set. Otherwise, return nil */ |
468 | if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.emptyset[c->resp])) |
469 | == NULL || checkType(c,set,OBJ_SET)) return; |
470 | |
471 | /* If count is zero, serve an empty set ASAP to avoid special |
472 | * cases later. */ |
473 | if (count == 0) { |
474 | addReply(c,shared.emptyset[c->resp]); |
475 | return; |
476 | } |
477 | |
478 | size = setTypeSize(set); |
479 | |
480 | /* Generate an SPOP keyspace notification */ |
481 | notifyKeyspaceEvent(NOTIFY_SET,"spop" ,c->argv[1],c->db->id); |
482 | server.dirty += (count >= size) ? size : count; |
483 | |
484 | /* CASE 1: |
485 | * The number of requested elements is greater than or equal to |
486 | * the number of elements inside the set: simply return the whole set. */ |
487 | if (count >= size) { |
488 | /* We just return the entire set */ |
489 | sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION); |
490 | |
491 | /* Delete the set as it is now empty */ |
492 | dbDelete(c->db,c->argv[1]); |
493 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,c->argv[1],c->db->id); |
494 | |
495 | /* Propagate this command as a DEL operation */ |
496 | rewriteClientCommandVector(c,2,shared.del,c->argv[1]); |
497 | signalModifiedKey(c,c->db,c->argv[1]); |
498 | return; |
499 | } |
500 | |
501 | /* Case 2 and 3 require to replicate SPOP as a set of SREM commands. |
502 | * Prepare our replication argument vector. Also send the array length |
503 | * which is common to both the code paths. */ |
504 | robj *propargv[3]; |
505 | propargv[0] = shared.srem; |
506 | propargv[1] = c->argv[1]; |
507 | addReplySetLen(c,count); |
508 | |
509 | /* Common iteration vars. */ |
510 | sds sdsele; |
511 | robj *objele; |
512 | int encoding; |
513 | int64_t llele; |
514 | unsigned long remaining = size-count; /* Elements left after SPOP. */ |
515 | |
516 | /* If we are here, the number of requested elements is less than the |
517 | * number of elements inside the set. Also we are sure that count < size. |
518 | * Use two different strategies. |
519 | * |
520 | * CASE 2: The number of elements to return is small compared to the |
521 | * set size. We can just extract random elements and return them to |
522 | * the set. */ |
523 | if (remaining*SPOP_MOVE_STRATEGY_MUL > count) { |
524 | while(count--) { |
525 | /* Emit and remove. */ |
526 | encoding = setTypeRandomElement(set,&sdsele,&llele); |
527 | if (encoding == OBJ_ENCODING_INTSET) { |
528 | addReplyBulkLongLong(c,llele); |
529 | objele = createStringObjectFromLongLong(llele); |
530 | set->ptr = intsetRemove(set->ptr,llele,NULL); |
531 | } else { |
532 | addReplyBulkCBuffer(c,sdsele,sdslen(sdsele)); |
533 | objele = createStringObject(sdsele,sdslen(sdsele)); |
534 | setTypeRemove(set,sdsele); |
535 | } |
536 | |
537 | /* Replicate/AOF this command as an SREM operation */ |
538 | propargv[2] = objele; |
539 | alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL); |
540 | decrRefCount(objele); |
541 | } |
542 | } else { |
543 | /* CASE 3: The number of elements to return is very big, approaching |
544 | * the size of the set itself. After some time extracting random elements |
545 | * from such a set becomes computationally expensive, so we use |
546 | * a different strategy, we extract random elements that we don't |
547 | * want to return (the elements that will remain part of the set), |
548 | * creating a new set as we do this (that will be stored as the original |
549 | * set). Then we return the elements left in the original set and |
550 | * release it. */ |
551 | robj *newset = NULL; |
552 | |
553 | /* Create a new set with just the remaining elements. */ |
554 | while(remaining--) { |
555 | encoding = setTypeRandomElement(set,&sdsele,&llele); |
556 | if (encoding == OBJ_ENCODING_INTSET) { |
557 | sdsele = sdsfromlonglong(llele); |
558 | } else { |
559 | sdsele = sdsdup(sdsele); |
560 | } |
561 | if (!newset) newset = setTypeCreate(sdsele); |
562 | setTypeAdd(newset,sdsele); |
563 | setTypeRemove(set,sdsele); |
564 | sdsfree(sdsele); |
565 | } |
566 | |
567 | /* Transfer the old set to the client. */ |
568 | setTypeIterator *si; |
569 | si = setTypeInitIterator(set); |
570 | while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) { |
571 | if (encoding == OBJ_ENCODING_INTSET) { |
572 | addReplyBulkLongLong(c,llele); |
573 | objele = createStringObjectFromLongLong(llele); |
574 | } else { |
575 | addReplyBulkCBuffer(c,sdsele,sdslen(sdsele)); |
576 | objele = createStringObject(sdsele,sdslen(sdsele)); |
577 | } |
578 | |
579 | /* Replicate/AOF this command as an SREM operation */ |
580 | propargv[2] = objele; |
581 | alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL); |
582 | decrRefCount(objele); |
583 | } |
584 | setTypeReleaseIterator(si); |
585 | |
586 | /* Assign the new set as the key value. */ |
587 | dbOverwrite(c->db,c->argv[1],newset); |
588 | } |
589 | |
590 | /* Don't propagate the command itself even if we incremented the |
591 | * dirty counter. We don't want to propagate an SPOP command since |
592 | * we propagated the command as a set of SREMs operations using |
593 | * the alsoPropagate() API. */ |
594 | preventCommandPropagation(c); |
595 | signalModifiedKey(c,c->db,c->argv[1]); |
596 | } |
597 | |
598 | void spopCommand(client *c) { |
599 | robj *set, *ele; |
600 | sds sdsele; |
601 | int64_t llele; |
602 | int encoding; |
603 | |
604 | if (c->argc == 3) { |
605 | spopWithCountCommand(c); |
606 | return; |
607 | } else if (c->argc > 3) { |
608 | addReplyErrorObject(c,shared.syntaxerr); |
609 | return; |
610 | } |
611 | |
612 | /* Make sure a key with the name inputted exists, and that it's type is |
613 | * indeed a set */ |
614 | if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) |
615 | == NULL || checkType(c,set,OBJ_SET)) return; |
616 | |
617 | /* Get a random element from the set */ |
618 | encoding = setTypeRandomElement(set,&sdsele,&llele); |
619 | |
620 | /* Remove the element from the set */ |
621 | if (encoding == OBJ_ENCODING_INTSET) { |
622 | ele = createStringObjectFromLongLong(llele); |
623 | set->ptr = intsetRemove(set->ptr,llele,NULL); |
624 | } else { |
625 | ele = createStringObject(sdsele,sdslen(sdsele)); |
626 | setTypeRemove(set,ele->ptr); |
627 | } |
628 | |
629 | notifyKeyspaceEvent(NOTIFY_SET,"spop" ,c->argv[1],c->db->id); |
630 | |
631 | /* Replicate/AOF this command as an SREM operation */ |
632 | rewriteClientCommandVector(c,3,shared.srem,c->argv[1],ele); |
633 | |
634 | /* Add the element to the reply */ |
635 | addReplyBulk(c,ele); |
636 | decrRefCount(ele); |
637 | |
638 | /* Delete the set if it's empty */ |
639 | if (setTypeSize(set) == 0) { |
640 | dbDelete(c->db,c->argv[1]); |
641 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,c->argv[1],c->db->id); |
642 | } |
643 | |
644 | /* Set has been modified */ |
645 | signalModifiedKey(c,c->db,c->argv[1]); |
646 | server.dirty++; |
647 | } |
648 | |
649 | /* handle the "SRANDMEMBER key <count>" variant. The normal version of the |
650 | * command is handled by the srandmemberCommand() function itself. */ |
651 | |
652 | /* How many times bigger should be the set compared to the requested size |
653 | * for us to don't use the "remove elements" strategy? Read later in the |
654 | * implementation for more info. */ |
655 | #define SRANDMEMBER_SUB_STRATEGY_MUL 3 |
656 | |
657 | void srandmemberWithCountCommand(client *c) { |
658 | long l; |
659 | unsigned long count, size; |
660 | int uniq = 1; |
661 | robj *set; |
662 | sds ele; |
663 | int64_t llele; |
664 | int encoding; |
665 | |
666 | dict *d; |
667 | |
668 | if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return; |
669 | if (l >= 0) { |
670 | count = (unsigned long) l; |
671 | } else { |
672 | /* A negative count means: return the same elements multiple times |
673 | * (i.e. don't remove the extracted element after every extraction). */ |
674 | count = -l; |
675 | uniq = 0; |
676 | } |
677 | |
678 | if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) |
679 | == NULL || checkType(c,set,OBJ_SET)) return; |
680 | size = setTypeSize(set); |
681 | |
682 | /* If count is zero, serve it ASAP to avoid special cases later. */ |
683 | if (count == 0) { |
684 | addReply(c,shared.emptyarray); |
685 | return; |
686 | } |
687 | |
688 | /* CASE 1: The count was negative, so the extraction method is just: |
689 | * "return N random elements" sampling the whole set every time. |
690 | * This case is trivial and can be served without auxiliary data |
691 | * structures. This case is the only one that also needs to return the |
692 | * elements in random order. */ |
693 | if (!uniq || count == 1) { |
694 | addReplyArrayLen(c,count); |
695 | while(count--) { |
696 | encoding = setTypeRandomElement(set,&ele,&llele); |
697 | if (encoding == OBJ_ENCODING_INTSET) { |
698 | addReplyBulkLongLong(c,llele); |
699 | } else { |
700 | addReplyBulkCBuffer(c,ele,sdslen(ele)); |
701 | } |
702 | } |
703 | return; |
704 | } |
705 | |
706 | /* CASE 2: |
707 | * The number of requested elements is greater than the number of |
708 | * elements inside the set: simply return the whole set. */ |
709 | if (count >= size) { |
710 | setTypeIterator *si; |
711 | addReplyArrayLen(c,size); |
712 | si = setTypeInitIterator(set); |
713 | while ((encoding = setTypeNext(si,&ele,&llele)) != -1) { |
714 | if (encoding == OBJ_ENCODING_INTSET) { |
715 | addReplyBulkLongLong(c,llele); |
716 | } else { |
717 | addReplyBulkCBuffer(c,ele,sdslen(ele)); |
718 | } |
719 | size--; |
720 | } |
721 | setTypeReleaseIterator(si); |
722 | serverAssert(size==0); |
723 | return; |
724 | } |
725 | |
726 | /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */ |
727 | d = dictCreate(&sdsReplyDictType); |
728 | |
729 | /* CASE 3: |
730 | * The number of elements inside the set is not greater than |
731 | * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements. |
732 | * In this case we create a set from scratch with all the elements, and |
733 | * subtract random elements to reach the requested number of elements. |
734 | * |
735 | * This is done because if the number of requested elements is just |
736 | * a bit less than the number of elements in the set, the natural approach |
737 | * used into CASE 4 is highly inefficient. */ |
738 | if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) { |
739 | setTypeIterator *si; |
740 | |
741 | /* Add all the elements into the temporary dictionary. */ |
742 | si = setTypeInitIterator(set); |
743 | dictExpand(d, size); |
744 | while ((encoding = setTypeNext(si,&ele,&llele)) != -1) { |
745 | int retval = DICT_ERR; |
746 | |
747 | if (encoding == OBJ_ENCODING_INTSET) { |
748 | retval = dictAdd(d,sdsfromlonglong(llele),NULL); |
749 | } else { |
750 | retval = dictAdd(d,sdsdup(ele),NULL); |
751 | } |
752 | serverAssert(retval == DICT_OK); |
753 | } |
754 | setTypeReleaseIterator(si); |
755 | serverAssert(dictSize(d) == size); |
756 | |
757 | /* Remove random elements to reach the right count. */ |
758 | while (size > count) { |
759 | dictEntry *de; |
760 | de = dictGetFairRandomKey(d); |
761 | dictUnlink(d,dictGetKey(de)); |
762 | sdsfree(dictGetKey(de)); |
763 | dictFreeUnlinkedEntry(d,de); |
764 | size--; |
765 | } |
766 | } |
767 | |
768 | /* CASE 4: We have a big set compared to the requested number of elements. |
769 | * In this case we can simply get random elements from the set and add |
770 | * to the temporary set, trying to eventually get enough unique elements |
771 | * to reach the specified count. */ |
772 | else { |
773 | unsigned long added = 0; |
774 | sds sdsele; |
775 | |
776 | dictExpand(d, count); |
777 | while (added < count) { |
778 | encoding = setTypeRandomElement(set,&ele,&llele); |
779 | if (encoding == OBJ_ENCODING_INTSET) { |
780 | sdsele = sdsfromlonglong(llele); |
781 | } else { |
782 | sdsele = sdsdup(ele); |
783 | } |
784 | /* Try to add the object to the dictionary. If it already exists |
785 | * free it, otherwise increment the number of objects we have |
786 | * in the result dictionary. */ |
787 | if (dictAdd(d,sdsele,NULL) == DICT_OK) |
788 | added++; |
789 | else |
790 | sdsfree(sdsele); |
791 | } |
792 | } |
793 | |
794 | /* CASE 3 & 4: send the result to the user. */ |
795 | { |
796 | dictIterator *di; |
797 | dictEntry *de; |
798 | |
799 | addReplyArrayLen(c,count); |
800 | di = dictGetIterator(d); |
801 | while((de = dictNext(di)) != NULL) |
802 | addReplyBulkSds(c,dictGetKey(de)); |
803 | dictReleaseIterator(di); |
804 | dictRelease(d); |
805 | } |
806 | } |
807 | |
808 | /* SRANDMEMBER <key> [<count>] */ |
809 | void srandmemberCommand(client *c) { |
810 | robj *set; |
811 | sds ele; |
812 | int64_t llele; |
813 | int encoding; |
814 | |
815 | if (c->argc == 3) { |
816 | srandmemberWithCountCommand(c); |
817 | return; |
818 | } else if (c->argc > 3) { |
819 | addReplyErrorObject(c,shared.syntaxerr); |
820 | return; |
821 | } |
822 | |
823 | /* Handle variant without <count> argument. Reply with simple bulk string */ |
824 | if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) |
825 | == NULL || checkType(c,set,OBJ_SET)) return; |
826 | |
827 | encoding = setTypeRandomElement(set,&ele,&llele); |
828 | if (encoding == OBJ_ENCODING_INTSET) { |
829 | addReplyBulkLongLong(c,llele); |
830 | } else { |
831 | addReplyBulkCBuffer(c,ele,sdslen(ele)); |
832 | } |
833 | } |
834 | |
835 | int qsortCompareSetsByCardinality(const void *s1, const void *s2) { |
836 | if (setTypeSize(*(robj**)s1) > setTypeSize(*(robj**)s2)) return 1; |
837 | if (setTypeSize(*(robj**)s1) < setTypeSize(*(robj**)s2)) return -1; |
838 | return 0; |
839 | } |
840 | |
841 | /* This is used by SDIFF and in this case we can receive NULL that should |
842 | * be handled as empty sets. */ |
843 | int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) { |
844 | robj *o1 = *(robj**)s1, *o2 = *(robj**)s2; |
845 | unsigned long first = o1 ? setTypeSize(o1) : 0; |
846 | unsigned long second = o2 ? setTypeSize(o2) : 0; |
847 | |
848 | if (first < second) return 1; |
849 | if (first > second) return -1; |
850 | return 0; |
851 | } |
852 | |
853 | /* SINTER / SMEMBERS / SINTERSTORE / SINTERCARD |
854 | * |
855 | * 'cardinality_only' work for SINTERCARD, only return the cardinality |
856 | * with minimum processing and memory overheads. |
857 | * |
858 | * 'limit' work for SINTERCARD, stop searching after reaching the limit. |
859 | * Passing a 0 means unlimited. |
860 | */ |
861 | void sinterGenericCommand(client *c, robj **setkeys, |
862 | unsigned long setnum, robj *dstkey, |
863 | int cardinality_only, unsigned long limit) { |
864 | robj **sets = zmalloc(sizeof(robj*)*setnum); |
865 | setTypeIterator *si; |
866 | robj *dstset = NULL; |
867 | sds elesds; |
868 | int64_t intobj; |
869 | void *replylen = NULL; |
870 | unsigned long j, cardinality = 0; |
871 | int encoding, empty = 0; |
872 | |
873 | for (j = 0; j < setnum; j++) { |
874 | robj *setobj = lookupKeyRead(c->db, setkeys[j]); |
875 | if (!setobj) { |
876 | /* A NULL is considered an empty set */ |
877 | empty += 1; |
878 | sets[j] = NULL; |
879 | continue; |
880 | } |
881 | if (checkType(c,setobj,OBJ_SET)) { |
882 | zfree(sets); |
883 | return; |
884 | } |
885 | sets[j] = setobj; |
886 | } |
887 | |
888 | /* Set intersection with an empty set always results in an empty set. |
889 | * Return ASAP if there is an empty set. */ |
890 | if (empty > 0) { |
891 | zfree(sets); |
892 | if (dstkey) { |
893 | if (dbDelete(c->db,dstkey)) { |
894 | signalModifiedKey(c,c->db,dstkey); |
895 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,dstkey,c->db->id); |
896 | server.dirty++; |
897 | } |
898 | addReply(c,shared.czero); |
899 | } else if (cardinality_only) { |
900 | addReplyLongLong(c,cardinality); |
901 | } else { |
902 | addReply(c,shared.emptyset[c->resp]); |
903 | } |
904 | return; |
905 | } |
906 | |
907 | /* Sort sets from the smallest to largest, this will improve our |
908 | * algorithm's performance */ |
909 | qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); |
910 | |
911 | /* The first thing we should output is the total number of elements... |
912 | * since this is a multi-bulk write, but at this stage we don't know |
913 | * the intersection set size, so we use a trick, append an empty object |
914 | * to the output list and save the pointer to later modify it with the |
915 | * right length */ |
916 | if (dstkey) { |
917 | /* If we have a target key where to store the resulting set |
918 | * create this key with an empty set inside */ |
919 | dstset = createIntsetObject(); |
920 | } else if (!cardinality_only) { |
921 | replylen = addReplyDeferredLen(c); |
922 | } |
923 | |
924 | /* Iterate all the elements of the first (smallest) set, and test |
925 | * the element against all the other sets, if at least one set does |
926 | * not include the element it is discarded */ |
927 | si = setTypeInitIterator(sets[0]); |
928 | while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) { |
929 | for (j = 1; j < setnum; j++) { |
930 | if (sets[j] == sets[0]) continue; |
931 | if (encoding == OBJ_ENCODING_INTSET) { |
932 | /* intset with intset is simple... and fast */ |
933 | if (sets[j]->encoding == OBJ_ENCODING_INTSET && |
934 | !intsetFind((intset*)sets[j]->ptr,intobj)) |
935 | { |
936 | break; |
937 | /* in order to compare an integer with an object we |
938 | * have to use the generic function, creating an object |
939 | * for this */ |
940 | } else if (sets[j]->encoding == OBJ_ENCODING_HT) { |
941 | elesds = sdsfromlonglong(intobj); |
942 | if (!setTypeIsMember(sets[j],elesds)) { |
943 | sdsfree(elesds); |
944 | break; |
945 | } |
946 | sdsfree(elesds); |
947 | } |
948 | } else if (encoding == OBJ_ENCODING_HT) { |
949 | if (!setTypeIsMember(sets[j],elesds)) { |
950 | break; |
951 | } |
952 | } |
953 | } |
954 | |
955 | /* Only take action when all sets contain the member */ |
956 | if (j == setnum) { |
957 | if (cardinality_only) { |
958 | cardinality++; |
959 | |
960 | /* We stop the searching after reaching the limit. */ |
961 | if (limit && cardinality >= limit) |
962 | break; |
963 | } else if (!dstkey) { |
964 | if (encoding == OBJ_ENCODING_HT) |
965 | addReplyBulkCBuffer(c,elesds,sdslen(elesds)); |
966 | else |
967 | addReplyBulkLongLong(c,intobj); |
968 | cardinality++; |
969 | } else { |
970 | if (encoding == OBJ_ENCODING_INTSET) { |
971 | elesds = sdsfromlonglong(intobj); |
972 | setTypeAdd(dstset,elesds); |
973 | sdsfree(elesds); |
974 | } else { |
975 | setTypeAdd(dstset,elesds); |
976 | } |
977 | } |
978 | } |
979 | } |
980 | setTypeReleaseIterator(si); |
981 | |
982 | if (cardinality_only) { |
983 | addReplyLongLong(c,cardinality); |
984 | } else if (dstkey) { |
985 | /* Store the resulting set into the target, if the intersection |
986 | * is not an empty set. */ |
987 | if (setTypeSize(dstset) > 0) { |
988 | setKey(c,c->db,dstkey,dstset,0); |
989 | addReplyLongLong(c,setTypeSize(dstset)); |
990 | notifyKeyspaceEvent(NOTIFY_SET,"sinterstore" , |
991 | dstkey,c->db->id); |
992 | server.dirty++; |
993 | } else { |
994 | addReply(c,shared.czero); |
995 | if (dbDelete(c->db,dstkey)) { |
996 | server.dirty++; |
997 | signalModifiedKey(c,c->db,dstkey); |
998 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,dstkey,c->db->id); |
999 | } |
1000 | } |
1001 | decrRefCount(dstset); |
1002 | } else { |
1003 | setDeferredSetLen(c,replylen,cardinality); |
1004 | } |
1005 | zfree(sets); |
1006 | } |
1007 | |
1008 | /* SINTER key [key ...] */ |
1009 | void sinterCommand(client *c) { |
1010 | sinterGenericCommand(c, c->argv+1, c->argc-1, NULL, 0, 0); |
1011 | } |
1012 | |
1013 | /* SINTERCARD numkeys key [key ...] [LIMIT limit] */ |
1014 | void sinterCardCommand(client *c) { |
1015 | long j; |
1016 | long numkeys = 0; /* Number of keys. */ |
1017 | long limit = 0; /* 0 means not limit. */ |
1018 | |
1019 | if (getRangeLongFromObjectOrReply(c, c->argv[1], 1, LONG_MAX, |
1020 | &numkeys, "numkeys should be greater than 0" ) != C_OK) |
1021 | return; |
1022 | if (numkeys > (c->argc - 2)) { |
1023 | addReplyError(c, "Number of keys can't be greater than number of args" ); |
1024 | return; |
1025 | } |
1026 | |
1027 | for (j = 2 + numkeys; j < c->argc; j++) { |
1028 | char *opt = c->argv[j]->ptr; |
1029 | int moreargs = (c->argc - 1) - j; |
1030 | |
1031 | if (!strcasecmp(opt, "LIMIT" ) && moreargs) { |
1032 | j++; |
1033 | if (getPositiveLongFromObjectOrReply(c, c->argv[j], &limit, |
1034 | "LIMIT can't be negative" ) != C_OK) |
1035 | return; |
1036 | } else { |
1037 | addReplyErrorObject(c, shared.syntaxerr); |
1038 | return; |
1039 | } |
1040 | } |
1041 | |
1042 | sinterGenericCommand(c, c->argv+2, numkeys, NULL, 1, limit); |
1043 | } |
1044 | |
1045 | /* SINTERSTORE destination key [key ...] */ |
1046 | void sinterstoreCommand(client *c) { |
1047 | sinterGenericCommand(c, c->argv+2, c->argc-2, c->argv[1], 0, 0); |
1048 | } |
1049 | |
1050 | void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, |
1051 | robj *dstkey, int op) { |
1052 | robj **sets = zmalloc(sizeof(robj*)*setnum); |
1053 | setTypeIterator *si; |
1054 | robj *dstset = NULL; |
1055 | sds ele; |
1056 | int j, cardinality = 0; |
1057 | int diff_algo = 1; |
1058 | int sameset = 0; |
1059 | |
1060 | for (j = 0; j < setnum; j++) { |
1061 | robj *setobj = lookupKeyRead(c->db, setkeys[j]); |
1062 | if (!setobj) { |
1063 | sets[j] = NULL; |
1064 | continue; |
1065 | } |
1066 | if (checkType(c,setobj,OBJ_SET)) { |
1067 | zfree(sets); |
1068 | return; |
1069 | } |
1070 | sets[j] = setobj; |
1071 | if (j > 0 && sets[0] == sets[j]) { |
1072 | sameset = 1; |
1073 | } |
1074 | } |
1075 | |
1076 | /* Select what DIFF algorithm to use. |
1077 | * |
1078 | * Algorithm 1 is O(N*M) where N is the size of the element first set |
1079 | * and M the total number of sets. |
1080 | * |
1081 | * Algorithm 2 is O(N) where N is the total number of elements in all |
1082 | * the sets. |
1083 | * |
1084 | * We compute what is the best bet with the current input here. */ |
1085 | if (op == SET_OP_DIFF && sets[0] && !sameset) { |
1086 | long long algo_one_work = 0, algo_two_work = 0; |
1087 | |
1088 | for (j = 0; j < setnum; j++) { |
1089 | if (sets[j] == NULL) continue; |
1090 | |
1091 | algo_one_work += setTypeSize(sets[0]); |
1092 | algo_two_work += setTypeSize(sets[j]); |
1093 | } |
1094 | |
1095 | /* Algorithm 1 has better constant times and performs less operations |
1096 | * if there are elements in common. Give it some advantage. */ |
1097 | algo_one_work /= 2; |
1098 | diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2; |
1099 | |
1100 | if (diff_algo == 1 && setnum > 1) { |
1101 | /* With algorithm 1 it is better to order the sets to subtract |
1102 | * by decreasing size, so that we are more likely to find |
1103 | * duplicated elements ASAP. */ |
1104 | qsort(sets+1,setnum-1,sizeof(robj*), |
1105 | qsortCompareSetsByRevCardinality); |
1106 | } |
1107 | } |
1108 | |
1109 | /* We need a temp set object to store our union. If the dstkey |
1110 | * is not NULL (that is, we are inside an SUNIONSTORE operation) then |
1111 | * this set object will be the resulting object to set into the target key*/ |
1112 | dstset = createIntsetObject(); |
1113 | |
1114 | if (op == SET_OP_UNION) { |
1115 | /* Union is trivial, just add every element of every set to the |
1116 | * temporary set. */ |
1117 | for (j = 0; j < setnum; j++) { |
1118 | if (!sets[j]) continue; /* non existing keys are like empty sets */ |
1119 | |
1120 | si = setTypeInitIterator(sets[j]); |
1121 | while((ele = setTypeNextObject(si)) != NULL) { |
1122 | if (setTypeAdd(dstset,ele)) cardinality++; |
1123 | sdsfree(ele); |
1124 | } |
1125 | setTypeReleaseIterator(si); |
1126 | } |
1127 | } else if (op == SET_OP_DIFF && sameset) { |
1128 | /* At least one of the sets is the same one (same key) as the first one, result must be empty. */ |
1129 | } else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) { |
1130 | /* DIFF Algorithm 1: |
1131 | * |
1132 | * We perform the diff by iterating all the elements of the first set, |
1133 | * and only adding it to the target set if the element does not exist |
1134 | * into all the other sets. |
1135 | * |
1136 | * This way we perform at max N*M operations, where N is the size of |
1137 | * the first set, and M the number of sets. */ |
1138 | si = setTypeInitIterator(sets[0]); |
1139 | while((ele = setTypeNextObject(si)) != NULL) { |
1140 | for (j = 1; j < setnum; j++) { |
1141 | if (!sets[j]) continue; /* no key is an empty set. */ |
1142 | if (sets[j] == sets[0]) break; /* same set! */ |
1143 | if (setTypeIsMember(sets[j],ele)) break; |
1144 | } |
1145 | if (j == setnum) { |
1146 | /* There is no other set with this element. Add it. */ |
1147 | setTypeAdd(dstset,ele); |
1148 | cardinality++; |
1149 | } |
1150 | sdsfree(ele); |
1151 | } |
1152 | setTypeReleaseIterator(si); |
1153 | } else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) { |
1154 | /* DIFF Algorithm 2: |
1155 | * |
1156 | * Add all the elements of the first set to the auxiliary set. |
1157 | * Then remove all the elements of all the next sets from it. |
1158 | * |
1159 | * This is O(N) where N is the sum of all the elements in every |
1160 | * set. */ |
1161 | for (j = 0; j < setnum; j++) { |
1162 | if (!sets[j]) continue; /* non existing keys are like empty sets */ |
1163 | |
1164 | si = setTypeInitIterator(sets[j]); |
1165 | while((ele = setTypeNextObject(si)) != NULL) { |
1166 | if (j == 0) { |
1167 | if (setTypeAdd(dstset,ele)) cardinality++; |
1168 | } else { |
1169 | if (setTypeRemove(dstset,ele)) cardinality--; |
1170 | } |
1171 | sdsfree(ele); |
1172 | } |
1173 | setTypeReleaseIterator(si); |
1174 | |
1175 | /* Exit if result set is empty as any additional removal |
1176 | * of elements will have no effect. */ |
1177 | if (cardinality == 0) break; |
1178 | } |
1179 | } |
1180 | |
1181 | /* Output the content of the resulting set, if not in STORE mode */ |
1182 | if (!dstkey) { |
1183 | addReplySetLen(c,cardinality); |
1184 | si = setTypeInitIterator(dstset); |
1185 | while((ele = setTypeNextObject(si)) != NULL) { |
1186 | addReplyBulkCBuffer(c,ele,sdslen(ele)); |
1187 | sdsfree(ele); |
1188 | } |
1189 | setTypeReleaseIterator(si); |
1190 | server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) : |
1191 | decrRefCount(dstset); |
1192 | } else { |
1193 | /* If we have a target key where to store the resulting set |
1194 | * create this key with the result set inside */ |
1195 | if (setTypeSize(dstset) > 0) { |
1196 | setKey(c,c->db,dstkey,dstset,0); |
1197 | addReplyLongLong(c,setTypeSize(dstset)); |
1198 | notifyKeyspaceEvent(NOTIFY_SET, |
1199 | op == SET_OP_UNION ? "sunionstore" : "sdiffstore" , |
1200 | dstkey,c->db->id); |
1201 | server.dirty++; |
1202 | } else { |
1203 | addReply(c,shared.czero); |
1204 | if (dbDelete(c->db,dstkey)) { |
1205 | server.dirty++; |
1206 | signalModifiedKey(c,c->db,dstkey); |
1207 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del" ,dstkey,c->db->id); |
1208 | } |
1209 | } |
1210 | decrRefCount(dstset); |
1211 | } |
1212 | zfree(sets); |
1213 | } |
1214 | |
1215 | /* SUNION key [key ...] */ |
1216 | void sunionCommand(client *c) { |
1217 | sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION); |
1218 | } |
1219 | |
1220 | /* SUNIONSTORE destination key [key ...] */ |
1221 | void sunionstoreCommand(client *c) { |
1222 | sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_UNION); |
1223 | } |
1224 | |
1225 | /* SDIFF key [key ...] */ |
1226 | void sdiffCommand(client *c) { |
1227 | sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF); |
1228 | } |
1229 | |
1230 | /* SDIFFSTORE destination key [key ...] */ |
1231 | void sdiffstoreCommand(client *c) { |
1232 | sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF); |
1233 | } |
1234 | |
1235 | void sscanCommand(client *c) { |
1236 | robj *set; |
1237 | unsigned long cursor; |
1238 | |
1239 | if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; |
1240 | if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || |
1241 | checkType(c,set,OBJ_SET)) return; |
1242 | scanGenericCommand(c,set,cursor); |
1243 | } |
1244 | |