1/*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2014, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * Copyright (c) 2015, Matt Stancliff <matt at genges dot com>,
5 * Jan-Erik Rediger <janerik at fnordig dot com>
6 *
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 * * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 * * Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
17 * * Neither the name of Redis nor the names of its contributors may be used
18 * to endorse or promote products derived from this software without
19 * specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
25 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 * POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "fmacros.h"
35#include <string.h>
36#include <stdlib.h>
37#include <assert.h>
38#include <errno.h>
39#include <ctype.h>
40
41#include "hiredis.h"
42#include "net.h"
43#include "sds.h"
44#include "async.h"
45#include "win32.h"
46
47extern int redisContextUpdateConnectTimeout(redisContext *c, const struct timeval *timeout);
48extern int redisContextUpdateCommandTimeout(redisContext *c, const struct timeval *timeout);
49
50static redisContextFuncs redisContextDefaultFuncs = {
51 .free_privctx = NULL,
52 .async_read = redisAsyncRead,
53 .async_write = redisAsyncWrite,
54 .read = redisNetRead,
55 .write = redisNetWrite
56};
57
58static redisReply *createReplyObject(int type);
59static void *createStringObject(const redisReadTask *task, char *str, size_t len);
60static void *createArrayObject(const redisReadTask *task, size_t elements);
61static void *createIntegerObject(const redisReadTask *task, long long value);
62static void *createDoubleObject(const redisReadTask *task, double value, char *str, size_t len);
63static void *createNilObject(const redisReadTask *task);
64static void *createBoolObject(const redisReadTask *task, int bval);
65
66/* Default set of functions to build the reply. Keep in mind that such a
67 * function returning NULL is interpreted as OOM. */
68static redisReplyObjectFunctions defaultFunctions = {
69 createStringObject,
70 createArrayObject,
71 createIntegerObject,
72 createDoubleObject,
73 createNilObject,
74 createBoolObject,
75 freeReplyObject
76};
77
78/* Create a reply object */
79static redisReply *createReplyObject(int type) {
80 redisReply *r = hi_calloc(1,sizeof(*r));
81
82 if (r == NULL)
83 return NULL;
84
85 r->type = type;
86 return r;
87}
88
89/* Free a reply object */
90void freeReplyObject(void *reply) {
91 redisReply *r = reply;
92 size_t j;
93
94 if (r == NULL)
95 return;
96
97 switch(r->type) {
98 case REDIS_REPLY_INTEGER:
99 case REDIS_REPLY_NIL:
100 case REDIS_REPLY_BOOL:
101 break; /* Nothing to free */
102 case REDIS_REPLY_ARRAY:
103 case REDIS_REPLY_MAP:
104 case REDIS_REPLY_SET:
105 case REDIS_REPLY_PUSH:
106 if (r->element != NULL) {
107 for (j = 0; j < r->elements; j++)
108 freeReplyObject(r->element[j]);
109 hi_free(r->element);
110 }
111 break;
112 case REDIS_REPLY_ERROR:
113 case REDIS_REPLY_STATUS:
114 case REDIS_REPLY_STRING:
115 case REDIS_REPLY_DOUBLE:
116 case REDIS_REPLY_VERB:
117 case REDIS_REPLY_BIGNUM:
118 hi_free(r->str);
119 break;
120 }
121 hi_free(r);
122}
123
124static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
125 redisReply *r, *parent;
126 char *buf;
127
128 r = createReplyObject(task->type);
129 if (r == NULL)
130 return NULL;
131
132 assert(task->type == REDIS_REPLY_ERROR ||
133 task->type == REDIS_REPLY_STATUS ||
134 task->type == REDIS_REPLY_STRING ||
135 task->type == REDIS_REPLY_VERB ||
136 task->type == REDIS_REPLY_BIGNUM);
137
138 /* Copy string value */
139 if (task->type == REDIS_REPLY_VERB) {
140 buf = hi_malloc(len-4+1); /* Skip 4 bytes of verbatim type header. */
141 if (buf == NULL) goto oom;
142
143 memcpy(r->vtype,str,3);
144 r->vtype[3] = '\0';
145 memcpy(buf,str+4,len-4);
146 buf[len-4] = '\0';
147 r->len = len - 4;
148 } else {
149 buf = hi_malloc(len+1);
150 if (buf == NULL) goto oom;
151
152 memcpy(buf,str,len);
153 buf[len] = '\0';
154 r->len = len;
155 }
156 r->str = buf;
157
158 if (task->parent) {
159 parent = task->parent->obj;
160 assert(parent->type == REDIS_REPLY_ARRAY ||
161 parent->type == REDIS_REPLY_MAP ||
162 parent->type == REDIS_REPLY_SET ||
163 parent->type == REDIS_REPLY_PUSH);
164 parent->element[task->idx] = r;
165 }
166 return r;
167
168oom:
169 freeReplyObject(r);
170 return NULL;
171}
172
173static void *createArrayObject(const redisReadTask *task, size_t elements) {
174 redisReply *r, *parent;
175
176 r = createReplyObject(task->type);
177 if (r == NULL)
178 return NULL;
179
180 if (elements > 0) {
181 r->element = hi_calloc(elements,sizeof(redisReply*));
182 if (r->element == NULL) {
183 freeReplyObject(r);
184 return NULL;
185 }
186 }
187
188 r->elements = elements;
189
190 if (task->parent) {
191 parent = task->parent->obj;
192 assert(parent->type == REDIS_REPLY_ARRAY ||
193 parent->type == REDIS_REPLY_MAP ||
194 parent->type == REDIS_REPLY_SET ||
195 parent->type == REDIS_REPLY_PUSH);
196 parent->element[task->idx] = r;
197 }
198 return r;
199}
200
201static void *createIntegerObject(const redisReadTask *task, long long value) {
202 redisReply *r, *parent;
203
204 r = createReplyObject(REDIS_REPLY_INTEGER);
205 if (r == NULL)
206 return NULL;
207
208 r->integer = value;
209
210 if (task->parent) {
211 parent = task->parent->obj;
212 assert(parent->type == REDIS_REPLY_ARRAY ||
213 parent->type == REDIS_REPLY_MAP ||
214 parent->type == REDIS_REPLY_SET ||
215 parent->type == REDIS_REPLY_PUSH);
216 parent->element[task->idx] = r;
217 }
218 return r;
219}
220
221static void *createDoubleObject(const redisReadTask *task, double value, char *str, size_t len) {
222 redisReply *r, *parent;
223
224 r = createReplyObject(REDIS_REPLY_DOUBLE);
225 if (r == NULL)
226 return NULL;
227
228 r->dval = value;
229 r->str = hi_malloc(len+1);
230 if (r->str == NULL) {
231 freeReplyObject(r);
232 return NULL;
233 }
234
235 /* The double reply also has the original protocol string representing a
236 * double as a null terminated string. This way the caller does not need
237 * to format back for string conversion, especially since Redis does efforts
238 * to make the string more human readable avoiding the calssical double
239 * decimal string conversion artifacts. */
240 memcpy(r->str, str, len);
241 r->str[len] = '\0';
242 r->len = len;
243
244 if (task->parent) {
245 parent = task->parent->obj;
246 assert(parent->type == REDIS_REPLY_ARRAY ||
247 parent->type == REDIS_REPLY_MAP ||
248 parent->type == REDIS_REPLY_SET ||
249 parent->type == REDIS_REPLY_PUSH);
250 parent->element[task->idx] = r;
251 }
252 return r;
253}
254
255static void *createNilObject(const redisReadTask *task) {
256 redisReply *r, *parent;
257
258 r = createReplyObject(REDIS_REPLY_NIL);
259 if (r == NULL)
260 return NULL;
261
262 if (task->parent) {
263 parent = task->parent->obj;
264 assert(parent->type == REDIS_REPLY_ARRAY ||
265 parent->type == REDIS_REPLY_MAP ||
266 parent->type == REDIS_REPLY_SET ||
267 parent->type == REDIS_REPLY_PUSH);
268 parent->element[task->idx] = r;
269 }
270 return r;
271}
272
273static void *createBoolObject(const redisReadTask *task, int bval) {
274 redisReply *r, *parent;
275
276 r = createReplyObject(REDIS_REPLY_BOOL);
277 if (r == NULL)
278 return NULL;
279
280 r->integer = bval != 0;
281
282 if (task->parent) {
283 parent = task->parent->obj;
284 assert(parent->type == REDIS_REPLY_ARRAY ||
285 parent->type == REDIS_REPLY_MAP ||
286 parent->type == REDIS_REPLY_SET ||
287 parent->type == REDIS_REPLY_PUSH);
288 parent->element[task->idx] = r;
289 }
290 return r;
291}
292
293/* Return the number of digits of 'v' when converted to string in radix 10.
294 * Implementation borrowed from link in redis/src/util.c:string2ll(). */
295static uint32_t countDigits(uint64_t v) {
296 uint32_t result = 1;
297 for (;;) {
298 if (v < 10) return result;
299 if (v < 100) return result + 1;
300 if (v < 1000) return result + 2;
301 if (v < 10000) return result + 3;
302 v /= 10000U;
303 result += 4;
304 }
305}
306
307/* Helper that calculates the bulk length given a certain string length. */
308static size_t bulklen(size_t len) {
309 return 1+countDigits(len)+2+len+2;
310}
311
312int redisvFormatCommand(char **target, const char *format, va_list ap) {
313 const char *c = format;
314 char *cmd = NULL; /* final command */
315 int pos; /* position in final command */
316 hisds curarg, newarg; /* current argument */
317 int touched = 0; /* was the current argument touched? */
318 char **curargv = NULL, **newargv = NULL;
319 int argc = 0;
320 int totlen = 0;
321 int error_type = 0; /* 0 = no error; -1 = memory error; -2 = format error */
322 int j;
323
324 /* Abort if there is not target to set */
325 if (target == NULL)
326 return -1;
327
328 /* Build the command string accordingly to protocol */
329 curarg = hi_sdsempty();
330 if (curarg == NULL)
331 return -1;
332
333 while(*c != '\0') {
334 if (*c != '%' || c[1] == '\0') {
335 if (*c == ' ') {
336 if (touched) {
337 newargv = hi_realloc(curargv,sizeof(char*)*(argc+1));
338 if (newargv == NULL) goto memory_err;
339 curargv = newargv;
340 curargv[argc++] = curarg;
341 totlen += bulklen(hi_sdslen(curarg));
342
343 /* curarg is put in argv so it can be overwritten. */
344 curarg = hi_sdsempty();
345 if (curarg == NULL) goto memory_err;
346 touched = 0;
347 }
348 } else {
349 newarg = hi_sdscatlen(curarg,c,1);
350 if (newarg == NULL) goto memory_err;
351 curarg = newarg;
352 touched = 1;
353 }
354 } else {
355 char *arg;
356 size_t size;
357
358 /* Set newarg so it can be checked even if it is not touched. */
359 newarg = curarg;
360
361 switch(c[1]) {
362 case 's':
363 arg = va_arg(ap,char*);
364 size = strlen(arg);
365 if (size > 0)
366 newarg = hi_sdscatlen(curarg,arg,size);
367 break;
368 case 'b':
369 arg = va_arg(ap,char*);
370 size = va_arg(ap,size_t);
371 if (size > 0)
372 newarg = hi_sdscatlen(curarg,arg,size);
373 break;
374 case '%':
375 newarg = hi_sdscat(curarg,"%");
376 break;
377 default:
378 /* Try to detect printf format */
379 {
380 static const char intfmts[] = "diouxX";
381 static const char flags[] = "#0-+ ";
382 char _format[16];
383 const char *_p = c+1;
384 size_t _l = 0;
385 va_list _cpy;
386
387 /* Flags */
388 while (*_p != '\0' && strchr(flags,*_p) != NULL) _p++;
389
390 /* Field width */
391 while (*_p != '\0' && isdigit(*_p)) _p++;
392
393 /* Precision */
394 if (*_p == '.') {
395 _p++;
396 while (*_p != '\0' && isdigit(*_p)) _p++;
397 }
398
399 /* Copy va_list before consuming with va_arg */
400 va_copy(_cpy,ap);
401
402 /* Integer conversion (without modifiers) */
403 if (strchr(intfmts,*_p) != NULL) {
404 va_arg(ap,int);
405 goto fmt_valid;
406 }
407
408 /* Double conversion (without modifiers) */
409 if (strchr("eEfFgGaA",*_p) != NULL) {
410 va_arg(ap,double);
411 goto fmt_valid;
412 }
413
414 /* Size: char */
415 if (_p[0] == 'h' && _p[1] == 'h') {
416 _p += 2;
417 if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
418 va_arg(ap,int); /* char gets promoted to int */
419 goto fmt_valid;
420 }
421 goto fmt_invalid;
422 }
423
424 /* Size: short */
425 if (_p[0] == 'h') {
426 _p += 1;
427 if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
428 va_arg(ap,int); /* short gets promoted to int */
429 goto fmt_valid;
430 }
431 goto fmt_invalid;
432 }
433
434 /* Size: long long */
435 if (_p[0] == 'l' && _p[1] == 'l') {
436 _p += 2;
437 if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
438 va_arg(ap,long long);
439 goto fmt_valid;
440 }
441 goto fmt_invalid;
442 }
443
444 /* Size: long */
445 if (_p[0] == 'l') {
446 _p += 1;
447 if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
448 va_arg(ap,long);
449 goto fmt_valid;
450 }
451 goto fmt_invalid;
452 }
453
454 fmt_invalid:
455 va_end(_cpy);
456 goto format_err;
457
458 fmt_valid:
459 _l = (_p+1)-c;
460 if (_l < sizeof(_format)-2) {
461 memcpy(_format,c,_l);
462 _format[_l] = '\0';
463 newarg = hi_sdscatvprintf(curarg,_format,_cpy);
464
465 /* Update current position (note: outer blocks
466 * increment c twice so compensate here) */
467 c = _p-1;
468 }
469
470 va_end(_cpy);
471 break;
472 }
473 }
474
475 if (newarg == NULL) goto memory_err;
476 curarg = newarg;
477
478 touched = 1;
479 c++;
480 }
481 c++;
482 }
483
484 /* Add the last argument if needed */
485 if (touched) {
486 newargv = hi_realloc(curargv,sizeof(char*)*(argc+1));
487 if (newargv == NULL) goto memory_err;
488 curargv = newargv;
489 curargv[argc++] = curarg;
490 totlen += bulklen(hi_sdslen(curarg));
491 } else {
492 hi_sdsfree(curarg);
493 }
494
495 /* Clear curarg because it was put in curargv or was free'd. */
496 curarg = NULL;
497
498 /* Add bytes needed to hold multi bulk count */
499 totlen += 1+countDigits(argc)+2;
500
501 /* Build the command at protocol level */
502 cmd = hi_malloc(totlen+1);
503 if (cmd == NULL) goto memory_err;
504
505 pos = sprintf(cmd,"*%d\r\n",argc);
506 for (j = 0; j < argc; j++) {
507 pos += sprintf(cmd+pos,"$%zu\r\n",hi_sdslen(curargv[j]));
508 memcpy(cmd+pos,curargv[j],hi_sdslen(curargv[j]));
509 pos += hi_sdslen(curargv[j]);
510 hi_sdsfree(curargv[j]);
511 cmd[pos++] = '\r';
512 cmd[pos++] = '\n';
513 }
514 assert(pos == totlen);
515 cmd[pos] = '\0';
516
517 hi_free(curargv);
518 *target = cmd;
519 return totlen;
520
521format_err:
522 error_type = -2;
523 goto cleanup;
524
525memory_err:
526 error_type = -1;
527 goto cleanup;
528
529cleanup:
530 if (curargv) {
531 while(argc--)
532 hi_sdsfree(curargv[argc]);
533 hi_free(curargv);
534 }
535
536 hi_sdsfree(curarg);
537 hi_free(cmd);
538
539 return error_type;
540}
541
542/* Format a command according to the Redis protocol. This function
543 * takes a format similar to printf:
544 *
545 * %s represents a C null terminated string you want to interpolate
546 * %b represents a binary safe string
547 *
548 * When using %b you need to provide both the pointer to the string
549 * and the length in bytes as a size_t. Examples:
550 *
551 * len = redisFormatCommand(target, "GET %s", mykey);
552 * len = redisFormatCommand(target, "SET %s %b", mykey, myval, myvallen);
553 */
554int redisFormatCommand(char **target, const char *format, ...) {
555 va_list ap;
556 int len;
557 va_start(ap,format);
558 len = redisvFormatCommand(target,format,ap);
559 va_end(ap);
560
561 /* The API says "-1" means bad result, but we now also return "-2" in some
562 * cases. Force the return value to always be -1. */
563 if (len < 0)
564 len = -1;
565
566 return len;
567}
568
569/* Format a command according to the Redis protocol using an hisds string and
570 * hi_sdscatfmt for the processing of arguments. This function takes the
571 * number of arguments, an array with arguments and an array with their
572 * lengths. If the latter is set to NULL, strlen will be used to compute the
573 * argument lengths.
574 */
575long long redisFormatSdsCommandArgv(hisds *target, int argc, const char **argv,
576 const size_t *argvlen)
577{
578 hisds cmd, aux;
579 unsigned long long totlen, len;
580 int j;
581
582 /* Abort on a NULL target */
583 if (target == NULL)
584 return -1;
585
586 /* Calculate our total size */
587 totlen = 1+countDigits(argc)+2;
588 for (j = 0; j < argc; j++) {
589 len = argvlen ? argvlen[j] : strlen(argv[j]);
590 totlen += bulklen(len);
591 }
592
593 /* Use an SDS string for command construction */
594 cmd = hi_sdsempty();
595 if (cmd == NULL)
596 return -1;
597
598 /* We already know how much storage we need */
599 aux = hi_sdsMakeRoomFor(cmd, totlen);
600 if (aux == NULL) {
601 hi_sdsfree(cmd);
602 return -1;
603 }
604
605 cmd = aux;
606
607 /* Construct command */
608 cmd = hi_sdscatfmt(cmd, "*%i\r\n", argc);
609 for (j=0; j < argc; j++) {
610 len = argvlen ? argvlen[j] : strlen(argv[j]);
611 cmd = hi_sdscatfmt(cmd, "$%U\r\n", len);
612 cmd = hi_sdscatlen(cmd, argv[j], len);
613 cmd = hi_sdscatlen(cmd, "\r\n", sizeof("\r\n")-1);
614 }
615
616 assert(hi_sdslen(cmd)==totlen);
617
618 *target = cmd;
619 return totlen;
620}
621
622void redisFreeSdsCommand(hisds cmd) {
623 hi_sdsfree(cmd);
624}
625
626/* Format a command according to the Redis protocol. This function takes the
627 * number of arguments, an array with arguments and an array with their
628 * lengths. If the latter is set to NULL, strlen will be used to compute the
629 * argument lengths.
630 */
631long long redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
632 char *cmd = NULL; /* final command */
633 size_t pos; /* position in final command */
634 size_t len, totlen;
635 int j;
636
637 /* Abort on a NULL target */
638 if (target == NULL)
639 return -1;
640
641 /* Calculate number of bytes needed for the command */
642 totlen = 1+countDigits(argc)+2;
643 for (j = 0; j < argc; j++) {
644 len = argvlen ? argvlen[j] : strlen(argv[j]);
645 totlen += bulklen(len);
646 }
647
648 /* Build the command at protocol level */
649 cmd = hi_malloc(totlen+1);
650 if (cmd == NULL)
651 return -1;
652
653 pos = sprintf(cmd,"*%d\r\n",argc);
654 for (j = 0; j < argc; j++) {
655 len = argvlen ? argvlen[j] : strlen(argv[j]);
656 pos += sprintf(cmd+pos,"$%zu\r\n",len);
657 memcpy(cmd+pos,argv[j],len);
658 pos += len;
659 cmd[pos++] = '\r';
660 cmd[pos++] = '\n';
661 }
662 assert(pos == totlen);
663 cmd[pos] = '\0';
664
665 *target = cmd;
666 return totlen;
667}
668
669void redisFreeCommand(char *cmd) {
670 hi_free(cmd);
671}
672
673void __redisSetError(redisContext *c, int type, const char *str) {
674 size_t len;
675
676 c->err = type;
677 if (str != NULL) {
678 len = strlen(str);
679 len = len < (sizeof(c->errstr)-1) ? len : (sizeof(c->errstr)-1);
680 memcpy(c->errstr,str,len);
681 c->errstr[len] = '\0';
682 } else {
683 /* Only REDIS_ERR_IO may lack a description! */
684 assert(type == REDIS_ERR_IO);
685 strerror_r(errno, c->errstr, sizeof(c->errstr));
686 }
687}
688
689redisReader *redisReaderCreate(void) {
690 return redisReaderCreateWithFunctions(&defaultFunctions);
691}
692
693static void redisPushAutoFree(void *privdata, void *reply) {
694 (void)privdata;
695 freeReplyObject(reply);
696}
697
698static redisContext *redisContextInit(void) {
699 redisContext *c;
700
701 c = hi_calloc(1, sizeof(*c));
702 if (c == NULL)
703 return NULL;
704
705 c->funcs = &redisContextDefaultFuncs;
706
707 c->obuf = hi_sdsempty();
708 c->reader = redisReaderCreate();
709 c->fd = REDIS_INVALID_FD;
710
711 if (c->obuf == NULL || c->reader == NULL) {
712 redisFree(c);
713 return NULL;
714 }
715
716 return c;
717}
718
719void redisFree(redisContext *c) {
720 if (c == NULL)
721 return;
722 redisNetClose(c);
723
724 hi_sdsfree(c->obuf);
725 redisReaderFree(c->reader);
726 hi_free(c->tcp.host);
727 hi_free(c->tcp.source_addr);
728 hi_free(c->unix_sock.path);
729 hi_free(c->connect_timeout);
730 hi_free(c->command_timeout);
731 hi_free(c->saddr);
732
733 if (c->privdata && c->free_privdata)
734 c->free_privdata(c->privdata);
735
736 if (c->funcs->free_privctx)
737 c->funcs->free_privctx(c->privctx);
738
739 memset(c, 0xff, sizeof(*c));
740 hi_free(c);
741}
742
743redisFD redisFreeKeepFd(redisContext *c) {
744 redisFD fd = c->fd;
745 c->fd = REDIS_INVALID_FD;
746 redisFree(c);
747 return fd;
748}
749
750int redisReconnect(redisContext *c) {
751 c->err = 0;
752 memset(c->errstr, '\0', strlen(c->errstr));
753
754 if (c->privctx && c->funcs->free_privctx) {
755 c->funcs->free_privctx(c->privctx);
756 c->privctx = NULL;
757 }
758
759 redisNetClose(c);
760
761 hi_sdsfree(c->obuf);
762 redisReaderFree(c->reader);
763
764 c->obuf = hi_sdsempty();
765 c->reader = redisReaderCreate();
766
767 if (c->obuf == NULL || c->reader == NULL) {
768 __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
769 return REDIS_ERR;
770 }
771
772 int ret = REDIS_ERR;
773 if (c->connection_type == REDIS_CONN_TCP) {
774 ret = redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port,
775 c->connect_timeout, c->tcp.source_addr);
776 } else if (c->connection_type == REDIS_CONN_UNIX) {
777 ret = redisContextConnectUnix(c, c->unix_sock.path, c->connect_timeout);
778 } else {
779 /* Something bad happened here and shouldn't have. There isn't
780 enough information in the context to reconnect. */
781 __redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect");
782 ret = REDIS_ERR;
783 }
784
785 if (c->command_timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) {
786 redisContextSetTimeout(c, *c->command_timeout);
787 }
788
789 return ret;
790}
791
792redisContext *redisConnectWithOptions(const redisOptions *options) {
793 redisContext *c = redisContextInit();
794 if (c == NULL) {
795 return NULL;
796 }
797 if (!(options->options & REDIS_OPT_NONBLOCK)) {
798 c->flags |= REDIS_BLOCK;
799 }
800 if (options->options & REDIS_OPT_REUSEADDR) {
801 c->flags |= REDIS_REUSEADDR;
802 }
803 if (options->options & REDIS_OPT_NOAUTOFREE) {
804 c->flags |= REDIS_NO_AUTO_FREE;
805 }
806 if (options->options & REDIS_OPT_NOAUTOFREEREPLIES) {
807 c->flags |= REDIS_NO_AUTO_FREE_REPLIES;
808 }
809
810 /* Set any user supplied RESP3 PUSH handler or use freeReplyObject
811 * as a default unless specifically flagged that we don't want one. */
812 if (options->push_cb != NULL)
813 redisSetPushCallback(c, options->push_cb);
814 else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE))
815 redisSetPushCallback(c, redisPushAutoFree);
816
817 c->privdata = options->privdata;
818 c->free_privdata = options->free_privdata;
819
820 if (redisContextUpdateConnectTimeout(c, options->connect_timeout) != REDIS_OK ||
821 redisContextUpdateCommandTimeout(c, options->command_timeout) != REDIS_OK) {
822 __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
823 return c;
824 }
825
826 if (options->type == REDIS_CONN_TCP) {
827 redisContextConnectBindTcp(c, options->endpoint.tcp.ip,
828 options->endpoint.tcp.port, options->connect_timeout,
829 options->endpoint.tcp.source_addr);
830 } else if (options->type == REDIS_CONN_UNIX) {
831 redisContextConnectUnix(c, options->endpoint.unix_socket,
832 options->connect_timeout);
833 } else if (options->type == REDIS_CONN_USERFD) {
834 c->fd = options->endpoint.fd;
835 c->flags |= REDIS_CONNECTED;
836 } else {
837 redisFree(c);
838 return NULL;
839 }
840
841 if (options->command_timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) {
842 redisContextSetTimeout(c, *options->command_timeout);
843 }
844
845 return c;
846}
847
848/* Connect to a Redis instance. On error the field error in the returned
849 * context will be set to the return value of the error function.
850 * When no set of reply functions is given, the default set will be used. */
851redisContext *redisConnect(const char *ip, int port) {
852 redisOptions options = {0};
853 REDIS_OPTIONS_SET_TCP(&options, ip, port);
854 return redisConnectWithOptions(&options);
855}
856
857redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv) {
858 redisOptions options = {0};
859 REDIS_OPTIONS_SET_TCP(&options, ip, port);
860 options.connect_timeout = &tv;
861 return redisConnectWithOptions(&options);
862}
863
864redisContext *redisConnectNonBlock(const char *ip, int port) {
865 redisOptions options = {0};
866 REDIS_OPTIONS_SET_TCP(&options, ip, port);
867 options.options |= REDIS_OPT_NONBLOCK;
868 return redisConnectWithOptions(&options);
869}
870
871redisContext *redisConnectBindNonBlock(const char *ip, int port,
872 const char *source_addr) {
873 redisOptions options = {0};
874 REDIS_OPTIONS_SET_TCP(&options, ip, port);
875 options.endpoint.tcp.source_addr = source_addr;
876 options.options |= REDIS_OPT_NONBLOCK;
877 return redisConnectWithOptions(&options);
878}
879
880redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port,
881 const char *source_addr) {
882 redisOptions options = {0};
883 REDIS_OPTIONS_SET_TCP(&options, ip, port);
884 options.endpoint.tcp.source_addr = source_addr;
885 options.options |= REDIS_OPT_NONBLOCK|REDIS_OPT_REUSEADDR;
886 return redisConnectWithOptions(&options);
887}
888
889redisContext *redisConnectUnix(const char *path) {
890 redisOptions options = {0};
891 REDIS_OPTIONS_SET_UNIX(&options, path);
892 return redisConnectWithOptions(&options);
893}
894
895redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv) {
896 redisOptions options = {0};
897 REDIS_OPTIONS_SET_UNIX(&options, path);
898 options.connect_timeout = &tv;
899 return redisConnectWithOptions(&options);
900}
901
902redisContext *redisConnectUnixNonBlock(const char *path) {
903 redisOptions options = {0};
904 REDIS_OPTIONS_SET_UNIX(&options, path);
905 options.options |= REDIS_OPT_NONBLOCK;
906 return redisConnectWithOptions(&options);
907}
908
909redisContext *redisConnectFd(redisFD fd) {
910 redisOptions options = {0};
911 options.type = REDIS_CONN_USERFD;
912 options.endpoint.fd = fd;
913 return redisConnectWithOptions(&options);
914}
915
916/* Set read/write timeout on a blocking socket. */
917int redisSetTimeout(redisContext *c, const struct timeval tv) {
918 if (c->flags & REDIS_BLOCK)
919 return redisContextSetTimeout(c,tv);
920 return REDIS_ERR;
921}
922
923/* Enable connection KeepAlive. */
924int redisEnableKeepAlive(redisContext *c) {
925 if (redisKeepAlive(c, REDIS_KEEPALIVE_INTERVAL) != REDIS_OK)
926 return REDIS_ERR;
927 return REDIS_OK;
928}
929
930/* Set a user provided RESP3 PUSH handler and return any old one set. */
931redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) {
932 redisPushFn *old = c->push_cb;
933 c->push_cb = fn;
934 return old;
935}
936
937/* Use this function to handle a read event on the descriptor. It will try
938 * and read some bytes from the socket and feed them to the reply parser.
939 *
940 * After this function is called, you may use redisGetReplyFromReader to
941 * see if there is a reply available. */
942int redisBufferRead(redisContext *c) {
943 char buf[1024*16];
944 int nread;
945
946 /* Return early when the context has seen an error. */
947 if (c->err)
948 return REDIS_ERR;
949
950 nread = c->funcs->read(c, buf, sizeof(buf));
951 if (nread < 0) {
952 return REDIS_ERR;
953 }
954 if (nread > 0 && redisReaderFeed(c->reader, buf, nread) != REDIS_OK) {
955 __redisSetError(c, c->reader->err, c->reader->errstr);
956 return REDIS_ERR;
957 }
958 return REDIS_OK;
959}
960
961/* Write the output buffer to the socket.
962 *
963 * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
964 * successfully written to the socket. When the buffer is empty after the
965 * write operation, "done" is set to 1 (if given).
966 *
967 * Returns REDIS_ERR if an error occurred trying to write and sets
968 * c->errstr to hold the appropriate error string.
969 */
970int redisBufferWrite(redisContext *c, int *done) {
971
972 /* Return early when the context has seen an error. */
973 if (c->err)
974 return REDIS_ERR;
975
976 if (hi_sdslen(c->obuf) > 0) {
977 ssize_t nwritten = c->funcs->write(c);
978 if (nwritten < 0) {
979 return REDIS_ERR;
980 } else if (nwritten > 0) {
981 if (nwritten == (ssize_t)hi_sdslen(c->obuf)) {
982 hi_sdsfree(c->obuf);
983 c->obuf = hi_sdsempty();
984 if (c->obuf == NULL)
985 goto oom;
986 } else {
987 if (hi_sdsrange(c->obuf,nwritten,-1) < 0) goto oom;
988 }
989 }
990 }
991 if (done != NULL) *done = (hi_sdslen(c->obuf) == 0);
992 return REDIS_OK;
993
994oom:
995 __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
996 return REDIS_ERR;
997}
998
999/* Internal helper that returns 1 if the reply was a RESP3 PUSH
1000 * message and we handled it with a user-provided callback. */
1001static int redisHandledPushReply(redisContext *c, void *reply) {
1002 if (reply && c->push_cb && redisIsPushReply(reply)) {
1003 c->push_cb(c->privdata, reply);
1004 return 1;
1005 }
1006
1007 return 0;
1008}
1009
1010/* Get a reply from our reader or set an error in the context. */
1011int redisGetReplyFromReader(redisContext *c, void **reply) {
1012 if (redisReaderGetReply(c->reader, reply) == REDIS_ERR) {
1013 __redisSetError(c,c->reader->err,c->reader->errstr);
1014 return REDIS_ERR;
1015 }
1016
1017 return REDIS_OK;
1018}
1019
1020/* Internal helper to get the next reply from our reader while handling
1021 * any PUSH messages we encounter along the way. This is separate from
1022 * redisGetReplyFromReader so as to not change its behavior. */
1023static int redisNextInBandReplyFromReader(redisContext *c, void **reply) {
1024 do {
1025 if (redisGetReplyFromReader(c, reply) == REDIS_ERR)
1026 return REDIS_ERR;
1027 } while (redisHandledPushReply(c, *reply));
1028
1029 return REDIS_OK;
1030}
1031
1032int redisGetReply(redisContext *c, void **reply) {
1033 int wdone = 0;
1034 void *aux = NULL;
1035
1036 /* Try to read pending replies */
1037 if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR)
1038 return REDIS_ERR;
1039
1040 /* For the blocking context, flush output buffer and read reply */
1041 if (aux == NULL && c->flags & REDIS_BLOCK) {
1042 /* Write until done */
1043 do {
1044 if (redisBufferWrite(c,&wdone) == REDIS_ERR)
1045 return REDIS_ERR;
1046 } while (!wdone);
1047
1048 /* Read until there is a reply */
1049 do {
1050 if (redisBufferRead(c) == REDIS_ERR)
1051 return REDIS_ERR;
1052
1053 if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR)
1054 return REDIS_ERR;
1055 } while (aux == NULL);
1056 }
1057
1058 /* Set reply or free it if we were passed NULL */
1059 if (reply != NULL) {
1060 *reply = aux;
1061 } else {
1062 freeReplyObject(aux);
1063 }
1064
1065 return REDIS_OK;
1066}
1067
1068
1069/* Helper function for the redisAppendCommand* family of functions.
1070 *
1071 * Write a formatted command to the output buffer. When this family
1072 * is used, you need to call redisGetReply yourself to retrieve
1073 * the reply (or replies in pub/sub).
1074 */
1075int __redisAppendCommand(redisContext *c, const char *cmd, size_t len) {
1076 hisds newbuf;
1077
1078 newbuf = hi_sdscatlen(c->obuf,cmd,len);
1079 if (newbuf == NULL) {
1080 __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
1081 return REDIS_ERR;
1082 }
1083
1084 c->obuf = newbuf;
1085 return REDIS_OK;
1086}
1087
1088int redisAppendFormattedCommand(redisContext *c, const char *cmd, size_t len) {
1089
1090 if (__redisAppendCommand(c, cmd, len) != REDIS_OK) {
1091 return REDIS_ERR;
1092 }
1093
1094 return REDIS_OK;
1095}
1096
1097int redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
1098 char *cmd;
1099 int len;
1100
1101 len = redisvFormatCommand(&cmd,format,ap);
1102 if (len == -1) {
1103 __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
1104 return REDIS_ERR;
1105 } else if (len == -2) {
1106 __redisSetError(c,REDIS_ERR_OTHER,"Invalid format string");
1107 return REDIS_ERR;
1108 }
1109
1110 if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
1111 hi_free(cmd);
1112 return REDIS_ERR;
1113 }
1114
1115 hi_free(cmd);
1116 return REDIS_OK;
1117}
1118
1119int redisAppendCommand(redisContext *c, const char *format, ...) {
1120 va_list ap;
1121 int ret;
1122
1123 va_start(ap,format);
1124 ret = redisvAppendCommand(c,format,ap);
1125 va_end(ap);
1126 return ret;
1127}
1128
1129int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
1130 hisds cmd;
1131 long long len;
1132
1133 len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
1134 if (len == -1) {
1135 __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
1136 return REDIS_ERR;
1137 }
1138
1139 if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
1140 hi_sdsfree(cmd);
1141 return REDIS_ERR;
1142 }
1143
1144 hi_sdsfree(cmd);
1145 return REDIS_OK;
1146}
1147
1148/* Helper function for the redisCommand* family of functions.
1149 *
1150 * Write a formatted command to the output buffer. If the given context is
1151 * blocking, immediately read the reply into the "reply" pointer. When the
1152 * context is non-blocking, the "reply" pointer will not be used and the
1153 * command is simply appended to the write buffer.
1154 *
1155 * Returns the reply when a reply was successfully retrieved. Returns NULL
1156 * otherwise. When NULL is returned in a blocking context, the error field
1157 * in the context will be set.
1158 */
1159static void *__redisBlockForReply(redisContext *c) {
1160 void *reply;
1161
1162 if (c->flags & REDIS_BLOCK) {
1163 if (redisGetReply(c,&reply) != REDIS_OK)
1164 return NULL;
1165 return reply;
1166 }
1167 return NULL;
1168}
1169
1170void *redisvCommand(redisContext *c, const char *format, va_list ap) {
1171 if (redisvAppendCommand(c,format,ap) != REDIS_OK)
1172 return NULL;
1173 return __redisBlockForReply(c);
1174}
1175
1176void *redisCommand(redisContext *c, const char *format, ...) {
1177 va_list ap;
1178 va_start(ap,format);
1179 void *reply = redisvCommand(c,format,ap);
1180 va_end(ap);
1181 return reply;
1182}
1183
1184void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
1185 if (redisAppendCommandArgv(c,argc,argv,argvlen) != REDIS_OK)
1186 return NULL;
1187 return __redisBlockForReply(c);
1188}
1189