1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "cluster.h" |
32 | |
33 | /* Structure to hold the pubsub related metadata. Currently used |
34 | * for pubsub and pubsubshard feature. */ |
35 | typedef struct pubsubtype { |
36 | int shard; |
37 | dict *(*clientPubSubChannels)(client*); |
38 | int (*subscriptionCount)(client*); |
39 | dict **serverPubSubChannels; |
40 | robj **subscribeMsg; |
41 | robj **unsubscribeMsg; |
42 | robj **messageBulk; |
43 | }pubsubtype; |
44 | |
45 | /* |
46 | * Get client's global Pub/Sub channels subscription count. |
47 | */ |
48 | int clientSubscriptionsCount(client *c); |
49 | |
50 | /* |
51 | * Get client's shard level Pub/Sub channels subscription count. |
52 | */ |
53 | int clientShardSubscriptionsCount(client *c); |
54 | |
55 | /* |
56 | * Get client's global Pub/Sub channels dict. |
57 | */ |
58 | dict* getClientPubSubChannels(client *c); |
59 | |
60 | /* |
61 | * Get client's shard level Pub/Sub channels dict. |
62 | */ |
63 | dict* getClientPubSubShardChannels(client *c); |
64 | |
65 | /* |
66 | * Get list of channels client is subscribed to. |
67 | * If a pattern is provided, the subset of channels is returned |
68 | * matching the pattern. |
69 | */ |
70 | void channelList(client *c, sds pat, dict* pubsub_channels); |
71 | |
72 | /* |
73 | * Pub/Sub type for global channels. |
74 | */ |
75 | pubsubtype pubSubType = { |
76 | .shard = 0, |
77 | .clientPubSubChannels = getClientPubSubChannels, |
78 | .subscriptionCount = clientSubscriptionsCount, |
79 | .serverPubSubChannels = &server.pubsub_channels, |
80 | .subscribeMsg = &shared.subscribebulk, |
81 | .unsubscribeMsg = &shared.unsubscribebulk, |
82 | .messageBulk = &shared.messagebulk, |
83 | }; |
84 | |
85 | /* |
86 | * Pub/Sub type for shard level channels bounded to a slot. |
87 | */ |
88 | pubsubtype pubSubShardType = { |
89 | .shard = 1, |
90 | .clientPubSubChannels = getClientPubSubShardChannels, |
91 | .subscriptionCount = clientShardSubscriptionsCount, |
92 | .serverPubSubChannels = &server.pubsubshard_channels, |
93 | .subscribeMsg = &shared.ssubscribebulk, |
94 | .unsubscribeMsg = &shared.sunsubscribebulk, |
95 | .messageBulk = &shared.smessagebulk, |
96 | }; |
97 | |
98 | /*----------------------------------------------------------------------------- |
99 | * Pubsub client replies API |
100 | *----------------------------------------------------------------------------*/ |
101 | |
102 | /* Send a pubsub message of type "message" to the client. |
103 | * Normally 'msg' is a Redis object containing the string to send as |
104 | * message. However if the caller sets 'msg' as NULL, it will be able |
105 | * to send a special message (for instance an Array type) by using the |
106 | * addReply*() API family. */ |
107 | void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { |
108 | if (c->resp == 2) |
109 | addReply(c,shared.mbulkhdr[3]); |
110 | else |
111 | addReplyPushLen(c,3); |
112 | addReply(c,message_bulk); |
113 | addReplyBulk(c,channel); |
114 | if (msg) addReplyBulk(c,msg); |
115 | } |
116 | |
117 | /* Send a pubsub message of type "pmessage" to the client. The difference |
118 | * with the "message" type delivered by addReplyPubsubMessage() is that |
119 | * this message format also includes the pattern that matched the message. */ |
120 | void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { |
121 | if (c->resp == 2) |
122 | addReply(c,shared.mbulkhdr[4]); |
123 | else |
124 | addReplyPushLen(c,4); |
125 | addReply(c,shared.pmessagebulk); |
126 | addReplyBulk(c,pat); |
127 | addReplyBulk(c,channel); |
128 | addReplyBulk(c,msg); |
129 | } |
130 | |
131 | /* Send the pubsub subscription notification to the client. */ |
132 | void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) { |
133 | if (c->resp == 2) |
134 | addReply(c,shared.mbulkhdr[3]); |
135 | else |
136 | addReplyPushLen(c,3); |
137 | addReply(c,*type.subscribeMsg); |
138 | addReplyBulk(c,channel); |
139 | addReplyLongLong(c,type.subscriptionCount(c)); |
140 | } |
141 | |
142 | /* Send the pubsub unsubscription notification to the client. |
143 | * Channel can be NULL: this is useful when the client sends a mass |
144 | * unsubscribe command but there are no channels to unsubscribe from: we |
145 | * still send a notification. */ |
146 | void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) { |
147 | if (c->resp == 2) |
148 | addReply(c,shared.mbulkhdr[3]); |
149 | else |
150 | addReplyPushLen(c,3); |
151 | addReply(c, *type.unsubscribeMsg); |
152 | if (channel) |
153 | addReplyBulk(c,channel); |
154 | else |
155 | addReplyNull(c); |
156 | addReplyLongLong(c,type.subscriptionCount(c)); |
157 | } |
158 | |
159 | /* Send the pubsub pattern subscription notification to the client. */ |
160 | void addReplyPubsubPatSubscribed(client *c, robj *pattern) { |
161 | if (c->resp == 2) |
162 | addReply(c,shared.mbulkhdr[3]); |
163 | else |
164 | addReplyPushLen(c,3); |
165 | addReply(c,shared.psubscribebulk); |
166 | addReplyBulk(c,pattern); |
167 | addReplyLongLong(c,clientSubscriptionsCount(c)); |
168 | } |
169 | |
170 | /* Send the pubsub pattern unsubscription notification to the client. |
171 | * Pattern can be NULL: this is useful when the client sends a mass |
172 | * punsubscribe command but there are no pattern to unsubscribe from: we |
173 | * still send a notification. */ |
174 | void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { |
175 | if (c->resp == 2) |
176 | addReply(c,shared.mbulkhdr[3]); |
177 | else |
178 | addReplyPushLen(c,3); |
179 | addReply(c,shared.punsubscribebulk); |
180 | if (pattern) |
181 | addReplyBulk(c,pattern); |
182 | else |
183 | addReplyNull(c); |
184 | addReplyLongLong(c,clientSubscriptionsCount(c)); |
185 | } |
186 | |
187 | /*----------------------------------------------------------------------------- |
188 | * Pubsub low level API |
189 | *----------------------------------------------------------------------------*/ |
190 | |
191 | /* Return the number of pubsub channels + patterns is handled. */ |
192 | int serverPubsubSubscriptionCount() { |
193 | return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns); |
194 | } |
195 | |
196 | /* Return the number of pubsub shard level channels is handled. */ |
197 | int serverPubsubShardSubscriptionCount() { |
198 | return dictSize(server.pubsubshard_channels); |
199 | } |
200 | |
201 | |
202 | /* Return the number of channels + patterns a client is subscribed to. */ |
203 | int clientSubscriptionsCount(client *c) { |
204 | return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns); |
205 | } |
206 | |
207 | /* Return the number of shard level channels a client is subscribed to. */ |
208 | int clientShardSubscriptionsCount(client *c) { |
209 | return dictSize(c->pubsubshard_channels); |
210 | } |
211 | |
212 | dict* getClientPubSubChannels(client *c) { |
213 | return c->pubsub_channels; |
214 | } |
215 | |
216 | dict* getClientPubSubShardChannels(client *c) { |
217 | return c->pubsubshard_channels; |
218 | } |
219 | |
220 | /* Return the number of pubsub + pubsub shard level channels |
221 | * a client is subscribed to. */ |
222 | int clientTotalPubSubSubscriptionCount(client *c) { |
223 | return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c); |
224 | } |
225 | |
226 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or |
227 | * 0 if the client was already subscribed to that channel. */ |
228 | int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { |
229 | dictEntry *de; |
230 | list *clients = NULL; |
231 | int retval = 0; |
232 | |
233 | /* Add the channel to the client -> channels hash table */ |
234 | if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) { |
235 | retval = 1; |
236 | incrRefCount(channel); |
237 | /* Add the client to the channel -> list of clients hash table */ |
238 | de = dictFind(*type.serverPubSubChannels, channel); |
239 | if (de == NULL) { |
240 | clients = listCreate(); |
241 | dictAdd(*type.serverPubSubChannels, channel, clients); |
242 | incrRefCount(channel); |
243 | } else { |
244 | clients = dictGetVal(de); |
245 | } |
246 | listAddNodeTail(clients,c); |
247 | } |
248 | /* Notify the client */ |
249 | addReplyPubsubSubscribed(c,channel,type); |
250 | return retval; |
251 | } |
252 | |
253 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or |
254 | * 0 if the client was not subscribed to the specified channel. */ |
255 | int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) { |
256 | dictEntry *de; |
257 | list *clients; |
258 | listNode *ln; |
259 | int retval = 0; |
260 | |
261 | /* Remove the channel from the client -> channels hash table */ |
262 | incrRefCount(channel); /* channel may be just a pointer to the same object |
263 | we have in the hash tables. Protect it... */ |
264 | if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) { |
265 | retval = 1; |
266 | /* Remove the client from the channel -> clients list hash table */ |
267 | de = dictFind(*type.serverPubSubChannels, channel); |
268 | serverAssertWithInfo(c,NULL,de != NULL); |
269 | clients = dictGetVal(de); |
270 | ln = listSearchKey(clients,c); |
271 | serverAssertWithInfo(c,NULL,ln != NULL); |
272 | listDelNode(clients,ln); |
273 | if (listLength(clients) == 0) { |
274 | /* Free the list and associated hash entry at all if this was |
275 | * the latest client, so that it will be possible to abuse |
276 | * Redis PUBSUB creating millions of channels. */ |
277 | dictDelete(*type.serverPubSubChannels, channel); |
278 | /* As this channel isn't subscribed by anyone, it's safe |
279 | * to remove the channel from the slot. */ |
280 | if (server.cluster_enabled & type.shard) { |
281 | slotToChannelDel(channel->ptr); |
282 | } |
283 | } |
284 | } |
285 | /* Notify the client */ |
286 | if (notify) { |
287 | addReplyPubsubUnsubscribed(c,channel,type); |
288 | } |
289 | decrRefCount(channel); /* it is finally safe to release it */ |
290 | return retval; |
291 | } |
292 | |
293 | void pubsubShardUnsubscribeAllClients(robj *channel) { |
294 | int retval; |
295 | dictEntry *de = dictFind(server.pubsubshard_channels, channel); |
296 | serverAssertWithInfo(NULL,channel,de != NULL); |
297 | list *clients = dictGetVal(de); |
298 | if (listLength(clients) > 0) { |
299 | /* For each client subscribed to the channel, unsubscribe it. */ |
300 | listIter li; |
301 | listNode *ln; |
302 | listRewind(clients, &li); |
303 | while ((ln = listNext(&li)) != NULL) { |
304 | client *c = listNodeValue(ln); |
305 | retval = dictDelete(c->pubsubshard_channels, channel); |
306 | serverAssertWithInfo(c,channel,retval == DICT_OK); |
307 | addReplyPubsubUnsubscribed(c, channel, pubSubShardType); |
308 | /* If the client has no other pubsub subscription, |
309 | * move out of pubsub mode. */ |
310 | if (clientTotalPubSubSubscriptionCount(c) == 0) { |
311 | c->flags &= ~CLIENT_PUBSUB; |
312 | } |
313 | } |
314 | } |
315 | /* Delete the channel from server pubsubshard channels hash table. */ |
316 | retval = dictDelete(server.pubsubshard_channels, channel); |
317 | /* Delete the channel from slots_to_channel mapping. */ |
318 | slotToChannelDel(channel->ptr); |
319 | serverAssertWithInfo(NULL,channel,retval == DICT_OK); |
320 | decrRefCount(channel); /* it is finally safe to release it */ |
321 | } |
322 | |
323 | |
324 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ |
325 | int pubsubSubscribePattern(client *c, robj *pattern) { |
326 | dictEntry *de; |
327 | list *clients; |
328 | int retval = 0; |
329 | |
330 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { |
331 | retval = 1; |
332 | listAddNodeTail(c->pubsub_patterns,pattern); |
333 | incrRefCount(pattern); |
334 | /* Add the client to the pattern -> list of clients hash table */ |
335 | de = dictFind(server.pubsub_patterns,pattern); |
336 | if (de == NULL) { |
337 | clients = listCreate(); |
338 | dictAdd(server.pubsub_patterns,pattern,clients); |
339 | incrRefCount(pattern); |
340 | } else { |
341 | clients = dictGetVal(de); |
342 | } |
343 | listAddNodeTail(clients,c); |
344 | } |
345 | /* Notify the client */ |
346 | addReplyPubsubPatSubscribed(c,pattern); |
347 | return retval; |
348 | } |
349 | |
350 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or |
351 | * 0 if the client was not subscribed to the specified channel. */ |
352 | int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { |
353 | dictEntry *de; |
354 | list *clients; |
355 | listNode *ln; |
356 | int retval = 0; |
357 | |
358 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ |
359 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { |
360 | retval = 1; |
361 | listDelNode(c->pubsub_patterns,ln); |
362 | /* Remove the client from the pattern -> clients list hash table */ |
363 | de = dictFind(server.pubsub_patterns,pattern); |
364 | serverAssertWithInfo(c,NULL,de != NULL); |
365 | clients = dictGetVal(de); |
366 | ln = listSearchKey(clients,c); |
367 | serverAssertWithInfo(c,NULL,ln != NULL); |
368 | listDelNode(clients,ln); |
369 | if (listLength(clients) == 0) { |
370 | /* Free the list and associated hash entry at all if this was |
371 | * the latest client. */ |
372 | dictDelete(server.pubsub_patterns,pattern); |
373 | } |
374 | } |
375 | /* Notify the client */ |
376 | if (notify) addReplyPubsubPatUnsubscribed(c,pattern); |
377 | decrRefCount(pattern); |
378 | return retval; |
379 | } |
380 | |
381 | /* Unsubscribe from all the channels. Return the number of channels the |
382 | * client was subscribed to. */ |
383 | int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) { |
384 | int count = 0; |
385 | if (dictSize(type.clientPubSubChannels(c)) > 0) { |
386 | dictIterator *di = dictGetSafeIterator(type.clientPubSubChannels(c)); |
387 | dictEntry *de; |
388 | |
389 | while((de = dictNext(di)) != NULL) { |
390 | robj *channel = dictGetKey(de); |
391 | |
392 | count += pubsubUnsubscribeChannel(c,channel,notify,type); |
393 | } |
394 | dictReleaseIterator(di); |
395 | } |
396 | /* We were subscribed to nothing? Still reply to the client. */ |
397 | if (notify && count == 0) { |
398 | addReplyPubsubUnsubscribed(c,NULL,type); |
399 | } |
400 | return count; |
401 | } |
402 | |
403 | /* |
404 | * Unsubscribe a client from all global channels. |
405 | */ |
406 | int pubsubUnsubscribeAllChannels(client *c, int notify) { |
407 | int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType); |
408 | return count; |
409 | } |
410 | |
411 | /* |
412 | * Unsubscribe a client from all shard subscribed channels. |
413 | */ |
414 | int pubsubUnsubscribeShardAllChannels(client *c, int notify) { |
415 | int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType); |
416 | return count; |
417 | } |
418 | |
419 | /* |
420 | * Unsubscribe a client from provided shard subscribed channel(s). |
421 | */ |
422 | void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) { |
423 | for (unsigned int j = 0; j < count; j++) { |
424 | /* Remove the channel from server and from the clients |
425 | * subscribed to it as well as notify them. */ |
426 | pubsubShardUnsubscribeAllClients(channels[j]); |
427 | } |
428 | } |
429 | |
430 | /* Unsubscribe from all the patterns. Return the number of patterns the |
431 | * client was subscribed from. */ |
432 | int pubsubUnsubscribeAllPatterns(client *c, int notify) { |
433 | listNode *ln; |
434 | listIter li; |
435 | int count = 0; |
436 | |
437 | listRewind(c->pubsub_patterns,&li); |
438 | while ((ln = listNext(&li)) != NULL) { |
439 | robj *pattern = ln->value; |
440 | |
441 | count += pubsubUnsubscribePattern(c,pattern,notify); |
442 | } |
443 | if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL); |
444 | return count; |
445 | } |
446 | |
447 | /* |
448 | * Publish a message to all the subscribers. |
449 | */ |
450 | int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) { |
451 | int receivers = 0; |
452 | dictEntry *de; |
453 | dictIterator *di; |
454 | listNode *ln; |
455 | listIter li; |
456 | |
457 | /* Send to clients listening for that channel */ |
458 | de = dictFind(*type.serverPubSubChannels, channel); |
459 | if (de) { |
460 | list *list = dictGetVal(de); |
461 | listNode *ln; |
462 | listIter li; |
463 | |
464 | listRewind(list,&li); |
465 | while ((ln = listNext(&li)) != NULL) { |
466 | client *c = ln->value; |
467 | addReplyPubsubMessage(c,channel,message,*type.messageBulk); |
468 | updateClientMemUsage(c); |
469 | receivers++; |
470 | } |
471 | } |
472 | |
473 | if (type.shard) { |
474 | /* Shard pubsub ignores patterns. */ |
475 | return receivers; |
476 | } |
477 | |
478 | /* Send to clients listening to matching channels */ |
479 | di = dictGetIterator(server.pubsub_patterns); |
480 | if (di) { |
481 | channel = getDecodedObject(channel); |
482 | while((de = dictNext(di)) != NULL) { |
483 | robj *pattern = dictGetKey(de); |
484 | list *clients = dictGetVal(de); |
485 | if (!stringmatchlen((char*)pattern->ptr, |
486 | sdslen(pattern->ptr), |
487 | (char*)channel->ptr, |
488 | sdslen(channel->ptr),0)) continue; |
489 | |
490 | listRewind(clients,&li); |
491 | while ((ln = listNext(&li)) != NULL) { |
492 | client *c = listNodeValue(ln); |
493 | addReplyPubsubPatMessage(c,pattern,channel,message); |
494 | updateClientMemUsage(c); |
495 | receivers++; |
496 | } |
497 | } |
498 | decrRefCount(channel); |
499 | dictReleaseIterator(di); |
500 | } |
501 | return receivers; |
502 | } |
503 | |
504 | /* Publish a message to all the subscribers. */ |
505 | int pubsubPublishMessage(robj *channel, robj *message, int sharded) { |
506 | return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType); |
507 | } |
508 | |
509 | /*----------------------------------------------------------------------------- |
510 | * Pubsub commands implementation |
511 | *----------------------------------------------------------------------------*/ |
512 | |
513 | /* SUBSCRIBE channel [channel ...] */ |
514 | void subscribeCommand(client *c) { |
515 | int j; |
516 | if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) { |
517 | /** |
518 | * A client that has CLIENT_DENY_BLOCKING flag on |
519 | * expect a reply per command and so can not execute subscribe. |
520 | * |
521 | * Notice that we have a special treatment for multi because of |
522 | * backward compatibility |
523 | */ |
524 | addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client" ); |
525 | return; |
526 | } |
527 | for (j = 1; j < c->argc; j++) |
528 | pubsubSubscribeChannel(c,c->argv[j],pubSubType); |
529 | c->flags |= CLIENT_PUBSUB; |
530 | } |
531 | |
532 | /* UNSUBSCRIBE [channel ...] */ |
533 | void unsubscribeCommand(client *c) { |
534 | if (c->argc == 1) { |
535 | pubsubUnsubscribeAllChannels(c,1); |
536 | } else { |
537 | int j; |
538 | |
539 | for (j = 1; j < c->argc; j++) |
540 | pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType); |
541 | } |
542 | if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; |
543 | } |
544 | |
545 | /* PSUBSCRIBE pattern [pattern ...] */ |
546 | void psubscribeCommand(client *c) { |
547 | int j; |
548 | if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) { |
549 | /** |
550 | * A client that has CLIENT_DENY_BLOCKING flag on |
551 | * expect a reply per command and so can not execute subscribe. |
552 | * |
553 | * Notice that we have a special treatment for multi because of |
554 | * backward compatibility |
555 | */ |
556 | addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client" ); |
557 | return; |
558 | } |
559 | |
560 | for (j = 1; j < c->argc; j++) |
561 | pubsubSubscribePattern(c,c->argv[j]); |
562 | c->flags |= CLIENT_PUBSUB; |
563 | } |
564 | |
565 | /* PUNSUBSCRIBE [pattern [pattern ...]] */ |
566 | void punsubscribeCommand(client *c) { |
567 | if (c->argc == 1) { |
568 | pubsubUnsubscribeAllPatterns(c,1); |
569 | } else { |
570 | int j; |
571 | |
572 | for (j = 1; j < c->argc; j++) |
573 | pubsubUnsubscribePattern(c,c->argv[j],1); |
574 | } |
575 | if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; |
576 | } |
577 | |
578 | /* This function wraps pubsubPublishMessage and also propagates the message to cluster. |
579 | * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/ |
580 | int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) { |
581 | int receivers = pubsubPublishMessage(channel, message, sharded); |
582 | if (server.cluster_enabled) |
583 | clusterPropagatePublish(channel, message, sharded); |
584 | return receivers; |
585 | } |
586 | |
587 | /* PUBLISH <channel> <message> */ |
588 | void publishCommand(client *c) { |
589 | if (server.sentinel_mode) { |
590 | sentinelPublishCommand(c); |
591 | return; |
592 | } |
593 | |
594 | int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0); |
595 | if (!server.cluster_enabled) |
596 | forceCommandPropagation(c,PROPAGATE_REPL); |
597 | addReplyLongLong(c,receivers); |
598 | } |
599 | |
600 | /* PUBSUB command for Pub/Sub introspection. */ |
601 | void pubsubCommand(client *c) { |
602 | if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help" )) { |
603 | const char *help[] = { |
604 | "CHANNELS [<pattern>]" , |
605 | " Return the currently active channels matching a <pattern> (default: '*')." , |
606 | "NUMPAT" , |
607 | " Return number of subscriptions to patterns." , |
608 | "NUMSUB [<channel> ...]" , |
609 | " Return the number of subscribers for the specified channels, excluding" , |
610 | " pattern subscriptions(default: no channels)." , |
611 | "SHARDCHANNELS [<pattern>]" , |
612 | " Return the currently active shard level channels matching a <pattern> (default: '*')." , |
613 | "SHARDNUMSUB [<shardchannel> ...]" , |
614 | " Return the number of subscribers for the specified shard level channel(s)" , |
615 | NULL |
616 | }; |
617 | addReplyHelp(c, help); |
618 | } else if (!strcasecmp(c->argv[1]->ptr,"channels" ) && |
619 | (c->argc == 2 || c->argc == 3)) |
620 | { |
621 | /* PUBSUB CHANNELS [<pattern>] */ |
622 | sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; |
623 | channelList(c, pat, server.pubsub_channels); |
624 | } else if (!strcasecmp(c->argv[1]->ptr,"numsub" ) && c->argc >= 2) { |
625 | /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */ |
626 | int j; |
627 | |
628 | addReplyArrayLen(c,(c->argc-2)*2); |
629 | for (j = 2; j < c->argc; j++) { |
630 | list *l = dictFetchValue(server.pubsub_channels,c->argv[j]); |
631 | |
632 | addReplyBulk(c,c->argv[j]); |
633 | addReplyLongLong(c,l ? listLength(l) : 0); |
634 | } |
635 | } else if (!strcasecmp(c->argv[1]->ptr,"numpat" ) && c->argc == 2) { |
636 | /* PUBSUB NUMPAT */ |
637 | addReplyLongLong(c,dictSize(server.pubsub_patterns)); |
638 | } else if (!strcasecmp(c->argv[1]->ptr,"shardchannels" ) && |
639 | (c->argc == 2 || c->argc == 3)) |
640 | { |
641 | /* PUBSUB SHARDCHANNELS */ |
642 | sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; |
643 | channelList(c,pat,server.pubsubshard_channels); |
644 | } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub" ) && c->argc >= 2) { |
645 | /* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */ |
646 | int j; |
647 | |
648 | addReplyArrayLen(c, (c->argc-2)*2); |
649 | for (j = 2; j < c->argc; j++) { |
650 | list *l = dictFetchValue(server.pubsubshard_channels, c->argv[j]); |
651 | |
652 | addReplyBulk(c,c->argv[j]); |
653 | addReplyLongLong(c,l ? listLength(l) : 0); |
654 | } |
655 | } else { |
656 | addReplySubcommandSyntaxError(c); |
657 | } |
658 | } |
659 | |
660 | void channelList(client *c, sds pat, dict *pubsub_channels) { |
661 | dictIterator *di = dictGetIterator(pubsub_channels); |
662 | dictEntry *de; |
663 | long mblen = 0; |
664 | void *replylen; |
665 | |
666 | replylen = addReplyDeferredLen(c); |
667 | while((de = dictNext(di)) != NULL) { |
668 | robj *cobj = dictGetKey(de); |
669 | sds channel = cobj->ptr; |
670 | |
671 | if (!pat || stringmatchlen(pat, sdslen(pat), |
672 | channel, sdslen(channel),0)) |
673 | { |
674 | addReplyBulk(c,cobj); |
675 | mblen++; |
676 | } |
677 | } |
678 | dictReleaseIterator(di); |
679 | setDeferredArrayLen(c,replylen,mblen); |
680 | } |
681 | |
682 | /* SPUBLISH <shardchannel> <message> */ |
683 | void spublishCommand(client *c) { |
684 | int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1); |
685 | if (!server.cluster_enabled) |
686 | forceCommandPropagation(c,PROPAGATE_REPL); |
687 | addReplyLongLong(c,receivers); |
688 | } |
689 | |
690 | /* SSUBSCRIBE shardchannel [shardchannel ...] */ |
691 | void ssubscribeCommand(client *c) { |
692 | if (c->flags & CLIENT_DENY_BLOCKING) { |
693 | /* A client that has CLIENT_DENY_BLOCKING flag on |
694 | * expect a reply per command and so can not execute subscribe. */ |
695 | addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client" ); |
696 | return; |
697 | } |
698 | |
699 | for (int j = 1; j < c->argc; j++) { |
700 | /* A channel is only considered to be added, if a |
701 | * subscriber exists for it. And if a subscriber |
702 | * already exists the slotToChannel doesn't needs |
703 | * to be incremented. */ |
704 | if (server.cluster_enabled & |
705 | (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) { |
706 | slotToChannelAdd(c->argv[j]->ptr); |
707 | } |
708 | pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); |
709 | } |
710 | c->flags |= CLIENT_PUBSUB; |
711 | } |
712 | |
713 | |
714 | /* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */ |
715 | void sunsubscribeCommand(client *c) { |
716 | if (c->argc == 1) { |
717 | pubsubUnsubscribeShardAllChannels(c, 1); |
718 | } else { |
719 | for (int j = 1; j < c->argc; j++) { |
720 | pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType); |
721 | } |
722 | } |
723 | if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; |
724 | } |
725 | |
726 | size_t pubsubMemOverhead(client *c) { |
727 | /* PubSub patterns */ |
728 | size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode); |
729 | /* Global PubSub channels */ |
730 | mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) + |
731 | dictSlots(c->pubsub_channels) * sizeof(dictEntry*); |
732 | /* Sharded PubSub channels */ |
733 | mem += dictSize(c->pubsubshard_channels) * sizeof(dictEntry) + |
734 | dictSlots(c->pubsubshard_channels) * sizeof(dictEntry*); |
735 | return mem; |
736 | } |
737 | |