1#include "server.h"
2#include "bio.h"
3#include "atomicvar.h"
4#include "functions.h"
5
6static redisAtomic size_t lazyfree_objects = 0;
7static redisAtomic size_t lazyfreed_objects = 0;
8
9/* Release objects from the lazyfree thread. It's just decrRefCount()
10 * updating the count of objects to release. */
11void lazyfreeFreeObject(void *args[]) {
12 robj *o = (robj *) args[0];
13 decrRefCount(o);
14 atomicDecr(lazyfree_objects,1);
15 atomicIncr(lazyfreed_objects,1);
16}
17
18/* Release a database from the lazyfree thread. The 'db' pointer is the
19 * database which was substituted with a fresh one in the main thread
20 * when the database was logically deleted. */
21void lazyfreeFreeDatabase(void *args[]) {
22 dict *ht1 = (dict *) args[0];
23 dict *ht2 = (dict *) args[1];
24
25 size_t numkeys = dictSize(ht1);
26 dictRelease(ht1);
27 dictRelease(ht2);
28 atomicDecr(lazyfree_objects,numkeys);
29 atomicIncr(lazyfreed_objects,numkeys);
30}
31
32/* Release the key tracking table. */
33void lazyFreeTrackingTable(void *args[]) {
34 rax *rt = args[0];
35 size_t len = rt->numele;
36 freeTrackingRadixTree(rt);
37 atomicDecr(lazyfree_objects,len);
38 atomicIncr(lazyfreed_objects,len);
39}
40
41/* Release the lua_scripts dict. */
42void lazyFreeLuaScripts(void *args[]) {
43 dict *lua_scripts = args[0];
44 long long len = dictSize(lua_scripts);
45 dictRelease(lua_scripts);
46 atomicDecr(lazyfree_objects,len);
47 atomicIncr(lazyfreed_objects,len);
48}
49
50/* Release the functions ctx. */
51void lazyFreeFunctionsCtx(void *args[]) {
52 functionsLibCtx *functions_lib_ctx = args[0];
53 size_t len = functionsLibCtxfunctionsLen(functions_lib_ctx);
54 functionsLibCtxFree(functions_lib_ctx);
55 atomicDecr(lazyfree_objects,len);
56 atomicIncr(lazyfreed_objects,len);
57}
58
59/* Release replication backlog referencing memory. */
60void lazyFreeReplicationBacklogRefMem(void *args[]) {
61 list *blocks = args[0];
62 rax *index = args[1];
63 long long len = listLength(blocks);
64 len += raxSize(index);
65 listRelease(blocks);
66 raxFree(index);
67 atomicDecr(lazyfree_objects,len);
68 atomicIncr(lazyfreed_objects,len);
69}
70
71/* Return the number of currently pending objects to free. */
72size_t lazyfreeGetPendingObjectsCount(void) {
73 size_t aux;
74 atomicGet(lazyfree_objects,aux);
75 return aux;
76}
77
78/* Return the number of objects that have been freed. */
79size_t lazyfreeGetFreedObjectsCount(void) {
80 size_t aux;
81 atomicGet(lazyfreed_objects,aux);
82 return aux;
83}
84
85void lazyfreeResetStats() {
86 atomicSet(lazyfreed_objects,0);
87}
88
89/* Return the amount of work needed in order to free an object.
90 * The return value is not always the actual number of allocations the
91 * object is composed of, but a number proportional to it.
92 *
93 * For strings the function always returns 1.
94 *
95 * For aggregated objects represented by hash tables or other data structures
96 * the function just returns the number of elements the object is composed of.
97 *
98 * Objects composed of single allocations are always reported as having a
99 * single item even if they are actually logical composed of multiple
100 * elements.
101 *
102 * For lists the function returns the number of elements in the quicklist
103 * representing the list. */
104size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
105 if (obj->type == OBJ_LIST) {
106 quicklist *ql = obj->ptr;
107 return ql->len;
108 } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
109 dict *ht = obj->ptr;
110 return dictSize(ht);
111 } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
112 zset *zs = obj->ptr;
113 return zs->zsl->length;
114 } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
115 dict *ht = obj->ptr;
116 return dictSize(ht);
117 } else if (obj->type == OBJ_STREAM) {
118 size_t effort = 0;
119 stream *s = obj->ptr;
120
121 /* Make a best effort estimate to maintain constant runtime. Every macro
122 * node in the Stream is one allocation. */
123 effort += s->rax->numnodes;
124
125 /* Every consumer group is an allocation and so are the entries in its
126 * PEL. We use size of the first group's PEL as an estimate for all
127 * others. */
128 if (s->cgroups && raxSize(s->cgroups)) {
129 raxIterator ri;
130 streamCG *cg;
131 raxStart(&ri,s->cgroups);
132 raxSeek(&ri,"^",NULL,0);
133 /* There must be at least one group so the following should always
134 * work. */
135 serverAssert(raxNext(&ri));
136 cg = ri.data;
137 effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
138 raxStop(&ri);
139 }
140 return effort;
141 } else if (obj->type == OBJ_MODULE) {
142 size_t effort = moduleGetFreeEffort(key, obj, dbid);
143 /* If the module's free_effort returns 0, we will use asynchronous free
144 * memory by default. */
145 return effort == 0 ? ULONG_MAX : effort;
146 } else {
147 return 1; /* Everything else is a single allocation. */
148 }
149}
150
151/* If there are enough allocations to free the value object asynchronously, it
152 * may be put into a lazy free list instead of being freed synchronously. The
153 * lazy free list will be reclaimed in a different bio.c thread. If the value is
154 * composed of a few allocations, to free in a lazy way is actually just
155 * slower... So under a certain limit we just free the object synchronously. */
156#define LAZYFREE_THRESHOLD 64
157
158/* Free an object, if the object is huge enough, free it in async way. */
159void freeObjAsync(robj *key, robj *obj, int dbid) {
160 size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
161 /* Note that if the object is shared, to reclaim it now it is not
162 * possible. This rarely happens, however sometimes the implementation
163 * of parts of the Redis core may call incrRefCount() to protect
164 * objects, and then call dbDelete(). */
165 if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
166 atomicIncr(lazyfree_objects,1);
167 bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
168 } else {
169 decrRefCount(obj);
170 }
171}
172
173/* Empty a Redis DB asynchronously. What the function does actually is to
174 * create a new empty set of hash tables and scheduling the old ones for
175 * lazy freeing. */
176void emptyDbAsync(redisDb *db) {
177 dict *oldht1 = db->dict, *oldht2 = db->expires;
178 db->dict = dictCreate(&dbDictType);
179 db->expires = dictCreate(&dbExpiresDictType);
180 atomicIncr(lazyfree_objects,dictSize(oldht1));
181 bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2);
182}
183
184/* Free the key tracking table.
185 * If the table is huge enough, free it in async way. */
186void freeTrackingRadixTreeAsync(rax *tracking) {
187 /* Because this rax has only keys and no values so we use numnodes. */
188 if (tracking->numnodes > LAZYFREE_THRESHOLD) {
189 atomicIncr(lazyfree_objects,tracking->numele);
190 bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
191 } else {
192 freeTrackingRadixTree(tracking);
193 }
194}
195
196/* Free lua_scripts dict, if the dict is huge enough, free it in async way. */
197void freeLuaScriptsAsync(dict *lua_scripts) {
198 if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
199 atomicIncr(lazyfree_objects,dictSize(lua_scripts));
200 bioCreateLazyFreeJob(lazyFreeLuaScripts,1,lua_scripts);
201 } else {
202 dictRelease(lua_scripts);
203 }
204}
205
206/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
207void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
208 if (functionsLibCtxfunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
209 atomicIncr(lazyfree_objects,functionsLibCtxfunctionsLen(functions_lib_ctx));
210 bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx);
211 } else {
212 functionsLibCtxFree(functions_lib_ctx);
213 }
214}
215
216/* Free replication backlog referencing buffer blocks and rax index. */
217void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
218 if (listLength(blocks) > LAZYFREE_THRESHOLD ||
219 raxSize(index) > LAZYFREE_THRESHOLD)
220 {
221 atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
222 bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
223 } else {
224 listRelease(blocks);
225 raxFree(index);
226 }
227}
228