1 | #include "server.h" |
2 | #include "bio.h" |
3 | #include "atomicvar.h" |
4 | #include "functions.h" |
5 | |
6 | static redisAtomic size_t lazyfree_objects = 0; |
7 | static redisAtomic size_t lazyfreed_objects = 0; |
8 | |
9 | /* Release objects from the lazyfree thread. It's just decrRefCount() |
10 | * updating the count of objects to release. */ |
11 | void lazyfreeFreeObject(void *args[]) { |
12 | robj *o = (robj *) args[0]; |
13 | decrRefCount(o); |
14 | atomicDecr(lazyfree_objects,1); |
15 | atomicIncr(lazyfreed_objects,1); |
16 | } |
17 | |
18 | /* Release a database from the lazyfree thread. The 'db' pointer is the |
19 | * database which was substituted with a fresh one in the main thread |
20 | * when the database was logically deleted. */ |
21 | void lazyfreeFreeDatabase(void *args[]) { |
22 | dict *ht1 = (dict *) args[0]; |
23 | dict *ht2 = (dict *) args[1]; |
24 | |
25 | size_t numkeys = dictSize(ht1); |
26 | dictRelease(ht1); |
27 | dictRelease(ht2); |
28 | atomicDecr(lazyfree_objects,numkeys); |
29 | atomicIncr(lazyfreed_objects,numkeys); |
30 | } |
31 | |
32 | /* Release the key tracking table. */ |
33 | void lazyFreeTrackingTable(void *args[]) { |
34 | rax *rt = args[0]; |
35 | size_t len = rt->numele; |
36 | freeTrackingRadixTree(rt); |
37 | atomicDecr(lazyfree_objects,len); |
38 | atomicIncr(lazyfreed_objects,len); |
39 | } |
40 | |
41 | /* Release the lua_scripts dict. */ |
42 | void lazyFreeLuaScripts(void *args[]) { |
43 | dict *lua_scripts = args[0]; |
44 | long long len = dictSize(lua_scripts); |
45 | dictRelease(lua_scripts); |
46 | atomicDecr(lazyfree_objects,len); |
47 | atomicIncr(lazyfreed_objects,len); |
48 | } |
49 | |
50 | /* Release the functions ctx. */ |
51 | void lazyFreeFunctionsCtx(void *args[]) { |
52 | functionsLibCtx *functions_lib_ctx = args[0]; |
53 | size_t len = functionsLibCtxfunctionsLen(functions_lib_ctx); |
54 | functionsLibCtxFree(functions_lib_ctx); |
55 | atomicDecr(lazyfree_objects,len); |
56 | atomicIncr(lazyfreed_objects,len); |
57 | } |
58 | |
59 | /* Release replication backlog referencing memory. */ |
60 | void lazyFreeReplicationBacklogRefMem(void *args[]) { |
61 | list *blocks = args[0]; |
62 | rax *index = args[1]; |
63 | long long len = listLength(blocks); |
64 | len += raxSize(index); |
65 | listRelease(blocks); |
66 | raxFree(index); |
67 | atomicDecr(lazyfree_objects,len); |
68 | atomicIncr(lazyfreed_objects,len); |
69 | } |
70 | |
71 | /* Return the number of currently pending objects to free. */ |
72 | size_t lazyfreeGetPendingObjectsCount(void) { |
73 | size_t aux; |
74 | atomicGet(lazyfree_objects,aux); |
75 | return aux; |
76 | } |
77 | |
78 | /* Return the number of objects that have been freed. */ |
79 | size_t lazyfreeGetFreedObjectsCount(void) { |
80 | size_t aux; |
81 | atomicGet(lazyfreed_objects,aux); |
82 | return aux; |
83 | } |
84 | |
85 | void lazyfreeResetStats() { |
86 | atomicSet(lazyfreed_objects,0); |
87 | } |
88 | |
89 | /* Return the amount of work needed in order to free an object. |
90 | * The return value is not always the actual number of allocations the |
91 | * object is composed of, but a number proportional to it. |
92 | * |
93 | * For strings the function always returns 1. |
94 | * |
95 | * For aggregated objects represented by hash tables or other data structures |
96 | * the function just returns the number of elements the object is composed of. |
97 | * |
98 | * Objects composed of single allocations are always reported as having a |
99 | * single item even if they are actually logical composed of multiple |
100 | * elements. |
101 | * |
102 | * For lists the function returns the number of elements in the quicklist |
103 | * representing the list. */ |
104 | size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) { |
105 | if (obj->type == OBJ_LIST) { |
106 | quicklist *ql = obj->ptr; |
107 | return ql->len; |
108 | } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) { |
109 | dict *ht = obj->ptr; |
110 | return dictSize(ht); |
111 | } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){ |
112 | zset *zs = obj->ptr; |
113 | return zs->zsl->length; |
114 | } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) { |
115 | dict *ht = obj->ptr; |
116 | return dictSize(ht); |
117 | } else if (obj->type == OBJ_STREAM) { |
118 | size_t effort = 0; |
119 | stream *s = obj->ptr; |
120 | |
121 | /* Make a best effort estimate to maintain constant runtime. Every macro |
122 | * node in the Stream is one allocation. */ |
123 | effort += s->rax->numnodes; |
124 | |
125 | /* Every consumer group is an allocation and so are the entries in its |
126 | * PEL. We use size of the first group's PEL as an estimate for all |
127 | * others. */ |
128 | if (s->cgroups && raxSize(s->cgroups)) { |
129 | raxIterator ri; |
130 | streamCG *cg; |
131 | raxStart(&ri,s->cgroups); |
132 | raxSeek(&ri,"^" ,NULL,0); |
133 | /* There must be at least one group so the following should always |
134 | * work. */ |
135 | serverAssert(raxNext(&ri)); |
136 | cg = ri.data; |
137 | effort += raxSize(s->cgroups)*(1+raxSize(cg->pel)); |
138 | raxStop(&ri); |
139 | } |
140 | return effort; |
141 | } else if (obj->type == OBJ_MODULE) { |
142 | size_t effort = moduleGetFreeEffort(key, obj, dbid); |
143 | /* If the module's free_effort returns 0, we will use asynchronous free |
144 | * memory by default. */ |
145 | return effort == 0 ? ULONG_MAX : effort; |
146 | } else { |
147 | return 1; /* Everything else is a single allocation. */ |
148 | } |
149 | } |
150 | |
151 | /* If there are enough allocations to free the value object asynchronously, it |
152 | * may be put into a lazy free list instead of being freed synchronously. The |
153 | * lazy free list will be reclaimed in a different bio.c thread. If the value is |
154 | * composed of a few allocations, to free in a lazy way is actually just |
155 | * slower... So under a certain limit we just free the object synchronously. */ |
156 | #define LAZYFREE_THRESHOLD 64 |
157 | |
158 | /* Free an object, if the object is huge enough, free it in async way. */ |
159 | void freeObjAsync(robj *key, robj *obj, int dbid) { |
160 | size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid); |
161 | /* Note that if the object is shared, to reclaim it now it is not |
162 | * possible. This rarely happens, however sometimes the implementation |
163 | * of parts of the Redis core may call incrRefCount() to protect |
164 | * objects, and then call dbDelete(). */ |
165 | if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { |
166 | atomicIncr(lazyfree_objects,1); |
167 | bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); |
168 | } else { |
169 | decrRefCount(obj); |
170 | } |
171 | } |
172 | |
173 | /* Empty a Redis DB asynchronously. What the function does actually is to |
174 | * create a new empty set of hash tables and scheduling the old ones for |
175 | * lazy freeing. */ |
176 | void emptyDbAsync(redisDb *db) { |
177 | dict *oldht1 = db->dict, *oldht2 = db->expires; |
178 | db->dict = dictCreate(&dbDictType); |
179 | db->expires = dictCreate(&dbExpiresDictType); |
180 | atomicIncr(lazyfree_objects,dictSize(oldht1)); |
181 | bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2); |
182 | } |
183 | |
184 | /* Free the key tracking table. |
185 | * If the table is huge enough, free it in async way. */ |
186 | void freeTrackingRadixTreeAsync(rax *tracking) { |
187 | /* Because this rax has only keys and no values so we use numnodes. */ |
188 | if (tracking->numnodes > LAZYFREE_THRESHOLD) { |
189 | atomicIncr(lazyfree_objects,tracking->numele); |
190 | bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking); |
191 | } else { |
192 | freeTrackingRadixTree(tracking); |
193 | } |
194 | } |
195 | |
196 | /* Free lua_scripts dict, if the dict is huge enough, free it in async way. */ |
197 | void freeLuaScriptsAsync(dict *lua_scripts) { |
198 | if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) { |
199 | atomicIncr(lazyfree_objects,dictSize(lua_scripts)); |
200 | bioCreateLazyFreeJob(lazyFreeLuaScripts,1,lua_scripts); |
201 | } else { |
202 | dictRelease(lua_scripts); |
203 | } |
204 | } |
205 | |
206 | /* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ |
207 | void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { |
208 | if (functionsLibCtxfunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) { |
209 | atomicIncr(lazyfree_objects,functionsLibCtxfunctionsLen(functions_lib_ctx)); |
210 | bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx); |
211 | } else { |
212 | functionsLibCtxFree(functions_lib_ctx); |
213 | } |
214 | } |
215 | |
216 | /* Free replication backlog referencing buffer blocks and rax index. */ |
217 | void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { |
218 | if (listLength(blocks) > LAZYFREE_THRESHOLD || |
219 | raxSize(index) > LAZYFREE_THRESHOLD) |
220 | { |
221 | atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index)); |
222 | bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index); |
223 | } else { |
224 | listRelease(blocks); |
225 | raxFree(index); |
226 | } |
227 | } |
228 | |