1/* A simple event-driven programming library. Originally I wrote this code
2 * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
3 * it in form of a library for easy reuse.
4 *
5 * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * * Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * * Neither the name of Redis nor the names of its contributors may be used
17 * to endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33#include "ae.h"
34#include "anet.h"
35#include "redisassert.h"
36
37#include <stdio.h>
38#include <sys/time.h>
39#include <sys/types.h>
40#include <unistd.h>
41#include <stdlib.h>
42#include <poll.h>
43#include <string.h>
44#include <time.h>
45#include <errno.h>
46
47#include "zmalloc.h"
48#include "config.h"
49
50/* Include the best multiplexing layer supported by this system.
51 * The following should be ordered by performances, descending. */
52#ifdef HAVE_EVPORT
53#include "ae_evport.c"
54#else
55 #ifdef HAVE_EPOLL
56 #include "ae_epoll.c"
57 #else
58 #ifdef HAVE_KQUEUE
59 #include "ae_kqueue.c"
60 #else
61 #include "ae_select.c"
62 #endif
63 #endif
64#endif
65
66
67aeEventLoop *aeCreateEventLoop(int setsize) {
68 aeEventLoop *eventLoop;
69 int i;
70
71 monotonicInit(); /* just in case the calling app didn't initialize */
72
73 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
74 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
75 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
76 if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
77 eventLoop->setsize = setsize;
78 eventLoop->timeEventHead = NULL;
79 eventLoop->timeEventNextId = 0;
80 eventLoop->stop = 0;
81 eventLoop->maxfd = -1;
82 eventLoop->beforesleep = NULL;
83 eventLoop->aftersleep = NULL;
84 eventLoop->flags = 0;
85 if (aeApiCreate(eventLoop) == -1) goto err;
86 /* Events with mask == AE_NONE are not set. So let's initialize the
87 * vector with it. */
88 for (i = 0; i < setsize; i++)
89 eventLoop->events[i].mask = AE_NONE;
90 return eventLoop;
91
92err:
93 if (eventLoop) {
94 zfree(eventLoop->events);
95 zfree(eventLoop->fired);
96 zfree(eventLoop);
97 }
98 return NULL;
99}
100
101/* Return the current set size. */
102int aeGetSetSize(aeEventLoop *eventLoop) {
103 return eventLoop->setsize;
104}
105
106/* Tells the next iteration/s of the event processing to set timeout of 0. */
107void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
108 if (noWait)
109 eventLoop->flags |= AE_DONT_WAIT;
110 else
111 eventLoop->flags &= ~AE_DONT_WAIT;
112}
113
114/* Resize the maximum set size of the event loop.
115 * If the requested set size is smaller than the current set size, but
116 * there is already a file descriptor in use that is >= the requested
117 * set size minus one, AE_ERR is returned and the operation is not
118 * performed at all.
119 *
120 * Otherwise AE_OK is returned and the operation is successful. */
121int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
122 int i;
123
124 if (setsize == eventLoop->setsize) return AE_OK;
125 if (eventLoop->maxfd >= setsize) return AE_ERR;
126 if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
127
128 eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
129 eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
130 eventLoop->setsize = setsize;
131
132 /* Make sure that if we created new slots, they are initialized with
133 * an AE_NONE mask. */
134 for (i = eventLoop->maxfd+1; i < setsize; i++)
135 eventLoop->events[i].mask = AE_NONE;
136 return AE_OK;
137}
138
139void aeDeleteEventLoop(aeEventLoop *eventLoop) {
140 aeApiFree(eventLoop);
141 zfree(eventLoop->events);
142 zfree(eventLoop->fired);
143
144 /* Free the time events list. */
145 aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
146 while (te) {
147 next_te = te->next;
148 zfree(te);
149 te = next_te;
150 }
151 zfree(eventLoop);
152}
153
154void aeStop(aeEventLoop *eventLoop) {
155 eventLoop->stop = 1;
156}
157
158int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
159 aeFileProc *proc, void *clientData)
160{
161 if (fd >= eventLoop->setsize) {
162 errno = ERANGE;
163 return AE_ERR;
164 }
165 aeFileEvent *fe = &eventLoop->events[fd];
166
167 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
168 return AE_ERR;
169 fe->mask |= mask;
170 if (mask & AE_READABLE) fe->rfileProc = proc;
171 if (mask & AE_WRITABLE) fe->wfileProc = proc;
172 fe->clientData = clientData;
173 if (fd > eventLoop->maxfd)
174 eventLoop->maxfd = fd;
175 return AE_OK;
176}
177
178void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
179{
180 if (fd >= eventLoop->setsize) return;
181 aeFileEvent *fe = &eventLoop->events[fd];
182 if (fe->mask == AE_NONE) return;
183
184 /* We want to always remove AE_BARRIER if set when AE_WRITABLE
185 * is removed. */
186 if (mask & AE_WRITABLE) mask |= AE_BARRIER;
187
188 aeApiDelEvent(eventLoop, fd, mask);
189 fe->mask = fe->mask & (~mask);
190 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
191 /* Update the max fd */
192 int j;
193
194 for (j = eventLoop->maxfd-1; j >= 0; j--)
195 if (eventLoop->events[j].mask != AE_NONE) break;
196 eventLoop->maxfd = j;
197 }
198}
199
200void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) {
201 if (fd >= eventLoop->setsize) return NULL;
202 aeFileEvent *fe = &eventLoop->events[fd];
203 if (fe->mask == AE_NONE) return NULL;
204
205 return fe->clientData;
206}
207
208int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
209 if (fd >= eventLoop->setsize) return 0;
210 aeFileEvent *fe = &eventLoop->events[fd];
211
212 return fe->mask;
213}
214
215long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
216 aeTimeProc *proc, void *clientData,
217 aeEventFinalizerProc *finalizerProc)
218{
219 long long id = eventLoop->timeEventNextId++;
220 aeTimeEvent *te;
221
222 te = zmalloc(sizeof(*te));
223 if (te == NULL) return AE_ERR;
224 te->id = id;
225 te->when = getMonotonicUs() + milliseconds * 1000;
226 te->timeProc = proc;
227 te->finalizerProc = finalizerProc;
228 te->clientData = clientData;
229 te->prev = NULL;
230 te->next = eventLoop->timeEventHead;
231 te->refcount = 0;
232 if (te->next)
233 te->next->prev = te;
234 eventLoop->timeEventHead = te;
235 return id;
236}
237
238int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
239{
240 aeTimeEvent *te = eventLoop->timeEventHead;
241 while(te) {
242 if (te->id == id) {
243 te->id = AE_DELETED_EVENT_ID;
244 return AE_OK;
245 }
246 te = te->next;
247 }
248 return AE_ERR; /* NO event with the specified ID found */
249}
250
251/* How many microseconds until the first timer should fire.
252 * If there are no timers, -1 is returned.
253 *
254 * Note that's O(N) since time events are unsorted.
255 * Possible optimizations (not needed by Redis so far, but...):
256 * 1) Insert the event in order, so that the nearest is just the head.
257 * Much better but still insertion or deletion of timers is O(N).
258 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
259 */
260static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
261 aeTimeEvent *te = eventLoop->timeEventHead;
262 if (te == NULL) return -1;
263
264 aeTimeEvent *earliest = NULL;
265 while (te) {
266 if (!earliest || te->when < earliest->when)
267 earliest = te;
268 te = te->next;
269 }
270
271 monotime now = getMonotonicUs();
272 return (now >= earliest->when) ? 0 : earliest->when - now;
273}
274
275/* Process time events */
276static int processTimeEvents(aeEventLoop *eventLoop) {
277 int processed = 0;
278 aeTimeEvent *te;
279 long long maxId;
280
281 te = eventLoop->timeEventHead;
282 maxId = eventLoop->timeEventNextId-1;
283 monotime now = getMonotonicUs();
284 while(te) {
285 long long id;
286
287 /* Remove events scheduled for deletion. */
288 if (te->id == AE_DELETED_EVENT_ID) {
289 aeTimeEvent *next = te->next;
290 /* If a reference exists for this timer event,
291 * don't free it. This is currently incremented
292 * for recursive timerProc calls */
293 if (te->refcount) {
294 te = next;
295 continue;
296 }
297 if (te->prev)
298 te->prev->next = te->next;
299 else
300 eventLoop->timeEventHead = te->next;
301 if (te->next)
302 te->next->prev = te->prev;
303 if (te->finalizerProc) {
304 te->finalizerProc(eventLoop, te->clientData);
305 now = getMonotonicUs();
306 }
307 zfree(te);
308 te = next;
309 continue;
310 }
311
312 /* Make sure we don't process time events created by time events in
313 * this iteration. Note that this check is currently useless: we always
314 * add new timers on the head, however if we change the implementation
315 * detail, this check may be useful again: we keep it here for future
316 * defense. */
317 if (te->id > maxId) {
318 te = te->next;
319 continue;
320 }
321
322 if (te->when <= now) {
323 int retval;
324
325 id = te->id;
326 te->refcount++;
327 retval = te->timeProc(eventLoop, id, te->clientData);
328 te->refcount--;
329 processed++;
330 now = getMonotonicUs();
331 if (retval != AE_NOMORE) {
332 te->when = now + retval * 1000;
333 } else {
334 te->id = AE_DELETED_EVENT_ID;
335 }
336 }
337 te = te->next;
338 }
339 return processed;
340}
341
342/* Process every pending time event, then every pending file event
343 * (that may be registered by time event callbacks just processed).
344 * Without special flags the function sleeps until some file event
345 * fires, or when the next time event occurs (if any).
346 *
347 * If flags is 0, the function does nothing and returns.
348 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
349 * if flags has AE_FILE_EVENTS set, file events are processed.
350 * if flags has AE_TIME_EVENTS set, time events are processed.
351 * if flags has AE_DONT_WAIT set, the function returns ASAP once all
352 * the events that can be handled without a wait are processed.
353 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
354 * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
355 *
356 * The function returns the number of events processed. */
357int aeProcessEvents(aeEventLoop *eventLoop, int flags)
358{
359 int processed = 0, numevents;
360
361 /* Nothing to do? return ASAP */
362 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
363
364 /* Note that we want to call select() even if there are no
365 * file events to process as long as we want to process time
366 * events, in order to sleep until the next time event is ready
367 * to fire. */
368 if (eventLoop->maxfd != -1 ||
369 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
370 int j;
371 struct timeval tv, *tvp;
372 int64_t usUntilTimer = -1;
373
374 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
375 usUntilTimer = usUntilEarliestTimer(eventLoop);
376
377 if (usUntilTimer >= 0) {
378 tv.tv_sec = usUntilTimer / 1000000;
379 tv.tv_usec = usUntilTimer % 1000000;
380 tvp = &tv;
381 } else {
382 /* If we have to check for events but need to return
383 * ASAP because of AE_DONT_WAIT we need to set the timeout
384 * to zero */
385 if (flags & AE_DONT_WAIT) {
386 tv.tv_sec = tv.tv_usec = 0;
387 tvp = &tv;
388 } else {
389 /* Otherwise we can block */
390 tvp = NULL; /* wait forever */
391 }
392 }
393
394 if (eventLoop->flags & AE_DONT_WAIT) {
395 tv.tv_sec = tv.tv_usec = 0;
396 tvp = &tv;
397 }
398
399 if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
400 eventLoop->beforesleep(eventLoop);
401
402 /* Call the multiplexing API, will return only on timeout or when
403 * some event fires. */
404 numevents = aeApiPoll(eventLoop, tvp);
405
406 /* After sleep callback. */
407 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
408 eventLoop->aftersleep(eventLoop);
409
410 for (j = 0; j < numevents; j++) {
411 int fd = eventLoop->fired[j].fd;
412 aeFileEvent *fe = &eventLoop->events[fd];
413 int mask = eventLoop->fired[j].mask;
414 int fired = 0; /* Number of events fired for current fd. */
415
416 /* Normally we execute the readable event first, and the writable
417 * event later. This is useful as sometimes we may be able
418 * to serve the reply of a query immediately after processing the
419 * query.
420 *
421 * However if AE_BARRIER is set in the mask, our application is
422 * asking us to do the reverse: never fire the writable event
423 * after the readable. In such a case, we invert the calls.
424 * This is useful when, for instance, we want to do things
425 * in the beforeSleep() hook, like fsyncing a file to disk,
426 * before replying to a client. */
427 int invert = fe->mask & AE_BARRIER;
428
429 /* Note the "fe->mask & mask & ..." code: maybe an already
430 * processed event removed an element that fired and we still
431 * didn't processed, so we check if the event is still valid.
432 *
433 * Fire the readable event if the call sequence is not
434 * inverted. */
435 if (!invert && fe->mask & mask & AE_READABLE) {
436 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
437 fired++;
438 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
439 }
440
441 /* Fire the writable event. */
442 if (fe->mask & mask & AE_WRITABLE) {
443 if (!fired || fe->wfileProc != fe->rfileProc) {
444 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
445 fired++;
446 }
447 }
448
449 /* If we have to invert the call, fire the readable event now
450 * after the writable one. */
451 if (invert) {
452 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
453 if ((fe->mask & mask & AE_READABLE) &&
454 (!fired || fe->wfileProc != fe->rfileProc))
455 {
456 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
457 fired++;
458 }
459 }
460
461 processed++;
462 }
463 }
464 /* Check time events */
465 if (flags & AE_TIME_EVENTS)
466 processed += processTimeEvents(eventLoop);
467
468 return processed; /* return the number of processed file/time events */
469}
470
471/* Wait for milliseconds until the given file descriptor becomes
472 * writable/readable/exception */
473int aeWait(int fd, int mask, long long milliseconds) {
474 struct pollfd pfd;
475 int retmask = 0, retval;
476
477 memset(&pfd, 0, sizeof(pfd));
478 pfd.fd = fd;
479 if (mask & AE_READABLE) pfd.events |= POLLIN;
480 if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
481
482 if ((retval = poll(&pfd, 1, milliseconds))== 1) {
483 if (pfd.revents & POLLIN) retmask |= AE_READABLE;
484 if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
485 if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
486 if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
487 return retmask;
488 } else {
489 return retval;
490 }
491}
492
493void aeMain(aeEventLoop *eventLoop) {
494 eventLoop->stop = 0;
495 while (!eventLoop->stop) {
496 aeProcessEvents(eventLoop, AE_ALL_EVENTS|
497 AE_CALL_BEFORE_SLEEP|
498 AE_CALL_AFTER_SLEEP);
499 }
500}
501
502char *aeGetApiName(void) {
503 return aeApiName();
504}
505
506void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
507 eventLoop->beforesleep = beforesleep;
508}
509
510void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
511 eventLoop->aftersleep = aftersleep;
512}
513