1 | /* |
2 | * Active memory defragmentation |
3 | * Try to find key / value allocations that need to be re-allocated in order |
4 | * to reduce external fragmentation. |
5 | * We do that by scanning the keyspace and for each pointer we have, we can try to |
6 | * ask the allocator if moving it to a new address will help reduce fragmentation. |
7 | * |
8 | * Copyright (c) 2020, Redis Labs, Inc |
9 | * All rights reserved. |
10 | * |
11 | * Redistribution and use in source and binary forms, with or without |
12 | * modification, are permitted provided that the following conditions are met: |
13 | * |
14 | * * Redistributions of source code must retain the above copyright notice, |
15 | * this list of conditions and the following disclaimer. |
16 | * * Redistributions in binary form must reproduce the above copyright |
17 | * notice, this list of conditions and the following disclaimer in the |
18 | * documentation and/or other materials provided with the distribution. |
19 | * * Neither the name of Redis nor the names of its contributors may be used |
20 | * to endorse or promote products derived from this software without |
21 | * specific prior written permission. |
22 | * |
23 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
24 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
25 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
26 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
27 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
28 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
29 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
30 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
31 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
32 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
33 | * POSSIBILITY OF SUCH DAMAGE. |
34 | */ |
35 | |
36 | #include "server.h" |
37 | #include "cluster.h" |
38 | #include <time.h> |
39 | #include <assert.h> |
40 | #include <stddef.h> |
41 | |
42 | #ifdef HAVE_DEFRAG |
43 | |
44 | /* this method was added to jemalloc in order to help us understand which |
45 | * pointers are worthwhile moving and which aren't */ |
46 | int je_get_defrag_hint(void* ptr); |
47 | |
48 | /* forward declarations*/ |
49 | void defragDictBucketCallback(dict *d, dictEntry **bucketref); |
50 | dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); |
51 | |
52 | /* Defrag helper for generic allocations. |
53 | * |
54 | * returns NULL in case the allocation wasn't moved. |
55 | * when it returns a non-null value, the old pointer was already released |
56 | * and should NOT be accessed. */ |
57 | void* activeDefragAlloc(void *ptr) { |
58 | size_t size; |
59 | void *newptr; |
60 | if(!je_get_defrag_hint(ptr)) { |
61 | server.stat_active_defrag_misses++; |
62 | return NULL; |
63 | } |
64 | /* move this allocation to a new allocation. |
65 | * make sure not to use the thread cache. so that we don't get back the same |
66 | * pointers we try to free */ |
67 | size = zmalloc_size(ptr); |
68 | newptr = zmalloc_no_tcache(size); |
69 | memcpy(newptr, ptr, size); |
70 | zfree_no_tcache(ptr); |
71 | return newptr; |
72 | } |
73 | |
74 | /*Defrag helper for sds strings |
75 | * |
76 | * returns NULL in case the allocation wasn't moved. |
77 | * when it returns a non-null value, the old pointer was already released |
78 | * and should NOT be accessed. */ |
79 | sds activeDefragSds(sds sdsptr) { |
80 | void* ptr = sdsAllocPtr(sdsptr); |
81 | void* newptr = activeDefragAlloc(ptr); |
82 | if (newptr) { |
83 | size_t offset = sdsptr - (char*)ptr; |
84 | sdsptr = (char*)newptr + offset; |
85 | return sdsptr; |
86 | } |
87 | return NULL; |
88 | } |
89 | |
90 | /* Defrag helper for robj and/or string objects |
91 | * |
92 | * returns NULL in case the allocation wasn't moved. |
93 | * when it returns a non-null value, the old pointer was already released |
94 | * and should NOT be accessed. */ |
95 | robj *activeDefragStringOb(robj* ob, long *defragged) { |
96 | robj *ret = NULL; |
97 | if (ob->refcount!=1) |
98 | return NULL; |
99 | |
100 | /* try to defrag robj (only if not an EMBSTR type (handled below). */ |
101 | if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) { |
102 | if ((ret = activeDefragAlloc(ob))) { |
103 | ob = ret; |
104 | (*defragged)++; |
105 | } |
106 | } |
107 | |
108 | /* try to defrag string object */ |
109 | if (ob->type == OBJ_STRING) { |
110 | if(ob->encoding==OBJ_ENCODING_RAW) { |
111 | sds newsds = activeDefragSds((sds)ob->ptr); |
112 | if (newsds) { |
113 | ob->ptr = newsds; |
114 | (*defragged)++; |
115 | } |
116 | } else if (ob->encoding==OBJ_ENCODING_EMBSTR) { |
117 | /* The sds is embedded in the object allocation, calculate the |
118 | * offset and update the pointer in the new allocation. */ |
119 | long ofs = (intptr_t)ob->ptr - (intptr_t)ob; |
120 | if ((ret = activeDefragAlloc(ob))) { |
121 | ret->ptr = (void*)((intptr_t)ret + ofs); |
122 | (*defragged)++; |
123 | } |
124 | } else if (ob->encoding!=OBJ_ENCODING_INT) { |
125 | serverPanic("Unknown string encoding" ); |
126 | } |
127 | } |
128 | return ret; |
129 | } |
130 | |
131 | /* Defrag helper for lua scripts |
132 | * |
133 | * returns NULL in case the allocation wasn't moved. |
134 | * when it returns a non-null value, the old pointer was already released |
135 | * and should NOT be accessed. */ |
136 | luaScript *activeDefragLuaScript(luaScript *script, long *defragged) { |
137 | luaScript *ret = NULL; |
138 | |
139 | /* try to defrag script struct */ |
140 | if ((ret = activeDefragAlloc(script))) { |
141 | script = ret; |
142 | (*defragged)++; |
143 | } |
144 | |
145 | /* try to defrag actual script object */ |
146 | robj *ob = activeDefragStringOb(script->body, defragged); |
147 | if (ob) script->body = ob; |
148 | |
149 | return ret; |
150 | } |
151 | |
152 | /* Defrag helper for dictEntries to be used during dict iteration (called on |
153 | * each step). Returns a stat of how many pointers were moved. */ |
154 | long dictIterDefragEntry(dictIterator *iter) { |
155 | /* This function is a little bit dirty since it messes with the internals |
156 | * of the dict and it's iterator, but the benefit is that it is very easy |
157 | * to use, and require no other changes in the dict. */ |
158 | long defragged = 0; |
159 | /* Handle the next entry (if there is one), and update the pointer in the |
160 | * current entry. */ |
161 | if (iter->nextEntry) { |
162 | dictEntry *newde = activeDefragAlloc(iter->nextEntry); |
163 | if (newde) { |
164 | defragged++; |
165 | iter->nextEntry = newde; |
166 | iter->entry->next = newde; |
167 | } |
168 | } |
169 | /* handle the case of the first entry in the hash bucket. */ |
170 | if (iter->d->ht_table[iter->table][iter->index] == iter->entry) { |
171 | dictEntry *newde = activeDefragAlloc(iter->entry); |
172 | if (newde) { |
173 | iter->entry = newde; |
174 | iter->d->ht_table[iter->table][iter->index] = newde; |
175 | defragged++; |
176 | } |
177 | } |
178 | return defragged; |
179 | } |
180 | |
181 | /* Defrag helper for dict main allocations (dict struct, and hash tables). |
182 | * receives a pointer to the dict* and implicitly updates it when the dict |
183 | * struct itself was moved. Returns a stat of how many pointers were moved. */ |
184 | long dictDefragTables(dict* d) { |
185 | dictEntry **newtable; |
186 | long defragged = 0; |
187 | /* handle the first hash table */ |
188 | newtable = activeDefragAlloc(d->ht_table[0]); |
189 | if (newtable) |
190 | defragged++, d->ht_table[0] = newtable; |
191 | /* handle the second hash table */ |
192 | if (d->ht_table[1]) { |
193 | newtable = activeDefragAlloc(d->ht_table[1]); |
194 | if (newtable) |
195 | defragged++, d->ht_table[1] = newtable; |
196 | } |
197 | return defragged; |
198 | } |
199 | |
200 | /* Internal function used by zslDefrag */ |
201 | void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { |
202 | int i; |
203 | for (i = 0; i < zsl->level; i++) { |
204 | if (update[i]->level[i].forward == oldnode) |
205 | update[i]->level[i].forward = newnode; |
206 | } |
207 | serverAssert(zsl->header!=oldnode); |
208 | if (newnode->level[0].forward) { |
209 | serverAssert(newnode->level[0].forward->backward==oldnode); |
210 | newnode->level[0].forward->backward = newnode; |
211 | } else { |
212 | serverAssert(zsl->tail==oldnode); |
213 | zsl->tail = newnode; |
214 | } |
215 | } |
216 | |
217 | /* Defrag helper for sorted set. |
218 | * Update the robj pointer, defrag the skiplist struct and return the new score |
219 | * reference. We may not access oldele pointer (not even the pointer stored in |
220 | * the skiplist), as it was already freed. Newele may be null, in which case we |
221 | * only need to defrag the skiplist, but not update the obj pointer. |
222 | * When return value is non-NULL, it is the score reference that must be updated |
223 | * in the dict record. */ |
224 | double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { |
225 | zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx; |
226 | int i; |
227 | sds ele = newele? newele: oldele; |
228 | |
229 | /* find the skiplist node referring to the object that was moved, |
230 | * and all pointers that need to be updated if we'll end up moving the skiplist node. */ |
231 | x = zsl->header; |
232 | for (i = zsl->level-1; i >= 0; i--) { |
233 | while (x->level[i].forward && |
234 | x->level[i].forward->ele != oldele && /* make sure not to access the |
235 | ->obj pointer if it matches |
236 | oldele */ |
237 | (x->level[i].forward->score < score || |
238 | (x->level[i].forward->score == score && |
239 | sdscmp(x->level[i].forward->ele,ele) < 0))) |
240 | x = x->level[i].forward; |
241 | update[i] = x; |
242 | } |
243 | |
244 | /* update the robj pointer inside the skip list record. */ |
245 | x = x->level[0].forward; |
246 | serverAssert(x && score == x->score && x->ele==oldele); |
247 | if (newele) |
248 | x->ele = newele; |
249 | |
250 | /* try to defrag the skiplist record itself */ |
251 | newx = activeDefragAlloc(x); |
252 | if (newx) { |
253 | zslUpdateNode(zsl, x, newx, update); |
254 | return &newx->score; |
255 | } |
256 | return NULL; |
257 | } |
258 | |
259 | /* Defrag helper for sorted set. |
260 | * Defrag a single dict entry key name, and corresponding skiplist struct */ |
261 | long activeDefragZsetEntry(zset *zs, dictEntry *de) { |
262 | sds newsds; |
263 | double* newscore; |
264 | long defragged = 0; |
265 | sds sdsele = dictGetKey(de); |
266 | if ((newsds = activeDefragSds(sdsele))) |
267 | defragged++, de->key = newsds; |
268 | newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds); |
269 | if (newscore) { |
270 | dictSetVal(zs->dict, de, newscore); |
271 | defragged++; |
272 | } |
273 | return defragged; |
274 | } |
275 | |
276 | #define DEFRAG_SDS_DICT_NO_VAL 0 |
277 | #define DEFRAG_SDS_DICT_VAL_IS_SDS 1 |
278 | #define DEFRAG_SDS_DICT_VAL_IS_STROB 2 |
279 | #define DEFRAG_SDS_DICT_VAL_VOID_PTR 3 |
280 | #define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4 |
281 | |
282 | /* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ |
283 | long activeDefragSdsDict(dict* d, int val_type) { |
284 | dictIterator *di; |
285 | dictEntry *de; |
286 | long defragged = 0; |
287 | di = dictGetIterator(d); |
288 | while((de = dictNext(di)) != NULL) { |
289 | sds sdsele = dictGetKey(de), newsds; |
290 | if ((newsds = activeDefragSds(sdsele))) |
291 | de->key = newsds, defragged++; |
292 | /* defrag the value */ |
293 | if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { |
294 | sdsele = dictGetVal(de); |
295 | if ((newsds = activeDefragSds(sdsele))) |
296 | de->v.val = newsds, defragged++; |
297 | } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { |
298 | robj *newele, *ele = dictGetVal(de); |
299 | if ((newele = activeDefragStringOb(ele, &defragged))) |
300 | de->v.val = newele; |
301 | } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { |
302 | void *newptr, *ptr = dictGetVal(de); |
303 | if ((newptr = activeDefragAlloc(ptr))) |
304 | de->v.val = newptr, defragged++; |
305 | } else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) { |
306 | void *newptr, *ptr = dictGetVal(de); |
307 | if ((newptr = activeDefragLuaScript(ptr, &defragged))) |
308 | de->v.val = newptr; |
309 | } |
310 | defragged += dictIterDefragEntry(di); |
311 | } |
312 | dictReleaseIterator(di); |
313 | return defragged; |
314 | } |
315 | |
316 | /* Defrag a list of ptr, sds or robj string values */ |
317 | long activeDefragList(list *l, int val_type) { |
318 | long defragged = 0; |
319 | listNode *ln, *newln; |
320 | for (ln = l->head; ln; ln = ln->next) { |
321 | if ((newln = activeDefragAlloc(ln))) { |
322 | if (newln->prev) |
323 | newln->prev->next = newln; |
324 | else |
325 | l->head = newln; |
326 | if (newln->next) |
327 | newln->next->prev = newln; |
328 | else |
329 | l->tail = newln; |
330 | ln = newln; |
331 | defragged++; |
332 | } |
333 | if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { |
334 | sds newsds, sdsele = ln->value; |
335 | if ((newsds = activeDefragSds(sdsele))) |
336 | ln->value = newsds, defragged++; |
337 | } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { |
338 | robj *newele, *ele = ln->value; |
339 | if ((newele = activeDefragStringOb(ele, &defragged))) |
340 | ln->value = newele; |
341 | } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { |
342 | void *newptr, *ptr = ln->value; |
343 | if ((newptr = activeDefragAlloc(ptr))) |
344 | ln->value = newptr, defragged++; |
345 | } |
346 | } |
347 | return defragged; |
348 | } |
349 | |
350 | /* Defrag a list of sds values and a dict with the same sds keys */ |
351 | long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { |
352 | long defragged = 0; |
353 | sds newsds, sdsele; |
354 | listNode *ln, *newln; |
355 | dictIterator *di; |
356 | dictEntry *de; |
357 | /* Defrag the list and it's sds values */ |
358 | for (ln = l->head; ln; ln = ln->next) { |
359 | if ((newln = activeDefragAlloc(ln))) { |
360 | if (newln->prev) |
361 | newln->prev->next = newln; |
362 | else |
363 | l->head = newln; |
364 | if (newln->next) |
365 | newln->next->prev = newln; |
366 | else |
367 | l->tail = newln; |
368 | ln = newln; |
369 | defragged++; |
370 | } |
371 | sdsele = ln->value; |
372 | if ((newsds = activeDefragSds(sdsele))) { |
373 | /* When defragging an sds value, we need to update the dict key */ |
374 | uint64_t hash = dictGetHash(d, newsds); |
375 | dictEntry **deref = dictFindEntryRefByPtrAndHash(d, sdsele, hash); |
376 | if (deref) |
377 | (*deref)->key = newsds; |
378 | ln->value = newsds; |
379 | defragged++; |
380 | } |
381 | } |
382 | |
383 | /* Defrag the dict values (keys were already handled) */ |
384 | di = dictGetIterator(d); |
385 | while((de = dictNext(di)) != NULL) { |
386 | if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { |
387 | sds newsds, sdsele = dictGetVal(de); |
388 | if ((newsds = activeDefragSds(sdsele))) |
389 | de->v.val = newsds, defragged++; |
390 | } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { |
391 | robj *newele, *ele = dictGetVal(de); |
392 | if ((newele = activeDefragStringOb(ele, &defragged))) |
393 | de->v.val = newele; |
394 | } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { |
395 | void *newptr, *ptr = dictGetVal(de); |
396 | if ((newptr = activeDefragAlloc(ptr))) |
397 | de->v.val = newptr, defragged++; |
398 | } |
399 | defragged += dictIterDefragEntry(di); |
400 | } |
401 | dictReleaseIterator(di); |
402 | |
403 | return defragged; |
404 | } |
405 | |
406 | /* Utility function that replaces an old key pointer in the dictionary with a |
407 | * new pointer. Additionally, we try to defrag the dictEntry in that dict. |
408 | * Oldkey mey be a dead pointer and should not be accessed (we get a |
409 | * pre-calculated hash value). Newkey may be null if the key pointer wasn't |
410 | * moved. Return value is the dictEntry if found, or NULL if not found. |
411 | * NOTE: this is very ugly code, but it let's us avoid the complication of |
412 | * doing a scan on another dict. */ |
413 | dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { |
414 | dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); |
415 | if (deref) { |
416 | dictEntry *de = *deref; |
417 | dictEntry *newde = activeDefragAlloc(de); |
418 | if (newde) { |
419 | de = *deref = newde; |
420 | (*defragged)++; |
421 | } |
422 | if (newkey) |
423 | de->key = newkey; |
424 | return de; |
425 | } |
426 | return NULL; |
427 | } |
428 | |
429 | long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { |
430 | quicklistNode *newnode, *node = *node_ref; |
431 | long defragged = 0; |
432 | unsigned char *newzl; |
433 | if ((newnode = activeDefragAlloc(node))) { |
434 | if (newnode->prev) |
435 | newnode->prev->next = newnode; |
436 | else |
437 | ql->head = newnode; |
438 | if (newnode->next) |
439 | newnode->next->prev = newnode; |
440 | else |
441 | ql->tail = newnode; |
442 | *node_ref = node = newnode; |
443 | defragged++; |
444 | } |
445 | if ((newzl = activeDefragAlloc(node->entry))) |
446 | defragged++, node->entry = newzl; |
447 | return defragged; |
448 | } |
449 | |
450 | long activeDefragQuickListNodes(quicklist *ql) { |
451 | quicklistNode *node = ql->head; |
452 | long defragged = 0; |
453 | while (node) { |
454 | defragged += activeDefragQuickListNode(ql, &node); |
455 | node = node->next; |
456 | } |
457 | return defragged; |
458 | } |
459 | |
460 | /* when the value has lots of elements, we want to handle it later and not as |
461 | * part of the main dictionary scan. this is needed in order to prevent latency |
462 | * spikes when handling large items */ |
463 | void defragLater(redisDb *db, dictEntry *kde) { |
464 | sds key = sdsdup(dictGetKey(kde)); |
465 | listAddNodeTail(db->defrag_later, key); |
466 | } |
467 | |
468 | /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ |
469 | long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { |
470 | quicklist *ql = ob->ptr; |
471 | quicklistNode *node; |
472 | long iterations = 0; |
473 | int bookmark_failed = 0; |
474 | if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST) |
475 | return 0; |
476 | |
477 | if (*cursor == 0) { |
478 | /* if cursor is 0, we start new iteration */ |
479 | node = ql->head; |
480 | } else { |
481 | node = quicklistBookmarkFind(ql, "_AD" ); |
482 | if (!node) { |
483 | /* if the bookmark was deleted, it means we reached the end. */ |
484 | *cursor = 0; |
485 | return 0; |
486 | } |
487 | node = node->next; |
488 | } |
489 | |
490 | (*cursor)++; |
491 | while (node) { |
492 | (*defragged) += activeDefragQuickListNode(ql, &node); |
493 | server.stat_active_defrag_scanned++; |
494 | if (++iterations > 128 && !bookmark_failed) { |
495 | if (ustime() > endtime) { |
496 | if (!quicklistBookmarkCreate(&ql, "_AD" , node)) { |
497 | bookmark_failed = 1; |
498 | } else { |
499 | ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */ |
500 | return 1; |
501 | } |
502 | } |
503 | iterations = 0; |
504 | } |
505 | node = node->next; |
506 | } |
507 | quicklistBookmarkDelete(ql, "_AD" ); |
508 | *cursor = 0; |
509 | return bookmark_failed? 1: 0; |
510 | } |
511 | |
512 | typedef struct { |
513 | zset *zs; |
514 | long defragged; |
515 | } scanLaterZsetData; |
516 | |
517 | void scanLaterZsetCallback(void *privdata, const dictEntry *_de) { |
518 | dictEntry *de = (dictEntry*)_de; |
519 | scanLaterZsetData *data = privdata; |
520 | data->defragged += activeDefragZsetEntry(data->zs, de); |
521 | server.stat_active_defrag_scanned++; |
522 | } |
523 | |
524 | long scanLaterZset(robj *ob, unsigned long *cursor) { |
525 | if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) |
526 | return 0; |
527 | zset *zs = (zset*)ob->ptr; |
528 | dict *d = zs->dict; |
529 | scanLaterZsetData data = {zs, 0}; |
530 | *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data); |
531 | return data.defragged; |
532 | } |
533 | |
534 | void scanLaterSetCallback(void *privdata, const dictEntry *_de) { |
535 | dictEntry *de = (dictEntry*)_de; |
536 | long *defragged = privdata; |
537 | sds sdsele = dictGetKey(de), newsds; |
538 | if ((newsds = activeDefragSds(sdsele))) |
539 | (*defragged)++, de->key = newsds; |
540 | server.stat_active_defrag_scanned++; |
541 | } |
542 | |
543 | long scanLaterSet(robj *ob, unsigned long *cursor) { |
544 | long defragged = 0; |
545 | if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) |
546 | return 0; |
547 | dict *d = ob->ptr; |
548 | *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged); |
549 | return defragged; |
550 | } |
551 | |
552 | void scanLaterHashCallback(void *privdata, const dictEntry *_de) { |
553 | dictEntry *de = (dictEntry*)_de; |
554 | long *defragged = privdata; |
555 | sds sdsele = dictGetKey(de), newsds; |
556 | if ((newsds = activeDefragSds(sdsele))) |
557 | (*defragged)++, de->key = newsds; |
558 | sdsele = dictGetVal(de); |
559 | if ((newsds = activeDefragSds(sdsele))) |
560 | (*defragged)++, de->v.val = newsds; |
561 | server.stat_active_defrag_scanned++; |
562 | } |
563 | |
564 | long scanLaterHash(robj *ob, unsigned long *cursor) { |
565 | long defragged = 0; |
566 | if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) |
567 | return 0; |
568 | dict *d = ob->ptr; |
569 | *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged); |
570 | return defragged; |
571 | } |
572 | |
573 | long defragQuicklist(redisDb *db, dictEntry *kde) { |
574 | robj *ob = dictGetVal(kde); |
575 | long defragged = 0; |
576 | quicklist *ql = ob->ptr, *newql; |
577 | serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); |
578 | if ((newql = activeDefragAlloc(ql))) |
579 | defragged++, ob->ptr = ql = newql; |
580 | if (ql->len > server.active_defrag_max_scan_fields) |
581 | defragLater(db, kde); |
582 | else |
583 | defragged += activeDefragQuickListNodes(ql); |
584 | return defragged; |
585 | } |
586 | |
587 | long defragZsetSkiplist(redisDb *db, dictEntry *kde) { |
588 | robj *ob = dictGetVal(kde); |
589 | long defragged = 0; |
590 | zset *zs = (zset*)ob->ptr; |
591 | zset *newzs; |
592 | zskiplist *newzsl; |
593 | dict *newdict; |
594 | dictEntry *de; |
595 | struct zskiplistNode *; |
596 | serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); |
597 | if ((newzs = activeDefragAlloc(zs))) |
598 | defragged++, ob->ptr = zs = newzs; |
599 | if ((newzsl = activeDefragAlloc(zs->zsl))) |
600 | defragged++, zs->zsl = newzsl; |
601 | if ((newheader = activeDefragAlloc(zs->zsl->header))) |
602 | defragged++, zs->zsl->header = newheader; |
603 | if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) |
604 | defragLater(db, kde); |
605 | else { |
606 | dictIterator *di = dictGetIterator(zs->dict); |
607 | while((de = dictNext(di)) != NULL) { |
608 | defragged += activeDefragZsetEntry(zs, de); |
609 | } |
610 | dictReleaseIterator(di); |
611 | } |
612 | /* handle the dict struct */ |
613 | if ((newdict = activeDefragAlloc(zs->dict))) |
614 | defragged++, zs->dict = newdict; |
615 | /* defrag the dict tables */ |
616 | defragged += dictDefragTables(zs->dict); |
617 | return defragged; |
618 | } |
619 | |
620 | long defragHash(redisDb *db, dictEntry *kde) { |
621 | long defragged = 0; |
622 | robj *ob = dictGetVal(kde); |
623 | dict *d, *newd; |
624 | serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); |
625 | d = ob->ptr; |
626 | if (dictSize(d) > server.active_defrag_max_scan_fields) |
627 | defragLater(db, kde); |
628 | else |
629 | defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); |
630 | /* handle the dict struct */ |
631 | if ((newd = activeDefragAlloc(ob->ptr))) |
632 | defragged++, ob->ptr = newd; |
633 | /* defrag the dict tables */ |
634 | defragged += dictDefragTables(ob->ptr); |
635 | return defragged; |
636 | } |
637 | |
638 | long defragSet(redisDb *db, dictEntry *kde) { |
639 | long defragged = 0; |
640 | robj *ob = dictGetVal(kde); |
641 | dict *d, *newd; |
642 | serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); |
643 | d = ob->ptr; |
644 | if (dictSize(d) > server.active_defrag_max_scan_fields) |
645 | defragLater(db, kde); |
646 | else |
647 | defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); |
648 | /* handle the dict struct */ |
649 | if ((newd = activeDefragAlloc(ob->ptr))) |
650 | defragged++, ob->ptr = newd; |
651 | /* defrag the dict tables */ |
652 | defragged += dictDefragTables(ob->ptr); |
653 | return defragged; |
654 | } |
655 | |
656 | /* Defrag callback for radix tree iterator, called for each node, |
657 | * used in order to defrag the nodes allocations. */ |
658 | int defragRaxNode(raxNode **noderef) { |
659 | raxNode *newnode = activeDefragAlloc(*noderef); |
660 | if (newnode) { |
661 | *noderef = newnode; |
662 | return 1; |
663 | } |
664 | return 0; |
665 | } |
666 | |
667 | /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ |
668 | int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { |
669 | static unsigned char last[sizeof(streamID)]; |
670 | raxIterator ri; |
671 | long iterations = 0; |
672 | if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) { |
673 | *cursor = 0; |
674 | return 0; |
675 | } |
676 | |
677 | stream *s = ob->ptr; |
678 | raxStart(&ri,s->rax); |
679 | if (*cursor == 0) { |
680 | /* if cursor is 0, we start new iteration */ |
681 | defragRaxNode(&s->rax->head); |
682 | /* assign the iterator node callback before the seek, so that the |
683 | * initial nodes that are processed till the first item are covered */ |
684 | ri.node_cb = defragRaxNode; |
685 | raxSeek(&ri,"^" ,NULL,0); |
686 | } else { |
687 | /* if cursor is non-zero, we seek to the static 'last' */ |
688 | if (!raxSeek(&ri,">" , last, sizeof(last))) { |
689 | *cursor = 0; |
690 | raxStop(&ri); |
691 | return 0; |
692 | } |
693 | /* assign the iterator node callback after the seek, so that the |
694 | * initial nodes that are processed till now aren't covered */ |
695 | ri.node_cb = defragRaxNode; |
696 | } |
697 | |
698 | (*cursor)++; |
699 | while (raxNext(&ri)) { |
700 | void *newdata = activeDefragAlloc(ri.data); |
701 | if (newdata) |
702 | raxSetData(ri.node, ri.data=newdata), (*defragged)++; |
703 | server.stat_active_defrag_scanned++; |
704 | if (++iterations > 128) { |
705 | if (ustime() > endtime) { |
706 | serverAssert(ri.key_len==sizeof(last)); |
707 | memcpy(last,ri.key,ri.key_len); |
708 | raxStop(&ri); |
709 | return 1; |
710 | } |
711 | iterations = 0; |
712 | } |
713 | } |
714 | raxStop(&ri); |
715 | *cursor = 0; |
716 | return 0; |
717 | } |
718 | |
719 | /* optional callback used defrag each rax element (not including the element pointer itself) */ |
720 | typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged); |
721 | |
722 | /* defrag radix tree including: |
723 | * 1) rax struct |
724 | * 2) rax nodes |
725 | * 3) rax entry data (only if defrag_data is specified) |
726 | * 4) call a callback per element, and allow the callback to return a new pointer for the element */ |
727 | long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { |
728 | long defragged = 0; |
729 | raxIterator ri; |
730 | rax* rax; |
731 | if ((rax = activeDefragAlloc(*raxref))) |
732 | defragged++, *raxref = rax; |
733 | rax = *raxref; |
734 | raxStart(&ri,rax); |
735 | ri.node_cb = defragRaxNode; |
736 | defragRaxNode(&rax->head); |
737 | raxSeek(&ri,"^" ,NULL,0); |
738 | while (raxNext(&ri)) { |
739 | void *newdata = NULL; |
740 | if (element_cb) |
741 | newdata = element_cb(&ri, element_cb_data, &defragged); |
742 | if (defrag_data && !newdata) |
743 | newdata = activeDefragAlloc(ri.data); |
744 | if (newdata) |
745 | raxSetData(ri.node, ri.data=newdata), defragged++; |
746 | } |
747 | raxStop(&ri); |
748 | return defragged; |
749 | } |
750 | |
751 | typedef struct { |
752 | streamCG *cg; |
753 | streamConsumer *c; |
754 | } PendingEntryContext; |
755 | |
756 | void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) { |
757 | UNUSED(defragged); |
758 | PendingEntryContext *ctx = privdata; |
759 | streamNACK *nack = ri->data, *newnack; |
760 | nack->consumer = ctx->c; /* update nack pointer to consumer */ |
761 | newnack = activeDefragAlloc(nack); |
762 | if (newnack) { |
763 | /* update consumer group pointer to the nack */ |
764 | void *prev; |
765 | raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); |
766 | serverAssert(prev==nack); |
767 | /* note: we don't increment 'defragged' that's done by the caller */ |
768 | } |
769 | return newnack; |
770 | } |
771 | |
772 | void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) { |
773 | streamConsumer *c = ri->data; |
774 | streamCG *cg = privdata; |
775 | void *newc = activeDefragAlloc(c); |
776 | if (newc) { |
777 | /* note: we don't increment 'defragged' that's done by the caller */ |
778 | c = newc; |
779 | } |
780 | sds newsds = activeDefragSds(c->name); |
781 | if (newsds) |
782 | (*defragged)++, c->name = newsds; |
783 | if (c->pel) { |
784 | PendingEntryContext pel_ctx = {cg, c}; |
785 | *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); |
786 | } |
787 | return newc; /* returns NULL if c was not defragged */ |
788 | } |
789 | |
790 | void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) { |
791 | streamCG *cg = ri->data; |
792 | UNUSED(privdata); |
793 | if (cg->consumers) |
794 | *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); |
795 | if (cg->pel) |
796 | *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL); |
797 | return NULL; |
798 | } |
799 | |
800 | long defragStream(redisDb *db, dictEntry *kde) { |
801 | long defragged = 0; |
802 | robj *ob = dictGetVal(kde); |
803 | serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); |
804 | stream *s = ob->ptr, *news; |
805 | |
806 | /* handle the main struct */ |
807 | if ((news = activeDefragAlloc(s))) |
808 | defragged++, ob->ptr = s = news; |
809 | |
810 | if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { |
811 | rax *newrax = activeDefragAlloc(s->rax); |
812 | if (newrax) |
813 | defragged++, s->rax = newrax; |
814 | defragLater(db, kde); |
815 | } else |
816 | defragged += defragRadixTree(&s->rax, 1, NULL, NULL); |
817 | |
818 | if (s->cgroups) |
819 | defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL); |
820 | return defragged; |
821 | } |
822 | |
823 | /* Defrag a module key. This is either done immediately or scheduled |
824 | * for later. Returns then number of pointers defragged. |
825 | */ |
826 | long defragModule(redisDb *db, dictEntry *kde) { |
827 | robj *obj = dictGetVal(kde); |
828 | serverAssert(obj->type == OBJ_MODULE); |
829 | long defragged = 0; |
830 | |
831 | if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id)) |
832 | defragLater(db, kde); |
833 | |
834 | return defragged; |
835 | } |
836 | |
837 | /* for each key we scan in the main dict, this function will attempt to defrag |
838 | * all the various pointers it has. Returns a stat of how many pointers were |
839 | * moved. */ |
840 | long defragKey(redisDb *db, dictEntry *de) { |
841 | sds keysds = dictGetKey(de); |
842 | robj *newob, *ob; |
843 | unsigned char *newzl; |
844 | long defragged = 0; |
845 | sds newsds; |
846 | |
847 | /* Try to defrag the key name. */ |
848 | newsds = activeDefragSds(keysds); |
849 | if (newsds) |
850 | defragged++, de->key = newsds; |
851 | if (dictSize(db->expires)) { |
852 | /* Dirty code: |
853 | * I can't search in db->expires for that key after i already released |
854 | * the pointer it holds it won't be able to do the string compare */ |
855 | uint64_t hash = dictGetHash(db->dict, de->key); |
856 | replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); |
857 | } |
858 | |
859 | /* Try to defrag robj and / or string value. */ |
860 | ob = dictGetVal(de); |
861 | if ((newob = activeDefragStringOb(ob, &defragged))) { |
862 | de->v.val = newob; |
863 | ob = newob; |
864 | } |
865 | |
866 | if (ob->type == OBJ_STRING) { |
867 | /* Already handled in activeDefragStringOb. */ |
868 | } else if (ob->type == OBJ_LIST) { |
869 | if (ob->encoding == OBJ_ENCODING_QUICKLIST) { |
870 | defragged += defragQuicklist(db, de); |
871 | } else { |
872 | serverPanic("Unknown list encoding" ); |
873 | } |
874 | } else if (ob->type == OBJ_SET) { |
875 | if (ob->encoding == OBJ_ENCODING_HT) { |
876 | defragged += defragSet(db, de); |
877 | } else if (ob->encoding == OBJ_ENCODING_INTSET) { |
878 | intset *newis, *is = ob->ptr; |
879 | if ((newis = activeDefragAlloc(is))) |
880 | defragged++, ob->ptr = newis; |
881 | } else { |
882 | serverPanic("Unknown set encoding" ); |
883 | } |
884 | } else if (ob->type == OBJ_ZSET) { |
885 | if (ob->encoding == OBJ_ENCODING_LISTPACK) { |
886 | if ((newzl = activeDefragAlloc(ob->ptr))) |
887 | defragged++, ob->ptr = newzl; |
888 | } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { |
889 | defragged += defragZsetSkiplist(db, de); |
890 | } else { |
891 | serverPanic("Unknown sorted set encoding" ); |
892 | } |
893 | } else if (ob->type == OBJ_HASH) { |
894 | if (ob->encoding == OBJ_ENCODING_LISTPACK) { |
895 | if ((newzl = activeDefragAlloc(ob->ptr))) |
896 | defragged++, ob->ptr = newzl; |
897 | } else if (ob->encoding == OBJ_ENCODING_HT) { |
898 | defragged += defragHash(db, de); |
899 | } else { |
900 | serverPanic("Unknown hash encoding" ); |
901 | } |
902 | } else if (ob->type == OBJ_STREAM) { |
903 | defragged += defragStream(db, de); |
904 | } else if (ob->type == OBJ_MODULE) { |
905 | defragged += defragModule(db, de); |
906 | } else { |
907 | serverPanic("Unknown object type" ); |
908 | } |
909 | return defragged; |
910 | } |
911 | |
912 | /* Defrag scan callback for the main db dictionary. */ |
913 | void defragScanCallback(void *privdata, const dictEntry *de) { |
914 | long defragged = defragKey((redisDb*)privdata, (dictEntry*)de); |
915 | server.stat_active_defrag_hits += defragged; |
916 | if(defragged) |
917 | server.stat_active_defrag_key_hits++; |
918 | else |
919 | server.stat_active_defrag_key_misses++; |
920 | server.stat_active_defrag_scanned++; |
921 | } |
922 | |
923 | /* Defrag scan callback for each hash table bucket, |
924 | * used in order to defrag the dictEntry allocations. */ |
925 | void defragDictBucketCallback(dict *d, dictEntry **bucketref) { |
926 | while(*bucketref) { |
927 | dictEntry *de = *bucketref, *newde; |
928 | if ((newde = activeDefragAlloc(de))) { |
929 | *bucketref = newde; |
930 | if (server.cluster_enabled && d == server.db[0].dict) { |
931 | /* Cluster keyspace dict. Update slot-to-entries mapping. */ |
932 | slotToKeyReplaceEntry(newde, server.db); |
933 | } |
934 | } |
935 | bucketref = &(*bucketref)->next; |
936 | } |
937 | } |
938 | |
939 | /* Utility function to get the fragmentation ratio from jemalloc. |
940 | * It is critical to do that by comparing only heap maps that belong to |
941 | * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this |
942 | * fragmentation ratio in order to decide if a defrag action should be taken |
943 | * or not, a false detection can cause the defragmenter to waste a lot of CPU |
944 | * without the possibility of getting any results. */ |
945 | float getAllocatorFragmentation(size_t *out_frag_bytes) { |
946 | size_t resident, active, allocated; |
947 | zmalloc_get_allocator_info(&allocated, &active, &resident); |
948 | float frag_pct = ((float)active / allocated)*100 - 100; |
949 | size_t frag_bytes = active - allocated; |
950 | float = ((float)resident / allocated)*100 - 100; |
951 | size_t = resident - allocated; |
952 | if(out_frag_bytes) |
953 | *out_frag_bytes = frag_bytes; |
954 | serverLog(LL_DEBUG, |
955 | "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)" , |
956 | allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes); |
957 | return frag_pct; |
958 | } |
959 | |
960 | /* We may need to defrag other globals, one small allocation can hold a full allocator run. |
961 | * so although small, it is still important to defrag these */ |
962 | long defragOtherGlobals() { |
963 | long defragged = 0; |
964 | |
965 | /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. |
966 | * but we assume most of these are short lived, we only need to defrag allocations |
967 | * that remain static for a long time */ |
968 | defragged += activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); |
969 | defragged += moduleDefragGlobals(); |
970 | return defragged; |
971 | } |
972 | |
973 | /* returns 0 more work may or may not be needed (see non-zero cursor), |
974 | * and 1 if time is up and more work is needed. */ |
975 | int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) { |
976 | if (de) { |
977 | robj *ob = dictGetVal(de); |
978 | if (ob->type == OBJ_LIST) { |
979 | return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits); |
980 | } else if (ob->type == OBJ_SET) { |
981 | server.stat_active_defrag_hits += scanLaterSet(ob, cursor); |
982 | } else if (ob->type == OBJ_ZSET) { |
983 | server.stat_active_defrag_hits += scanLaterZset(ob, cursor); |
984 | } else if (ob->type == OBJ_HASH) { |
985 | server.stat_active_defrag_hits += scanLaterHash(ob, cursor); |
986 | } else if (ob->type == OBJ_STREAM) { |
987 | return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); |
988 | } else if (ob->type == OBJ_MODULE) { |
989 | return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid); |
990 | } else { |
991 | *cursor = 0; /* object type may have changed since we schedule it for later */ |
992 | } |
993 | } else { |
994 | *cursor = 0; /* object may have been deleted already */ |
995 | } |
996 | return 0; |
997 | } |
998 | |
999 | /* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ |
1000 | static sds defrag_later_current_key = NULL; |
1001 | static unsigned long defrag_later_cursor = 0; |
1002 | |
1003 | /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ |
1004 | int defragLaterStep(redisDb *db, long long endtime) { |
1005 | unsigned int iterations = 0; |
1006 | unsigned long long prev_defragged = server.stat_active_defrag_hits; |
1007 | unsigned long long prev_scanned = server.stat_active_defrag_scanned; |
1008 | long long key_defragged; |
1009 | |
1010 | do { |
1011 | /* if we're not continuing a scan from the last call or loop, start a new one */ |
1012 | if (!defrag_later_cursor) { |
1013 | listNode *head = listFirst(db->defrag_later); |
1014 | |
1015 | /* Move on to next key */ |
1016 | if (defrag_later_current_key) { |
1017 | serverAssert(defrag_later_current_key == head->value); |
1018 | listDelNode(db->defrag_later, head); |
1019 | defrag_later_cursor = 0; |
1020 | defrag_later_current_key = NULL; |
1021 | } |
1022 | |
1023 | /* stop if we reached the last one. */ |
1024 | head = listFirst(db->defrag_later); |
1025 | if (!head) |
1026 | return 0; |
1027 | |
1028 | /* start a new key */ |
1029 | defrag_later_current_key = head->value; |
1030 | defrag_later_cursor = 0; |
1031 | } |
1032 | |
1033 | /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ |
1034 | dictEntry *de = dictFind(db->dict, defrag_later_current_key); |
1035 | key_defragged = server.stat_active_defrag_hits; |
1036 | do { |
1037 | int quit = 0; |
1038 | if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id)) |
1039 | quit = 1; /* time is up, we didn't finish all the work */ |
1040 | |
1041 | /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields |
1042 | * (if we have a lot of pointers in one hash bucket, or rehashing), |
1043 | * check if we reached the time limit. */ |
1044 | if (quit || (++iterations > 16 || |
1045 | server.stat_active_defrag_hits - prev_defragged > 512 || |
1046 | server.stat_active_defrag_scanned - prev_scanned > 64)) { |
1047 | if (quit || ustime() > endtime) { |
1048 | if(key_defragged != server.stat_active_defrag_hits) |
1049 | server.stat_active_defrag_key_hits++; |
1050 | else |
1051 | server.stat_active_defrag_key_misses++; |
1052 | return 1; |
1053 | } |
1054 | iterations = 0; |
1055 | prev_defragged = server.stat_active_defrag_hits; |
1056 | prev_scanned = server.stat_active_defrag_scanned; |
1057 | } |
1058 | } while(defrag_later_cursor); |
1059 | if(key_defragged != server.stat_active_defrag_hits) |
1060 | server.stat_active_defrag_key_hits++; |
1061 | else |
1062 | server.stat_active_defrag_key_misses++; |
1063 | } while(1); |
1064 | } |
1065 | |
1066 | #define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) |
1067 | #define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) |
1068 | |
1069 | /* decide if defrag is needed, and at what CPU effort to invest in it */ |
1070 | void computeDefragCycles() { |
1071 | size_t frag_bytes; |
1072 | float frag_pct = getAllocatorFragmentation(&frag_bytes); |
1073 | /* If we're not already running, and below the threshold, exit. */ |
1074 | if (!server.active_defrag_running) { |
1075 | if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) |
1076 | return; |
1077 | } |
1078 | |
1079 | /* Calculate the adaptive aggressiveness of the defrag */ |
1080 | int cpu_pct = INTERPOLATE(frag_pct, |
1081 | server.active_defrag_threshold_lower, |
1082 | server.active_defrag_threshold_upper, |
1083 | server.active_defrag_cycle_min, |
1084 | server.active_defrag_cycle_max); |
1085 | cpu_pct = LIMIT(cpu_pct, |
1086 | server.active_defrag_cycle_min, |
1087 | server.active_defrag_cycle_max); |
1088 | /* We allow increasing the aggressiveness during a scan, but don't |
1089 | * reduce it. */ |
1090 | if (cpu_pct > server.active_defrag_running) { |
1091 | server.active_defrag_running = cpu_pct; |
1092 | serverLog(LL_VERBOSE, |
1093 | "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%" , |
1094 | frag_pct, frag_bytes, cpu_pct); |
1095 | } |
1096 | } |
1097 | |
1098 | /* Perform incremental defragmentation work from the serverCron. |
1099 | * This works in a similar way to activeExpireCycle, in the sense that |
1100 | * we do incremental work across calls. */ |
1101 | void activeDefragCycle(void) { |
1102 | static int current_db = -1; |
1103 | static unsigned long cursor = 0; |
1104 | static redisDb *db = NULL; |
1105 | static long long start_scan, start_stat; |
1106 | unsigned int iterations = 0; |
1107 | unsigned long long prev_defragged = server.stat_active_defrag_hits; |
1108 | unsigned long long prev_scanned = server.stat_active_defrag_scanned; |
1109 | long long start, timelimit, endtime; |
1110 | mstime_t latency; |
1111 | int quit = 0; |
1112 | |
1113 | if (!server.active_defrag_enabled) { |
1114 | if (server.active_defrag_running) { |
1115 | /* if active defrag was disabled mid-run, start from fresh next time. */ |
1116 | server.active_defrag_running = 0; |
1117 | if (db) |
1118 | listEmpty(db->defrag_later); |
1119 | defrag_later_current_key = NULL; |
1120 | defrag_later_cursor = 0; |
1121 | current_db = -1; |
1122 | cursor = 0; |
1123 | db = NULL; |
1124 | goto update_metrics; |
1125 | } |
1126 | return; |
1127 | } |
1128 | |
1129 | if (hasActiveChildProcess()) |
1130 | return; /* Defragging memory while there's a fork will just do damage. */ |
1131 | |
1132 | /* Once a second, check if the fragmentation justfies starting a scan |
1133 | * or making it more aggressive. */ |
1134 | run_with_period(1000) { |
1135 | computeDefragCycles(); |
1136 | } |
1137 | if (!server.active_defrag_running) |
1138 | return; |
1139 | |
1140 | /* See activeExpireCycle for how timelimit is handled. */ |
1141 | start = ustime(); |
1142 | timelimit = 1000000*server.active_defrag_running/server.hz/100; |
1143 | if (timelimit <= 0) timelimit = 1; |
1144 | endtime = start + timelimit; |
1145 | latencyStartMonitor(latency); |
1146 | |
1147 | do { |
1148 | /* if we're not continuing a scan from the last call or loop, start a new one */ |
1149 | if (!cursor) { |
1150 | /* finish any leftovers from previous db before moving to the next one */ |
1151 | if (db && defragLaterStep(db, endtime)) { |
1152 | quit = 1; /* time is up, we didn't finish all the work */ |
1153 | break; /* this will exit the function and we'll continue on the next cycle */ |
1154 | } |
1155 | |
1156 | /* Move on to next database, and stop if we reached the last one. */ |
1157 | if (++current_db >= server.dbnum) { |
1158 | /* defrag other items not part of the db / keys */ |
1159 | server.stat_active_defrag_hits += defragOtherGlobals(); |
1160 | |
1161 | long long now = ustime(); |
1162 | size_t frag_bytes; |
1163 | float frag_pct = getAllocatorFragmentation(&frag_bytes); |
1164 | serverLog(LL_VERBOSE, |
1165 | "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu" , |
1166 | (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes); |
1167 | |
1168 | start_scan = now; |
1169 | current_db = -1; |
1170 | cursor = 0; |
1171 | db = NULL; |
1172 | server.active_defrag_running = 0; |
1173 | |
1174 | computeDefragCycles(); /* if another scan is needed, start it right away */ |
1175 | if (server.active_defrag_running != 0 && ustime() < endtime) |
1176 | continue; |
1177 | break; |
1178 | } |
1179 | else if (current_db==0) { |
1180 | /* Start a scan from the first database. */ |
1181 | start_scan = ustime(); |
1182 | start_stat = server.stat_active_defrag_hits; |
1183 | } |
1184 | |
1185 | db = &server.db[current_db]; |
1186 | cursor = 0; |
1187 | } |
1188 | |
1189 | do { |
1190 | /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ |
1191 | if (defragLaterStep(db, endtime)) { |
1192 | quit = 1; /* time is up, we didn't finish all the work */ |
1193 | break; /* this will exit the function and we'll continue on the next cycle */ |
1194 | } |
1195 | |
1196 | cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); |
1197 | |
1198 | /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys |
1199 | * (if we have a lot of pointers in one hash bucket or rehashing), |
1200 | * check if we reached the time limit. |
1201 | * But regardless, don't start a new db in this loop, this is because after |
1202 | * the last db we call defragOtherGlobals, which must be done in one cycle */ |
1203 | if (!cursor || (++iterations > 16 || |
1204 | server.stat_active_defrag_hits - prev_defragged > 512 || |
1205 | server.stat_active_defrag_scanned - prev_scanned > 64)) { |
1206 | if (!cursor || ustime() > endtime) { |
1207 | quit = 1; |
1208 | break; |
1209 | } |
1210 | iterations = 0; |
1211 | prev_defragged = server.stat_active_defrag_hits; |
1212 | prev_scanned = server.stat_active_defrag_scanned; |
1213 | } |
1214 | } while(cursor && !quit); |
1215 | } while(!quit); |
1216 | |
1217 | latencyEndMonitor(latency); |
1218 | latencyAddSampleIfNeeded("active-defrag-cycle" ,latency); |
1219 | |
1220 | update_metrics: |
1221 | if (server.active_defrag_running > 0) { |
1222 | if (server.stat_last_active_defrag_time == 0) |
1223 | elapsedStart(&server.stat_last_active_defrag_time); |
1224 | } else if (server.stat_last_active_defrag_time != 0) { |
1225 | server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); |
1226 | server.stat_last_active_defrag_time = 0; |
1227 | } |
1228 | } |
1229 | |
1230 | #else /* HAVE_DEFRAG */ |
1231 | |
1232 | void activeDefragCycle(void) { |
1233 | /* Not implemented yet. */ |
1234 | } |
1235 | |
1236 | void *activeDefragAlloc(void *ptr) { |
1237 | UNUSED(ptr); |
1238 | return NULL; |
1239 | } |
1240 | |
1241 | robj *activeDefragStringOb(robj *ob, long *defragged) { |
1242 | UNUSED(ob); |
1243 | UNUSED(defragged); |
1244 | return NULL; |
1245 | } |
1246 | |
1247 | #endif |
1248 | |