1/*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "fmacros.h"
33#include "alloc.h"
34#include <stdlib.h>
35#include <string.h>
36#ifndef _MSC_VER
37#include <strings.h>
38#endif
39#include <assert.h>
40#include <ctype.h>
41#include <errno.h>
42#include "async.h"
43#include "net.h"
44#include "dict.c"
45#include "sds.h"
46#include "win32.h"
47
48#include "async_private.h"
49
50#ifdef NDEBUG
51#undef assert
52#define assert(e) (void)(e)
53#endif
54
55/* Forward declarations of hiredis.c functions */
56int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
57void __redisSetError(redisContext *c, int type, const char *str);
58
59/* Functions managing dictionary of callbacks for pub/sub. */
60static unsigned int callbackHash(const void *key) {
61 return dictGenHashFunction((const unsigned char *)key,
62 hi_sdslen((const hisds)key));
63}
64
65static void *callbackValDup(void *privdata, const void *src) {
66 ((void) privdata);
67 redisCallback *dup;
68
69 dup = hi_malloc(sizeof(*dup));
70 if (dup == NULL)
71 return NULL;
72
73 memcpy(dup,src,sizeof(*dup));
74 return dup;
75}
76
77static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
78 int l1, l2;
79 ((void) privdata);
80
81 l1 = hi_sdslen((const hisds)key1);
82 l2 = hi_sdslen((const hisds)key2);
83 if (l1 != l2) return 0;
84 return memcmp(key1,key2,l1) == 0;
85}
86
87static void callbackKeyDestructor(void *privdata, void *key) {
88 ((void) privdata);
89 hi_sdsfree((hisds)key);
90}
91
92static void callbackValDestructor(void *privdata, void *val) {
93 ((void) privdata);
94 hi_free(val);
95}
96
97static dictType callbackDict = {
98 callbackHash,
99 NULL,
100 callbackValDup,
101 callbackKeyCompare,
102 callbackKeyDestructor,
103 callbackValDestructor
104};
105
106static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
107 redisAsyncContext *ac;
108 dict *channels = NULL, *patterns = NULL;
109
110 channels = dictCreate(&callbackDict,NULL);
111 if (channels == NULL)
112 goto oom;
113
114 patterns = dictCreate(&callbackDict,NULL);
115 if (patterns == NULL)
116 goto oom;
117
118 ac = hi_realloc(c,sizeof(redisAsyncContext));
119 if (ac == NULL)
120 goto oom;
121
122 c = &(ac->c);
123
124 /* The regular connect functions will always set the flag REDIS_CONNECTED.
125 * For the async API, we want to wait until the first write event is
126 * received up before setting this flag, so reset it here. */
127 c->flags &= ~REDIS_CONNECTED;
128
129 ac->err = 0;
130 ac->errstr = NULL;
131 ac->data = NULL;
132 ac->dataCleanup = NULL;
133
134 ac->ev.data = NULL;
135 ac->ev.addRead = NULL;
136 ac->ev.delRead = NULL;
137 ac->ev.addWrite = NULL;
138 ac->ev.delWrite = NULL;
139 ac->ev.cleanup = NULL;
140 ac->ev.scheduleTimer = NULL;
141
142 ac->onConnect = NULL;
143 ac->onDisconnect = NULL;
144
145 ac->replies.head = NULL;
146 ac->replies.tail = NULL;
147 ac->sub.replies.head = NULL;
148 ac->sub.replies.tail = NULL;
149 ac->sub.channels = channels;
150 ac->sub.patterns = patterns;
151
152 return ac;
153oom:
154 if (channels) dictRelease(channels);
155 if (patterns) dictRelease(patterns);
156 return NULL;
157}
158
159/* We want the error field to be accessible directly instead of requiring
160 * an indirection to the redisContext struct. */
161static void __redisAsyncCopyError(redisAsyncContext *ac) {
162 if (!ac)
163 return;
164
165 redisContext *c = &(ac->c);
166 ac->err = c->err;
167 ac->errstr = c->errstr;
168}
169
170redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
171 redisOptions myOptions = *options;
172 redisContext *c;
173 redisAsyncContext *ac;
174
175 /* Clear any erroneously set sync callback and flag that we don't want to
176 * use freeReplyObject by default. */
177 myOptions.push_cb = NULL;
178 myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
179
180 myOptions.options |= REDIS_OPT_NONBLOCK;
181 c = redisConnectWithOptions(&myOptions);
182 if (c == NULL) {
183 return NULL;
184 }
185
186 ac = redisAsyncInitialize(c);
187 if (ac == NULL) {
188 redisFree(c);
189 return NULL;
190 }
191
192 /* Set any configured async push handler */
193 redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
194
195 __redisAsyncCopyError(ac);
196 return ac;
197}
198
199redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
200 redisOptions options = {0};
201 REDIS_OPTIONS_SET_TCP(&options, ip, port);
202 return redisAsyncConnectWithOptions(&options);
203}
204
205redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
206 const char *source_addr) {
207 redisOptions options = {0};
208 REDIS_OPTIONS_SET_TCP(&options, ip, port);
209 options.endpoint.tcp.source_addr = source_addr;
210 return redisAsyncConnectWithOptions(&options);
211}
212
213redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
214 const char *source_addr) {
215 redisOptions options = {0};
216 REDIS_OPTIONS_SET_TCP(&options, ip, port);
217 options.options |= REDIS_OPT_REUSEADDR;
218 options.endpoint.tcp.source_addr = source_addr;
219 return redisAsyncConnectWithOptions(&options);
220}
221
222redisAsyncContext *redisAsyncConnectUnix(const char *path) {
223 redisOptions options = {0};
224 REDIS_OPTIONS_SET_UNIX(&options, path);
225 return redisAsyncConnectWithOptions(&options);
226}
227
228int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
229 if (ac->onConnect == NULL) {
230 ac->onConnect = fn;
231
232 /* The common way to detect an established connection is to wait for
233 * the first write event to be fired. This assumes the related event
234 * library functions are already set. */
235 _EL_ADD_WRITE(ac);
236 return REDIS_OK;
237 }
238 return REDIS_ERR;
239}
240
241int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
242 if (ac->onDisconnect == NULL) {
243 ac->onDisconnect = fn;
244 return REDIS_OK;
245 }
246 return REDIS_ERR;
247}
248
249/* Helper functions to push/shift callbacks */
250static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
251 redisCallback *cb;
252
253 /* Copy callback from stack to heap */
254 cb = hi_malloc(sizeof(*cb));
255 if (cb == NULL)
256 return REDIS_ERR_OOM;
257
258 if (source != NULL) {
259 memcpy(cb,source,sizeof(*cb));
260 cb->next = NULL;
261 }
262
263 /* Store callback in list */
264 if (list->head == NULL)
265 list->head = cb;
266 if (list->tail != NULL)
267 list->tail->next = cb;
268 list->tail = cb;
269 return REDIS_OK;
270}
271
272static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
273 redisCallback *cb = list->head;
274 if (cb != NULL) {
275 list->head = cb->next;
276 if (cb == list->tail)
277 list->tail = NULL;
278
279 /* Copy callback from heap to stack */
280 if (target != NULL)
281 memcpy(target,cb,sizeof(*cb));
282 hi_free(cb);
283 return REDIS_OK;
284 }
285 return REDIS_ERR;
286}
287
288static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
289 redisContext *c = &(ac->c);
290 if (cb->fn != NULL) {
291 c->flags |= REDIS_IN_CALLBACK;
292 cb->fn(ac,reply,cb->privdata);
293 c->flags &= ~REDIS_IN_CALLBACK;
294 }
295}
296
297static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
298 if (ac->push_cb != NULL) {
299 ac->c.flags |= REDIS_IN_CALLBACK;
300 ac->push_cb(ac, reply);
301 ac->c.flags &= ~REDIS_IN_CALLBACK;
302 }
303}
304
305/* Helper function to free the context. */
306static void __redisAsyncFree(redisAsyncContext *ac) {
307 redisContext *c = &(ac->c);
308 redisCallback cb;
309 dictIterator it;
310 dictEntry *de;
311
312 /* Execute pending callbacks with NULL reply. */
313 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
314 __redisRunCallback(ac,&cb,NULL);
315 while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
316 __redisRunCallback(ac,&cb,NULL);
317
318 /* Run subscription callbacks with NULL reply */
319 if (ac->sub.channels) {
320 dictInitIterator(&it,ac->sub.channels);
321 while ((de = dictNext(&it)) != NULL)
322 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
323
324 dictRelease(ac->sub.channels);
325 }
326
327 if (ac->sub.patterns) {
328 dictInitIterator(&it,ac->sub.patterns);
329 while ((de = dictNext(&it)) != NULL)
330 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
331
332 dictRelease(ac->sub.patterns);
333 }
334
335 /* Signal event lib to clean up */
336 _EL_CLEANUP(ac);
337
338 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
339 * this context, the status will always be REDIS_OK. */
340 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
341 if (c->flags & REDIS_FREEING) {
342 ac->onDisconnect(ac,REDIS_OK);
343 } else {
344 ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
345 }
346 }
347
348 if (ac->dataCleanup) {
349 ac->dataCleanup(ac->data);
350 }
351
352 /* Cleanup self */
353 redisFree(c);
354}
355
356/* Free the async context. When this function is called from a callback,
357 * control needs to be returned to redisProcessCallbacks() before actual
358 * free'ing. To do so, a flag is set on the context which is picked up by
359 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
360void redisAsyncFree(redisAsyncContext *ac) {
361 redisContext *c = &(ac->c);
362 c->flags |= REDIS_FREEING;
363 if (!(c->flags & REDIS_IN_CALLBACK))
364 __redisAsyncFree(ac);
365}
366
367/* Helper function to make the disconnect happen and clean up. */
368void __redisAsyncDisconnect(redisAsyncContext *ac) {
369 redisContext *c = &(ac->c);
370
371 /* Make sure error is accessible if there is any */
372 __redisAsyncCopyError(ac);
373
374 if (ac->err == 0) {
375 /* For clean disconnects, there should be no pending callbacks. */
376 int ret = __redisShiftCallback(&ac->replies,NULL);
377 assert(ret == REDIS_ERR);
378 } else {
379 /* Disconnection is caused by an error, make sure that pending
380 * callbacks cannot call new commands. */
381 c->flags |= REDIS_DISCONNECTING;
382 }
383
384 /* cleanup event library on disconnect.
385 * this is safe to call multiple times */
386 _EL_CLEANUP(ac);
387
388 /* For non-clean disconnects, __redisAsyncFree() will execute pending
389 * callbacks with a NULL-reply. */
390 if (!(c->flags & REDIS_NO_AUTO_FREE)) {
391 __redisAsyncFree(ac);
392 }
393}
394
395/* Tries to do a clean disconnect from Redis, meaning it stops new commands
396 * from being issued, but tries to flush the output buffer and execute
397 * callbacks for all remaining replies. When this function is called from a
398 * callback, there might be more replies and we can safely defer disconnecting
399 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
400 * when there are no pending callbacks. */
401void redisAsyncDisconnect(redisAsyncContext *ac) {
402 redisContext *c = &(ac->c);
403 c->flags |= REDIS_DISCONNECTING;
404
405 /** unset the auto-free flag here, because disconnect undoes this */
406 c->flags &= ~REDIS_NO_AUTO_FREE;
407 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
408 __redisAsyncDisconnect(ac);
409}
410
411static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
412 redisContext *c = &(ac->c);
413 dict *callbacks;
414 redisCallback *cb;
415 dictEntry *de;
416 int pvariant;
417 char *stype;
418 hisds sname;
419
420 /* Match reply with the expected format of a pushed message.
421 * The type and number of elements (3 to 4) are specified at:
422 * https://redis.io/topics/pubsub#format-of-pushed-messages */
423 if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
424 reply->type == REDIS_REPLY_PUSH) {
425 assert(reply->element[0]->type == REDIS_REPLY_STRING);
426 stype = reply->element[0]->str;
427 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
428
429 if (pvariant)
430 callbacks = ac->sub.patterns;
431 else
432 callbacks = ac->sub.channels;
433
434 /* Locate the right callback */
435 assert(reply->element[1]->type == REDIS_REPLY_STRING);
436 sname = hi_sdsnewlen(reply->element[1]->str,reply->element[1]->len);
437 if (sname == NULL)
438 goto oom;
439
440 de = dictFind(callbacks,sname);
441 if (de != NULL) {
442 cb = dictGetEntryVal(de);
443
444 /* If this is an subscribe reply decrease pending counter. */
445 if (strcasecmp(stype+pvariant,"subscribe") == 0) {
446 cb->pending_subs -= 1;
447 }
448
449 memcpy(dstcb,cb,sizeof(*dstcb));
450
451 /* If this is an unsubscribe message, remove it. */
452 if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
453 if (cb->pending_subs == 0)
454 dictDelete(callbacks,sname);
455
456 /* If this was the last unsubscribe message, revert to
457 * non-subscribe mode. */
458 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
459
460 /* Unset subscribed flag only when no pipelined pending subscribe. */
461 if (reply->element[2]->integer == 0
462 && dictSize(ac->sub.channels) == 0
463 && dictSize(ac->sub.patterns) == 0) {
464 c->flags &= ~REDIS_SUBSCRIBED;
465
466 /* Move ongoing regular command callbacks. */
467 redisCallback cb;
468 while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
469 __redisPushCallback(&ac->replies,&cb);
470 }
471 }
472 }
473 }
474 hi_sdsfree(sname);
475 } else {
476 /* Shift callback for pending command in subscribed context. */
477 __redisShiftCallback(&ac->sub.replies,dstcb);
478 }
479 return REDIS_OK;
480oom:
481 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
482 return REDIS_ERR;
483}
484
485#define redisIsSpontaneousPushReply(r) \
486 (redisIsPushReply(r) && !redisIsSubscribeReply(r))
487
488static int redisIsSubscribeReply(redisReply *reply) {
489 char *str;
490 size_t len, off;
491
492 /* We will always have at least one string with the subscribe/message type */
493 if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
494 reply->element[0]->len < sizeof("message") - 1)
495 {
496 return 0;
497 }
498
499 /* Get the string/len moving past 'p' if needed */
500 off = tolower(reply->element[0]->str[0]) == 'p';
501 str = reply->element[0]->str + off;
502 len = reply->element[0]->len - off;
503
504 return !strncasecmp(str, "subscribe", len) ||
505 !strncasecmp(str, "message", len) ||
506 !strncasecmp(str, "unsubscribe", len);
507}
508
509void redisProcessCallbacks(redisAsyncContext *ac) {
510 redisContext *c = &(ac->c);
511 void *reply = NULL;
512 int status;
513
514 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
515 if (reply == NULL) {
516 /* When the connection is being disconnected and there are
517 * no more replies, this is the cue to really disconnect. */
518 if (c->flags & REDIS_DISCONNECTING && hi_sdslen(c->obuf) == 0
519 && ac->replies.head == NULL) {
520 __redisAsyncDisconnect(ac);
521 return;
522 }
523 /* When the connection is not being disconnected, simply stop
524 * trying to get replies and wait for the next loop tick. */
525 break;
526 }
527
528 /* Keep track of push message support for subscribe handling */
529 if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
530
531 /* Send any non-subscribe related PUSH messages to our PUSH handler
532 * while allowing subscribe related PUSH messages to pass through.
533 * This allows existing code to be backward compatible and work in
534 * either RESP2 or RESP3 mode. */
535 if (redisIsSpontaneousPushReply(reply)) {
536 __redisRunPushCallback(ac, reply);
537 c->reader->fn->freeObject(reply);
538 continue;
539 }
540
541 /* Even if the context is subscribed, pending regular
542 * callbacks will get a reply before pub/sub messages arrive. */
543 redisCallback cb = {NULL, NULL, 0, NULL};
544 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
545 /*
546 * A spontaneous reply in a not-subscribed context can be the error
547 * reply that is sent when a new connection exceeds the maximum
548 * number of allowed connections on the server side.
549 *
550 * This is seen as an error instead of a regular reply because the
551 * server closes the connection after sending it.
552 *
553 * To prevent the error from being overwritten by an EOF error the
554 * connection is closed here. See issue #43.
555 *
556 * Another possibility is that the server is loading its dataset.
557 * In this case we also want to close the connection, and have the
558 * user wait until the server is ready to take our request.
559 */
560 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
561 c->err = REDIS_ERR_OTHER;
562 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
563 c->reader->fn->freeObject(reply);
564 __redisAsyncDisconnect(ac);
565 return;
566 }
567 /* No more regular callbacks and no errors, the context *must* be subscribed. */
568 assert(c->flags & REDIS_SUBSCRIBED);
569 if (c->flags & REDIS_SUBSCRIBED)
570 __redisGetSubscribeCallback(ac,reply,&cb);
571 }
572
573 if (cb.fn != NULL) {
574 __redisRunCallback(ac,&cb,reply);
575 if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
576 c->reader->fn->freeObject(reply);
577 }
578
579 /* Proceed with free'ing when redisAsyncFree() was called. */
580 if (c->flags & REDIS_FREEING) {
581 __redisAsyncFree(ac);
582 return;
583 }
584 } else {
585 /* No callback for this reply. This can either be a NULL callback,
586 * or there were no callbacks to begin with. Either way, don't
587 * abort with an error, but simply ignore it because the client
588 * doesn't know what the server will spit out over the wire. */
589 c->reader->fn->freeObject(reply);
590 }
591
592 /* If in monitor mode, repush the callback */
593 if (c->flags & REDIS_MONITORING) {
594 __redisPushCallback(&ac->replies,&cb);
595 }
596 }
597
598 /* Disconnect when there was an error reading the reply */
599 if (status != REDIS_OK)
600 __redisAsyncDisconnect(ac);
601}
602
603static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
604 if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
605 __redisAsyncDisconnect(ac);
606}
607
608/* Internal helper function to detect socket status the first time a read or
609 * write event fires. When connecting was not successful, the connect callback
610 * is called with a REDIS_ERR status and the context is free'd. */
611static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
612 int completed = 0;
613 redisContext *c = &(ac->c);
614
615 if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
616 /* Error! */
617 if (redisCheckSocketError(c) == REDIS_ERR)
618 __redisAsyncCopyError(ac);
619 __redisAsyncHandleConnectFailure(ac);
620 return REDIS_ERR;
621 } else if (completed == 1) {
622 /* connected! */
623 if (c->connection_type == REDIS_CONN_TCP &&
624 redisSetTcpNoDelay(c) == REDIS_ERR) {
625 __redisAsyncHandleConnectFailure(ac);
626 return REDIS_ERR;
627 }
628
629 if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
630 c->flags |= REDIS_CONNECTED;
631 return REDIS_OK;
632 } else {
633 return REDIS_OK;
634 }
635}
636
637void redisAsyncRead(redisAsyncContext *ac) {
638 redisContext *c = &(ac->c);
639
640 if (redisBufferRead(c) == REDIS_ERR) {
641 __redisAsyncDisconnect(ac);
642 } else {
643 /* Always re-schedule reads */
644 _EL_ADD_READ(ac);
645 redisProcessCallbacks(ac);
646 }
647}
648
649/* This function should be called when the socket is readable.
650 * It processes all replies that can be read and executes their callbacks.
651 */
652void redisAsyncHandleRead(redisAsyncContext *ac) {
653 redisContext *c = &(ac->c);
654
655 if (!(c->flags & REDIS_CONNECTED)) {
656 /* Abort connect was not successful. */
657 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
658 return;
659 /* Try again later when the context is still not connected. */
660 if (!(c->flags & REDIS_CONNECTED))
661 return;
662 }
663
664 c->funcs->async_read(ac);
665}
666
667void redisAsyncWrite(redisAsyncContext *ac) {
668 redisContext *c = &(ac->c);
669 int done = 0;
670
671 if (redisBufferWrite(c,&done) == REDIS_ERR) {
672 __redisAsyncDisconnect(ac);
673 } else {
674 /* Continue writing when not done, stop writing otherwise */
675 if (!done)
676 _EL_ADD_WRITE(ac);
677 else
678 _EL_DEL_WRITE(ac);
679
680 /* Always schedule reads after writes */
681 _EL_ADD_READ(ac);
682 }
683}
684
685void redisAsyncHandleWrite(redisAsyncContext *ac) {
686 redisContext *c = &(ac->c);
687
688 if (!(c->flags & REDIS_CONNECTED)) {
689 /* Abort connect was not successful. */
690 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
691 return;
692 /* Try again later when the context is still not connected. */
693 if (!(c->flags & REDIS_CONNECTED))
694 return;
695 }
696
697 c->funcs->async_write(ac);
698}
699
700void redisAsyncHandleTimeout(redisAsyncContext *ac) {
701 redisContext *c = &(ac->c);
702 redisCallback cb;
703
704 if ((c->flags & REDIS_CONNECTED)) {
705 if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
706 /* Nothing to do - just an idle timeout */
707 return;
708 }
709
710 if (!ac->c.command_timeout ||
711 (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) {
712 /* A belated connect timeout arriving, ignore */
713 return;
714 }
715 }
716
717 if (!c->err) {
718 __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
719 __redisAsyncCopyError(ac);
720 }
721
722 if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
723 ac->onConnect(ac, REDIS_ERR);
724 }
725
726 while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
727 __redisRunCallback(ac, &cb, NULL);
728 }
729
730 /**
731 * TODO: Don't automatically sever the connection,
732 * rather, allow to ignore <x> responses before the queue is clear
733 */
734 __redisAsyncDisconnect(ac);
735}
736
737/* Sets a pointer to the first argument and its length starting at p. Returns
738 * the number of bytes to skip to get to the following argument. */
739static const char *nextArgument(const char *start, const char **str, size_t *len) {
740 const char *p = start;
741 if (p[0] != '$') {
742 p = strchr(p,'$');
743 if (p == NULL) return NULL;
744 }
745
746 *len = (int)strtol(p+1,NULL,10);
747 p = strchr(p,'\r');
748 assert(p);
749 *str = p+2;
750 return p+2+(*len)+2;
751}
752
753/* Helper function for the redisAsyncCommand* family of functions. Writes a
754 * formatted command to the output buffer and registers the provided callback
755 * function with the context. */
756static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
757 redisContext *c = &(ac->c);
758 redisCallback cb;
759 struct dict *cbdict;
760 dictEntry *de;
761 redisCallback *existcb;
762 int pvariant, hasnext;
763 const char *cstr, *astr;
764 size_t clen, alen;
765 const char *p;
766 hisds sname;
767 int ret;
768
769 /* Don't accept new commands when the connection is about to be closed. */
770 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
771
772 /* Setup callback */
773 cb.fn = fn;
774 cb.privdata = privdata;
775 cb.pending_subs = 1;
776
777 /* Find out which command will be appended. */
778 p = nextArgument(cmd,&cstr,&clen);
779 assert(p != NULL);
780 hasnext = (p[0] == '$');
781 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
782 cstr += pvariant;
783 clen -= pvariant;
784
785 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
786 c->flags |= REDIS_SUBSCRIBED;
787
788 /* Add every channel/pattern to the list of subscription callbacks. */
789 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
790 sname = hi_sdsnewlen(astr,alen);
791 if (sname == NULL)
792 goto oom;
793
794 if (pvariant)
795 cbdict = ac->sub.patterns;
796 else
797 cbdict = ac->sub.channels;
798
799 de = dictFind(cbdict,sname);
800
801 if (de != NULL) {
802 existcb = dictGetEntryVal(de);
803 cb.pending_subs = existcb->pending_subs + 1;
804 }
805
806 ret = dictReplace(cbdict,sname,&cb);
807
808 if (ret == 0) hi_sdsfree(sname);
809 }
810 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
811 /* It is only useful to call (P)UNSUBSCRIBE when the context is
812 * subscribed to one or more channels or patterns. */
813 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
814
815 /* (P)UNSUBSCRIBE does not have its own response: every channel or
816 * pattern that is unsubscribed will receive a message. This means we
817 * should not append a callback function for this command. */
818 } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
819 /* Set monitor flag and push callback */
820 c->flags |= REDIS_MONITORING;
821 if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
822 goto oom;
823 } else {
824 if (c->flags & REDIS_SUBSCRIBED) {
825 if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
826 goto oom;
827 } else {
828 if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
829 goto oom;
830 }
831 }
832
833 __redisAppendCommand(c,cmd,len);
834
835 /* Always schedule a write when the write buffer is non-empty */
836 _EL_ADD_WRITE(ac);
837
838 return REDIS_OK;
839oom:
840 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
841 __redisAsyncCopyError(ac);
842 return REDIS_ERR;
843}
844
845int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
846 char *cmd;
847 int len;
848 int status;
849 len = redisvFormatCommand(&cmd,format,ap);
850
851 /* We don't want to pass -1 or -2 to future functions as a length. */
852 if (len < 0)
853 return REDIS_ERR;
854
855 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
856 hi_free(cmd);
857 return status;
858}
859
860int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
861 va_list ap;
862 int status;
863 va_start(ap,format);
864 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
865 va_end(ap);
866 return status;
867}
868
869int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
870 hisds cmd;
871 long long len;
872 int status;
873 len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
874 if (len < 0)
875 return REDIS_ERR;
876 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
877 hi_sdsfree(cmd);
878 return status;
879}
880
881int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
882 int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
883 return status;
884}
885
886redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
887 redisAsyncPushFn *old = ac->push_cb;
888 ac->push_cb = fn;
889 return old;
890}
891
892int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
893 if (!ac->c.command_timeout) {
894 ac->c.command_timeout = hi_calloc(1, sizeof(tv));
895 if (ac->c.command_timeout == NULL) {
896 __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
897 __redisAsyncCopyError(ac);
898 return REDIS_ERR;
899 }
900 }
901
902 if (tv.tv_sec != ac->c.command_timeout->tv_sec ||
903 tv.tv_usec != ac->c.command_timeout->tv_usec)
904 {
905 *ac->c.command_timeout = tv;
906 }
907
908 return REDIS_OK;
909}
910