1/* Background I/O service for Redis.
2 *
3 * This file implements operations that we need to perform in the background.
4 * Currently there is only a single operation, that is a background close(2)
5 * system call. This is needed as when the process is the last owner of a
6 * reference to a file closing it means unlinking it, and the deletion of the
7 * file is slow, blocking the server.
8 *
9 * In the future we'll either continue implementing new things we need or
10 * we'll switch to libeio. However there are probably long term uses for this
11 * file as we may want to put here Redis specific background tasks (for instance
12 * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
13 * implementation).
14 *
15 * DESIGN
16 * ------
17 *
18 * The design is trivial, we have a structure representing a job to perform
19 * and a different thread and job queue for every job type.
20 * Every thread waits for new jobs in its queue, and process every job
21 * sequentially.
22 *
23 * Jobs of the same type are guaranteed to be processed from the least
24 * recently inserted to the most recently inserted (older jobs processed
25 * first).
26 *
27 * Currently there is no way for the creator of the job to be notified about
28 * the completion of the operation, this will only be added when/if needed.
29 *
30 * ----------------------------------------------------------------------------
31 *
32 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
33 * All rights reserved.
34 *
35 * Redistribution and use in source and binary forms, with or without
36 * modification, are permitted provided that the following conditions are met:
37 *
38 * * Redistributions of source code must retain the above copyright notice,
39 * this list of conditions and the following disclaimer.
40 * * Redistributions in binary form must reproduce the above copyright
41 * notice, this list of conditions and the following disclaimer in the
42 * documentation and/or other materials provided with the distribution.
43 * * Neither the name of Redis nor the names of its contributors may be used
44 * to endorse or promote products derived from this software without
45 * specific prior written permission.
46 *
47 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
48 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
49 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
50 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
51 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
52 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
53 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
54 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
55 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
56 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
57 * POSSIBILITY OF SUCH DAMAGE.
58 */
59
60
61#include "server.h"
62#include "bio.h"
63
64static pthread_t bio_threads[BIO_NUM_OPS];
65static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
66static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
67static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
68static list *bio_jobs[BIO_NUM_OPS];
69/* The following array is used to hold the number of pending jobs for every
70 * OP type. This allows us to export the bioPendingJobsOfType() API that is
71 * useful when the main thread wants to perform some operation that may involve
72 * objects shared with the background thread. The main thread will just wait
73 * that there are no longer jobs of this type to be executed before performing
74 * the sensible operation. This data is also useful for reporting. */
75static unsigned long long bio_pending[BIO_NUM_OPS];
76
77/* This structure represents a background Job. It is only used locally to this
78 * file as the API does not expose the internals at all. */
79struct bio_job {
80 /* Job specific arguments.*/
81 int fd; /* Fd for file based background jobs */
82 lazy_free_fn *free_fn; /* Function that will free the provided arguments */
83 void *free_args[]; /* List of arguments to be passed to the free function */
84};
85
86void *bioProcessBackgroundJobs(void *arg);
87
88/* Make sure we have enough stack to perform all the things we do in the
89 * main thread. */
90#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
91
92/* Initialize the background system, spawning the thread. */
93void bioInit(void) {
94 pthread_attr_t attr;
95 pthread_t thread;
96 size_t stacksize;
97 int j;
98
99 /* Initialization of state vars and objects */
100 for (j = 0; j < BIO_NUM_OPS; j++) {
101 pthread_mutex_init(&bio_mutex[j],NULL);
102 pthread_cond_init(&bio_newjob_cond[j],NULL);
103 pthread_cond_init(&bio_step_cond[j],NULL);
104 bio_jobs[j] = listCreate();
105 bio_pending[j] = 0;
106 }
107
108 /* Set the stack size as by default it may be small in some system */
109 pthread_attr_init(&attr);
110 pthread_attr_getstacksize(&attr,&stacksize);
111 if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
112 while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
113 pthread_attr_setstacksize(&attr, stacksize);
114
115 /* Ready to spawn our threads. We use the single argument the thread
116 * function accepts in order to pass the job ID the thread is
117 * responsible of. */
118 for (j = 0; j < BIO_NUM_OPS; j++) {
119 void *arg = (void*)(unsigned long) j;
120 if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
121 serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
122 exit(1);
123 }
124 bio_threads[j] = thread;
125 }
126}
127
128void bioSubmitJob(int type, struct bio_job *job) {
129 pthread_mutex_lock(&bio_mutex[type]);
130 listAddNodeTail(bio_jobs[type],job);
131 bio_pending[type]++;
132 pthread_cond_signal(&bio_newjob_cond[type]);
133 pthread_mutex_unlock(&bio_mutex[type]);
134}
135
136void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
137 va_list valist;
138 /* Allocate memory for the job structure and all required
139 * arguments */
140 struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
141 job->free_fn = free_fn;
142
143 va_start(valist, arg_count);
144 for (int i = 0; i < arg_count; i++) {
145 job->free_args[i] = va_arg(valist, void *);
146 }
147 va_end(valist);
148 bioSubmitJob(BIO_LAZY_FREE, job);
149}
150
151void bioCreateCloseJob(int fd) {
152 struct bio_job *job = zmalloc(sizeof(*job));
153 job->fd = fd;
154
155 bioSubmitJob(BIO_CLOSE_FILE, job);
156}
157
158void bioCreateFsyncJob(int fd) {
159 struct bio_job *job = zmalloc(sizeof(*job));
160 job->fd = fd;
161
162 bioSubmitJob(BIO_AOF_FSYNC, job);
163}
164
165void *bioProcessBackgroundJobs(void *arg) {
166 struct bio_job *job;
167 unsigned long type = (unsigned long) arg;
168 sigset_t sigset;
169
170 /* Check that the type is within the right interval. */
171 if (type >= BIO_NUM_OPS) {
172 serverLog(LL_WARNING,
173 "Warning: bio thread started with wrong type %lu",type);
174 return NULL;
175 }
176
177 switch (type) {
178 case BIO_CLOSE_FILE:
179 redis_set_thread_title("bio_close_file");
180 break;
181 case BIO_AOF_FSYNC:
182 redis_set_thread_title("bio_aof_fsync");
183 break;
184 case BIO_LAZY_FREE:
185 redis_set_thread_title("bio_lazy_free");
186 break;
187 }
188
189 redisSetCpuAffinity(server.bio_cpulist);
190
191 makeThreadKillable();
192
193 pthread_mutex_lock(&bio_mutex[type]);
194 /* Block SIGALRM so we are sure that only the main thread will
195 * receive the watchdog signal. */
196 sigemptyset(&sigset);
197 sigaddset(&sigset, SIGALRM);
198 if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
199 serverLog(LL_WARNING,
200 "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
201
202 while(1) {
203 listNode *ln;
204
205 /* The loop always starts with the lock hold. */
206 if (listLength(bio_jobs[type]) == 0) {
207 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
208 continue;
209 }
210 /* Pop the job from the queue. */
211 ln = listFirst(bio_jobs[type]);
212 job = ln->value;
213 /* It is now possible to unlock the background system as we know have
214 * a stand alone job structure to process.*/
215 pthread_mutex_unlock(&bio_mutex[type]);
216
217 /* Process the job accordingly to its type. */
218 if (type == BIO_CLOSE_FILE) {
219 close(job->fd);
220 } else if (type == BIO_AOF_FSYNC) {
221 /* The fd may be closed by main thread and reused for another
222 * socket, pipe, or file. We just ignore these errno because
223 * aof fsync did not really fail. */
224 if (redis_fsync(job->fd) == -1 &&
225 errno != EBADF && errno != EINVAL)
226 {
227 int last_status;
228 atomicGet(server.aof_bio_fsync_status,last_status);
229 atomicSet(server.aof_bio_fsync_status,C_ERR);
230 atomicSet(server.aof_bio_fsync_errno,errno);
231 if (last_status == C_OK) {
232 serverLog(LL_WARNING,
233 "Fail to fsync the AOF file: %s",strerror(errno));
234 }
235 } else {
236 atomicSet(server.aof_bio_fsync_status,C_OK);
237 }
238 } else if (type == BIO_LAZY_FREE) {
239 job->free_fn(job->free_args);
240 } else {
241 serverPanic("Wrong job type in bioProcessBackgroundJobs().");
242 }
243 zfree(job);
244
245 /* Lock again before reiterating the loop, if there are no longer
246 * jobs to process we'll block again in pthread_cond_wait(). */
247 pthread_mutex_lock(&bio_mutex[type]);
248 listDelNode(bio_jobs[type],ln);
249 bio_pending[type]--;
250
251 /* Unblock threads blocked on bioWaitStepOfType() if any. */
252 pthread_cond_broadcast(&bio_step_cond[type]);
253 }
254}
255
256/* Return the number of pending jobs of the specified type. */
257unsigned long long bioPendingJobsOfType(int type) {
258 unsigned long long val;
259 pthread_mutex_lock(&bio_mutex[type]);
260 val = bio_pending[type];
261 pthread_mutex_unlock(&bio_mutex[type]);
262 return val;
263}
264
265/* If there are pending jobs for the specified type, the function blocks
266 * and waits that the next job was processed. Otherwise the function
267 * does not block and returns ASAP.
268 *
269 * The function returns the number of jobs still to process of the
270 * requested type.
271 *
272 * This function is useful when from another thread, we want to wait
273 * a bio.c thread to do more work in a blocking way.
274 */
275unsigned long long bioWaitStepOfType(int type) {
276 unsigned long long val;
277 pthread_mutex_lock(&bio_mutex[type]);
278 val = bio_pending[type];
279 if (val != 0) {
280 pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
281 val = bio_pending[type];
282 }
283 pthread_mutex_unlock(&bio_mutex[type]);
284 return val;
285}
286
287/* Kill the running bio threads in an unclean way. This function should be
288 * used only when it's critical to stop the threads for some reason.
289 * Currently Redis does this only on crash (for instance on SIGSEGV) in order
290 * to perform a fast memory check without other threads messing with memory. */
291void bioKillThreads(void) {
292 int err, j;
293
294 for (j = 0; j < BIO_NUM_OPS; j++) {
295 if (bio_threads[j] == pthread_self()) continue;
296 if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {
297 if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
298 serverLog(LL_WARNING,
299 "Bio thread for job type #%d can not be joined: %s",
300 j, strerror(err));
301 } else {
302 serverLog(LL_WARNING,
303 "Bio thread for job type #%d terminated",j);
304 }
305 }
306 }
307}
308