1 | /* tracking.c - Client side caching: keys tracking and invalidation |
2 | * |
3 | * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com> |
4 | * All rights reserved. |
5 | * |
6 | * Redistribution and use in source and binary forms, with or without |
7 | * modification, are permitted provided that the following conditions are met: |
8 | * |
9 | * * Redistributions of source code must retain the above copyright notice, |
10 | * this list of conditions and the following disclaimer. |
11 | * * Redistributions in binary form must reproduce the above copyright |
12 | * notice, this list of conditions and the following disclaimer in the |
13 | * documentation and/or other materials provided with the distribution. |
14 | * * Neither the name of Redis nor the names of its contributors may be used |
15 | * to endorse or promote products derived from this software without |
16 | * specific prior written permission. |
17 | * |
18 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
19 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
20 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
21 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
22 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
23 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
24 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
25 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
26 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
27 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 | * POSSIBILITY OF SUCH DAMAGE. |
29 | */ |
30 | |
31 | #include "server.h" |
32 | |
33 | /* The tracking table is constituted by a radix tree of keys, each pointing |
34 | * to a radix tree of client IDs, used to track the clients that may have |
35 | * certain keys in their local, client side, cache. |
36 | * |
37 | * When a client enables tracking with "CLIENT TRACKING on", each key served to |
38 | * the client is remembered in the table mapping the keys to the client IDs. |
39 | * Later, when a key is modified, all the clients that may have local copy |
40 | * of such key will receive an invalidation message. |
41 | * |
42 | * Clients will normally take frequently requested objects in memory, removing |
43 | * them when invalidation messages are received. */ |
44 | rax *TrackingTable = NULL; |
45 | rax *PrefixTable = NULL; |
46 | uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across |
47 | the whole tracking table. This gives |
48 | an hint about the total memory we |
49 | are using server side for CSC. */ |
50 | robj *TrackingChannelName; |
51 | |
52 | /* This is the structure that we have as value of the PrefixTable, and |
53 | * represents the list of keys modified, and the list of clients that need |
54 | * to be notified, for a given prefix. */ |
55 | typedef struct bcastState { |
56 | rax *keys; /* Keys modified in the current event loop cycle. */ |
57 | rax *clients; /* Clients subscribed to the notification events for this |
58 | prefix. */ |
59 | } bcastState; |
60 | |
61 | /* Remove the tracking state from the client 'c'. Note that there is not much |
62 | * to do for us here, if not to decrement the counter of the clients in |
63 | * tracking mode, because we just store the ID of the client in the tracking |
64 | * table, so we'll remove the ID reference in a lazy way. Otherwise when a |
65 | * client with many entries in the table is removed, it would cost a lot of |
66 | * time to do the cleanup. */ |
67 | void disableTracking(client *c) { |
68 | /* If this client is in broadcasting mode, we need to unsubscribe it |
69 | * from all the prefixes it is registered to. */ |
70 | if (c->flags & CLIENT_TRACKING_BCAST) { |
71 | raxIterator ri; |
72 | raxStart(&ri,c->client_tracking_prefixes); |
73 | raxSeek(&ri,"^" ,NULL,0); |
74 | while(raxNext(&ri)) { |
75 | bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); |
76 | serverAssert(bs != raxNotFound); |
77 | raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); |
78 | /* Was it the last client? Remove the prefix from the |
79 | * table. */ |
80 | if (raxSize(bs->clients) == 0) { |
81 | raxFree(bs->clients); |
82 | raxFree(bs->keys); |
83 | zfree(bs); |
84 | raxRemove(PrefixTable,ri.key,ri.key_len,NULL); |
85 | } |
86 | } |
87 | raxStop(&ri); |
88 | raxFree(c->client_tracking_prefixes); |
89 | c->client_tracking_prefixes = NULL; |
90 | } |
91 | |
92 | /* Clear flags and adjust the count. */ |
93 | if (c->flags & CLIENT_TRACKING) { |
94 | server.tracking_clients--; |
95 | c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| |
96 | CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| |
97 | CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING| |
98 | CLIENT_TRACKING_NOLOOP); |
99 | } |
100 | } |
101 | |
102 | static int stringCheckPrefix(unsigned char *s1, size_t s1_len, unsigned char *s2, size_t s2_len) { |
103 | size_t min_length = s1_len < s2_len ? s1_len : s2_len; |
104 | return memcmp(s1,s2,min_length) == 0; |
105 | } |
106 | |
107 | /* Check if any of the provided prefixes collide with one another or |
108 | * with an existing prefix for the client. A collision is defined as two |
109 | * prefixes that will emit an invalidation for the same key. If no prefix |
110 | * collision is found, 1 is return, otherwise 0 is returned and the client |
111 | * has an error emitted describing the error. */ |
112 | int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) { |
113 | for (size_t i = 0; i < numprefix; i++) { |
114 | /* Check input list has no overlap with existing prefixes. */ |
115 | if (c->client_tracking_prefixes) { |
116 | raxIterator ri; |
117 | raxStart(&ri,c->client_tracking_prefixes); |
118 | raxSeek(&ri,"^" ,NULL,0); |
119 | while(raxNext(&ri)) { |
120 | if (stringCheckPrefix(ri.key,ri.key_len, |
121 | prefixes[i]->ptr,sdslen(prefixes[i]->ptr))) |
122 | { |
123 | sds collision = sdsnewlen(ri.key,ri.key_len); |
124 | addReplyErrorFormat(c, |
125 | "Prefix '%s' overlaps with an existing prefix '%s'. " |
126 | "Prefixes for a single client must not overlap." , |
127 | (unsigned char *)prefixes[i]->ptr, |
128 | (unsigned char *)collision); |
129 | sdsfree(collision); |
130 | raxStop(&ri); |
131 | return 0; |
132 | } |
133 | } |
134 | raxStop(&ri); |
135 | } |
136 | /* Check input has no overlap with itself. */ |
137 | for (size_t j = i + 1; j < numprefix; j++) { |
138 | if (stringCheckPrefix(prefixes[i]->ptr,sdslen(prefixes[i]->ptr), |
139 | prefixes[j]->ptr,sdslen(prefixes[j]->ptr))) |
140 | { |
141 | addReplyErrorFormat(c, |
142 | "Prefix '%s' overlaps with another provided prefix '%s'. " |
143 | "Prefixes for a single client must not overlap." , |
144 | (unsigned char *)prefixes[i]->ptr, |
145 | (unsigned char *)prefixes[j]->ptr); |
146 | return i; |
147 | } |
148 | } |
149 | } |
150 | return 1; |
151 | } |
152 | |
153 | /* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is |
154 | * already registered for the specified prefix, no operation is performed. */ |
155 | void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { |
156 | bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,plen); |
157 | /* If this is the first client subscribing to such prefix, create |
158 | * the prefix in the table. */ |
159 | if (bs == raxNotFound) { |
160 | bs = zmalloc(sizeof(*bs)); |
161 | bs->keys = raxNew(); |
162 | bs->clients = raxNew(); |
163 | raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); |
164 | } |
165 | if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { |
166 | if (c->client_tracking_prefixes == NULL) |
167 | c->client_tracking_prefixes = raxNew(); |
168 | raxInsert(c->client_tracking_prefixes, |
169 | (unsigned char*)prefix,plen,NULL,NULL); |
170 | } |
171 | } |
172 | |
173 | /* Enable the tracking state for the client 'c', and as a side effect allocates |
174 | * the tracking table if needed. If the 'redirect_to' argument is non zero, the |
175 | * invalidation messages for this client will be sent to the client ID |
176 | * specified by the 'redirect_to' argument. Note that if such client will |
177 | * eventually get freed, we'll send a message to the original client to |
178 | * inform it of the condition. Multiple clients can redirect the invalidation |
179 | * messages to the same client ID. */ |
180 | void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) { |
181 | if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; |
182 | c->flags |= CLIENT_TRACKING; |
183 | c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST| |
184 | CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT| |
185 | CLIENT_TRACKING_NOLOOP); |
186 | c->client_tracking_redirection = redirect_to; |
187 | |
188 | /* This may be the first client we ever enable. Create the tracking |
189 | * table if it does not exist. */ |
190 | if (TrackingTable == NULL) { |
191 | TrackingTable = raxNew(); |
192 | PrefixTable = raxNew(); |
193 | TrackingChannelName = createStringObject("__redis__:invalidate" ,20); |
194 | } |
195 | |
196 | /* For broadcasting, set the list of prefixes in the client. */ |
197 | if (options & CLIENT_TRACKING_BCAST) { |
198 | c->flags |= CLIENT_TRACKING_BCAST; |
199 | if (numprefix == 0) enableBcastTrackingForPrefix(c,"" ,0); |
200 | for (size_t j = 0; j < numprefix; j++) { |
201 | sds sdsprefix = prefix[j]->ptr; |
202 | enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); |
203 | } |
204 | } |
205 | |
206 | /* Set the remaining flags that don't need any special handling. */ |
207 | c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT| |
208 | CLIENT_TRACKING_NOLOOP); |
209 | } |
210 | |
211 | /* This function is called after the execution of a readonly command in the |
212 | * case the client 'c' has keys tracking enabled and the tracking is not |
213 | * in BCAST mode. It will populate the tracking invalidation table according |
214 | * to the keys the user fetched, so that Redis will know what are the clients |
215 | * that should receive an invalidation message with certain groups of keys |
216 | * are modified. */ |
217 | void trackingRememberKeys(client *c) { |
218 | /* Return if we are in optin/out mode and the right CACHING command |
219 | * was/wasn't given in order to modify the default behavior. */ |
220 | uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN; |
221 | uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT; |
222 | uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING; |
223 | if ((optin && !caching_given) || (optout && caching_given)) return; |
224 | |
225 | getKeysResult result = GETKEYS_RESULT_INIT; |
226 | int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result); |
227 | if (!numkeys) { |
228 | getKeysFreeResult(&result); |
229 | return; |
230 | } |
231 | /* Shard channels are treated as special keys for client |
232 | * library to rely on `COMMAND` command to discover the node |
233 | * to connect to. These channels doesn't need to be tracked. */ |
234 | if (c->cmd->flags & CMD_PUBSUB) { |
235 | return; |
236 | } |
237 | |
238 | keyReference *keys = result.keys; |
239 | |
240 | for(int j = 0; j < numkeys; j++) { |
241 | int idx = keys[j].pos; |
242 | sds sdskey = c->argv[idx]->ptr; |
243 | rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); |
244 | if (ids == raxNotFound) { |
245 | ids = raxNew(); |
246 | int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, |
247 | sdslen(sdskey),ids, NULL); |
248 | serverAssert(inserted == 1); |
249 | } |
250 | if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL)) |
251 | TrackingTableTotalItems++; |
252 | } |
253 | getKeysFreeResult(&result); |
254 | } |
255 | |
256 | /* Given a key name, this function sends an invalidation message in the |
257 | * proper channel (depending on RESP version: PubSub or Push message) and |
258 | * to the proper client (in case of redirection), in the context of the |
259 | * client 'c' with tracking enabled. |
260 | * |
261 | * In case the 'proto' argument is non zero, the function will assume that |
262 | * 'keyname' points to a buffer of 'keylen' bytes already expressed in the |
263 | * form of Redis RESP protocol. This is used for: |
264 | * - In BCAST mode, to send an array of invalidated keys to all |
265 | * applicable clients |
266 | * - Following a flush command, to send a single RESP NULL to indicate |
267 | * that all keys are now invalid. */ |
268 | void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { |
269 | int using_redirection = 0; |
270 | if (c->client_tracking_redirection) { |
271 | client *redir = lookupClientByID(c->client_tracking_redirection); |
272 | if (!redir) { |
273 | c->flags |= CLIENT_TRACKING_BROKEN_REDIR; |
274 | /* We need to signal to the original connection that we |
275 | * are unable to send invalidation messages to the redirected |
276 | * connection, because the client no longer exist. */ |
277 | if (c->resp > 2) { |
278 | addReplyPushLen(c,2); |
279 | addReplyBulkCBuffer(c,"tracking-redir-broken" ,21); |
280 | addReplyLongLong(c,c->client_tracking_redirection); |
281 | } |
282 | return; |
283 | } |
284 | c = redir; |
285 | using_redirection = 1; |
286 | } |
287 | |
288 | /* Only send such info for clients in RESP version 3 or more. However |
289 | * if redirection is active, and the connection we redirect to is |
290 | * in Pub/Sub mode, we can support the feature with RESP 2 as well, |
291 | * by sending Pub/Sub messages in the __redis__:invalidate channel. */ |
292 | if (c->resp > 2) { |
293 | addReplyPushLen(c,2); |
294 | addReplyBulkCBuffer(c,"invalidate" ,10); |
295 | } else if (using_redirection && c->flags & CLIENT_PUBSUB) { |
296 | /* We use a static object to speedup things, however we assume |
297 | * that addReplyPubsubMessage() will not take a reference. */ |
298 | addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk); |
299 | } else { |
300 | /* If are here, the client is not using RESP3, nor is |
301 | * redirecting to another client. We can't send anything to |
302 | * it since RESP2 does not support push messages in the same |
303 | * connection. */ |
304 | return; |
305 | } |
306 | |
307 | /* Send the "value" part, which is the array of keys. */ |
308 | if (proto) { |
309 | addReplyProto(c,keyname,keylen); |
310 | } else { |
311 | addReplyArrayLen(c,1); |
312 | addReplyBulkCBuffer(c,keyname,keylen); |
313 | } |
314 | updateClientMemUsage(c); |
315 | } |
316 | |
317 | /* This function is called when a key is modified in Redis and in the case |
318 | * we have at least one client with the BCAST mode enabled. |
319 | * Its goal is to set the key in the right broadcast state if the key |
320 | * matches one or more prefixes in the prefix table. Later when we |
321 | * return to the event loop, we'll send invalidation messages to the |
322 | * clients subscribed to each prefix. */ |
323 | void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { |
324 | raxIterator ri; |
325 | raxStart(&ri,PrefixTable); |
326 | raxSeek(&ri,"^" ,NULL,0); |
327 | while(raxNext(&ri)) { |
328 | if (ri.key_len > keylen) continue; |
329 | if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) |
330 | continue; |
331 | bcastState *bs = ri.data; |
332 | /* We insert the client pointer as associated value in the radix |
333 | * tree. This way we know who was the client that did the last |
334 | * change to the key, and can avoid sending the notification in the |
335 | * case the client is in NOLOOP mode. */ |
336 | raxInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL); |
337 | } |
338 | raxStop(&ri); |
339 | } |
340 | |
341 | /* This function is called from signalModifiedKey() or other places in Redis |
342 | * when a key changes value. In the context of keys tracking, our task here is |
343 | * to send a notification to every client that may have keys about such caching |
344 | * slot. |
345 | * |
346 | * Note that 'c' may be NULL in case the operation was performed outside the |
347 | * context of a client modifying the database (for instance when we delete a |
348 | * key because of expire). |
349 | * |
350 | * The last argument 'bcast' tells the function if it should also schedule |
351 | * the key for broadcasting to clients in BCAST mode. This is the case when |
352 | * the function is called from the Redis core once a key is modified, however |
353 | * we also call the function in order to evict keys in the key table in case |
354 | * of memory pressure: in that case the key didn't really change, so we want |
355 | * just to notify the clients that are in the table for this key, that would |
356 | * otherwise miss the fact we are no longer tracking the key for them. */ |
357 | void trackingInvalidateKey(client *c, robj *keyobj, int bcast) { |
358 | if (TrackingTable == NULL) return; |
359 | |
360 | unsigned char *key = (unsigned char*)keyobj->ptr; |
361 | size_t keylen = sdslen(keyobj->ptr); |
362 | |
363 | if (bcast && raxSize(PrefixTable) > 0) |
364 | trackingRememberKeyToBroadcast(c,(char *)key,keylen); |
365 | |
366 | rax *ids = raxFind(TrackingTable,key,keylen); |
367 | if (ids == raxNotFound) return; |
368 | |
369 | raxIterator ri; |
370 | raxStart(&ri,ids); |
371 | raxSeek(&ri,"^" ,NULL,0); |
372 | while(raxNext(&ri)) { |
373 | uint64_t id; |
374 | memcpy(&id,ri.key,sizeof(id)); |
375 | client *target = lookupClientByID(id); |
376 | /* Note that if the client is in BCAST mode, we don't want to |
377 | * send invalidation messages that were pending in the case |
378 | * previously the client was not in BCAST mode. This can happen if |
379 | * TRACKING is enabled normally, and then the client switches to |
380 | * BCAST mode. */ |
381 | if (target == NULL || |
382 | !(target->flags & CLIENT_TRACKING)|| |
383 | target->flags & CLIENT_TRACKING_BCAST) |
384 | { |
385 | continue; |
386 | } |
387 | |
388 | /* If the client enabled the NOLOOP mode, don't send notifications |
389 | * about keys changed by the client itself. */ |
390 | if (target->flags & CLIENT_TRACKING_NOLOOP && |
391 | target == c) |
392 | { |
393 | continue; |
394 | } |
395 | |
396 | /* If target is current client, we need schedule key invalidation. |
397 | * As the invalidation messages may be interleaved with command |
398 | * response and should after command response */ |
399 | if (target == server.current_client){ |
400 | incrRefCount(keyobj); |
401 | listAddNodeTail(server.tracking_pending_keys, keyobj); |
402 | } else { |
403 | sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0); |
404 | } |
405 | } |
406 | raxStop(&ri); |
407 | |
408 | /* Free the tracking table: we'll create the radix tree and populate it |
409 | * again if more keys will be modified in this caching slot. */ |
410 | TrackingTableTotalItems -= raxSize(ids); |
411 | raxFree(ids); |
412 | raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL); |
413 | } |
414 | |
415 | void trackingHandlePendingKeyInvalidations() { |
416 | if (!listLength(server.tracking_pending_keys)) return; |
417 | |
418 | listNode *ln; |
419 | listIter li; |
420 | |
421 | listRewind(server.tracking_pending_keys,&li); |
422 | while ((ln = listNext(&li)) != NULL) { |
423 | robj *key = listNodeValue(ln); |
424 | /* current_client maybe freed, so we need to send invalidation |
425 | * message only when current_client is still alive */ |
426 | if (server.current_client != NULL) |
427 | sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0); |
428 | decrRefCount(key); |
429 | } |
430 | listEmpty(server.tracking_pending_keys); |
431 | } |
432 | |
433 | /* This function is called when one or all the Redis databases are |
434 | * flushed. Caching keys are not specific for each DB but are global: |
435 | * currently what we do is send a special notification to clients with |
436 | * tracking enabled, sending a RESP NULL, which means, "all the keys", |
437 | * in order to avoid flooding clients with many invalidation messages |
438 | * for all the keys they may hold. |
439 | */ |
440 | void freeTrackingRadixTreeCallback(void *rt) { |
441 | raxFree(rt); |
442 | } |
443 | |
444 | void freeTrackingRadixTree(rax *rt) { |
445 | raxFreeWithCallback(rt,freeTrackingRadixTreeCallback); |
446 | } |
447 | |
448 | /* A RESP NULL is sent to indicate that all keys are invalid */ |
449 | void trackingInvalidateKeysOnFlush(int async) { |
450 | if (server.tracking_clients) { |
451 | listNode *ln; |
452 | listIter li; |
453 | listRewind(server.clients,&li); |
454 | while ((ln = listNext(&li)) != NULL) { |
455 | client *c = listNodeValue(ln); |
456 | if (c->flags & CLIENT_TRACKING) { |
457 | sendTrackingMessage(c,shared.null[c->resp]->ptr,sdslen(shared.null[c->resp]->ptr),1); |
458 | } |
459 | } |
460 | } |
461 | |
462 | /* In case of FLUSHALL, reclaim all the memory used by tracking. */ |
463 | if (TrackingTable) { |
464 | if (async) { |
465 | freeTrackingRadixTreeAsync(TrackingTable); |
466 | } else { |
467 | freeTrackingRadixTree(TrackingTable); |
468 | } |
469 | TrackingTable = raxNew(); |
470 | TrackingTableTotalItems = 0; |
471 | } |
472 | } |
473 | |
474 | /* Tracking forces Redis to remember information about which client may have |
475 | * certain keys. In workloads where there are a lot of reads, but keys are |
476 | * hardly modified, the amount of information we have to remember server side |
477 | * could be a lot, with the number of keys being totally not bound. |
478 | * |
479 | * So Redis allows the user to configure a maximum number of keys for the |
480 | * invalidation table. This function makes sure that we don't go over the |
481 | * specified fill rate: if we are over, we can just evict information about |
482 | * a random key, and send invalidation messages to clients like if the key was |
483 | * modified. */ |
484 | void trackingLimitUsedSlots(void) { |
485 | static unsigned int timeout_counter = 0; |
486 | if (TrackingTable == NULL) return; |
487 | if (server.tracking_table_max_keys == 0) return; /* No limits set. */ |
488 | size_t max_keys = server.tracking_table_max_keys; |
489 | if (raxSize(TrackingTable) <= max_keys) { |
490 | timeout_counter = 0; |
491 | return; /* Limit not reached. */ |
492 | } |
493 | |
494 | /* We have to invalidate a few keys to reach the limit again. The effort |
495 | * we do here is proportional to the number of times we entered this |
496 | * function and found that we are still over the limit. */ |
497 | int effort = 100 * (timeout_counter+1); |
498 | |
499 | /* We just remove one key after another by using a random walk. */ |
500 | raxIterator ri; |
501 | raxStart(&ri,TrackingTable); |
502 | while(effort > 0) { |
503 | effort--; |
504 | raxSeek(&ri,"^" ,NULL,0); |
505 | raxRandomWalk(&ri,0); |
506 | if (raxEOF(&ri)) break; |
507 | robj *keyobj = createStringObject((char*)ri.key,ri.key_len); |
508 | trackingInvalidateKey(NULL,keyobj,0); |
509 | decrRefCount(keyobj); |
510 | if (raxSize(TrackingTable) <= max_keys) { |
511 | timeout_counter = 0; |
512 | raxStop(&ri); |
513 | return; /* Return ASAP: we are again under the limit. */ |
514 | } |
515 | } |
516 | |
517 | /* If we reach this point, we were not able to go under the configured |
518 | * limit using the maximum effort we had for this run. */ |
519 | raxStop(&ri); |
520 | timeout_counter++; |
521 | } |
522 | |
523 | /* Generate Redis protocol for an array containing all the key names |
524 | * in the 'keys' radix tree. If the client is not NULL, the list will not |
525 | * include keys that were modified the last time by this client, in order |
526 | * to implement the NOLOOP option. |
527 | * |
528 | * If the resulting array would be empty, NULL is returned instead. */ |
529 | sds trackingBuildBroadcastReply(client *c, rax *keys) { |
530 | raxIterator ri; |
531 | uint64_t count; |
532 | |
533 | if (c == NULL) { |
534 | count = raxSize(keys); |
535 | } else { |
536 | count = 0; |
537 | raxStart(&ri,keys); |
538 | raxSeek(&ri,"^" ,NULL,0); |
539 | while(raxNext(&ri)) { |
540 | if (ri.data != c) count++; |
541 | } |
542 | raxStop(&ri); |
543 | |
544 | if (count == 0) return NULL; |
545 | } |
546 | |
547 | /* Create the array reply with the list of keys once, then send |
548 | * it to all the clients subscribed to this prefix. */ |
549 | char buf[32]; |
550 | size_t len = ll2string(buf,sizeof(buf),count); |
551 | sds proto = sdsempty(); |
552 | proto = sdsMakeRoomFor(proto,count*15); |
553 | proto = sdscatlen(proto,"*" ,1); |
554 | proto = sdscatlen(proto,buf,len); |
555 | proto = sdscatlen(proto,"\r\n" ,2); |
556 | raxStart(&ri,keys); |
557 | raxSeek(&ri,"^" ,NULL,0); |
558 | while(raxNext(&ri)) { |
559 | if (c && ri.data == c) continue; |
560 | len = ll2string(buf,sizeof(buf),ri.key_len); |
561 | proto = sdscatlen(proto,"$" ,1); |
562 | proto = sdscatlen(proto,buf,len); |
563 | proto = sdscatlen(proto,"\r\n" ,2); |
564 | proto = sdscatlen(proto,ri.key,ri.key_len); |
565 | proto = sdscatlen(proto,"\r\n" ,2); |
566 | } |
567 | raxStop(&ri); |
568 | return proto; |
569 | } |
570 | |
571 | /* This function will run the prefixes of clients in BCAST mode and |
572 | * keys that were modified about each prefix, and will send the |
573 | * notifications to each client in each prefix. */ |
574 | void trackingBroadcastInvalidationMessages(void) { |
575 | raxIterator ri, ri2; |
576 | |
577 | /* Return ASAP if there is nothing to do here. */ |
578 | if (TrackingTable == NULL || !server.tracking_clients) return; |
579 | |
580 | raxStart(&ri,PrefixTable); |
581 | raxSeek(&ri,"^" ,NULL,0); |
582 | |
583 | /* For each prefix... */ |
584 | while(raxNext(&ri)) { |
585 | bcastState *bs = ri.data; |
586 | |
587 | if (raxSize(bs->keys)) { |
588 | /* Generate the common protocol for all the clients that are |
589 | * not using the NOLOOP option. */ |
590 | sds proto = trackingBuildBroadcastReply(NULL,bs->keys); |
591 | |
592 | /* Send this array of keys to every client in the list. */ |
593 | raxStart(&ri2,bs->clients); |
594 | raxSeek(&ri2,"^" ,NULL,0); |
595 | while(raxNext(&ri2)) { |
596 | client *c; |
597 | memcpy(&c,ri2.key,sizeof(c)); |
598 | if (c->flags & CLIENT_TRACKING_NOLOOP) { |
599 | /* This client may have certain keys excluded. */ |
600 | sds adhoc = trackingBuildBroadcastReply(c,bs->keys); |
601 | if (adhoc) { |
602 | sendTrackingMessage(c,adhoc,sdslen(adhoc),1); |
603 | sdsfree(adhoc); |
604 | } |
605 | } else { |
606 | sendTrackingMessage(c,proto,sdslen(proto),1); |
607 | } |
608 | } |
609 | raxStop(&ri2); |
610 | |
611 | /* Clean up: we can remove everything from this state, because we |
612 | * want to only track the new keys that will be accumulated starting |
613 | * from now. */ |
614 | sdsfree(proto); |
615 | } |
616 | raxFree(bs->keys); |
617 | bs->keys = raxNew(); |
618 | } |
619 | raxStop(&ri); |
620 | } |
621 | |
622 | /* This is just used in order to access the amount of used slots in the |
623 | * tracking table. */ |
624 | uint64_t trackingGetTotalItems(void) { |
625 | return TrackingTableTotalItems; |
626 | } |
627 | |
628 | uint64_t trackingGetTotalKeys(void) { |
629 | if (TrackingTable == NULL) return 0; |
630 | return raxSize(TrackingTable); |
631 | } |
632 | |
633 | uint64_t trackingGetTotalPrefixes(void) { |
634 | if (PrefixTable == NULL) return 0; |
635 | return raxSize(PrefixTable); |
636 | } |
637 | |