1/*
2 * Active memory defragmentation
3 * Try to find key / value allocations that need to be re-allocated in order
4 * to reduce external fragmentation.
5 * We do that by scanning the keyspace and for each pointer we have, we can try to
6 * ask the allocator if moving it to a new address will help reduce fragmentation.
7 *
8 * Copyright (c) 2020, Redis Labs, Inc
9 * All rights reserved.
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * * Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * * Redistributions in binary form must reproduce the above copyright
17 * notice, this list of conditions and the following disclaimer in the
18 * documentation and/or other materials provided with the distribution.
19 * * Neither the name of Redis nor the names of its contributors may be used
20 * to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "server.h"
37#include "cluster.h"
38#include <time.h>
39#include <assert.h>
40#include <stddef.h>
41
42#ifdef HAVE_DEFRAG
43
44/* this method was added to jemalloc in order to help us understand which
45 * pointers are worthwhile moving and which aren't */
46int je_get_defrag_hint(void* ptr);
47
48/* forward declarations*/
49void defragDictBucketCallback(dict *d, dictEntry **bucketref);
50dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
51
52/* Defrag helper for generic allocations.
53 *
54 * returns NULL in case the allocation wasn't moved.
55 * when it returns a non-null value, the old pointer was already released
56 * and should NOT be accessed. */
57void* activeDefragAlloc(void *ptr) {
58 size_t size;
59 void *newptr;
60 if(!je_get_defrag_hint(ptr)) {
61 server.stat_active_defrag_misses++;
62 return NULL;
63 }
64 /* move this allocation to a new allocation.
65 * make sure not to use the thread cache. so that we don't get back the same
66 * pointers we try to free */
67 size = zmalloc_size(ptr);
68 newptr = zmalloc_no_tcache(size);
69 memcpy(newptr, ptr, size);
70 zfree_no_tcache(ptr);
71 return newptr;
72}
73
74/*Defrag helper for sds strings
75 *
76 * returns NULL in case the allocation wasn't moved.
77 * when it returns a non-null value, the old pointer was already released
78 * and should NOT be accessed. */
79sds activeDefragSds(sds sdsptr) {
80 void* ptr = sdsAllocPtr(sdsptr);
81 void* newptr = activeDefragAlloc(ptr);
82 if (newptr) {
83 size_t offset = sdsptr - (char*)ptr;
84 sdsptr = (char*)newptr + offset;
85 return sdsptr;
86 }
87 return NULL;
88}
89
90/* Defrag helper for robj and/or string objects
91 *
92 * returns NULL in case the allocation wasn't moved.
93 * when it returns a non-null value, the old pointer was already released
94 * and should NOT be accessed. */
95robj *activeDefragStringOb(robj* ob, long *defragged) {
96 robj *ret = NULL;
97 if (ob->refcount!=1)
98 return NULL;
99
100 /* try to defrag robj (only if not an EMBSTR type (handled below). */
101 if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
102 if ((ret = activeDefragAlloc(ob))) {
103 ob = ret;
104 (*defragged)++;
105 }
106 }
107
108 /* try to defrag string object */
109 if (ob->type == OBJ_STRING) {
110 if(ob->encoding==OBJ_ENCODING_RAW) {
111 sds newsds = activeDefragSds((sds)ob->ptr);
112 if (newsds) {
113 ob->ptr = newsds;
114 (*defragged)++;
115 }
116 } else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
117 /* The sds is embedded in the object allocation, calculate the
118 * offset and update the pointer in the new allocation. */
119 long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
120 if ((ret = activeDefragAlloc(ob))) {
121 ret->ptr = (void*)((intptr_t)ret + ofs);
122 (*defragged)++;
123 }
124 } else if (ob->encoding!=OBJ_ENCODING_INT) {
125 serverPanic("Unknown string encoding");
126 }
127 }
128 return ret;
129}
130
131/* Defrag helper for lua scripts
132 *
133 * returns NULL in case the allocation wasn't moved.
134 * when it returns a non-null value, the old pointer was already released
135 * and should NOT be accessed. */
136luaScript *activeDefragLuaScript(luaScript *script, long *defragged) {
137 luaScript *ret = NULL;
138
139 /* try to defrag script struct */
140 if ((ret = activeDefragAlloc(script))) {
141 script = ret;
142 (*defragged)++;
143 }
144
145 /* try to defrag actual script object */
146 robj *ob = activeDefragStringOb(script->body, defragged);
147 if (ob) script->body = ob;
148
149 return ret;
150}
151
152/* Defrag helper for dictEntries to be used during dict iteration (called on
153 * each step). Returns a stat of how many pointers were moved. */
154long dictIterDefragEntry(dictIterator *iter) {
155 /* This function is a little bit dirty since it messes with the internals
156 * of the dict and it's iterator, but the benefit is that it is very easy
157 * to use, and require no other changes in the dict. */
158 long defragged = 0;
159 /* Handle the next entry (if there is one), and update the pointer in the
160 * current entry. */
161 if (iter->nextEntry) {
162 dictEntry *newde = activeDefragAlloc(iter->nextEntry);
163 if (newde) {
164 defragged++;
165 iter->nextEntry = newde;
166 iter->entry->next = newde;
167 }
168 }
169 /* handle the case of the first entry in the hash bucket. */
170 if (iter->d->ht_table[iter->table][iter->index] == iter->entry) {
171 dictEntry *newde = activeDefragAlloc(iter->entry);
172 if (newde) {
173 iter->entry = newde;
174 iter->d->ht_table[iter->table][iter->index] = newde;
175 defragged++;
176 }
177 }
178 return defragged;
179}
180
181/* Defrag helper for dict main allocations (dict struct, and hash tables).
182 * receives a pointer to the dict* and implicitly updates it when the dict
183 * struct itself was moved. Returns a stat of how many pointers were moved. */
184long dictDefragTables(dict* d) {
185 dictEntry **newtable;
186 long defragged = 0;
187 /* handle the first hash table */
188 newtable = activeDefragAlloc(d->ht_table[0]);
189 if (newtable)
190 defragged++, d->ht_table[0] = newtable;
191 /* handle the second hash table */
192 if (d->ht_table[1]) {
193 newtable = activeDefragAlloc(d->ht_table[1]);
194 if (newtable)
195 defragged++, d->ht_table[1] = newtable;
196 }
197 return defragged;
198}
199
200/* Internal function used by zslDefrag */
201void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
202 int i;
203 for (i = 0; i < zsl->level; i++) {
204 if (update[i]->level[i].forward == oldnode)
205 update[i]->level[i].forward = newnode;
206 }
207 serverAssert(zsl->header!=oldnode);
208 if (newnode->level[0].forward) {
209 serverAssert(newnode->level[0].forward->backward==oldnode);
210 newnode->level[0].forward->backward = newnode;
211 } else {
212 serverAssert(zsl->tail==oldnode);
213 zsl->tail = newnode;
214 }
215}
216
217/* Defrag helper for sorted set.
218 * Update the robj pointer, defrag the skiplist struct and return the new score
219 * reference. We may not access oldele pointer (not even the pointer stored in
220 * the skiplist), as it was already freed. Newele may be null, in which case we
221 * only need to defrag the skiplist, but not update the obj pointer.
222 * When return value is non-NULL, it is the score reference that must be updated
223 * in the dict record. */
224double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
225 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
226 int i;
227 sds ele = newele? newele: oldele;
228
229 /* find the skiplist node referring to the object that was moved,
230 * and all pointers that need to be updated if we'll end up moving the skiplist node. */
231 x = zsl->header;
232 for (i = zsl->level-1; i >= 0; i--) {
233 while (x->level[i].forward &&
234 x->level[i].forward->ele != oldele && /* make sure not to access the
235 ->obj pointer if it matches
236 oldele */
237 (x->level[i].forward->score < score ||
238 (x->level[i].forward->score == score &&
239 sdscmp(x->level[i].forward->ele,ele) < 0)))
240 x = x->level[i].forward;
241 update[i] = x;
242 }
243
244 /* update the robj pointer inside the skip list record. */
245 x = x->level[0].forward;
246 serverAssert(x && score == x->score && x->ele==oldele);
247 if (newele)
248 x->ele = newele;
249
250 /* try to defrag the skiplist record itself */
251 newx = activeDefragAlloc(x);
252 if (newx) {
253 zslUpdateNode(zsl, x, newx, update);
254 return &newx->score;
255 }
256 return NULL;
257}
258
259/* Defrag helper for sorted set.
260 * Defrag a single dict entry key name, and corresponding skiplist struct */
261long activeDefragZsetEntry(zset *zs, dictEntry *de) {
262 sds newsds;
263 double* newscore;
264 long defragged = 0;
265 sds sdsele = dictGetKey(de);
266 if ((newsds = activeDefragSds(sdsele)))
267 defragged++, de->key = newsds;
268 newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
269 if (newscore) {
270 dictSetVal(zs->dict, de, newscore);
271 defragged++;
272 }
273 return defragged;
274}
275
276#define DEFRAG_SDS_DICT_NO_VAL 0
277#define DEFRAG_SDS_DICT_VAL_IS_SDS 1
278#define DEFRAG_SDS_DICT_VAL_IS_STROB 2
279#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
280#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4
281
282/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
283long activeDefragSdsDict(dict* d, int val_type) {
284 dictIterator *di;
285 dictEntry *de;
286 long defragged = 0;
287 di = dictGetIterator(d);
288 while((de = dictNext(di)) != NULL) {
289 sds sdsele = dictGetKey(de), newsds;
290 if ((newsds = activeDefragSds(sdsele)))
291 de->key = newsds, defragged++;
292 /* defrag the value */
293 if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
294 sdsele = dictGetVal(de);
295 if ((newsds = activeDefragSds(sdsele)))
296 de->v.val = newsds, defragged++;
297 } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
298 robj *newele, *ele = dictGetVal(de);
299 if ((newele = activeDefragStringOb(ele, &defragged)))
300 de->v.val = newele;
301 } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
302 void *newptr, *ptr = dictGetVal(de);
303 if ((newptr = activeDefragAlloc(ptr)))
304 de->v.val = newptr, defragged++;
305 } else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) {
306 void *newptr, *ptr = dictGetVal(de);
307 if ((newptr = activeDefragLuaScript(ptr, &defragged)))
308 de->v.val = newptr;
309 }
310 defragged += dictIterDefragEntry(di);
311 }
312 dictReleaseIterator(di);
313 return defragged;
314}
315
316/* Defrag a list of ptr, sds or robj string values */
317long activeDefragList(list *l, int val_type) {
318 long defragged = 0;
319 listNode *ln, *newln;
320 for (ln = l->head; ln; ln = ln->next) {
321 if ((newln = activeDefragAlloc(ln))) {
322 if (newln->prev)
323 newln->prev->next = newln;
324 else
325 l->head = newln;
326 if (newln->next)
327 newln->next->prev = newln;
328 else
329 l->tail = newln;
330 ln = newln;
331 defragged++;
332 }
333 if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
334 sds newsds, sdsele = ln->value;
335 if ((newsds = activeDefragSds(sdsele)))
336 ln->value = newsds, defragged++;
337 } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
338 robj *newele, *ele = ln->value;
339 if ((newele = activeDefragStringOb(ele, &defragged)))
340 ln->value = newele;
341 } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
342 void *newptr, *ptr = ln->value;
343 if ((newptr = activeDefragAlloc(ptr)))
344 ln->value = newptr, defragged++;
345 }
346 }
347 return defragged;
348}
349
350/* Defrag a list of sds values and a dict with the same sds keys */
351long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
352 long defragged = 0;
353 sds newsds, sdsele;
354 listNode *ln, *newln;
355 dictIterator *di;
356 dictEntry *de;
357 /* Defrag the list and it's sds values */
358 for (ln = l->head; ln; ln = ln->next) {
359 if ((newln = activeDefragAlloc(ln))) {
360 if (newln->prev)
361 newln->prev->next = newln;
362 else
363 l->head = newln;
364 if (newln->next)
365 newln->next->prev = newln;
366 else
367 l->tail = newln;
368 ln = newln;
369 defragged++;
370 }
371 sdsele = ln->value;
372 if ((newsds = activeDefragSds(sdsele))) {
373 /* When defragging an sds value, we need to update the dict key */
374 uint64_t hash = dictGetHash(d, newsds);
375 dictEntry **deref = dictFindEntryRefByPtrAndHash(d, sdsele, hash);
376 if (deref)
377 (*deref)->key = newsds;
378 ln->value = newsds;
379 defragged++;
380 }
381 }
382
383 /* Defrag the dict values (keys were already handled) */
384 di = dictGetIterator(d);
385 while((de = dictNext(di)) != NULL) {
386 if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
387 sds newsds, sdsele = dictGetVal(de);
388 if ((newsds = activeDefragSds(sdsele)))
389 de->v.val = newsds, defragged++;
390 } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
391 robj *newele, *ele = dictGetVal(de);
392 if ((newele = activeDefragStringOb(ele, &defragged)))
393 de->v.val = newele;
394 } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
395 void *newptr, *ptr = dictGetVal(de);
396 if ((newptr = activeDefragAlloc(ptr)))
397 de->v.val = newptr, defragged++;
398 }
399 defragged += dictIterDefragEntry(di);
400 }
401 dictReleaseIterator(di);
402
403 return defragged;
404}
405
406/* Utility function that replaces an old key pointer in the dictionary with a
407 * new pointer. Additionally, we try to defrag the dictEntry in that dict.
408 * Oldkey mey be a dead pointer and should not be accessed (we get a
409 * pre-calculated hash value). Newkey may be null if the key pointer wasn't
410 * moved. Return value is the dictEntry if found, or NULL if not found.
411 * NOTE: this is very ugly code, but it let's us avoid the complication of
412 * doing a scan on another dict. */
413dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) {
414 dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
415 if (deref) {
416 dictEntry *de = *deref;
417 dictEntry *newde = activeDefragAlloc(de);
418 if (newde) {
419 de = *deref = newde;
420 (*defragged)++;
421 }
422 if (newkey)
423 de->key = newkey;
424 return de;
425 }
426 return NULL;
427}
428
429long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
430 quicklistNode *newnode, *node = *node_ref;
431 long defragged = 0;
432 unsigned char *newzl;
433 if ((newnode = activeDefragAlloc(node))) {
434 if (newnode->prev)
435 newnode->prev->next = newnode;
436 else
437 ql->head = newnode;
438 if (newnode->next)
439 newnode->next->prev = newnode;
440 else
441 ql->tail = newnode;
442 *node_ref = node = newnode;
443 defragged++;
444 }
445 if ((newzl = activeDefragAlloc(node->entry)))
446 defragged++, node->entry = newzl;
447 return defragged;
448}
449
450long activeDefragQuickListNodes(quicklist *ql) {
451 quicklistNode *node = ql->head;
452 long defragged = 0;
453 while (node) {
454 defragged += activeDefragQuickListNode(ql, &node);
455 node = node->next;
456 }
457 return defragged;
458}
459
460/* when the value has lots of elements, we want to handle it later and not as
461 * part of the main dictionary scan. this is needed in order to prevent latency
462 * spikes when handling large items */
463void defragLater(redisDb *db, dictEntry *kde) {
464 sds key = sdsdup(dictGetKey(kde));
465 listAddNodeTail(db->defrag_later, key);
466}
467
468/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
469long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
470 quicklist *ql = ob->ptr;
471 quicklistNode *node;
472 long iterations = 0;
473 int bookmark_failed = 0;
474 if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
475 return 0;
476
477 if (*cursor == 0) {
478 /* if cursor is 0, we start new iteration */
479 node = ql->head;
480 } else {
481 node = quicklistBookmarkFind(ql, "_AD");
482 if (!node) {
483 /* if the bookmark was deleted, it means we reached the end. */
484 *cursor = 0;
485 return 0;
486 }
487 node = node->next;
488 }
489
490 (*cursor)++;
491 while (node) {
492 (*defragged) += activeDefragQuickListNode(ql, &node);
493 server.stat_active_defrag_scanned++;
494 if (++iterations > 128 && !bookmark_failed) {
495 if (ustime() > endtime) {
496 if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
497 bookmark_failed = 1;
498 } else {
499 ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
500 return 1;
501 }
502 }
503 iterations = 0;
504 }
505 node = node->next;
506 }
507 quicklistBookmarkDelete(ql, "_AD");
508 *cursor = 0;
509 return bookmark_failed? 1: 0;
510}
511
512typedef struct {
513 zset *zs;
514 long defragged;
515} scanLaterZsetData;
516
517void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
518 dictEntry *de = (dictEntry*)_de;
519 scanLaterZsetData *data = privdata;
520 data->defragged += activeDefragZsetEntry(data->zs, de);
521 server.stat_active_defrag_scanned++;
522}
523
524long scanLaterZset(robj *ob, unsigned long *cursor) {
525 if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
526 return 0;
527 zset *zs = (zset*)ob->ptr;
528 dict *d = zs->dict;
529 scanLaterZsetData data = {zs, 0};
530 *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data);
531 return data.defragged;
532}
533
534void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
535 dictEntry *de = (dictEntry*)_de;
536 long *defragged = privdata;
537 sds sdsele = dictGetKey(de), newsds;
538 if ((newsds = activeDefragSds(sdsele)))
539 (*defragged)++, de->key = newsds;
540 server.stat_active_defrag_scanned++;
541}
542
543long scanLaterSet(robj *ob, unsigned long *cursor) {
544 long defragged = 0;
545 if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
546 return 0;
547 dict *d = ob->ptr;
548 *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged);
549 return defragged;
550}
551
552void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
553 dictEntry *de = (dictEntry*)_de;
554 long *defragged = privdata;
555 sds sdsele = dictGetKey(de), newsds;
556 if ((newsds = activeDefragSds(sdsele)))
557 (*defragged)++, de->key = newsds;
558 sdsele = dictGetVal(de);
559 if ((newsds = activeDefragSds(sdsele)))
560 (*defragged)++, de->v.val = newsds;
561 server.stat_active_defrag_scanned++;
562}
563
564long scanLaterHash(robj *ob, unsigned long *cursor) {
565 long defragged = 0;
566 if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
567 return 0;
568 dict *d = ob->ptr;
569 *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged);
570 return defragged;
571}
572
573long defragQuicklist(redisDb *db, dictEntry *kde) {
574 robj *ob = dictGetVal(kde);
575 long defragged = 0;
576 quicklist *ql = ob->ptr, *newql;
577 serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
578 if ((newql = activeDefragAlloc(ql)))
579 defragged++, ob->ptr = ql = newql;
580 if (ql->len > server.active_defrag_max_scan_fields)
581 defragLater(db, kde);
582 else
583 defragged += activeDefragQuickListNodes(ql);
584 return defragged;
585}
586
587long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
588 robj *ob = dictGetVal(kde);
589 long defragged = 0;
590 zset *zs = (zset*)ob->ptr;
591 zset *newzs;
592 zskiplist *newzsl;
593 dict *newdict;
594 dictEntry *de;
595 struct zskiplistNode *newheader;
596 serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
597 if ((newzs = activeDefragAlloc(zs)))
598 defragged++, ob->ptr = zs = newzs;
599 if ((newzsl = activeDefragAlloc(zs->zsl)))
600 defragged++, zs->zsl = newzsl;
601 if ((newheader = activeDefragAlloc(zs->zsl->header)))
602 defragged++, zs->zsl->header = newheader;
603 if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
604 defragLater(db, kde);
605 else {
606 dictIterator *di = dictGetIterator(zs->dict);
607 while((de = dictNext(di)) != NULL) {
608 defragged += activeDefragZsetEntry(zs, de);
609 }
610 dictReleaseIterator(di);
611 }
612 /* handle the dict struct */
613 if ((newdict = activeDefragAlloc(zs->dict)))
614 defragged++, zs->dict = newdict;
615 /* defrag the dict tables */
616 defragged += dictDefragTables(zs->dict);
617 return defragged;
618}
619
620long defragHash(redisDb *db, dictEntry *kde) {
621 long defragged = 0;
622 robj *ob = dictGetVal(kde);
623 dict *d, *newd;
624 serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
625 d = ob->ptr;
626 if (dictSize(d) > server.active_defrag_max_scan_fields)
627 defragLater(db, kde);
628 else
629 defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
630 /* handle the dict struct */
631 if ((newd = activeDefragAlloc(ob->ptr)))
632 defragged++, ob->ptr = newd;
633 /* defrag the dict tables */
634 defragged += dictDefragTables(ob->ptr);
635 return defragged;
636}
637
638long defragSet(redisDb *db, dictEntry *kde) {
639 long defragged = 0;
640 robj *ob = dictGetVal(kde);
641 dict *d, *newd;
642 serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
643 d = ob->ptr;
644 if (dictSize(d) > server.active_defrag_max_scan_fields)
645 defragLater(db, kde);
646 else
647 defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
648 /* handle the dict struct */
649 if ((newd = activeDefragAlloc(ob->ptr)))
650 defragged++, ob->ptr = newd;
651 /* defrag the dict tables */
652 defragged += dictDefragTables(ob->ptr);
653 return defragged;
654}
655
656/* Defrag callback for radix tree iterator, called for each node,
657 * used in order to defrag the nodes allocations. */
658int defragRaxNode(raxNode **noderef) {
659 raxNode *newnode = activeDefragAlloc(*noderef);
660 if (newnode) {
661 *noderef = newnode;
662 return 1;
663 }
664 return 0;
665}
666
667/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
668int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
669 static unsigned char last[sizeof(streamID)];
670 raxIterator ri;
671 long iterations = 0;
672 if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
673 *cursor = 0;
674 return 0;
675 }
676
677 stream *s = ob->ptr;
678 raxStart(&ri,s->rax);
679 if (*cursor == 0) {
680 /* if cursor is 0, we start new iteration */
681 defragRaxNode(&s->rax->head);
682 /* assign the iterator node callback before the seek, so that the
683 * initial nodes that are processed till the first item are covered */
684 ri.node_cb = defragRaxNode;
685 raxSeek(&ri,"^",NULL,0);
686 } else {
687 /* if cursor is non-zero, we seek to the static 'last' */
688 if (!raxSeek(&ri,">", last, sizeof(last))) {
689 *cursor = 0;
690 raxStop(&ri);
691 return 0;
692 }
693 /* assign the iterator node callback after the seek, so that the
694 * initial nodes that are processed till now aren't covered */
695 ri.node_cb = defragRaxNode;
696 }
697
698 (*cursor)++;
699 while (raxNext(&ri)) {
700 void *newdata = activeDefragAlloc(ri.data);
701 if (newdata)
702 raxSetData(ri.node, ri.data=newdata), (*defragged)++;
703 server.stat_active_defrag_scanned++;
704 if (++iterations > 128) {
705 if (ustime() > endtime) {
706 serverAssert(ri.key_len==sizeof(last));
707 memcpy(last,ri.key,ri.key_len);
708 raxStop(&ri);
709 return 1;
710 }
711 iterations = 0;
712 }
713 }
714 raxStop(&ri);
715 *cursor = 0;
716 return 0;
717}
718
719/* optional callback used defrag each rax element (not including the element pointer itself) */
720typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
721
722/* defrag radix tree including:
723 * 1) rax struct
724 * 2) rax nodes
725 * 3) rax entry data (only if defrag_data is specified)
726 * 4) call a callback per element, and allow the callback to return a new pointer for the element */
727long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
728 long defragged = 0;
729 raxIterator ri;
730 rax* rax;
731 if ((rax = activeDefragAlloc(*raxref)))
732 defragged++, *raxref = rax;
733 rax = *raxref;
734 raxStart(&ri,rax);
735 ri.node_cb = defragRaxNode;
736 defragRaxNode(&rax->head);
737 raxSeek(&ri,"^",NULL,0);
738 while (raxNext(&ri)) {
739 void *newdata = NULL;
740 if (element_cb)
741 newdata = element_cb(&ri, element_cb_data, &defragged);
742 if (defrag_data && !newdata)
743 newdata = activeDefragAlloc(ri.data);
744 if (newdata)
745 raxSetData(ri.node, ri.data=newdata), defragged++;
746 }
747 raxStop(&ri);
748 return defragged;
749}
750
751typedef struct {
752 streamCG *cg;
753 streamConsumer *c;
754} PendingEntryContext;
755
756void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
757 UNUSED(defragged);
758 PendingEntryContext *ctx = privdata;
759 streamNACK *nack = ri->data, *newnack;
760 nack->consumer = ctx->c; /* update nack pointer to consumer */
761 newnack = activeDefragAlloc(nack);
762 if (newnack) {
763 /* update consumer group pointer to the nack */
764 void *prev;
765 raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
766 serverAssert(prev==nack);
767 /* note: we don't increment 'defragged' that's done by the caller */
768 }
769 return newnack;
770}
771
772void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
773 streamConsumer *c = ri->data;
774 streamCG *cg = privdata;
775 void *newc = activeDefragAlloc(c);
776 if (newc) {
777 /* note: we don't increment 'defragged' that's done by the caller */
778 c = newc;
779 }
780 sds newsds = activeDefragSds(c->name);
781 if (newsds)
782 (*defragged)++, c->name = newsds;
783 if (c->pel) {
784 PendingEntryContext pel_ctx = {cg, c};
785 *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
786 }
787 return newc; /* returns NULL if c was not defragged */
788}
789
790void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
791 streamCG *cg = ri->data;
792 UNUSED(privdata);
793 if (cg->consumers)
794 *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
795 if (cg->pel)
796 *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL);
797 return NULL;
798}
799
800long defragStream(redisDb *db, dictEntry *kde) {
801 long defragged = 0;
802 robj *ob = dictGetVal(kde);
803 serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
804 stream *s = ob->ptr, *news;
805
806 /* handle the main struct */
807 if ((news = activeDefragAlloc(s)))
808 defragged++, ob->ptr = s = news;
809
810 if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
811 rax *newrax = activeDefragAlloc(s->rax);
812 if (newrax)
813 defragged++, s->rax = newrax;
814 defragLater(db, kde);
815 } else
816 defragged += defragRadixTree(&s->rax, 1, NULL, NULL);
817
818 if (s->cgroups)
819 defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
820 return defragged;
821}
822
823/* Defrag a module key. This is either done immediately or scheduled
824 * for later. Returns then number of pointers defragged.
825 */
826long defragModule(redisDb *db, dictEntry *kde) {
827 robj *obj = dictGetVal(kde);
828 serverAssert(obj->type == OBJ_MODULE);
829 long defragged = 0;
830
831 if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id))
832 defragLater(db, kde);
833
834 return defragged;
835}
836
837/* for each key we scan in the main dict, this function will attempt to defrag
838 * all the various pointers it has. Returns a stat of how many pointers were
839 * moved. */
840long defragKey(redisDb *db, dictEntry *de) {
841 sds keysds = dictGetKey(de);
842 robj *newob, *ob;
843 unsigned char *newzl;
844 long defragged = 0;
845 sds newsds;
846
847 /* Try to defrag the key name. */
848 newsds = activeDefragSds(keysds);
849 if (newsds)
850 defragged++, de->key = newsds;
851 if (dictSize(db->expires)) {
852 /* Dirty code:
853 * I can't search in db->expires for that key after i already released
854 * the pointer it holds it won't be able to do the string compare */
855 uint64_t hash = dictGetHash(db->dict, de->key);
856 replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
857 }
858
859 /* Try to defrag robj and / or string value. */
860 ob = dictGetVal(de);
861 if ((newob = activeDefragStringOb(ob, &defragged))) {
862 de->v.val = newob;
863 ob = newob;
864 }
865
866 if (ob->type == OBJ_STRING) {
867 /* Already handled in activeDefragStringOb. */
868 } else if (ob->type == OBJ_LIST) {
869 if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
870 defragged += defragQuicklist(db, de);
871 } else {
872 serverPanic("Unknown list encoding");
873 }
874 } else if (ob->type == OBJ_SET) {
875 if (ob->encoding == OBJ_ENCODING_HT) {
876 defragged += defragSet(db, de);
877 } else if (ob->encoding == OBJ_ENCODING_INTSET) {
878 intset *newis, *is = ob->ptr;
879 if ((newis = activeDefragAlloc(is)))
880 defragged++, ob->ptr = newis;
881 } else {
882 serverPanic("Unknown set encoding");
883 }
884 } else if (ob->type == OBJ_ZSET) {
885 if (ob->encoding == OBJ_ENCODING_LISTPACK) {
886 if ((newzl = activeDefragAlloc(ob->ptr)))
887 defragged++, ob->ptr = newzl;
888 } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
889 defragged += defragZsetSkiplist(db, de);
890 } else {
891 serverPanic("Unknown sorted set encoding");
892 }
893 } else if (ob->type == OBJ_HASH) {
894 if (ob->encoding == OBJ_ENCODING_LISTPACK) {
895 if ((newzl = activeDefragAlloc(ob->ptr)))
896 defragged++, ob->ptr = newzl;
897 } else if (ob->encoding == OBJ_ENCODING_HT) {
898 defragged += defragHash(db, de);
899 } else {
900 serverPanic("Unknown hash encoding");
901 }
902 } else if (ob->type == OBJ_STREAM) {
903 defragged += defragStream(db, de);
904 } else if (ob->type == OBJ_MODULE) {
905 defragged += defragModule(db, de);
906 } else {
907 serverPanic("Unknown object type");
908 }
909 return defragged;
910}
911
912/* Defrag scan callback for the main db dictionary. */
913void defragScanCallback(void *privdata, const dictEntry *de) {
914 long defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
915 server.stat_active_defrag_hits += defragged;
916 if(defragged)
917 server.stat_active_defrag_key_hits++;
918 else
919 server.stat_active_defrag_key_misses++;
920 server.stat_active_defrag_scanned++;
921}
922
923/* Defrag scan callback for each hash table bucket,
924 * used in order to defrag the dictEntry allocations. */
925void defragDictBucketCallback(dict *d, dictEntry **bucketref) {
926 while(*bucketref) {
927 dictEntry *de = *bucketref, *newde;
928 if ((newde = activeDefragAlloc(de))) {
929 *bucketref = newde;
930 if (server.cluster_enabled && d == server.db[0].dict) {
931 /* Cluster keyspace dict. Update slot-to-entries mapping. */
932 slotToKeyReplaceEntry(newde, server.db);
933 }
934 }
935 bucketref = &(*bucketref)->next;
936 }
937}
938
939/* Utility function to get the fragmentation ratio from jemalloc.
940 * It is critical to do that by comparing only heap maps that belong to
941 * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
942 * fragmentation ratio in order to decide if a defrag action should be taken
943 * or not, a false detection can cause the defragmenter to waste a lot of CPU
944 * without the possibility of getting any results. */
945float getAllocatorFragmentation(size_t *out_frag_bytes) {
946 size_t resident, active, allocated;
947 zmalloc_get_allocator_info(&allocated, &active, &resident);
948 float frag_pct = ((float)active / allocated)*100 - 100;
949 size_t frag_bytes = active - allocated;
950 float rss_pct = ((float)resident / allocated)*100 - 100;
951 size_t rss_bytes = resident - allocated;
952 if(out_frag_bytes)
953 *out_frag_bytes = frag_bytes;
954 serverLog(LL_DEBUG,
955 "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)",
956 allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes);
957 return frag_pct;
958}
959
960/* We may need to defrag other globals, one small allocation can hold a full allocator run.
961 * so although small, it is still important to defrag these */
962long defragOtherGlobals() {
963 long defragged = 0;
964
965 /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
966 * but we assume most of these are short lived, we only need to defrag allocations
967 * that remain static for a long time */
968 defragged += activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
969 defragged += moduleDefragGlobals();
970 return defragged;
971}
972
973/* returns 0 more work may or may not be needed (see non-zero cursor),
974 * and 1 if time is up and more work is needed. */
975int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) {
976 if (de) {
977 robj *ob = dictGetVal(de);
978 if (ob->type == OBJ_LIST) {
979 return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits);
980 } else if (ob->type == OBJ_SET) {
981 server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
982 } else if (ob->type == OBJ_ZSET) {
983 server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
984 } else if (ob->type == OBJ_HASH) {
985 server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
986 } else if (ob->type == OBJ_STREAM) {
987 return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
988 } else if (ob->type == OBJ_MODULE) {
989 return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid);
990 } else {
991 *cursor = 0; /* object type may have changed since we schedule it for later */
992 }
993 } else {
994 *cursor = 0; /* object may have been deleted already */
995 }
996 return 0;
997}
998
999/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */
1000static sds defrag_later_current_key = NULL;
1001static unsigned long defrag_later_cursor = 0;
1002
1003/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
1004int defragLaterStep(redisDb *db, long long endtime) {
1005 unsigned int iterations = 0;
1006 unsigned long long prev_defragged = server.stat_active_defrag_hits;
1007 unsigned long long prev_scanned = server.stat_active_defrag_scanned;
1008 long long key_defragged;
1009
1010 do {
1011 /* if we're not continuing a scan from the last call or loop, start a new one */
1012 if (!defrag_later_cursor) {
1013 listNode *head = listFirst(db->defrag_later);
1014
1015 /* Move on to next key */
1016 if (defrag_later_current_key) {
1017 serverAssert(defrag_later_current_key == head->value);
1018 listDelNode(db->defrag_later, head);
1019 defrag_later_cursor = 0;
1020 defrag_later_current_key = NULL;
1021 }
1022
1023 /* stop if we reached the last one. */
1024 head = listFirst(db->defrag_later);
1025 if (!head)
1026 return 0;
1027
1028 /* start a new key */
1029 defrag_later_current_key = head->value;
1030 defrag_later_cursor = 0;
1031 }
1032
1033 /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
1034 dictEntry *de = dictFind(db->dict, defrag_later_current_key);
1035 key_defragged = server.stat_active_defrag_hits;
1036 do {
1037 int quit = 0;
1038 if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id))
1039 quit = 1; /* time is up, we didn't finish all the work */
1040
1041 /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
1042 * (if we have a lot of pointers in one hash bucket, or rehashing),
1043 * check if we reached the time limit. */
1044 if (quit || (++iterations > 16 ||
1045 server.stat_active_defrag_hits - prev_defragged > 512 ||
1046 server.stat_active_defrag_scanned - prev_scanned > 64)) {
1047 if (quit || ustime() > endtime) {
1048 if(key_defragged != server.stat_active_defrag_hits)
1049 server.stat_active_defrag_key_hits++;
1050 else
1051 server.stat_active_defrag_key_misses++;
1052 return 1;
1053 }
1054 iterations = 0;
1055 prev_defragged = server.stat_active_defrag_hits;
1056 prev_scanned = server.stat_active_defrag_scanned;
1057 }
1058 } while(defrag_later_cursor);
1059 if(key_defragged != server.stat_active_defrag_hits)
1060 server.stat_active_defrag_key_hits++;
1061 else
1062 server.stat_active_defrag_key_misses++;
1063 } while(1);
1064}
1065
1066#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
1067#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
1068
1069/* decide if defrag is needed, and at what CPU effort to invest in it */
1070void computeDefragCycles() {
1071 size_t frag_bytes;
1072 float frag_pct = getAllocatorFragmentation(&frag_bytes);
1073 /* If we're not already running, and below the threshold, exit. */
1074 if (!server.active_defrag_running) {
1075 if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
1076 return;
1077 }
1078
1079 /* Calculate the adaptive aggressiveness of the defrag */
1080 int cpu_pct = INTERPOLATE(frag_pct,
1081 server.active_defrag_threshold_lower,
1082 server.active_defrag_threshold_upper,
1083 server.active_defrag_cycle_min,
1084 server.active_defrag_cycle_max);
1085 cpu_pct = LIMIT(cpu_pct,
1086 server.active_defrag_cycle_min,
1087 server.active_defrag_cycle_max);
1088 /* We allow increasing the aggressiveness during a scan, but don't
1089 * reduce it. */
1090 if (cpu_pct > server.active_defrag_running) {
1091 server.active_defrag_running = cpu_pct;
1092 serverLog(LL_VERBOSE,
1093 "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
1094 frag_pct, frag_bytes, cpu_pct);
1095 }
1096}
1097
1098/* Perform incremental defragmentation work from the serverCron.
1099 * This works in a similar way to activeExpireCycle, in the sense that
1100 * we do incremental work across calls. */
1101void activeDefragCycle(void) {
1102 static int current_db = -1;
1103 static unsigned long cursor = 0;
1104 static redisDb *db = NULL;
1105 static long long start_scan, start_stat;
1106 unsigned int iterations = 0;
1107 unsigned long long prev_defragged = server.stat_active_defrag_hits;
1108 unsigned long long prev_scanned = server.stat_active_defrag_scanned;
1109 long long start, timelimit, endtime;
1110 mstime_t latency;
1111 int quit = 0;
1112
1113 if (!server.active_defrag_enabled) {
1114 if (server.active_defrag_running) {
1115 /* if active defrag was disabled mid-run, start from fresh next time. */
1116 server.active_defrag_running = 0;
1117 if (db)
1118 listEmpty(db->defrag_later);
1119 defrag_later_current_key = NULL;
1120 defrag_later_cursor = 0;
1121 current_db = -1;
1122 cursor = 0;
1123 db = NULL;
1124 goto update_metrics;
1125 }
1126 return;
1127 }
1128
1129 if (hasActiveChildProcess())
1130 return; /* Defragging memory while there's a fork will just do damage. */
1131
1132 /* Once a second, check if the fragmentation justfies starting a scan
1133 * or making it more aggressive. */
1134 run_with_period(1000) {
1135 computeDefragCycles();
1136 }
1137 if (!server.active_defrag_running)
1138 return;
1139
1140 /* See activeExpireCycle for how timelimit is handled. */
1141 start = ustime();
1142 timelimit = 1000000*server.active_defrag_running/server.hz/100;
1143 if (timelimit <= 0) timelimit = 1;
1144 endtime = start + timelimit;
1145 latencyStartMonitor(latency);
1146
1147 do {
1148 /* if we're not continuing a scan from the last call or loop, start a new one */
1149 if (!cursor) {
1150 /* finish any leftovers from previous db before moving to the next one */
1151 if (db && defragLaterStep(db, endtime)) {
1152 quit = 1; /* time is up, we didn't finish all the work */
1153 break; /* this will exit the function and we'll continue on the next cycle */
1154 }
1155
1156 /* Move on to next database, and stop if we reached the last one. */
1157 if (++current_db >= server.dbnum) {
1158 /* defrag other items not part of the db / keys */
1159 server.stat_active_defrag_hits += defragOtherGlobals();
1160
1161 long long now = ustime();
1162 size_t frag_bytes;
1163 float frag_pct = getAllocatorFragmentation(&frag_bytes);
1164 serverLog(LL_VERBOSE,
1165 "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
1166 (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes);
1167
1168 start_scan = now;
1169 current_db = -1;
1170 cursor = 0;
1171 db = NULL;
1172 server.active_defrag_running = 0;
1173
1174 computeDefragCycles(); /* if another scan is needed, start it right away */
1175 if (server.active_defrag_running != 0 && ustime() < endtime)
1176 continue;
1177 break;
1178 }
1179 else if (current_db==0) {
1180 /* Start a scan from the first database. */
1181 start_scan = ustime();
1182 start_stat = server.stat_active_defrag_hits;
1183 }
1184
1185 db = &server.db[current_db];
1186 cursor = 0;
1187 }
1188
1189 do {
1190 /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
1191 if (defragLaterStep(db, endtime)) {
1192 quit = 1; /* time is up, we didn't finish all the work */
1193 break; /* this will exit the function and we'll continue on the next cycle */
1194 }
1195
1196 cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
1197
1198 /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
1199 * (if we have a lot of pointers in one hash bucket or rehashing),
1200 * check if we reached the time limit.
1201 * But regardless, don't start a new db in this loop, this is because after
1202 * the last db we call defragOtherGlobals, which must be done in one cycle */
1203 if (!cursor || (++iterations > 16 ||
1204 server.stat_active_defrag_hits - prev_defragged > 512 ||
1205 server.stat_active_defrag_scanned - prev_scanned > 64)) {
1206 if (!cursor || ustime() > endtime) {
1207 quit = 1;
1208 break;
1209 }
1210 iterations = 0;
1211 prev_defragged = server.stat_active_defrag_hits;
1212 prev_scanned = server.stat_active_defrag_scanned;
1213 }
1214 } while(cursor && !quit);
1215 } while(!quit);
1216
1217 latencyEndMonitor(latency);
1218 latencyAddSampleIfNeeded("active-defrag-cycle",latency);
1219
1220update_metrics:
1221 if (server.active_defrag_running > 0) {
1222 if (server.stat_last_active_defrag_time == 0)
1223 elapsedStart(&server.stat_last_active_defrag_time);
1224 } else if (server.stat_last_active_defrag_time != 0) {
1225 server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time);
1226 server.stat_last_active_defrag_time = 0;
1227 }
1228}
1229
1230#else /* HAVE_DEFRAG */
1231
1232void activeDefragCycle(void) {
1233 /* Not implemented yet. */
1234}
1235
1236void *activeDefragAlloc(void *ptr) {
1237 UNUSED(ptr);
1238 return NULL;
1239}
1240
1241robj *activeDefragStringOb(robj *ob, long *defragged) {
1242 UNUSED(ob);
1243 UNUSED(defragged);
1244 return NULL;
1245}
1246
1247#endif
1248