1/* tracking.c - Client side caching: keys tracking and invalidation
2 *
3 * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include "server.h"
32
33/* The tracking table is constituted by a radix tree of keys, each pointing
34 * to a radix tree of client IDs, used to track the clients that may have
35 * certain keys in their local, client side, cache.
36 *
37 * When a client enables tracking with "CLIENT TRACKING on", each key served to
38 * the client is remembered in the table mapping the keys to the client IDs.
39 * Later, when a key is modified, all the clients that may have local copy
40 * of such key will receive an invalidation message.
41 *
42 * Clients will normally take frequently requested objects in memory, removing
43 * them when invalidation messages are received. */
44rax *TrackingTable = NULL;
45rax *PrefixTable = NULL;
46uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
47 the whole tracking table. This gives
48 an hint about the total memory we
49 are using server side for CSC. */
50robj *TrackingChannelName;
51
52/* This is the structure that we have as value of the PrefixTable, and
53 * represents the list of keys modified, and the list of clients that need
54 * to be notified, for a given prefix. */
55typedef struct bcastState {
56 rax *keys; /* Keys modified in the current event loop cycle. */
57 rax *clients; /* Clients subscribed to the notification events for this
58 prefix. */
59} bcastState;
60
61/* Remove the tracking state from the client 'c'. Note that there is not much
62 * to do for us here, if not to decrement the counter of the clients in
63 * tracking mode, because we just store the ID of the client in the tracking
64 * table, so we'll remove the ID reference in a lazy way. Otherwise when a
65 * client with many entries in the table is removed, it would cost a lot of
66 * time to do the cleanup. */
67void disableTracking(client *c) {
68 /* If this client is in broadcasting mode, we need to unsubscribe it
69 * from all the prefixes it is registered to. */
70 if (c->flags & CLIENT_TRACKING_BCAST) {
71 raxIterator ri;
72 raxStart(&ri,c->client_tracking_prefixes);
73 raxSeek(&ri,"^",NULL,0);
74 while(raxNext(&ri)) {
75 bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
76 serverAssert(bs != raxNotFound);
77 raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
78 /* Was it the last client? Remove the prefix from the
79 * table. */
80 if (raxSize(bs->clients) == 0) {
81 raxFree(bs->clients);
82 raxFree(bs->keys);
83 zfree(bs);
84 raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
85 }
86 }
87 raxStop(&ri);
88 raxFree(c->client_tracking_prefixes);
89 c->client_tracking_prefixes = NULL;
90 }
91
92 /* Clear flags and adjust the count. */
93 if (c->flags & CLIENT_TRACKING) {
94 server.tracking_clients--;
95 c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
96 CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
97 CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING|
98 CLIENT_TRACKING_NOLOOP);
99 }
100}
101
102static int stringCheckPrefix(unsigned char *s1, size_t s1_len, unsigned char *s2, size_t s2_len) {
103 size_t min_length = s1_len < s2_len ? s1_len : s2_len;
104 return memcmp(s1,s2,min_length) == 0;
105}
106
107/* Check if any of the provided prefixes collide with one another or
108 * with an existing prefix for the client. A collision is defined as two
109 * prefixes that will emit an invalidation for the same key. If no prefix
110 * collision is found, 1 is return, otherwise 0 is returned and the client
111 * has an error emitted describing the error. */
112int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) {
113 for (size_t i = 0; i < numprefix; i++) {
114 /* Check input list has no overlap with existing prefixes. */
115 if (c->client_tracking_prefixes) {
116 raxIterator ri;
117 raxStart(&ri,c->client_tracking_prefixes);
118 raxSeek(&ri,"^",NULL,0);
119 while(raxNext(&ri)) {
120 if (stringCheckPrefix(ri.key,ri.key_len,
121 prefixes[i]->ptr,sdslen(prefixes[i]->ptr)))
122 {
123 sds collision = sdsnewlen(ri.key,ri.key_len);
124 addReplyErrorFormat(c,
125 "Prefix '%s' overlaps with an existing prefix '%s'. "
126 "Prefixes for a single client must not overlap.",
127 (unsigned char *)prefixes[i]->ptr,
128 (unsigned char *)collision);
129 sdsfree(collision);
130 raxStop(&ri);
131 return 0;
132 }
133 }
134 raxStop(&ri);
135 }
136 /* Check input has no overlap with itself. */
137 for (size_t j = i + 1; j < numprefix; j++) {
138 if (stringCheckPrefix(prefixes[i]->ptr,sdslen(prefixes[i]->ptr),
139 prefixes[j]->ptr,sdslen(prefixes[j]->ptr)))
140 {
141 addReplyErrorFormat(c,
142 "Prefix '%s' overlaps with another provided prefix '%s'. "
143 "Prefixes for a single client must not overlap.",
144 (unsigned char *)prefixes[i]->ptr,
145 (unsigned char *)prefixes[j]->ptr);
146 return i;
147 }
148 }
149 }
150 return 1;
151}
152
153/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
154 * already registered for the specified prefix, no operation is performed. */
155void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
156 bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,plen);
157 /* If this is the first client subscribing to such prefix, create
158 * the prefix in the table. */
159 if (bs == raxNotFound) {
160 bs = zmalloc(sizeof(*bs));
161 bs->keys = raxNew();
162 bs->clients = raxNew();
163 raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
164 }
165 if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
166 if (c->client_tracking_prefixes == NULL)
167 c->client_tracking_prefixes = raxNew();
168 raxInsert(c->client_tracking_prefixes,
169 (unsigned char*)prefix,plen,NULL,NULL);
170 }
171}
172
173/* Enable the tracking state for the client 'c', and as a side effect allocates
174 * the tracking table if needed. If the 'redirect_to' argument is non zero, the
175 * invalidation messages for this client will be sent to the client ID
176 * specified by the 'redirect_to' argument. Note that if such client will
177 * eventually get freed, we'll send a message to the original client to
178 * inform it of the condition. Multiple clients can redirect the invalidation
179 * messages to the same client ID. */
180void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
181 if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
182 c->flags |= CLIENT_TRACKING;
183 c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
184 CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
185 CLIENT_TRACKING_NOLOOP);
186 c->client_tracking_redirection = redirect_to;
187
188 /* This may be the first client we ever enable. Create the tracking
189 * table if it does not exist. */
190 if (TrackingTable == NULL) {
191 TrackingTable = raxNew();
192 PrefixTable = raxNew();
193 TrackingChannelName = createStringObject("__redis__:invalidate",20);
194 }
195
196 /* For broadcasting, set the list of prefixes in the client. */
197 if (options & CLIENT_TRACKING_BCAST) {
198 c->flags |= CLIENT_TRACKING_BCAST;
199 if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
200 for (size_t j = 0; j < numprefix; j++) {
201 sds sdsprefix = prefix[j]->ptr;
202 enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
203 }
204 }
205
206 /* Set the remaining flags that don't need any special handling. */
207 c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
208 CLIENT_TRACKING_NOLOOP);
209}
210
211/* This function is called after the execution of a readonly command in the
212 * case the client 'c' has keys tracking enabled and the tracking is not
213 * in BCAST mode. It will populate the tracking invalidation table according
214 * to the keys the user fetched, so that Redis will know what are the clients
215 * that should receive an invalidation message with certain groups of keys
216 * are modified. */
217void trackingRememberKeys(client *c) {
218 /* Return if we are in optin/out mode and the right CACHING command
219 * was/wasn't given in order to modify the default behavior. */
220 uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
221 uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
222 uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
223 if ((optin && !caching_given) || (optout && caching_given)) return;
224
225 getKeysResult result = GETKEYS_RESULT_INIT;
226 int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result);
227 if (!numkeys) {
228 getKeysFreeResult(&result);
229 return;
230 }
231 /* Shard channels are treated as special keys for client
232 * library to rely on `COMMAND` command to discover the node
233 * to connect to. These channels doesn't need to be tracked. */
234 if (c->cmd->flags & CMD_PUBSUB) {
235 return;
236 }
237
238 keyReference *keys = result.keys;
239
240 for(int j = 0; j < numkeys; j++) {
241 int idx = keys[j].pos;
242 sds sdskey = c->argv[idx]->ptr;
243 rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
244 if (ids == raxNotFound) {
245 ids = raxNew();
246 int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
247 sdslen(sdskey),ids, NULL);
248 serverAssert(inserted == 1);
249 }
250 if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
251 TrackingTableTotalItems++;
252 }
253 getKeysFreeResult(&result);
254}
255
256/* Given a key name, this function sends an invalidation message in the
257 * proper channel (depending on RESP version: PubSub or Push message) and
258 * to the proper client (in case of redirection), in the context of the
259 * client 'c' with tracking enabled.
260 *
261 * In case the 'proto' argument is non zero, the function will assume that
262 * 'keyname' points to a buffer of 'keylen' bytes already expressed in the
263 * form of Redis RESP protocol. This is used for:
264 * - In BCAST mode, to send an array of invalidated keys to all
265 * applicable clients
266 * - Following a flush command, to send a single RESP NULL to indicate
267 * that all keys are now invalid. */
268void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
269 int using_redirection = 0;
270 if (c->client_tracking_redirection) {
271 client *redir = lookupClientByID(c->client_tracking_redirection);
272 if (!redir) {
273 c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
274 /* We need to signal to the original connection that we
275 * are unable to send invalidation messages to the redirected
276 * connection, because the client no longer exist. */
277 if (c->resp > 2) {
278 addReplyPushLen(c,2);
279 addReplyBulkCBuffer(c,"tracking-redir-broken",21);
280 addReplyLongLong(c,c->client_tracking_redirection);
281 }
282 return;
283 }
284 c = redir;
285 using_redirection = 1;
286 }
287
288 /* Only send such info for clients in RESP version 3 or more. However
289 * if redirection is active, and the connection we redirect to is
290 * in Pub/Sub mode, we can support the feature with RESP 2 as well,
291 * by sending Pub/Sub messages in the __redis__:invalidate channel. */
292 if (c->resp > 2) {
293 addReplyPushLen(c,2);
294 addReplyBulkCBuffer(c,"invalidate",10);
295 } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
296 /* We use a static object to speedup things, however we assume
297 * that addReplyPubsubMessage() will not take a reference. */
298 addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk);
299 } else {
300 /* If are here, the client is not using RESP3, nor is
301 * redirecting to another client. We can't send anything to
302 * it since RESP2 does not support push messages in the same
303 * connection. */
304 return;
305 }
306
307 /* Send the "value" part, which is the array of keys. */
308 if (proto) {
309 addReplyProto(c,keyname,keylen);
310 } else {
311 addReplyArrayLen(c,1);
312 addReplyBulkCBuffer(c,keyname,keylen);
313 }
314 updateClientMemUsage(c);
315}
316
317/* This function is called when a key is modified in Redis and in the case
318 * we have at least one client with the BCAST mode enabled.
319 * Its goal is to set the key in the right broadcast state if the key
320 * matches one or more prefixes in the prefix table. Later when we
321 * return to the event loop, we'll send invalidation messages to the
322 * clients subscribed to each prefix. */
323void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
324 raxIterator ri;
325 raxStart(&ri,PrefixTable);
326 raxSeek(&ri,"^",NULL,0);
327 while(raxNext(&ri)) {
328 if (ri.key_len > keylen) continue;
329 if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
330 continue;
331 bcastState *bs = ri.data;
332 /* We insert the client pointer as associated value in the radix
333 * tree. This way we know who was the client that did the last
334 * change to the key, and can avoid sending the notification in the
335 * case the client is in NOLOOP mode. */
336 raxInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL);
337 }
338 raxStop(&ri);
339}
340
341/* This function is called from signalModifiedKey() or other places in Redis
342 * when a key changes value. In the context of keys tracking, our task here is
343 * to send a notification to every client that may have keys about such caching
344 * slot.
345 *
346 * Note that 'c' may be NULL in case the operation was performed outside the
347 * context of a client modifying the database (for instance when we delete a
348 * key because of expire).
349 *
350 * The last argument 'bcast' tells the function if it should also schedule
351 * the key for broadcasting to clients in BCAST mode. This is the case when
352 * the function is called from the Redis core once a key is modified, however
353 * we also call the function in order to evict keys in the key table in case
354 * of memory pressure: in that case the key didn't really change, so we want
355 * just to notify the clients that are in the table for this key, that would
356 * otherwise miss the fact we are no longer tracking the key for them. */
357void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
358 if (TrackingTable == NULL) return;
359
360 unsigned char *key = (unsigned char*)keyobj->ptr;
361 size_t keylen = sdslen(keyobj->ptr);
362
363 if (bcast && raxSize(PrefixTable) > 0)
364 trackingRememberKeyToBroadcast(c,(char *)key,keylen);
365
366 rax *ids = raxFind(TrackingTable,key,keylen);
367 if (ids == raxNotFound) return;
368
369 raxIterator ri;
370 raxStart(&ri,ids);
371 raxSeek(&ri,"^",NULL,0);
372 while(raxNext(&ri)) {
373 uint64_t id;
374 memcpy(&id,ri.key,sizeof(id));
375 client *target = lookupClientByID(id);
376 /* Note that if the client is in BCAST mode, we don't want to
377 * send invalidation messages that were pending in the case
378 * previously the client was not in BCAST mode. This can happen if
379 * TRACKING is enabled normally, and then the client switches to
380 * BCAST mode. */
381 if (target == NULL ||
382 !(target->flags & CLIENT_TRACKING)||
383 target->flags & CLIENT_TRACKING_BCAST)
384 {
385 continue;
386 }
387
388 /* If the client enabled the NOLOOP mode, don't send notifications
389 * about keys changed by the client itself. */
390 if (target->flags & CLIENT_TRACKING_NOLOOP &&
391 target == c)
392 {
393 continue;
394 }
395
396 /* If target is current client, we need schedule key invalidation.
397 * As the invalidation messages may be interleaved with command
398 * response and should after command response */
399 if (target == server.current_client){
400 incrRefCount(keyobj);
401 listAddNodeTail(server.tracking_pending_keys, keyobj);
402 } else {
403 sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0);
404 }
405 }
406 raxStop(&ri);
407
408 /* Free the tracking table: we'll create the radix tree and populate it
409 * again if more keys will be modified in this caching slot. */
410 TrackingTableTotalItems -= raxSize(ids);
411 raxFree(ids);
412 raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
413}
414
415void trackingHandlePendingKeyInvalidations() {
416 if (!listLength(server.tracking_pending_keys)) return;
417
418 listNode *ln;
419 listIter li;
420
421 listRewind(server.tracking_pending_keys,&li);
422 while ((ln = listNext(&li)) != NULL) {
423 robj *key = listNodeValue(ln);
424 /* current_client maybe freed, so we need to send invalidation
425 * message only when current_client is still alive */
426 if (server.current_client != NULL)
427 sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0);
428 decrRefCount(key);
429 }
430 listEmpty(server.tracking_pending_keys);
431}
432
433/* This function is called when one or all the Redis databases are
434 * flushed. Caching keys are not specific for each DB but are global:
435 * currently what we do is send a special notification to clients with
436 * tracking enabled, sending a RESP NULL, which means, "all the keys",
437 * in order to avoid flooding clients with many invalidation messages
438 * for all the keys they may hold.
439 */
440void freeTrackingRadixTreeCallback(void *rt) {
441 raxFree(rt);
442}
443
444void freeTrackingRadixTree(rax *rt) {
445 raxFreeWithCallback(rt,freeTrackingRadixTreeCallback);
446}
447
448/* A RESP NULL is sent to indicate that all keys are invalid */
449void trackingInvalidateKeysOnFlush(int async) {
450 if (server.tracking_clients) {
451 listNode *ln;
452 listIter li;
453 listRewind(server.clients,&li);
454 while ((ln = listNext(&li)) != NULL) {
455 client *c = listNodeValue(ln);
456 if (c->flags & CLIENT_TRACKING) {
457 sendTrackingMessage(c,shared.null[c->resp]->ptr,sdslen(shared.null[c->resp]->ptr),1);
458 }
459 }
460 }
461
462 /* In case of FLUSHALL, reclaim all the memory used by tracking. */
463 if (TrackingTable) {
464 if (async) {
465 freeTrackingRadixTreeAsync(TrackingTable);
466 } else {
467 freeTrackingRadixTree(TrackingTable);
468 }
469 TrackingTable = raxNew();
470 TrackingTableTotalItems = 0;
471 }
472}
473
474/* Tracking forces Redis to remember information about which client may have
475 * certain keys. In workloads where there are a lot of reads, but keys are
476 * hardly modified, the amount of information we have to remember server side
477 * could be a lot, with the number of keys being totally not bound.
478 *
479 * So Redis allows the user to configure a maximum number of keys for the
480 * invalidation table. This function makes sure that we don't go over the
481 * specified fill rate: if we are over, we can just evict information about
482 * a random key, and send invalidation messages to clients like if the key was
483 * modified. */
484void trackingLimitUsedSlots(void) {
485 static unsigned int timeout_counter = 0;
486 if (TrackingTable == NULL) return;
487 if (server.tracking_table_max_keys == 0) return; /* No limits set. */
488 size_t max_keys = server.tracking_table_max_keys;
489 if (raxSize(TrackingTable) <= max_keys) {
490 timeout_counter = 0;
491 return; /* Limit not reached. */
492 }
493
494 /* We have to invalidate a few keys to reach the limit again. The effort
495 * we do here is proportional to the number of times we entered this
496 * function and found that we are still over the limit. */
497 int effort = 100 * (timeout_counter+1);
498
499 /* We just remove one key after another by using a random walk. */
500 raxIterator ri;
501 raxStart(&ri,TrackingTable);
502 while(effort > 0) {
503 effort--;
504 raxSeek(&ri,"^",NULL,0);
505 raxRandomWalk(&ri,0);
506 if (raxEOF(&ri)) break;
507 robj *keyobj = createStringObject((char*)ri.key,ri.key_len);
508 trackingInvalidateKey(NULL,keyobj,0);
509 decrRefCount(keyobj);
510 if (raxSize(TrackingTable) <= max_keys) {
511 timeout_counter = 0;
512 raxStop(&ri);
513 return; /* Return ASAP: we are again under the limit. */
514 }
515 }
516
517 /* If we reach this point, we were not able to go under the configured
518 * limit using the maximum effort we had for this run. */
519 raxStop(&ri);
520 timeout_counter++;
521}
522
523/* Generate Redis protocol for an array containing all the key names
524 * in the 'keys' radix tree. If the client is not NULL, the list will not
525 * include keys that were modified the last time by this client, in order
526 * to implement the NOLOOP option.
527 *
528 * If the resulting array would be empty, NULL is returned instead. */
529sds trackingBuildBroadcastReply(client *c, rax *keys) {
530 raxIterator ri;
531 uint64_t count;
532
533 if (c == NULL) {
534 count = raxSize(keys);
535 } else {
536 count = 0;
537 raxStart(&ri,keys);
538 raxSeek(&ri,"^",NULL,0);
539 while(raxNext(&ri)) {
540 if (ri.data != c) count++;
541 }
542 raxStop(&ri);
543
544 if (count == 0) return NULL;
545 }
546
547 /* Create the array reply with the list of keys once, then send
548 * it to all the clients subscribed to this prefix. */
549 char buf[32];
550 size_t len = ll2string(buf,sizeof(buf),count);
551 sds proto = sdsempty();
552 proto = sdsMakeRoomFor(proto,count*15);
553 proto = sdscatlen(proto,"*",1);
554 proto = sdscatlen(proto,buf,len);
555 proto = sdscatlen(proto,"\r\n",2);
556 raxStart(&ri,keys);
557 raxSeek(&ri,"^",NULL,0);
558 while(raxNext(&ri)) {
559 if (c && ri.data == c) continue;
560 len = ll2string(buf,sizeof(buf),ri.key_len);
561 proto = sdscatlen(proto,"$",1);
562 proto = sdscatlen(proto,buf,len);
563 proto = sdscatlen(proto,"\r\n",2);
564 proto = sdscatlen(proto,ri.key,ri.key_len);
565 proto = sdscatlen(proto,"\r\n",2);
566 }
567 raxStop(&ri);
568 return proto;
569}
570
571/* This function will run the prefixes of clients in BCAST mode and
572 * keys that were modified about each prefix, and will send the
573 * notifications to each client in each prefix. */
574void trackingBroadcastInvalidationMessages(void) {
575 raxIterator ri, ri2;
576
577 /* Return ASAP if there is nothing to do here. */
578 if (TrackingTable == NULL || !server.tracking_clients) return;
579
580 raxStart(&ri,PrefixTable);
581 raxSeek(&ri,"^",NULL,0);
582
583 /* For each prefix... */
584 while(raxNext(&ri)) {
585 bcastState *bs = ri.data;
586
587 if (raxSize(bs->keys)) {
588 /* Generate the common protocol for all the clients that are
589 * not using the NOLOOP option. */
590 sds proto = trackingBuildBroadcastReply(NULL,bs->keys);
591
592 /* Send this array of keys to every client in the list. */
593 raxStart(&ri2,bs->clients);
594 raxSeek(&ri2,"^",NULL,0);
595 while(raxNext(&ri2)) {
596 client *c;
597 memcpy(&c,ri2.key,sizeof(c));
598 if (c->flags & CLIENT_TRACKING_NOLOOP) {
599 /* This client may have certain keys excluded. */
600 sds adhoc = trackingBuildBroadcastReply(c,bs->keys);
601 if (adhoc) {
602 sendTrackingMessage(c,adhoc,sdslen(adhoc),1);
603 sdsfree(adhoc);
604 }
605 } else {
606 sendTrackingMessage(c,proto,sdslen(proto),1);
607 }
608 }
609 raxStop(&ri2);
610
611 /* Clean up: we can remove everything from this state, because we
612 * want to only track the new keys that will be accumulated starting
613 * from now. */
614 sdsfree(proto);
615 }
616 raxFree(bs->keys);
617 bs->keys = raxNew();
618 }
619 raxStop(&ri);
620}
621
622/* This is just used in order to access the amount of used slots in the
623 * tracking table. */
624uint64_t trackingGetTotalItems(void) {
625 return TrackingTableTotalItems;
626}
627
628uint64_t trackingGetTotalKeys(void) {
629 if (TrackingTable == NULL) return 0;
630 return raxSize(TrackingTable);
631}
632
633uint64_t trackingGetTotalPrefixes(void) {
634 if (PrefixTable == NULL) return 0;
635 return raxSize(PrefixTable);
636}
637