1 | /* Background I/O service for Redis. |
2 | * |
3 | * This file implements operations that we need to perform in the background. |
4 | * Currently there is only a single operation, that is a background close(2) |
5 | * system call. This is needed as when the process is the last owner of a |
6 | * reference to a file closing it means unlinking it, and the deletion of the |
7 | * file is slow, blocking the server. |
8 | * |
9 | * In the future we'll either continue implementing new things we need or |
10 | * we'll switch to libeio. However there are probably long term uses for this |
11 | * file as we may want to put here Redis specific background tasks (for instance |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL |
13 | * implementation). |
14 | * |
15 | * DESIGN |
16 | * ------ |
17 | * |
18 | * The design is trivial, we have a structure representing a job to perform |
19 | * and a different thread and job queue for every job type. |
20 | * Every thread waits for new jobs in its queue, and process every job |
21 | * sequentially. |
22 | * |
23 | * Jobs of the same type are guaranteed to be processed from the least |
24 | * recently inserted to the most recently inserted (older jobs processed |
25 | * first). |
26 | * |
27 | * Currently there is no way for the creator of the job to be notified about |
28 | * the completion of the operation, this will only be added when/if needed. |
29 | * |
30 | * ---------------------------------------------------------------------------- |
31 | * |
32 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
33 | * All rights reserved. |
34 | * |
35 | * Redistribution and use in source and binary forms, with or without |
36 | * modification, are permitted provided that the following conditions are met: |
37 | * |
38 | * * Redistributions of source code must retain the above copyright notice, |
39 | * this list of conditions and the following disclaimer. |
40 | * * Redistributions in binary form must reproduce the above copyright |
41 | * notice, this list of conditions and the following disclaimer in the |
42 | * documentation and/or other materials provided with the distribution. |
43 | * * Neither the name of Redis nor the names of its contributors may be used |
44 | * to endorse or promote products derived from this software without |
45 | * specific prior written permission. |
46 | * |
47 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
48 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
49 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
50 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
51 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
52 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
53 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
54 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
55 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
56 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
57 | * POSSIBILITY OF SUCH DAMAGE. |
58 | */ |
59 | |
60 | |
61 | #include "server.h" |
62 | #include "bio.h" |
63 | |
64 | static pthread_t bio_threads[BIO_NUM_OPS]; |
65 | static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; |
66 | static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; |
67 | static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; |
68 | static list *bio_jobs[BIO_NUM_OPS]; |
69 | /* The following array is used to hold the number of pending jobs for every |
70 | * OP type. This allows us to export the bioPendingJobsOfType() API that is |
71 | * useful when the main thread wants to perform some operation that may involve |
72 | * objects shared with the background thread. The main thread will just wait |
73 | * that there are no longer jobs of this type to be executed before performing |
74 | * the sensible operation. This data is also useful for reporting. */ |
75 | static unsigned long long bio_pending[BIO_NUM_OPS]; |
76 | |
77 | /* This structure represents a background Job. It is only used locally to this |
78 | * file as the API does not expose the internals at all. */ |
79 | struct bio_job { |
80 | /* Job specific arguments.*/ |
81 | int fd; /* Fd for file based background jobs */ |
82 | lazy_free_fn *free_fn; /* Function that will free the provided arguments */ |
83 | void *free_args[]; /* List of arguments to be passed to the free function */ |
84 | }; |
85 | |
86 | void *bioProcessBackgroundJobs(void *arg); |
87 | |
88 | /* Make sure we have enough stack to perform all the things we do in the |
89 | * main thread. */ |
90 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) |
91 | |
92 | /* Initialize the background system, spawning the thread. */ |
93 | void bioInit(void) { |
94 | pthread_attr_t attr; |
95 | pthread_t thread; |
96 | size_t stacksize; |
97 | int j; |
98 | |
99 | /* Initialization of state vars and objects */ |
100 | for (j = 0; j < BIO_NUM_OPS; j++) { |
101 | pthread_mutex_init(&bio_mutex[j],NULL); |
102 | pthread_cond_init(&bio_newjob_cond[j],NULL); |
103 | pthread_cond_init(&bio_step_cond[j],NULL); |
104 | bio_jobs[j] = listCreate(); |
105 | bio_pending[j] = 0; |
106 | } |
107 | |
108 | /* Set the stack size as by default it may be small in some system */ |
109 | pthread_attr_init(&attr); |
110 | pthread_attr_getstacksize(&attr,&stacksize); |
111 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ |
112 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; |
113 | pthread_attr_setstacksize(&attr, stacksize); |
114 | |
115 | /* Ready to spawn our threads. We use the single argument the thread |
116 | * function accepts in order to pass the job ID the thread is |
117 | * responsible of. */ |
118 | for (j = 0; j < BIO_NUM_OPS; j++) { |
119 | void *arg = (void*)(unsigned long) j; |
120 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { |
121 | serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs." ); |
122 | exit(1); |
123 | } |
124 | bio_threads[j] = thread; |
125 | } |
126 | } |
127 | |
128 | void bioSubmitJob(int type, struct bio_job *job) { |
129 | pthread_mutex_lock(&bio_mutex[type]); |
130 | listAddNodeTail(bio_jobs[type],job); |
131 | bio_pending[type]++; |
132 | pthread_cond_signal(&bio_newjob_cond[type]); |
133 | pthread_mutex_unlock(&bio_mutex[type]); |
134 | } |
135 | |
136 | void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { |
137 | va_list valist; |
138 | /* Allocate memory for the job structure and all required |
139 | * arguments */ |
140 | struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); |
141 | job->free_fn = free_fn; |
142 | |
143 | va_start(valist, arg_count); |
144 | for (int i = 0; i < arg_count; i++) { |
145 | job->free_args[i] = va_arg(valist, void *); |
146 | } |
147 | va_end(valist); |
148 | bioSubmitJob(BIO_LAZY_FREE, job); |
149 | } |
150 | |
151 | void bioCreateCloseJob(int fd) { |
152 | struct bio_job *job = zmalloc(sizeof(*job)); |
153 | job->fd = fd; |
154 | |
155 | bioSubmitJob(BIO_CLOSE_FILE, job); |
156 | } |
157 | |
158 | void bioCreateFsyncJob(int fd) { |
159 | struct bio_job *job = zmalloc(sizeof(*job)); |
160 | job->fd = fd; |
161 | |
162 | bioSubmitJob(BIO_AOF_FSYNC, job); |
163 | } |
164 | |
165 | void *bioProcessBackgroundJobs(void *arg) { |
166 | struct bio_job *job; |
167 | unsigned long type = (unsigned long) arg; |
168 | sigset_t sigset; |
169 | |
170 | /* Check that the type is within the right interval. */ |
171 | if (type >= BIO_NUM_OPS) { |
172 | serverLog(LL_WARNING, |
173 | "Warning: bio thread started with wrong type %lu" ,type); |
174 | return NULL; |
175 | } |
176 | |
177 | switch (type) { |
178 | case BIO_CLOSE_FILE: |
179 | redis_set_thread_title("bio_close_file" ); |
180 | break; |
181 | case BIO_AOF_FSYNC: |
182 | redis_set_thread_title("bio_aof_fsync" ); |
183 | break; |
184 | case BIO_LAZY_FREE: |
185 | redis_set_thread_title("bio_lazy_free" ); |
186 | break; |
187 | } |
188 | |
189 | redisSetCpuAffinity(server.bio_cpulist); |
190 | |
191 | makeThreadKillable(); |
192 | |
193 | pthread_mutex_lock(&bio_mutex[type]); |
194 | /* Block SIGALRM so we are sure that only the main thread will |
195 | * receive the watchdog signal. */ |
196 | sigemptyset(&sigset); |
197 | sigaddset(&sigset, SIGALRM); |
198 | if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) |
199 | serverLog(LL_WARNING, |
200 | "Warning: can't mask SIGALRM in bio.c thread: %s" , strerror(errno)); |
201 | |
202 | while(1) { |
203 | listNode *ln; |
204 | |
205 | /* The loop always starts with the lock hold. */ |
206 | if (listLength(bio_jobs[type]) == 0) { |
207 | pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); |
208 | continue; |
209 | } |
210 | /* Pop the job from the queue. */ |
211 | ln = listFirst(bio_jobs[type]); |
212 | job = ln->value; |
213 | /* It is now possible to unlock the background system as we know have |
214 | * a stand alone job structure to process.*/ |
215 | pthread_mutex_unlock(&bio_mutex[type]); |
216 | |
217 | /* Process the job accordingly to its type. */ |
218 | if (type == BIO_CLOSE_FILE) { |
219 | close(job->fd); |
220 | } else if (type == BIO_AOF_FSYNC) { |
221 | /* The fd may be closed by main thread and reused for another |
222 | * socket, pipe, or file. We just ignore these errno because |
223 | * aof fsync did not really fail. */ |
224 | if (redis_fsync(job->fd) == -1 && |
225 | errno != EBADF && errno != EINVAL) |
226 | { |
227 | int last_status; |
228 | atomicGet(server.aof_bio_fsync_status,last_status); |
229 | atomicSet(server.aof_bio_fsync_status,C_ERR); |
230 | atomicSet(server.aof_bio_fsync_errno,errno); |
231 | if (last_status == C_OK) { |
232 | serverLog(LL_WARNING, |
233 | "Fail to fsync the AOF file: %s" ,strerror(errno)); |
234 | } |
235 | } else { |
236 | atomicSet(server.aof_bio_fsync_status,C_OK); |
237 | } |
238 | } else if (type == BIO_LAZY_FREE) { |
239 | job->free_fn(job->free_args); |
240 | } else { |
241 | serverPanic("Wrong job type in bioProcessBackgroundJobs()." ); |
242 | } |
243 | zfree(job); |
244 | |
245 | /* Lock again before reiterating the loop, if there are no longer |
246 | * jobs to process we'll block again in pthread_cond_wait(). */ |
247 | pthread_mutex_lock(&bio_mutex[type]); |
248 | listDelNode(bio_jobs[type],ln); |
249 | bio_pending[type]--; |
250 | |
251 | /* Unblock threads blocked on bioWaitStepOfType() if any. */ |
252 | pthread_cond_broadcast(&bio_step_cond[type]); |
253 | } |
254 | } |
255 | |
256 | /* Return the number of pending jobs of the specified type. */ |
257 | unsigned long long bioPendingJobsOfType(int type) { |
258 | unsigned long long val; |
259 | pthread_mutex_lock(&bio_mutex[type]); |
260 | val = bio_pending[type]; |
261 | pthread_mutex_unlock(&bio_mutex[type]); |
262 | return val; |
263 | } |
264 | |
265 | /* If there are pending jobs for the specified type, the function blocks |
266 | * and waits that the next job was processed. Otherwise the function |
267 | * does not block and returns ASAP. |
268 | * |
269 | * The function returns the number of jobs still to process of the |
270 | * requested type. |
271 | * |
272 | * This function is useful when from another thread, we want to wait |
273 | * a bio.c thread to do more work in a blocking way. |
274 | */ |
275 | unsigned long long bioWaitStepOfType(int type) { |
276 | unsigned long long val; |
277 | pthread_mutex_lock(&bio_mutex[type]); |
278 | val = bio_pending[type]; |
279 | if (val != 0) { |
280 | pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); |
281 | val = bio_pending[type]; |
282 | } |
283 | pthread_mutex_unlock(&bio_mutex[type]); |
284 | return val; |
285 | } |
286 | |
287 | /* Kill the running bio threads in an unclean way. This function should be |
288 | * used only when it's critical to stop the threads for some reason. |
289 | * Currently Redis does this only on crash (for instance on SIGSEGV) in order |
290 | * to perform a fast memory check without other threads messing with memory. */ |
291 | void bioKillThreads(void) { |
292 | int err, j; |
293 | |
294 | for (j = 0; j < BIO_NUM_OPS; j++) { |
295 | if (bio_threads[j] == pthread_self()) continue; |
296 | if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) { |
297 | if ((err = pthread_join(bio_threads[j],NULL)) != 0) { |
298 | serverLog(LL_WARNING, |
299 | "Bio thread for job type #%d can not be joined: %s" , |
300 | j, strerror(err)); |
301 | } else { |
302 | serverLog(LL_WARNING, |
303 | "Bio thread for job type #%d terminated" ,j); |
304 | } |
305 | } |
306 | } |
307 | } |
308 | |