1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "cluster.h"
32
33/* Structure to hold the pubsub related metadata. Currently used
34 * for pubsub and pubsubshard feature. */
35typedef struct pubsubtype {
36 int shard;
37 dict *(*clientPubSubChannels)(client*);
38 int (*subscriptionCount)(client*);
39 dict **serverPubSubChannels;
40 robj **subscribeMsg;
41 robj **unsubscribeMsg;
42 robj **messageBulk;
43}pubsubtype;
44
45/*
46 * Get client's global Pub/Sub channels subscription count.
47 */
48int clientSubscriptionsCount(client *c);
49
50/*
51 * Get client's shard level Pub/Sub channels subscription count.
52 */
53int clientShardSubscriptionsCount(client *c);
54
55/*
56 * Get client's global Pub/Sub channels dict.
57 */
58dict* getClientPubSubChannels(client *c);
59
60/*
61 * Get client's shard level Pub/Sub channels dict.
62 */
63dict* getClientPubSubShardChannels(client *c);
64
65/*
66 * Get list of channels client is subscribed to.
67 * If a pattern is provided, the subset of channels is returned
68 * matching the pattern.
69 */
70void channelList(client *c, sds pat, dict* pubsub_channels);
71
72/*
73 * Pub/Sub type for global channels.
74 */
75pubsubtype pubSubType = {
76 .shard = 0,
77 .clientPubSubChannels = getClientPubSubChannels,
78 .subscriptionCount = clientSubscriptionsCount,
79 .serverPubSubChannels = &server.pubsub_channels,
80 .subscribeMsg = &shared.subscribebulk,
81 .unsubscribeMsg = &shared.unsubscribebulk,
82 .messageBulk = &shared.messagebulk,
83};
84
85/*
86 * Pub/Sub type for shard level channels bounded to a slot.
87 */
88pubsubtype pubSubShardType = {
89 .shard = 1,
90 .clientPubSubChannels = getClientPubSubShardChannels,
91 .subscriptionCount = clientShardSubscriptionsCount,
92 .serverPubSubChannels = &server.pubsubshard_channels,
93 .subscribeMsg = &shared.ssubscribebulk,
94 .unsubscribeMsg = &shared.sunsubscribebulk,
95 .messageBulk = &shared.smessagebulk,
96};
97
98/*-----------------------------------------------------------------------------
99 * Pubsub client replies API
100 *----------------------------------------------------------------------------*/
101
102/* Send a pubsub message of type "message" to the client.
103 * Normally 'msg' is a Redis object containing the string to send as
104 * message. However if the caller sets 'msg' as NULL, it will be able
105 * to send a special message (for instance an Array type) by using the
106 * addReply*() API family. */
107void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
108 if (c->resp == 2)
109 addReply(c,shared.mbulkhdr[3]);
110 else
111 addReplyPushLen(c,3);
112 addReply(c,message_bulk);
113 addReplyBulk(c,channel);
114 if (msg) addReplyBulk(c,msg);
115}
116
117/* Send a pubsub message of type "pmessage" to the client. The difference
118 * with the "message" type delivered by addReplyPubsubMessage() is that
119 * this message format also includes the pattern that matched the message. */
120void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
121 if (c->resp == 2)
122 addReply(c,shared.mbulkhdr[4]);
123 else
124 addReplyPushLen(c,4);
125 addReply(c,shared.pmessagebulk);
126 addReplyBulk(c,pat);
127 addReplyBulk(c,channel);
128 addReplyBulk(c,msg);
129}
130
131/* Send the pubsub subscription notification to the client. */
132void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
133 if (c->resp == 2)
134 addReply(c,shared.mbulkhdr[3]);
135 else
136 addReplyPushLen(c,3);
137 addReply(c,*type.subscribeMsg);
138 addReplyBulk(c,channel);
139 addReplyLongLong(c,type.subscriptionCount(c));
140}
141
142/* Send the pubsub unsubscription notification to the client.
143 * Channel can be NULL: this is useful when the client sends a mass
144 * unsubscribe command but there are no channels to unsubscribe from: we
145 * still send a notification. */
146void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
147 if (c->resp == 2)
148 addReply(c,shared.mbulkhdr[3]);
149 else
150 addReplyPushLen(c,3);
151 addReply(c, *type.unsubscribeMsg);
152 if (channel)
153 addReplyBulk(c,channel);
154 else
155 addReplyNull(c);
156 addReplyLongLong(c,type.subscriptionCount(c));
157}
158
159/* Send the pubsub pattern subscription notification to the client. */
160void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
161 if (c->resp == 2)
162 addReply(c,shared.mbulkhdr[3]);
163 else
164 addReplyPushLen(c,3);
165 addReply(c,shared.psubscribebulk);
166 addReplyBulk(c,pattern);
167 addReplyLongLong(c,clientSubscriptionsCount(c));
168}
169
170/* Send the pubsub pattern unsubscription notification to the client.
171 * Pattern can be NULL: this is useful when the client sends a mass
172 * punsubscribe command but there are no pattern to unsubscribe from: we
173 * still send a notification. */
174void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
175 if (c->resp == 2)
176 addReply(c,shared.mbulkhdr[3]);
177 else
178 addReplyPushLen(c,3);
179 addReply(c,shared.punsubscribebulk);
180 if (pattern)
181 addReplyBulk(c,pattern);
182 else
183 addReplyNull(c);
184 addReplyLongLong(c,clientSubscriptionsCount(c));
185}
186
187/*-----------------------------------------------------------------------------
188 * Pubsub low level API
189 *----------------------------------------------------------------------------*/
190
191/* Return the number of pubsub channels + patterns is handled. */
192int serverPubsubSubscriptionCount() {
193 return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
194}
195
196/* Return the number of pubsub shard level channels is handled. */
197int serverPubsubShardSubscriptionCount() {
198 return dictSize(server.pubsubshard_channels);
199}
200
201
202/* Return the number of channels + patterns a client is subscribed to. */
203int clientSubscriptionsCount(client *c) {
204 return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
205}
206
207/* Return the number of shard level channels a client is subscribed to. */
208int clientShardSubscriptionsCount(client *c) {
209 return dictSize(c->pubsubshard_channels);
210}
211
212dict* getClientPubSubChannels(client *c) {
213 return c->pubsub_channels;
214}
215
216dict* getClientPubSubShardChannels(client *c) {
217 return c->pubsubshard_channels;
218}
219
220/* Return the number of pubsub + pubsub shard level channels
221 * a client is subscribed to. */
222int clientTotalPubSubSubscriptionCount(client *c) {
223 return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c);
224}
225
226/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
227 * 0 if the client was already subscribed to that channel. */
228int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
229 dictEntry *de;
230 list *clients = NULL;
231 int retval = 0;
232
233 /* Add the channel to the client -> channels hash table */
234 if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
235 retval = 1;
236 incrRefCount(channel);
237 /* Add the client to the channel -> list of clients hash table */
238 de = dictFind(*type.serverPubSubChannels, channel);
239 if (de == NULL) {
240 clients = listCreate();
241 dictAdd(*type.serverPubSubChannels, channel, clients);
242 incrRefCount(channel);
243 } else {
244 clients = dictGetVal(de);
245 }
246 listAddNodeTail(clients,c);
247 }
248 /* Notify the client */
249 addReplyPubsubSubscribed(c,channel,type);
250 return retval;
251}
252
253/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
254 * 0 if the client was not subscribed to the specified channel. */
255int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
256 dictEntry *de;
257 list *clients;
258 listNode *ln;
259 int retval = 0;
260
261 /* Remove the channel from the client -> channels hash table */
262 incrRefCount(channel); /* channel may be just a pointer to the same object
263 we have in the hash tables. Protect it... */
264 if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) {
265 retval = 1;
266 /* Remove the client from the channel -> clients list hash table */
267 de = dictFind(*type.serverPubSubChannels, channel);
268 serverAssertWithInfo(c,NULL,de != NULL);
269 clients = dictGetVal(de);
270 ln = listSearchKey(clients,c);
271 serverAssertWithInfo(c,NULL,ln != NULL);
272 listDelNode(clients,ln);
273 if (listLength(clients) == 0) {
274 /* Free the list and associated hash entry at all if this was
275 * the latest client, so that it will be possible to abuse
276 * Redis PUBSUB creating millions of channels. */
277 dictDelete(*type.serverPubSubChannels, channel);
278 /* As this channel isn't subscribed by anyone, it's safe
279 * to remove the channel from the slot. */
280 if (server.cluster_enabled & type.shard) {
281 slotToChannelDel(channel->ptr);
282 }
283 }
284 }
285 /* Notify the client */
286 if (notify) {
287 addReplyPubsubUnsubscribed(c,channel,type);
288 }
289 decrRefCount(channel); /* it is finally safe to release it */
290 return retval;
291}
292
293void pubsubShardUnsubscribeAllClients(robj *channel) {
294 int retval;
295 dictEntry *de = dictFind(server.pubsubshard_channels, channel);
296 serverAssertWithInfo(NULL,channel,de != NULL);
297 list *clients = dictGetVal(de);
298 if (listLength(clients) > 0) {
299 /* For each client subscribed to the channel, unsubscribe it. */
300 listIter li;
301 listNode *ln;
302 listRewind(clients, &li);
303 while ((ln = listNext(&li)) != NULL) {
304 client *c = listNodeValue(ln);
305 retval = dictDelete(c->pubsubshard_channels, channel);
306 serverAssertWithInfo(c,channel,retval == DICT_OK);
307 addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
308 /* If the client has no other pubsub subscription,
309 * move out of pubsub mode. */
310 if (clientTotalPubSubSubscriptionCount(c) == 0) {
311 c->flags &= ~CLIENT_PUBSUB;
312 }
313 }
314 }
315 /* Delete the channel from server pubsubshard channels hash table. */
316 retval = dictDelete(server.pubsubshard_channels, channel);
317 /* Delete the channel from slots_to_channel mapping. */
318 slotToChannelDel(channel->ptr);
319 serverAssertWithInfo(NULL,channel,retval == DICT_OK);
320 decrRefCount(channel); /* it is finally safe to release it */
321}
322
323
324/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
325int pubsubSubscribePattern(client *c, robj *pattern) {
326 dictEntry *de;
327 list *clients;
328 int retval = 0;
329
330 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
331 retval = 1;
332 listAddNodeTail(c->pubsub_patterns,pattern);
333 incrRefCount(pattern);
334 /* Add the client to the pattern -> list of clients hash table */
335 de = dictFind(server.pubsub_patterns,pattern);
336 if (de == NULL) {
337 clients = listCreate();
338 dictAdd(server.pubsub_patterns,pattern,clients);
339 incrRefCount(pattern);
340 } else {
341 clients = dictGetVal(de);
342 }
343 listAddNodeTail(clients,c);
344 }
345 /* Notify the client */
346 addReplyPubsubPatSubscribed(c,pattern);
347 return retval;
348}
349
350/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
351 * 0 if the client was not subscribed to the specified channel. */
352int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
353 dictEntry *de;
354 list *clients;
355 listNode *ln;
356 int retval = 0;
357
358 incrRefCount(pattern); /* Protect the object. May be the same we remove */
359 if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
360 retval = 1;
361 listDelNode(c->pubsub_patterns,ln);
362 /* Remove the client from the pattern -> clients list hash table */
363 de = dictFind(server.pubsub_patterns,pattern);
364 serverAssertWithInfo(c,NULL,de != NULL);
365 clients = dictGetVal(de);
366 ln = listSearchKey(clients,c);
367 serverAssertWithInfo(c,NULL,ln != NULL);
368 listDelNode(clients,ln);
369 if (listLength(clients) == 0) {
370 /* Free the list and associated hash entry at all if this was
371 * the latest client. */
372 dictDelete(server.pubsub_patterns,pattern);
373 }
374 }
375 /* Notify the client */
376 if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
377 decrRefCount(pattern);
378 return retval;
379}
380
381/* Unsubscribe from all the channels. Return the number of channels the
382 * client was subscribed to. */
383int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) {
384 int count = 0;
385 if (dictSize(type.clientPubSubChannels(c)) > 0) {
386 dictIterator *di = dictGetSafeIterator(type.clientPubSubChannels(c));
387 dictEntry *de;
388
389 while((de = dictNext(di)) != NULL) {
390 robj *channel = dictGetKey(de);
391
392 count += pubsubUnsubscribeChannel(c,channel,notify,type);
393 }
394 dictReleaseIterator(di);
395 }
396 /* We were subscribed to nothing? Still reply to the client. */
397 if (notify && count == 0) {
398 addReplyPubsubUnsubscribed(c,NULL,type);
399 }
400 return count;
401}
402
403/*
404 * Unsubscribe a client from all global channels.
405 */
406int pubsubUnsubscribeAllChannels(client *c, int notify) {
407 int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType);
408 return count;
409}
410
411/*
412 * Unsubscribe a client from all shard subscribed channels.
413 */
414int pubsubUnsubscribeShardAllChannels(client *c, int notify) {
415 int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType);
416 return count;
417}
418
419/*
420 * Unsubscribe a client from provided shard subscribed channel(s).
421 */
422void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) {
423 for (unsigned int j = 0; j < count; j++) {
424 /* Remove the channel from server and from the clients
425 * subscribed to it as well as notify them. */
426 pubsubShardUnsubscribeAllClients(channels[j]);
427 }
428}
429
430/* Unsubscribe from all the patterns. Return the number of patterns the
431 * client was subscribed from. */
432int pubsubUnsubscribeAllPatterns(client *c, int notify) {
433 listNode *ln;
434 listIter li;
435 int count = 0;
436
437 listRewind(c->pubsub_patterns,&li);
438 while ((ln = listNext(&li)) != NULL) {
439 robj *pattern = ln->value;
440
441 count += pubsubUnsubscribePattern(c,pattern,notify);
442 }
443 if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
444 return count;
445}
446
447/*
448 * Publish a message to all the subscribers.
449 */
450int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
451 int receivers = 0;
452 dictEntry *de;
453 dictIterator *di;
454 listNode *ln;
455 listIter li;
456
457 /* Send to clients listening for that channel */
458 de = dictFind(*type.serverPubSubChannels, channel);
459 if (de) {
460 list *list = dictGetVal(de);
461 listNode *ln;
462 listIter li;
463
464 listRewind(list,&li);
465 while ((ln = listNext(&li)) != NULL) {
466 client *c = ln->value;
467 addReplyPubsubMessage(c,channel,message,*type.messageBulk);
468 updateClientMemUsage(c);
469 receivers++;
470 }
471 }
472
473 if (type.shard) {
474 /* Shard pubsub ignores patterns. */
475 return receivers;
476 }
477
478 /* Send to clients listening to matching channels */
479 di = dictGetIterator(server.pubsub_patterns);
480 if (di) {
481 channel = getDecodedObject(channel);
482 while((de = dictNext(di)) != NULL) {
483 robj *pattern = dictGetKey(de);
484 list *clients = dictGetVal(de);
485 if (!stringmatchlen((char*)pattern->ptr,
486 sdslen(pattern->ptr),
487 (char*)channel->ptr,
488 sdslen(channel->ptr),0)) continue;
489
490 listRewind(clients,&li);
491 while ((ln = listNext(&li)) != NULL) {
492 client *c = listNodeValue(ln);
493 addReplyPubsubPatMessage(c,pattern,channel,message);
494 updateClientMemUsage(c);
495 receivers++;
496 }
497 }
498 decrRefCount(channel);
499 dictReleaseIterator(di);
500 }
501 return receivers;
502}
503
504/* Publish a message to all the subscribers. */
505int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
506 return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType);
507}
508
509/*-----------------------------------------------------------------------------
510 * Pubsub commands implementation
511 *----------------------------------------------------------------------------*/
512
513/* SUBSCRIBE channel [channel ...] */
514void subscribeCommand(client *c) {
515 int j;
516 if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
517 /**
518 * A client that has CLIENT_DENY_BLOCKING flag on
519 * expect a reply per command and so can not execute subscribe.
520 *
521 * Notice that we have a special treatment for multi because of
522 * backward compatibility
523 */
524 addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
525 return;
526 }
527 for (j = 1; j < c->argc; j++)
528 pubsubSubscribeChannel(c,c->argv[j],pubSubType);
529 c->flags |= CLIENT_PUBSUB;
530}
531
532/* UNSUBSCRIBE [channel ...] */
533void unsubscribeCommand(client *c) {
534 if (c->argc == 1) {
535 pubsubUnsubscribeAllChannels(c,1);
536 } else {
537 int j;
538
539 for (j = 1; j < c->argc; j++)
540 pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType);
541 }
542 if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
543}
544
545/* PSUBSCRIBE pattern [pattern ...] */
546void psubscribeCommand(client *c) {
547 int j;
548 if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
549 /**
550 * A client that has CLIENT_DENY_BLOCKING flag on
551 * expect a reply per command and so can not execute subscribe.
552 *
553 * Notice that we have a special treatment for multi because of
554 * backward compatibility
555 */
556 addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
557 return;
558 }
559
560 for (j = 1; j < c->argc; j++)
561 pubsubSubscribePattern(c,c->argv[j]);
562 c->flags |= CLIENT_PUBSUB;
563}
564
565/* PUNSUBSCRIBE [pattern [pattern ...]] */
566void punsubscribeCommand(client *c) {
567 if (c->argc == 1) {
568 pubsubUnsubscribeAllPatterns(c,1);
569 } else {
570 int j;
571
572 for (j = 1; j < c->argc; j++)
573 pubsubUnsubscribePattern(c,c->argv[j],1);
574 }
575 if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
576}
577
578/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
579 * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
580int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
581 int receivers = pubsubPublishMessage(channel, message, sharded);
582 if (server.cluster_enabled)
583 clusterPropagatePublish(channel, message, sharded);
584 return receivers;
585}
586
587/* PUBLISH <channel> <message> */
588void publishCommand(client *c) {
589 if (server.sentinel_mode) {
590 sentinelPublishCommand(c);
591 return;
592 }
593
594 int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0);
595 if (!server.cluster_enabled)
596 forceCommandPropagation(c,PROPAGATE_REPL);
597 addReplyLongLong(c,receivers);
598}
599
600/* PUBSUB command for Pub/Sub introspection. */
601void pubsubCommand(client *c) {
602 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
603 const char *help[] = {
604"CHANNELS [<pattern>]",
605" Return the currently active channels matching a <pattern> (default: '*').",
606"NUMPAT",
607" Return number of subscriptions to patterns.",
608"NUMSUB [<channel> ...]",
609" Return the number of subscribers for the specified channels, excluding",
610" pattern subscriptions(default: no channels).",
611"SHARDCHANNELS [<pattern>]",
612" Return the currently active shard level channels matching a <pattern> (default: '*').",
613"SHARDNUMSUB [<shardchannel> ...]",
614" Return the number of subscribers for the specified shard level channel(s)",
615NULL
616 };
617 addReplyHelp(c, help);
618 } else if (!strcasecmp(c->argv[1]->ptr,"channels") &&
619 (c->argc == 2 || c->argc == 3))
620 {
621 /* PUBSUB CHANNELS [<pattern>] */
622 sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
623 channelList(c, pat, server.pubsub_channels);
624 } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
625 /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
626 int j;
627
628 addReplyArrayLen(c,(c->argc-2)*2);
629 for (j = 2; j < c->argc; j++) {
630 list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
631
632 addReplyBulk(c,c->argv[j]);
633 addReplyLongLong(c,l ? listLength(l) : 0);
634 }
635 } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
636 /* PUBSUB NUMPAT */
637 addReplyLongLong(c,dictSize(server.pubsub_patterns));
638 } else if (!strcasecmp(c->argv[1]->ptr,"shardchannels") &&
639 (c->argc == 2 || c->argc == 3))
640 {
641 /* PUBSUB SHARDCHANNELS */
642 sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
643 channelList(c,pat,server.pubsubshard_channels);
644 } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) {
645 /* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */
646 int j;
647
648 addReplyArrayLen(c, (c->argc-2)*2);
649 for (j = 2; j < c->argc; j++) {
650 list *l = dictFetchValue(server.pubsubshard_channels, c->argv[j]);
651
652 addReplyBulk(c,c->argv[j]);
653 addReplyLongLong(c,l ? listLength(l) : 0);
654 }
655 } else {
656 addReplySubcommandSyntaxError(c);
657 }
658}
659
660void channelList(client *c, sds pat, dict *pubsub_channels) {
661 dictIterator *di = dictGetIterator(pubsub_channels);
662 dictEntry *de;
663 long mblen = 0;
664 void *replylen;
665
666 replylen = addReplyDeferredLen(c);
667 while((de = dictNext(di)) != NULL) {
668 robj *cobj = dictGetKey(de);
669 sds channel = cobj->ptr;
670
671 if (!pat || stringmatchlen(pat, sdslen(pat),
672 channel, sdslen(channel),0))
673 {
674 addReplyBulk(c,cobj);
675 mblen++;
676 }
677 }
678 dictReleaseIterator(di);
679 setDeferredArrayLen(c,replylen,mblen);
680}
681
682/* SPUBLISH <shardchannel> <message> */
683void spublishCommand(client *c) {
684 int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
685 if (!server.cluster_enabled)
686 forceCommandPropagation(c,PROPAGATE_REPL);
687 addReplyLongLong(c,receivers);
688}
689
690/* SSUBSCRIBE shardchannel [shardchannel ...] */
691void ssubscribeCommand(client *c) {
692 if (c->flags & CLIENT_DENY_BLOCKING) {
693 /* A client that has CLIENT_DENY_BLOCKING flag on
694 * expect a reply per command and so can not execute subscribe. */
695 addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
696 return;
697 }
698
699 for (int j = 1; j < c->argc; j++) {
700 /* A channel is only considered to be added, if a
701 * subscriber exists for it. And if a subscriber
702 * already exists the slotToChannel doesn't needs
703 * to be incremented. */
704 if (server.cluster_enabled &
705 (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
706 slotToChannelAdd(c->argv[j]->ptr);
707 }
708 pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
709 }
710 c->flags |= CLIENT_PUBSUB;
711}
712
713
714/* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */
715void sunsubscribeCommand(client *c) {
716 if (c->argc == 1) {
717 pubsubUnsubscribeShardAllChannels(c, 1);
718 } else {
719 for (int j = 1; j < c->argc; j++) {
720 pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
721 }
722 }
723 if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
724}
725
726size_t pubsubMemOverhead(client *c) {
727 /* PubSub patterns */
728 size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode);
729 /* Global PubSub channels */
730 mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
731 dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
732 /* Sharded PubSub channels */
733 mem += dictSize(c->pubsubshard_channels) * sizeof(dictEntry) +
734 dictSlots(c->pubsubshard_channels) * sizeof(dictEntry*);
735 return mem;
736}
737