1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "lzf.h" /* LZF compression library */
32#include "zipmap.h"
33#include "endianconv.h"
34#include "stream.h"
35#include "functions.h"
36
37#include <math.h>
38#include <fcntl.h>
39#include <sys/types.h>
40#include <sys/time.h>
41#include <sys/resource.h>
42#include <sys/wait.h>
43#include <arpa/inet.h>
44#include <sys/stat.h>
45#include <sys/param.h>
46
47/* This macro is called when the internal RDB structure is corrupt */
48#define rdbReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__)
49/* This macro is called when RDB read failed (possibly a short read) */
50#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
51
52/* This macro tells if we are in the context of a RESTORE command, and not loading an RDB or AOF. */
53#define isRestoreContext() \
54 (server.current_client == NULL || server.current_client->id == CLIENT_ID_AOF) ? 0 : 1
55
56char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
57extern int rdbCheckMode;
58void rdbCheckError(const char *fmt, ...);
59void rdbCheckSetError(const char *fmt, ...);
60
61#ifdef __GNUC__
62void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__ ((format (printf, 3, 4)));
63#endif
64void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
65 va_list ap;
66 char msg[1024];
67 int len;
68
69 len = snprintf(msg,sizeof(msg),
70 "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
71 (unsigned long long)server.loading_loaded_bytes, linenum);
72 va_start(ap,reason);
73 vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
74 va_end(ap);
75
76 if (isRestoreContext()) {
77 /* If we're in the context of a RESTORE command, just propagate the error. */
78 /* log in VERBOSE, and return (don't exit). */
79 serverLog(LL_VERBOSE, "%s", msg);
80 return;
81 } else if (rdbCheckMode) {
82 /* If we're inside the rdb checker, let it handle the error. */
83 rdbCheckError("%s",msg);
84 } else if (rdbFileBeingLoaded) {
85 /* If we're loading an rdb file form disk, run rdb check (and exit) */
86 serverLog(LL_WARNING, "%s", msg);
87 char *argv[2] = {"",rdbFileBeingLoaded};
88 redis_check_rdb_main(2,argv,NULL);
89 } else if (corruption_error) {
90 /* In diskless loading, in case of corrupt file, log and exit. */
91 serverLog(LL_WARNING, "%s. Failure loading rdb format", msg);
92 } else {
93 /* In diskless loading, in case of a short read (not a corrupt
94 * file), log and proceed (don't exit). */
95 serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
96 return;
97 }
98 serverLog(LL_WARNING, "Terminating server after rdb file reading failure.");
99 exit(1);
100}
101
102static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
103 if (rdb && rioWrite(rdb,p,len) == 0)
104 return -1;
105 return len;
106}
107
108int rdbSaveType(rio *rdb, unsigned char type) {
109 return rdbWriteRaw(rdb,&type,1);
110}
111
112/* Load a "type" in RDB format, that is a one byte unsigned integer.
113 * This function is not only used to load object types, but also special
114 * "types" like the end-of-file type, the EXPIRE type, and so forth. */
115int rdbLoadType(rio *rdb) {
116 unsigned char type;
117 if (rioRead(rdb,&type,1) == 0) return -1;
118 return type;
119}
120
121/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
122 * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
123 * opcode. On error -1 is returned, however this could be a valid time, so
124 * to check for loading errors the caller should call rioGetReadError() after
125 * calling this function. */
126time_t rdbLoadTime(rio *rdb) {
127 int32_t t32;
128 if (rioRead(rdb,&t32,4) == 0) return -1;
129 return (time_t)t32;
130}
131
132int rdbSaveMillisecondTime(rio *rdb, long long t) {
133 int64_t t64 = (int64_t) t;
134 memrev64ifbe(&t64); /* Store in little endian. */
135 return rdbWriteRaw(rdb,&t64,8);
136}
137
138/* This function loads a time from the RDB file. It gets the version of the
139 * RDB because, unfortunately, before Redis 5 (RDB version 9), the function
140 * failed to convert data to/from little endian, so RDB files with keys having
141 * expires could not be shared between big endian and little endian systems
142 * (because the expire time will be totally wrong). The fix for this is just
143 * to call memrev64ifbe(), however if we fix this for all the RDB versions,
144 * this call will introduce an incompatibility for big endian systems:
145 * after upgrading to Redis version 5 they will no longer be able to load their
146 * own old RDB files. Because of that, we instead fix the function only for new
147 * RDB versions, and load older RDB versions as we used to do in the past,
148 * allowing big endian systems to load their own old RDB files.
149 *
150 * On I/O error the function returns LLONG_MAX, however if this is also a
151 * valid stored value, the caller should use rioGetReadError() to check for
152 * errors after calling this function. */
153long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
154 int64_t t64;
155 if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
156 if (rdbver >= 9) /* Check the top comment of this function. */
157 memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
158 return (long long)t64;
159}
160
161/* Saves an encoded length. The first two bits in the first byte are used to
162 * hold the encoding type. See the RDB_* definitions for more information
163 * on the types of encoding. */
164int rdbSaveLen(rio *rdb, uint64_t len) {
165 unsigned char buf[2];
166 size_t nwritten;
167
168 if (len < (1<<6)) {
169 /* Save a 6 bit len */
170 buf[0] = (len&0xFF)|(RDB_6BITLEN<<6);
171 if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
172 nwritten = 1;
173 } else if (len < (1<<14)) {
174 /* Save a 14 bit len */
175 buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
176 buf[1] = len&0xFF;
177 if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
178 nwritten = 2;
179 } else if (len <= UINT32_MAX) {
180 /* Save a 32 bit len */
181 buf[0] = RDB_32BITLEN;
182 if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
183 uint32_t len32 = htonl(len);
184 if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
185 nwritten = 1+4;
186 } else {
187 /* Save a 64 bit len */
188 buf[0] = RDB_64BITLEN;
189 if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
190 len = htonu64(len);
191 if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
192 nwritten = 1+8;
193 }
194 return nwritten;
195}
196
197
198/* Load an encoded length. If the loaded length is a normal length as stored
199 * with rdbSaveLen(), the read length is set to '*lenptr'. If instead the
200 * loaded length describes a special encoding that follows, then '*isencoded'
201 * is set to 1 and the encoding format is stored at '*lenptr'.
202 *
203 * See the RDB_ENC_* definitions in rdb.h for more information on special
204 * encodings.
205 *
206 * The function returns -1 on error, 0 on success. */
207int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
208 unsigned char buf[2];
209 int type;
210
211 if (isencoded) *isencoded = 0;
212 if (rioRead(rdb,buf,1) == 0) return -1;
213 type = (buf[0]&0xC0)>>6;
214 if (type == RDB_ENCVAL) {
215 /* Read a 6 bit encoding type. */
216 if (isencoded) *isencoded = 1;
217 *lenptr = buf[0]&0x3F;
218 } else if (type == RDB_6BITLEN) {
219 /* Read a 6 bit len. */
220 *lenptr = buf[0]&0x3F;
221 } else if (type == RDB_14BITLEN) {
222 /* Read a 14 bit len. */
223 if (rioRead(rdb,buf+1,1) == 0) return -1;
224 *lenptr = ((buf[0]&0x3F)<<8)|buf[1];
225 } else if (buf[0] == RDB_32BITLEN) {
226 /* Read a 32 bit len. */
227 uint32_t len;
228 if (rioRead(rdb,&len,4) == 0) return -1;
229 *lenptr = ntohl(len);
230 } else if (buf[0] == RDB_64BITLEN) {
231 /* Read a 64 bit len. */
232 uint64_t len;
233 if (rioRead(rdb,&len,8) == 0) return -1;
234 *lenptr = ntohu64(len);
235 } else {
236 rdbReportCorruptRDB(
237 "Unknown length encoding %d in rdbLoadLen()",type);
238 return -1; /* Never reached. */
239 }
240 return 0;
241}
242
243/* This is like rdbLoadLenByRef() but directly returns the value read
244 * from the RDB stream, signaling an error by returning RDB_LENERR
245 * (since it is a too large count to be applicable in any Redis data
246 * structure). */
247uint64_t rdbLoadLen(rio *rdb, int *isencoded) {
248 uint64_t len;
249
250 if (rdbLoadLenByRef(rdb,isencoded,&len) == -1) return RDB_LENERR;
251 return len;
252}
253
254/* Encodes the "value" argument as integer when it fits in the supported ranges
255 * for encoded types. If the function successfully encodes the integer, the
256 * representation is stored in the buffer pointer to by "enc" and the string
257 * length is returned. Otherwise 0 is returned. */
258int rdbEncodeInteger(long long value, unsigned char *enc) {
259 if (value >= -(1<<7) && value <= (1<<7)-1) {
260 enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8;
261 enc[1] = value&0xFF;
262 return 2;
263 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
264 enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
265 enc[1] = value&0xFF;
266 enc[2] = (value>>8)&0xFF;
267 return 3;
268 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
269 enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
270 enc[1] = value&0xFF;
271 enc[2] = (value>>8)&0xFF;
272 enc[3] = (value>>16)&0xFF;
273 enc[4] = (value>>24)&0xFF;
274 return 5;
275 } else {
276 return 0;
277 }
278}
279
280/* Loads an integer-encoded object with the specified encoding type "enctype".
281 * The returned value changes according to the flags, see
282 * rdbGenericLoadStringObject() for more info. */
283void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
284 int plain = flags & RDB_LOAD_PLAIN;
285 int sds = flags & RDB_LOAD_SDS;
286 int encode = flags & RDB_LOAD_ENC;
287 unsigned char enc[4];
288 long long val;
289
290 if (enctype == RDB_ENC_INT8) {
291 if (rioRead(rdb,enc,1) == 0) return NULL;
292 val = (signed char)enc[0];
293 } else if (enctype == RDB_ENC_INT16) {
294 uint16_t v;
295 if (rioRead(rdb,enc,2) == 0) return NULL;
296 v = ((uint32_t)enc[0])|
297 ((uint32_t)enc[1]<<8);
298 val = (int16_t)v;
299 } else if (enctype == RDB_ENC_INT32) {
300 uint32_t v;
301 if (rioRead(rdb,enc,4) == 0) return NULL;
302 v = ((uint32_t)enc[0])|
303 ((uint32_t)enc[1]<<8)|
304 ((uint32_t)enc[2]<<16)|
305 ((uint32_t)enc[3]<<24);
306 val = (int32_t)v;
307 } else {
308 rdbReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
309 return NULL; /* Never reached. */
310 }
311 if (plain || sds) {
312 char buf[LONG_STR_SIZE], *p;
313 int len = ll2string(buf,sizeof(buf),val);
314 if (lenptr) *lenptr = len;
315 p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
316 memcpy(p,buf,len);
317 return p;
318 } else if (encode) {
319 return createStringObjectFromLongLongForValue(val);
320 } else {
321 return createObject(OBJ_STRING,sdsfromlonglong(val));
322 }
323}
324
325/* String objects in the form "2391" "-100" without any space and with a
326 * range of values that can fit in an 8, 16 or 32 bit signed value can be
327 * encoded as integers to save space */
328int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
329 long long value;
330 if (string2ll(s, len, &value)) {
331 return rdbEncodeInteger(value, enc);
332 } else {
333 return 0;
334 }
335}
336
337ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
338 size_t original_len) {
339 unsigned char byte;
340 ssize_t n, nwritten = 0;
341
342 /* Data compressed! Let's save it on disk */
343 byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
344 if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
345 nwritten += n;
346
347 if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
348 nwritten += n;
349
350 if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
351 nwritten += n;
352
353 if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
354 nwritten += n;
355
356 return nwritten;
357
358writeerr:
359 return -1;
360}
361
362ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
363 size_t comprlen, outlen;
364 void *out;
365
366 /* We require at least four bytes compression for this to be worth it */
367 if (len <= 4) return 0;
368 outlen = len-4;
369 if ((out = zmalloc(outlen+1)) == NULL) return 0;
370 comprlen = lzf_compress(s, len, out, outlen);
371 if (comprlen == 0) {
372 zfree(out);
373 return 0;
374 }
375 ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
376 zfree(out);
377 return nwritten;
378}
379
380/* Load an LZF compressed string in RDB format. The returned value
381 * changes according to 'flags'. For more info check the
382 * rdbGenericLoadStringObject() function. */
383void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
384 int plain = flags & RDB_LOAD_PLAIN;
385 int sds = flags & RDB_LOAD_SDS;
386 uint64_t len, clen;
387 unsigned char *c = NULL;
388 char *val = NULL;
389
390 if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
391 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
392 if ((c = ztrymalloc(clen)) == NULL) {
393 serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
394 goto err;
395 }
396
397 /* Allocate our target according to the uncompressed size. */
398 if (plain) {
399 val = ztrymalloc(len);
400 } else {
401 val = sdstrynewlen(SDS_NOINIT,len);
402 }
403 if (!val) {
404 serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
405 goto err;
406 }
407
408 if (lenptr) *lenptr = len;
409
410 /* Load the compressed representation and uncompress it to target. */
411 if (rioRead(rdb,c,clen) == 0) goto err;
412 if (lzf_decompress(c,clen,val,len) != len) {
413 rdbReportCorruptRDB("Invalid LZF compressed string");
414 goto err;
415 }
416 zfree(c);
417
418 if (plain || sds) {
419 return val;
420 } else {
421 return createObject(OBJ_STRING,val);
422 }
423err:
424 zfree(c);
425 if (plain)
426 zfree(val);
427 else
428 sdsfree(val);
429 return NULL;
430}
431
432/* Save a string object as [len][data] on disk. If the object is a string
433 * representation of an integer value we try to save it in a special form */
434ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
435 int enclen;
436 ssize_t n, nwritten = 0;
437
438 /* Try integer encoding */
439 if (len <= 11) {
440 unsigned char buf[5];
441 if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
442 if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
443 return enclen;
444 }
445 }
446
447 /* Try LZF compression - under 20 bytes it's unable to compress even
448 * aaaaaaaaaaaaaaaaaa so skip it */
449 if (server.rdb_compression && len > 20) {
450 n = rdbSaveLzfStringObject(rdb,s,len);
451 if (n == -1) return -1;
452 if (n > 0) return n;
453 /* Return value of 0 means data can't be compressed, save the old way */
454 }
455
456 /* Store verbatim */
457 if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
458 nwritten += n;
459 if (len > 0) {
460 if (rdbWriteRaw(rdb,s,len) == -1) return -1;
461 nwritten += len;
462 }
463 return nwritten;
464}
465
466/* Save a long long value as either an encoded string or a string. */
467ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
468 unsigned char buf[32];
469 ssize_t n, nwritten = 0;
470 int enclen = rdbEncodeInteger(value,buf);
471 if (enclen > 0) {
472 return rdbWriteRaw(rdb,buf,enclen);
473 } else {
474 /* Encode as string */
475 enclen = ll2string((char*)buf,32,value);
476 serverAssert(enclen < 32);
477 if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
478 nwritten += n;
479 if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
480 nwritten += n;
481 }
482 return nwritten;
483}
484
485/* Like rdbSaveRawString() gets a Redis object instead. */
486ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
487 /* Avoid to decode the object, then encode it again, if the
488 * object is already integer encoded. */
489 if (obj->encoding == OBJ_ENCODING_INT) {
490 return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
491 } else {
492 serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
493 return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
494 }
495}
496
497/* Load a string object from an RDB file according to flags:
498 *
499 * RDB_LOAD_NONE (no flags): load an RDB object, unencoded.
500 * RDB_LOAD_ENC: If the returned type is a Redis object, try to
501 * encode it in a special way to be more memory
502 * efficient. When this flag is passed the function
503 * no longer guarantees that obj->ptr is an SDS string.
504 * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc()
505 * instead of a Redis object with an sds in it.
506 * RDB_LOAD_SDS: Return an SDS string instead of a Redis object.
507 *
508 * On I/O error NULL is returned.
509 */
510void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
511 int encode = flags & RDB_LOAD_ENC;
512 int plain = flags & RDB_LOAD_PLAIN;
513 int sds = flags & RDB_LOAD_SDS;
514 int isencoded;
515 unsigned long long len;
516
517 len = rdbLoadLen(rdb,&isencoded);
518 if (len == RDB_LENERR) return NULL;
519
520 if (isencoded) {
521 switch(len) {
522 case RDB_ENC_INT8:
523 case RDB_ENC_INT16:
524 case RDB_ENC_INT32:
525 return rdbLoadIntegerObject(rdb,len,flags,lenptr);
526 case RDB_ENC_LZF:
527 return rdbLoadLzfStringObject(rdb,flags,lenptr);
528 default:
529 rdbReportCorruptRDB("Unknown RDB string encoding type %llu",len);
530 return NULL;
531 }
532 }
533
534 if (plain || sds) {
535 void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT,len);
536 if (!buf) {
537 serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
538 return NULL;
539 }
540 if (lenptr) *lenptr = len;
541 if (len && rioRead(rdb,buf,len) == 0) {
542 if (plain)
543 zfree(buf);
544 else
545 sdsfree(buf);
546 return NULL;
547 }
548 return buf;
549 } else {
550 robj *o = encode ? tryCreateStringObject(SDS_NOINIT,len) :
551 tryCreateRawStringObject(SDS_NOINIT,len);
552 if (!o) {
553 serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
554 return NULL;
555 }
556 if (len && rioRead(rdb,o->ptr,len) == 0) {
557 decrRefCount(o);
558 return NULL;
559 }
560 return o;
561 }
562}
563
564robj *rdbLoadStringObject(rio *rdb) {
565 return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
566}
567
568robj *rdbLoadEncodedStringObject(rio *rdb) {
569 return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL);
570}
571
572/* Save a double value. Doubles are saved as strings prefixed by an unsigned
573 * 8 bit integer specifying the length of the representation.
574 * This 8 bit integer has special values in order to specify the following
575 * conditions:
576 * 253: not a number
577 * 254: + inf
578 * 255: - inf
579 */
580int rdbSaveDoubleValue(rio *rdb, double val) {
581 unsigned char buf[128];
582 int len;
583
584 if (isnan(val)) {
585 buf[0] = 253;
586 len = 1;
587 } else if (!isfinite(val)) {
588 len = 1;
589 buf[0] = (val < 0) ? 255 : 254;
590 } else {
591 long long lvalue;
592 /* Integer printing function is much faster, check if we can safely use it. */
593 if (double2ll(val, &lvalue))
594 ll2string((char*)buf+1,sizeof(buf)-1,lvalue);
595 else
596 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
597 buf[0] = strlen((char*)buf+1);
598 len = buf[0]+1;
599 }
600 return rdbWriteRaw(rdb,buf,len);
601}
602
603/* For information about double serialization check rdbSaveDoubleValue() */
604int rdbLoadDoubleValue(rio *rdb, double *val) {
605 char buf[256];
606 unsigned char len;
607
608 if (rioRead(rdb,&len,1) == 0) return -1;
609 switch(len) {
610 case 255: *val = R_NegInf; return 0;
611 case 254: *val = R_PosInf; return 0;
612 case 253: *val = R_Nan; return 0;
613 default:
614 if (rioRead(rdb,buf,len) == 0) return -1;
615 buf[len] = '\0';
616 if (sscanf(buf, "%lg", val)!=1) return -1;
617 return 0;
618 }
619}
620
621/* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed.
622 * We just make sure the integer is always stored in little endian, otherwise
623 * the value is copied verbatim from memory to disk.
624 *
625 * Return -1 on error, the size of the serialized value on success. */
626int rdbSaveBinaryDoubleValue(rio *rdb, double val) {
627 memrev64ifbe(&val);
628 return rdbWriteRaw(rdb,&val,sizeof(val));
629}
630
631/* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for
632 * more info. On error -1 is returned, otherwise 0. */
633int rdbLoadBinaryDoubleValue(rio *rdb, double *val) {
634 if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
635 memrev64ifbe(val);
636 return 0;
637}
638
639/* Like rdbSaveBinaryDoubleValue() but single precision. */
640int rdbSaveBinaryFloatValue(rio *rdb, float val) {
641 memrev32ifbe(&val);
642 return rdbWriteRaw(rdb,&val,sizeof(val));
643}
644
645/* Like rdbLoadBinaryDoubleValue() but single precision. */
646int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
647 if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
648 memrev32ifbe(val);
649 return 0;
650}
651
652/* Save the object type of object "o". */
653int rdbSaveObjectType(rio *rdb, robj *o) {
654 switch (o->type) {
655 case OBJ_STRING:
656 return rdbSaveType(rdb,RDB_TYPE_STRING);
657 case OBJ_LIST:
658 if (o->encoding == OBJ_ENCODING_QUICKLIST)
659 return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2);
660 else
661 serverPanic("Unknown list encoding");
662 case OBJ_SET:
663 if (o->encoding == OBJ_ENCODING_INTSET)
664 return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
665 else if (o->encoding == OBJ_ENCODING_HT)
666 return rdbSaveType(rdb,RDB_TYPE_SET);
667 else
668 serverPanic("Unknown set encoding");
669 case OBJ_ZSET:
670 if (o->encoding == OBJ_ENCODING_LISTPACK)
671 return rdbSaveType(rdb,RDB_TYPE_ZSET_LISTPACK);
672 else if (o->encoding == OBJ_ENCODING_SKIPLIST)
673 return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
674 else
675 serverPanic("Unknown sorted set encoding");
676 case OBJ_HASH:
677 if (o->encoding == OBJ_ENCODING_LISTPACK)
678 return rdbSaveType(rdb,RDB_TYPE_HASH_LISTPACK);
679 else if (o->encoding == OBJ_ENCODING_HT)
680 return rdbSaveType(rdb,RDB_TYPE_HASH);
681 else
682 serverPanic("Unknown hash encoding");
683 case OBJ_STREAM:
684 return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2);
685 case OBJ_MODULE:
686 return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
687 default:
688 serverPanic("Unknown object type");
689 }
690 return -1; /* avoid warning */
691}
692
693/* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the
694 * type is not specifically a valid Object Type. */
695int rdbLoadObjectType(rio *rdb) {
696 int type;
697 if ((type = rdbLoadType(rdb)) == -1) return -1;
698 if (!rdbIsObjectType(type)) return -1;
699 return type;
700}
701
702/* This helper function serializes a consumer group Pending Entries List (PEL)
703 * into the RDB file. The 'nacks' argument tells the function if also persist
704 * the information about the not acknowledged message, or if to persist
705 * just the IDs: this is useful because for the global consumer group PEL
706 * we serialized the NACKs as well, but when serializing the local consumer
707 * PELs we just add the ID, that will be resolved inside the global PEL to
708 * put a reference to the same structure. */
709ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
710 ssize_t n, nwritten = 0;
711
712 /* Number of entries in the PEL. */
713 if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1;
714 nwritten += n;
715
716 /* Save each entry. */
717 raxIterator ri;
718 raxStart(&ri,pel);
719 raxSeek(&ri,"^",NULL,0);
720 while(raxNext(&ri)) {
721 /* We store IDs in raw form as 128 big big endian numbers, like
722 * they are inside the radix tree key. */
723 if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) {
724 raxStop(&ri);
725 return -1;
726 }
727 nwritten += n;
728
729 if (nacks) {
730 streamNACK *nack = ri.data;
731 if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) {
732 raxStop(&ri);
733 return -1;
734 }
735 nwritten += n;
736 if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) {
737 raxStop(&ri);
738 return -1;
739 }
740 nwritten += n;
741 /* We don't save the consumer name: we'll save the pending IDs
742 * for each consumer in the consumer PEL, and resolve the consumer
743 * at loading time. */
744 }
745 }
746 raxStop(&ri);
747 return nwritten;
748}
749
750/* Serialize the consumers of a stream consumer group into the RDB. Helper
751 * function for the stream data type serialization. What we do here is to
752 * persist the consumer metadata, and it's PEL, for each consumer. */
753size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
754 ssize_t n, nwritten = 0;
755
756 /* Number of consumers in this consumer group. */
757 if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1;
758 nwritten += n;
759
760 /* Save each consumer. */
761 raxIterator ri;
762 raxStart(&ri,cg->consumers);
763 raxSeek(&ri,"^",NULL,0);
764 while(raxNext(&ri)) {
765 streamConsumer *consumer = ri.data;
766
767 /* Consumer name. */
768 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
769 raxStop(&ri);
770 return -1;
771 }
772 nwritten += n;
773
774 /* Last seen time. */
775 if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) {
776 raxStop(&ri);
777 return -1;
778 }
779 nwritten += n;
780
781 /* Consumer PEL, without the ACKs (see last parameter of the function
782 * passed with value of 0), at loading time we'll lookup the ID
783 * in the consumer group global PEL and will put a reference in the
784 * consumer local PEL. */
785 if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) {
786 raxStop(&ri);
787 return -1;
788 }
789 nwritten += n;
790 }
791 raxStop(&ri);
792 return nwritten;
793}
794
795/* Save a Redis object.
796 * Returns -1 on error, number of bytes written on success. */
797ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
798 ssize_t n = 0, nwritten = 0;
799
800 if (o->type == OBJ_STRING) {
801 /* Save a string value */
802 if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
803 nwritten += n;
804 } else if (o->type == OBJ_LIST) {
805 /* Save a list value */
806 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
807 quicklist *ql = o->ptr;
808 quicklistNode *node = ql->head;
809
810 if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
811 nwritten += n;
812
813 while(node) {
814 if ((n = rdbSaveLen(rdb,node->container)) == -1) return -1;
815 nwritten += n;
816
817 if (quicklistNodeIsCompressed(node)) {
818 void *data;
819 size_t compress_len = quicklistGetLzf(node, &data);
820 if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
821 nwritten += n;
822 } else {
823 if ((n = rdbSaveRawString(rdb,node->entry,node->sz)) == -1) return -1;
824 nwritten += n;
825 }
826 node = node->next;
827 }
828 } else {
829 serverPanic("Unknown list encoding");
830 }
831 } else if (o->type == OBJ_SET) {
832 /* Save a set value */
833 if (o->encoding == OBJ_ENCODING_HT) {
834 dict *set = o->ptr;
835 dictIterator *di = dictGetIterator(set);
836 dictEntry *de;
837
838 if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
839 dictReleaseIterator(di);
840 return -1;
841 }
842 nwritten += n;
843
844 while((de = dictNext(di)) != NULL) {
845 sds ele = dictGetKey(de);
846 if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
847 == -1)
848 {
849 dictReleaseIterator(di);
850 return -1;
851 }
852 nwritten += n;
853 }
854 dictReleaseIterator(di);
855 } else if (o->encoding == OBJ_ENCODING_INTSET) {
856 size_t l = intsetBlobLen((intset*)o->ptr);
857
858 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
859 nwritten += n;
860 } else {
861 serverPanic("Unknown set encoding");
862 }
863 } else if (o->type == OBJ_ZSET) {
864 /* Save a sorted set value */
865 if (o->encoding == OBJ_ENCODING_LISTPACK) {
866 size_t l = lpBytes((unsigned char*)o->ptr);
867
868 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
869 nwritten += n;
870 } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
871 zset *zs = o->ptr;
872 zskiplist *zsl = zs->zsl;
873
874 if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
875 nwritten += n;
876
877 /* We save the skiplist elements from the greatest to the smallest
878 * (that's trivial since the elements are already ordered in the
879 * skiplist): this improves the load process, since the next loaded
880 * element will always be the smaller, so adding to the skiplist
881 * will always immediately stop at the head, making the insertion
882 * O(1) instead of O(log(N)). */
883 zskiplistNode *zn = zsl->tail;
884 while (zn != NULL) {
885 if ((n = rdbSaveRawString(rdb,
886 (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
887 {
888 return -1;
889 }
890 nwritten += n;
891 if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
892 return -1;
893 nwritten += n;
894 zn = zn->backward;
895 }
896 } else {
897 serverPanic("Unknown sorted set encoding");
898 }
899 } else if (o->type == OBJ_HASH) {
900 /* Save a hash value */
901 if (o->encoding == OBJ_ENCODING_LISTPACK) {
902 size_t l = lpBytes((unsigned char*)o->ptr);
903
904 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
905 nwritten += n;
906 } else if (o->encoding == OBJ_ENCODING_HT) {
907 dictIterator *di = dictGetIterator(o->ptr);
908 dictEntry *de;
909
910 if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
911 dictReleaseIterator(di);
912 return -1;
913 }
914 nwritten += n;
915
916 while((de = dictNext(di)) != NULL) {
917 sds field = dictGetKey(de);
918 sds value = dictGetVal(de);
919
920 if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
921 sdslen(field))) == -1)
922 {
923 dictReleaseIterator(di);
924 return -1;
925 }
926 nwritten += n;
927 if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
928 sdslen(value))) == -1)
929 {
930 dictReleaseIterator(di);
931 return -1;
932 }
933 nwritten += n;
934 }
935 dictReleaseIterator(di);
936 } else {
937 serverPanic("Unknown hash encoding");
938 }
939 } else if (o->type == OBJ_STREAM) {
940 /* Store how many listpacks we have inside the radix tree. */
941 stream *s = o->ptr;
942 rax *rax = s->rax;
943 if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
944 nwritten += n;
945
946 /* Serialize all the listpacks inside the radix tree as they are,
947 * when loading back, we'll use the first entry of each listpack
948 * to insert it back into the radix tree. */
949 raxIterator ri;
950 raxStart(&ri,rax);
951 raxSeek(&ri,"^",NULL,0);
952 while (raxNext(&ri)) {
953 unsigned char *lp = ri.data;
954 size_t lp_bytes = lpBytes(lp);
955 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
956 raxStop(&ri);
957 return -1;
958 }
959 nwritten += n;
960 if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) {
961 raxStop(&ri);
962 return -1;
963 }
964 nwritten += n;
965 }
966 raxStop(&ri);
967
968 /* Save the number of elements inside the stream. We cannot obtain
969 * this easily later, since our macro nodes should be checked for
970 * number of items: not a great CPU / space tradeoff. */
971 if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
972 nwritten += n;
973 /* Save the last entry ID. */
974 if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
975 nwritten += n;
976 if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
977 nwritten += n;
978 /* Save the first entry ID. */
979 if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1;
980 nwritten += n;
981 if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1;
982 nwritten += n;
983 /* Save the maximal tombstone ID. */
984 if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1;
985 nwritten += n;
986 if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1;
987 nwritten += n;
988 /* Save the offset. */
989 if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1;
990 nwritten += n;
991
992 /* The consumer groups and their clients are part of the stream
993 * type, so serialize every consumer group. */
994
995 /* Save the number of groups. */
996 size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
997 if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
998 nwritten += n;
999
1000 if (num_cgroups) {
1001 /* Serialize each consumer group. */
1002 raxStart(&ri,s->cgroups);
1003 raxSeek(&ri,"^",NULL,0);
1004 while(raxNext(&ri)) {
1005 streamCG *cg = ri.data;
1006
1007 /* Save the group name. */
1008 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
1009 raxStop(&ri);
1010 return -1;
1011 }
1012 nwritten += n;
1013
1014 /* Last ID. */
1015 if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) {
1016 raxStop(&ri);
1017 return -1;
1018 }
1019 nwritten += n;
1020 if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) {
1021 raxStop(&ri);
1022 return -1;
1023 }
1024 nwritten += n;
1025
1026 /* Save the group's logical reads counter. */
1027 if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) {
1028 raxStop(&ri);
1029 return -1;
1030 }
1031 nwritten += n;
1032
1033 /* Save the global PEL. */
1034 if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
1035 raxStop(&ri);
1036 return -1;
1037 }
1038 nwritten += n;
1039
1040 /* Save the consumers of this group. */
1041 if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) {
1042 raxStop(&ri);
1043 return -1;
1044 }
1045 nwritten += n;
1046 }
1047 raxStop(&ri);
1048 }
1049 } else if (o->type == OBJ_MODULE) {
1050 /* Save a module-specific value. */
1051 RedisModuleIO io;
1052 moduleValue *mv = o->ptr;
1053 moduleType *mt = mv->type;
1054
1055 /* Write the "module" identifier as prefix, so that we'll be able
1056 * to call the right module during loading. */
1057 int retval = rdbSaveLen(rdb,mt->id);
1058 if (retval == -1) return -1;
1059 moduleInitIOContext(io,mt,rdb,key,dbid);
1060 io.bytes += retval;
1061
1062 /* Then write the module-specific representation + EOF marker. */
1063 mt->rdb_save(&io,mv->value);
1064 retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
1065 if (retval == -1)
1066 io.error = 1;
1067 else
1068 io.bytes += retval;
1069
1070 if (io.ctx) {
1071 moduleFreeContext(io.ctx);
1072 zfree(io.ctx);
1073 }
1074 return io.error ? -1 : (ssize_t)io.bytes;
1075 } else {
1076 serverPanic("Unknown object type");
1077 }
1078 return nwritten;
1079}
1080
1081/* Return the length the object will have on disk if saved with
1082 * the rdbSaveObject() function. Currently we use a trick to get
1083 * this length with very little changes to the code. In the future
1084 * we could switch to a faster solution. */
1085size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) {
1086 ssize_t len = rdbSaveObject(NULL,o,key,dbid);
1087 serverAssertWithInfo(NULL,o,len != -1);
1088 return len;
1089}
1090
1091/* Save a key-value pair, with expire time, type, key, value.
1092 * On error -1 is returned.
1093 * On success if the key was actually saved 1 is returned. */
1094int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
1095 int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
1096 int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
1097
1098 /* Save the expire time */
1099 if (expiretime != -1) {
1100 if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
1101 if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
1102 }
1103
1104 /* Save the LRU info. */
1105 if (savelru) {
1106 uint64_t idletime = estimateObjectIdleTime(val);
1107 idletime /= 1000; /* Using seconds is enough and requires less space.*/
1108 if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
1109 if (rdbSaveLen(rdb,idletime) == -1) return -1;
1110 }
1111
1112 /* Save the LFU info. */
1113 if (savelfu) {
1114 uint8_t buf[1];
1115 buf[0] = LFUDecrAndReturn(val);
1116 /* We can encode this in exactly two bytes: the opcode and an 8
1117 * bit counter, since the frequency is logarithmic with a 0-255 range.
1118 * Note that we do not store the halving time because to reset it
1119 * a single time when loading does not affect the frequency much. */
1120 if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
1121 if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
1122 }
1123
1124 /* Save type, key, value */
1125 if (rdbSaveObjectType(rdb,val) == -1) return -1;
1126 if (rdbSaveStringObject(rdb,key) == -1) return -1;
1127 if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1;
1128
1129 /* Delay return if required (for testing) */
1130 if (server.rdb_key_save_delay)
1131 debugDelay(server.rdb_key_save_delay);
1132
1133 return 1;
1134}
1135
1136/* Save an AUX field. */
1137ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
1138 ssize_t ret, len = 0;
1139 if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
1140 len += ret;
1141 if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
1142 len += ret;
1143 if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
1144 len += ret;
1145 return len;
1146}
1147
1148/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
1149 * with strlen(). */
1150ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
1151 return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
1152}
1153
1154/* Wrapper for strlen(key) + integer type (up to long long range). */
1155ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
1156 char buf[LONG_STR_SIZE];
1157 int vlen = ll2string(buf,sizeof(buf),val);
1158 return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
1159}
1160
1161/* Save a few default AUX fields with information about the RDB generated. */
1162int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
1163 int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
1164 int aof_base = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
1165
1166 /* Add a few fields about the state when the RDB was created. */
1167 if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
1168 if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
1169 if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
1170 if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
1171
1172 /* Handle saving options that generate aux fields. */
1173 if (rsi) {
1174 if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
1175 == -1) return -1;
1176 if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
1177 == -1) return -1;
1178 if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
1179 == -1) return -1;
1180 }
1181 if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1;
1182 return 1;
1183}
1184
1185ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
1186 /* Save a module-specific aux value. */
1187 RedisModuleIO io;
1188 int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
1189 if (retval == -1) return -1;
1190 moduleInitIOContext(io,mt,rdb,NULL,-1);
1191 io.bytes += retval;
1192
1193 /* Write the "module" identifier as prefix, so that we'll be able
1194 * to call the right module during loading. */
1195 retval = rdbSaveLen(rdb,mt->id);
1196 if (retval == -1) return -1;
1197 io.bytes += retval;
1198
1199 /* write the 'when' so that we can provide it on loading. add a UINT opcode
1200 * for backwards compatibility, everything after the MT needs to be prefixed
1201 * by an opcode. */
1202 retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT);
1203 if (retval == -1) return -1;
1204 io.bytes += retval;
1205 retval = rdbSaveLen(rdb,when);
1206 if (retval == -1) return -1;
1207 io.bytes += retval;
1208
1209 /* Then write the module-specific representation + EOF marker. */
1210 mt->aux_save(&io,when);
1211 retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
1212 if (retval == -1)
1213 io.error = 1;
1214 else
1215 io.bytes += retval;
1216
1217 if (io.ctx) {
1218 moduleFreeContext(io.ctx);
1219 zfree(io.ctx);
1220 }
1221 if (io.error)
1222 return -1;
1223 return io.bytes;
1224}
1225
1226ssize_t rdbSaveFunctions(rio *rdb) {
1227 dict *functions = functionsLibGet();
1228 dictIterator *iter = dictGetIterator(functions);
1229 dictEntry *entry = NULL;
1230 ssize_t written = 0;
1231 ssize_t ret;
1232 while ((entry = dictNext(iter))) {
1233 if ((ret = rdbSaveType(rdb, RDB_OPCODE_FUNCTION2)) < 0) goto werr;
1234 written += ret;
1235 functionLibInfo *li = dictGetVal(entry);
1236 if ((ret = rdbSaveRawString(rdb, (unsigned char *) li->code, sdslen(li->code))) < 0) goto werr;
1237 written += ret;
1238 }
1239 dictReleaseIterator(iter);
1240 return written;
1241
1242werr:
1243 dictReleaseIterator(iter);
1244 return -1;
1245}
1246
1247ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
1248 dictIterator *di;
1249 dictEntry *de;
1250 ssize_t written = 0;
1251 ssize_t res;
1252 static long long info_updated_time = 0;
1253 char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
1254
1255 redisDb *db = server.db + dbid;
1256 dict *d = db->dict;
1257 if (dictSize(d) == 0) return 0;
1258 di = dictGetSafeIterator(d);
1259
1260 /* Write the SELECT DB opcode */
1261 if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
1262 written += res;
1263 if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr;
1264 written += res;
1265
1266 /* Write the RESIZE DB opcode. */
1267 uint64_t db_size, expires_size;
1268 db_size = dictSize(db->dict);
1269 expires_size = dictSize(db->expires);
1270 if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
1271 written += res;
1272 if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
1273 written += res;
1274 if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
1275 written += res;
1276
1277 /* Iterate this DB writing every entry */
1278 while((de = dictNext(di)) != NULL) {
1279 sds keystr = dictGetKey(de);
1280 robj key, *o = dictGetVal(de);
1281 long long expire;
1282 size_t rdb_bytes_before_key = rdb->processed_bytes;
1283
1284 initStaticStringObject(key,keystr);
1285 expire = getExpire(db,&key);
1286 if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
1287 written += res;
1288
1289 /* In fork child process, we can try to release memory back to the
1290 * OS and possibly avoid or decrease COW. We give the dismiss
1291 * mechanism a hint about an estimated size of the object we stored. */
1292 size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
1293 if (server.in_fork_child) dismissObject(o, dump_size);
1294
1295 /* Update child info every 1 second (approximately).
1296 * in order to avoid calling mstime() on each iteration, we will
1297 * check the diff every 1024 keys */
1298 if (((*key_counter)++ & 1023) == 0) {
1299 long long now = mstime();
1300 if (now - info_updated_time >= 1000) {
1301 sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname);
1302 info_updated_time = now;
1303 }
1304 }
1305 }
1306
1307 dictReleaseIterator(di);
1308 return written;
1309
1310werr:
1311 dictReleaseIterator(di);
1312 return -1;
1313}
1314
1315/* Produces a dump of the database in RDB format sending it to the specified
1316 * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
1317 * is returned and part of the output, or all the output, can be
1318 * missing because of I/O errors.
1319 *
1320 * When the function returns C_ERR and if 'error' is not NULL, the
1321 * integer pointed by 'error' is set to the value of errno just after the I/O
1322 * error. */
1323int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
1324 char magic[10];
1325 uint64_t cksum;
1326 long key_counter = 0;
1327 int j;
1328
1329 if (server.rdb_checksum)
1330 rdb->update_cksum = rioGenericUpdateChecksum;
1331 snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
1332 if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
1333 if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
1334 if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
1335
1336 /* save functions */
1337 if (!(req & SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) && rdbSaveFunctions(rdb) == -1) goto werr;
1338
1339 /* save all databases, skip this if we're in functions-only mode */
1340 if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) {
1341 for (j = 0; j < server.dbnum; j++) {
1342 if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
1343 }
1344 }
1345
1346 if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
1347
1348 /* EOF opcode */
1349 if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
1350
1351 /* CRC64 checksum. It will be zero if checksum computation is disabled, the
1352 * loading code skips the check in this case. */
1353 cksum = rdb->cksum;
1354 memrev64ifbe(&cksum);
1355 if (rioWrite(rdb,&cksum,8) == 0) goto werr;
1356 return C_OK;
1357
1358werr:
1359 if (error) *error = errno;
1360 return C_ERR;
1361}
1362
1363/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
1364 * and a suffix to the generated RDB dump. The prefix is:
1365 *
1366 * $EOF:<40 bytes unguessable hex string>\r\n
1367 *
1368 * While the suffix is the 40 bytes hex string we announced in the prefix.
1369 * This way processes receiving the payload can understand when it ends
1370 * without doing any processing of the content. */
1371int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
1372 char eofmark[RDB_EOF_MARK_SIZE];
1373
1374 startSaving(RDBFLAGS_REPLICATION);
1375 getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
1376 if (error) *error = 0;
1377 if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
1378 if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
1379 if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
1380 if (rdbSaveRio(req,rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
1381 if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
1382 stopSaving(1);
1383 return C_OK;
1384
1385werr: /* Write error. */
1386 /* Set 'error' only if not already set by rdbSaveRio() call. */
1387 if (error && *error == 0) *error = errno;
1388 stopSaving(0);
1389 return C_ERR;
1390}
1391
1392/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
1393int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
1394 char tmpfile[256];
1395 char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
1396 FILE *fp = NULL;
1397 rio rdb;
1398 int error = 0;
1399 char *err_op; /* For a detailed log */
1400
1401 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
1402 fp = fopen(tmpfile,"w");
1403 if (!fp) {
1404 char *str_err = strerror(errno);
1405 char *cwdp = getcwd(cwd,MAXPATHLEN);
1406 serverLog(LL_WARNING,
1407 "Failed opening the temp RDB file %s (in server root dir %s) "
1408 "for saving: %s",
1409 tmpfile,
1410 cwdp ? cwdp : "unknown",
1411 str_err);
1412 return C_ERR;
1413 }
1414
1415 rioInitWithFile(&rdb,fp);
1416 startSaving(RDBFLAGS_NONE);
1417
1418 if (server.rdb_save_incremental_fsync)
1419 rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
1420
1421 if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
1422 errno = error;
1423 err_op = "rdbSaveRio";
1424 goto werr;
1425 }
1426
1427 /* Make sure data will not remain on the OS's output buffers */
1428 if (fflush(fp)) { err_op = "fflush"; goto werr; }
1429 if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
1430 if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
1431 fp = NULL;
1432
1433 /* Use RENAME to make sure the DB file is changed atomically only
1434 * if the generate DB file is ok. */
1435 if (rename(tmpfile,filename) == -1) {
1436 char *str_err = strerror(errno);
1437 char *cwdp = getcwd(cwd,MAXPATHLEN);
1438 serverLog(LL_WARNING,
1439 "Error moving temp DB file %s on the final "
1440 "destination %s (in server root dir %s): %s",
1441 tmpfile,
1442 filename,
1443 cwdp ? cwdp : "unknown",
1444 str_err);
1445 unlink(tmpfile);
1446 stopSaving(0);
1447 return C_ERR;
1448 }
1449 if (fsyncFileDir(filename) == -1) { err_op = "fsyncFileDir"; goto werr; }
1450
1451 serverLog(LL_NOTICE,"DB saved on disk");
1452 server.dirty = 0;
1453 server.lastsave = time(NULL);
1454 server.lastbgsave_status = C_OK;
1455 stopSaving(1);
1456 return C_OK;
1457
1458werr:
1459 serverLog(LL_WARNING,"Write error saving DB on disk(%s): %s", err_op, strerror(errno));
1460 if (fp) fclose(fp);
1461 unlink(tmpfile);
1462 stopSaving(0);
1463 return C_ERR;
1464}
1465
1466int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) {
1467 pid_t childpid;
1468
1469 if (hasActiveChildProcess()) return C_ERR;
1470 server.stat_rdb_saves++;
1471
1472 server.dirty_before_bgsave = server.dirty;
1473 server.lastbgsave_try = time(NULL);
1474
1475 if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
1476 int retval;
1477
1478 /* Child */
1479 redisSetProcTitle("redis-rdb-bgsave");
1480 redisSetCpuAffinity(server.bgsave_cpulist);
1481 retval = rdbSave(req, filename,rsi);
1482 if (retval == C_OK) {
1483 sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
1484 }
1485 exitFromChild((retval == C_OK) ? 0 : 1);
1486 } else {
1487 /* Parent */
1488 if (childpid == -1) {
1489 server.lastbgsave_status = C_ERR;
1490 serverLog(LL_WARNING,"Can't save in background: fork: %s",
1491 strerror(errno));
1492 return C_ERR;
1493 }
1494 serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);
1495 server.rdb_save_time_start = time(NULL);
1496 server.rdb_child_type = RDB_CHILD_TYPE_DISK;
1497 return C_OK;
1498 }
1499 return C_OK; /* unreached */
1500}
1501
1502/* Note that we may call this function in signal handle 'sigShutdownHandler',
1503 * so we need guarantee all functions we call are async-signal-safe.
1504 * If we call this function from signal handle, we won't call bg_unlink that
1505 * is not async-signal-safe. */
1506void rdbRemoveTempFile(pid_t childpid, int from_signal) {
1507 char tmpfile[256];
1508 char pid[32];
1509
1510 /* Generate temp rdb file name using async-signal safe functions. */
1511 int pid_len = ll2string(pid, sizeof(pid), childpid);
1512 strcpy(tmpfile, "temp-");
1513 strncpy(tmpfile+5, pid, pid_len);
1514 strcpy(tmpfile+5+pid_len, ".rdb");
1515
1516 if (from_signal) {
1517 /* bg_unlink is not async-signal-safe, but in this case we don't really
1518 * need to close the fd, it'll be released when the process exists. */
1519 int fd = open(tmpfile, O_RDONLY|O_NONBLOCK);
1520 UNUSED(fd);
1521 unlink(tmpfile);
1522 } else {
1523 bg_unlink(tmpfile);
1524 }
1525}
1526
1527/* This function is called by rdbLoadObject() when the code is in RDB-check
1528 * mode and we find a module value of type 2 that can be parsed without
1529 * the need of the actual module. The value is parsed for errors, finally
1530 * a dummy redis object is returned just to conform to the API. */
1531robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
1532 uint64_t opcode;
1533 while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) {
1534 if (opcode == RDB_MODULE_OPCODE_SINT ||
1535 opcode == RDB_MODULE_OPCODE_UINT)
1536 {
1537 uint64_t len;
1538 if (rdbLoadLenByRef(rdb,NULL,&len) == -1) {
1539 rdbReportCorruptRDB(
1540 "Error reading integer from module %s value", modulename);
1541 }
1542 } else if (opcode == RDB_MODULE_OPCODE_STRING) {
1543 robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
1544 if (o == NULL) {
1545 rdbReportCorruptRDB(
1546 "Error reading string from module %s value", modulename);
1547 }
1548 decrRefCount(o);
1549 } else if (opcode == RDB_MODULE_OPCODE_FLOAT) {
1550 float val;
1551 if (rdbLoadBinaryFloatValue(rdb,&val) == -1) {
1552 rdbReportCorruptRDB(
1553 "Error reading float from module %s value", modulename);
1554 }
1555 } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) {
1556 double val;
1557 if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) {
1558 rdbReportCorruptRDB(
1559 "Error reading double from module %s value", modulename);
1560 }
1561 }
1562 }
1563 return createStringObject("module-dummy-value",18);
1564}
1565
1566/* callback for hashZiplistConvertAndValidateIntegrity.
1567 * Check that the ziplist doesn't have duplicate hash field names.
1568 * The ziplist element pointed by 'p' will be converted and stored into listpack. */
1569static int _ziplistPairsEntryConvertAndValidate(unsigned char *p, unsigned int head_count, void *userdata) {
1570 unsigned char *str;
1571 unsigned int slen;
1572 long long vll;
1573
1574 struct {
1575 long count;
1576 dict *fields;
1577 unsigned char **lp;
1578 } *data = userdata;
1579
1580 if (data->fields == NULL) {
1581 data->fields = dictCreate(&hashDictType);
1582 dictExpand(data->fields, head_count/2);
1583 }
1584
1585 if (!ziplistGet(p, &str, &slen, &vll))
1586 return 0;
1587
1588 /* Even records are field names, add to dict and check that's not a dup */
1589 if (((data->count) & 1) == 0) {
1590 sds field = str? sdsnewlen(str, slen): sdsfromlonglong(vll);
1591 if (dictAdd(data->fields, field, NULL) != DICT_OK) {
1592 /* Duplicate, return an error */
1593 sdsfree(field);
1594 return 0;
1595 }
1596 }
1597
1598 if (str) {
1599 *(data->lp) = lpAppend(*(data->lp), (unsigned char*)str, slen);
1600 } else {
1601 *(data->lp) = lpAppendInteger(*(data->lp), vll);
1602 }
1603
1604 (data->count)++;
1605 return 1;
1606}
1607
1608/* Validate the integrity of the data structure while converting it to
1609 * listpack and storing it at 'lp'.
1610 * The function is safe to call on non-validated ziplists, it returns 0
1611 * when encounter an integrity validation issue. */
1612int ziplistPairsConvertAndValidateIntegrity(unsigned char *zl, size_t size, unsigned char **lp) {
1613 /* Keep track of the field names to locate duplicate ones */
1614 struct {
1615 long count;
1616 dict *fields; /* Initialisation at the first callback. */
1617 unsigned char **lp;
1618 } data = {0, NULL, lp};
1619
1620 int ret = ziplistValidateIntegrity(zl, size, 1, _ziplistPairsEntryConvertAndValidate, &data);
1621
1622 /* make sure we have an even number of records. */
1623 if (data.count & 1)
1624 ret = 0;
1625
1626 if (data.fields) dictRelease(data.fields);
1627 return ret;
1628}
1629
1630/* callback for ziplistValidateIntegrity.
1631 * The ziplist element pointed by 'p' will be converted and stored into listpack. */
1632static int _ziplistEntryConvertAndValidate(unsigned char *p, unsigned int head_count, void *userdata) {
1633 UNUSED(head_count);
1634 unsigned char *str;
1635 unsigned int slen;
1636 long long vll;
1637 unsigned char **lp = (unsigned char**)userdata;
1638
1639 if (!ziplistGet(p, &str, &slen, &vll)) return 0;
1640
1641 if (str)
1642 *lp = lpAppend(*lp, (unsigned char*)str, slen);
1643 else
1644 *lp = lpAppendInteger(*lp, vll);
1645
1646 return 1;
1647}
1648
1649/* callback for ziplistValidateIntegrity.
1650 * The ziplist element pointed by 'p' will be converted and stored into quicklist. */
1651static int _listZiplistEntryConvertAndValidate(unsigned char *p, unsigned int head_count, void *userdata) {
1652 UNUSED(head_count);
1653 unsigned char *str;
1654 unsigned int slen;
1655 long long vll;
1656 char longstr[32] = {0};
1657 quicklist *ql = (quicklist*)userdata;
1658
1659 if (!ziplistGet(p, &str, &slen, &vll)) return 0;
1660 if (!str) {
1661 /* Write the longval as a string so we can re-add it */
1662 slen = ll2string(longstr, sizeof(longstr), vll);
1663 str = (unsigned char *)longstr;
1664 }
1665 quicklistPushTail(ql, str, slen);
1666 return 1;
1667}
1668
1669/* callback for to check the listpack doesn't have duplicate records */
1670static int _lpPairsEntryValidation(unsigned char *p, unsigned int head_count, void *userdata) {
1671 struct {
1672 long count;
1673 dict *fields;
1674 } *data = userdata;
1675
1676 if (data->fields == NULL) {
1677 data->fields = dictCreate(&hashDictType);
1678 dictExpand(data->fields, head_count/2);
1679 }
1680
1681 /* Even records are field names, add to dict and check that's not a dup */
1682 if (((data->count) & 1) == 0) {
1683 unsigned char *str;
1684 int64_t slen;
1685 unsigned char buf[LP_INTBUF_SIZE];
1686
1687 str = lpGet(p, &slen, buf);
1688 sds field = sdsnewlen(str, slen);
1689 if (dictAdd(data->fields, field, NULL) != DICT_OK) {
1690 /* Duplicate, return an error */
1691 sdsfree(field);
1692 return 0;
1693 }
1694 }
1695
1696 (data->count)++;
1697 return 1;
1698}
1699
1700/* Validate the integrity of the listpack structure.
1701 * when `deep` is 0, only the integrity of the header is validated.
1702 * when `deep` is 1, we scan all the entries one by one. */
1703int lpPairsValidateIntegrityAndDups(unsigned char *lp, size_t size, int deep) {
1704 if (!deep)
1705 return lpValidateIntegrity(lp, size, 0, NULL, NULL);
1706
1707 /* Keep track of the field names to locate duplicate ones */
1708 struct {
1709 long count;
1710 dict *fields; /* Initialisation at the first callback. */
1711 } data = {0, NULL};
1712
1713 int ret = lpValidateIntegrity(lp, size, 1, _lpPairsEntryValidation, &data);
1714
1715 /* make sure we have an even number of records. */
1716 if (data.count & 1)
1717 ret = 0;
1718
1719 if (data.fields) dictRelease(data.fields);
1720 return ret;
1721}
1722
1723/* Load a Redis object of the specified type from the specified file.
1724 * On success a newly allocated object is returned, otherwise NULL.
1725 * When the function returns NULL and if 'error' is not NULL, the
1726 * integer pointed by 'error' is set to the type of error that occurred */
1727robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
1728 robj *o = NULL, *ele, *dec;
1729 uint64_t len;
1730 unsigned int i;
1731
1732 /* Set default error of load object, it will be set to 0 on success. */
1733 if (error) *error = RDB_LOAD_ERR_OTHER;
1734
1735 int deep_integrity_validation = server.sanitize_dump_payload == SANITIZE_DUMP_YES;
1736 if (server.sanitize_dump_payload == SANITIZE_DUMP_CLIENTS) {
1737 /* Skip sanitization when loading (an RDB), or getting a RESTORE command
1738 * from either the master or a client using an ACL user with the skip-sanitize-payload flag. */
1739 int skip = server.loading ||
1740 (server.current_client && (server.current_client->flags & CLIENT_MASTER));
1741 if (!skip && server.current_client && server.current_client->user)
1742 skip = !!(server.current_client->user->flags & USER_FLAG_SANITIZE_PAYLOAD_SKIP);
1743 deep_integrity_validation = !skip;
1744 }
1745
1746 if (rdbtype == RDB_TYPE_STRING) {
1747 /* Read string value */
1748 if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
1749 o = tryObjectEncoding(o);
1750 } else if (rdbtype == RDB_TYPE_LIST) {
1751 /* Read list value */
1752 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1753 if (len == 0) goto emptykey;
1754
1755 o = createQuicklistObject();
1756 quicklistSetOptions(o->ptr, server.list_max_listpack_size,
1757 server.list_compress_depth);
1758
1759 /* Load every single element of the list */
1760 while(len--) {
1761 if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) {
1762 decrRefCount(o);
1763 return NULL;
1764 }
1765 dec = getDecodedObject(ele);
1766 size_t len = sdslen(dec->ptr);
1767 quicklistPushTail(o->ptr, dec->ptr, len);
1768 decrRefCount(dec);
1769 decrRefCount(ele);
1770 }
1771 } else if (rdbtype == RDB_TYPE_SET) {
1772 /* Read Set value */
1773 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1774 if (len == 0) goto emptykey;
1775
1776 /* Use a regular set when there are too many entries. */
1777 size_t max_entries = server.set_max_intset_entries;
1778 if (max_entries >= 1<<30) max_entries = 1<<30;
1779 if (len > max_entries) {
1780 o = createSetObject();
1781 /* It's faster to expand the dict to the right size asap in order
1782 * to avoid rehashing */
1783 if (len > DICT_HT_INITIAL_SIZE && dictTryExpand(o->ptr,len) != DICT_OK) {
1784 rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len);
1785 decrRefCount(o);
1786 return NULL;
1787 }
1788 } else {
1789 o = createIntsetObject();
1790 }
1791
1792 /* Load every single element of the set */
1793 for (i = 0; i < len; i++) {
1794 long long llval;
1795 sds sdsele;
1796
1797 if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1798 decrRefCount(o);
1799 return NULL;
1800 }
1801
1802 if (o->encoding == OBJ_ENCODING_INTSET) {
1803 /* Fetch integer value from element. */
1804 if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
1805 uint8_t success;
1806 o->ptr = intsetAdd(o->ptr,llval,&success);
1807 if (!success) {
1808 rdbReportCorruptRDB("Duplicate set members detected");
1809 decrRefCount(o);
1810 sdsfree(sdsele);
1811 return NULL;
1812 }
1813 } else {
1814 setTypeConvert(o,OBJ_ENCODING_HT);
1815 if (dictTryExpand(o->ptr,len) != DICT_OK) {
1816 rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len);
1817 sdsfree(sdsele);
1818 decrRefCount(o);
1819 return NULL;
1820 }
1821 }
1822 }
1823
1824 /* This will also be called when the set was just converted
1825 * to a regular hash table encoded set. */
1826 if (o->encoding == OBJ_ENCODING_HT) {
1827 if (dictAdd((dict*)o->ptr,sdsele,NULL) != DICT_OK) {
1828 rdbReportCorruptRDB("Duplicate set members detected");
1829 decrRefCount(o);
1830 sdsfree(sdsele);
1831 return NULL;
1832 }
1833 } else {
1834 sdsfree(sdsele);
1835 }
1836 }
1837 } else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) {
1838 /* Read sorted set value. */
1839 uint64_t zsetlen;
1840 size_t maxelelen = 0, totelelen = 0;
1841 zset *zs;
1842
1843 if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1844 if (zsetlen == 0) goto emptykey;
1845
1846 o = createZsetObject();
1847 zs = o->ptr;
1848
1849 if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict,zsetlen) != DICT_OK) {
1850 rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)zsetlen);
1851 decrRefCount(o);
1852 return NULL;
1853 }
1854
1855 /* Load every single element of the sorted set. */
1856 while(zsetlen--) {
1857 sds sdsele;
1858 double score;
1859 zskiplistNode *znode;
1860
1861 if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1862 decrRefCount(o);
1863 return NULL;
1864 }
1865
1866 if (rdbtype == RDB_TYPE_ZSET_2) {
1867 if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
1868 decrRefCount(o);
1869 sdsfree(sdsele);
1870 return NULL;
1871 }
1872 } else {
1873 if (rdbLoadDoubleValue(rdb,&score) == -1) {
1874 decrRefCount(o);
1875 sdsfree(sdsele);
1876 return NULL;
1877 }
1878 }
1879
1880 if (isnan(score)) {
1881 rdbReportCorruptRDB("Zset with NAN score detected");
1882 decrRefCount(o);
1883 sdsfree(sdsele);
1884 return NULL;
1885 }
1886
1887 /* Don't care about integer-encoded strings. */
1888 if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
1889 totelelen += sdslen(sdsele);
1890
1891 znode = zslInsert(zs->zsl,score,sdsele);
1892 if (dictAdd(zs->dict,sdsele,&znode->score) != DICT_OK) {
1893 rdbReportCorruptRDB("Duplicate zset fields detected");
1894 decrRefCount(o);
1895 /* no need to free 'sdsele', will be released by zslFree together with 'o' */
1896 return NULL;
1897 }
1898 }
1899
1900 /* Convert *after* loading, since sorted sets are not stored ordered. */
1901 if (zsetLength(o) <= server.zset_max_listpack_entries &&
1902 maxelelen <= server.zset_max_listpack_value &&
1903 lpSafeToAdd(NULL, totelelen))
1904 {
1905 zsetConvert(o,OBJ_ENCODING_LISTPACK);
1906 }
1907 } else if (rdbtype == RDB_TYPE_HASH) {
1908 uint64_t len;
1909 int ret;
1910 sds field, value;
1911 dict *dupSearchDict = NULL;
1912
1913 len = rdbLoadLen(rdb, NULL);
1914 if (len == RDB_LENERR) return NULL;
1915 if (len == 0) goto emptykey;
1916
1917 o = createHashObject();
1918
1919 /* Too many entries? Use a hash table right from the start. */
1920 if (len > server.hash_max_listpack_entries)
1921 hashTypeConvert(o, OBJ_ENCODING_HT);
1922 else if (deep_integrity_validation) {
1923 /* In this mode, we need to guarantee that the server won't crash
1924 * later when the ziplist is converted to a dict.
1925 * Create a set (dict with no values) to for a dup search.
1926 * We can dismiss it as soon as we convert the ziplist to a hash. */
1927 dupSearchDict = dictCreate(&hashDictType);
1928 }
1929
1930
1931 /* Load every field and value into the ziplist */
1932 while (o->encoding == OBJ_ENCODING_LISTPACK && len > 0) {
1933 len--;
1934 /* Load raw strings */
1935 if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1936 decrRefCount(o);
1937 if (dupSearchDict) dictRelease(dupSearchDict);
1938 return NULL;
1939 }
1940 if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1941 sdsfree(field);
1942 decrRefCount(o);
1943 if (dupSearchDict) dictRelease(dupSearchDict);
1944 return NULL;
1945 }
1946
1947 if (dupSearchDict) {
1948 sds field_dup = sdsdup(field);
1949 if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) {
1950 rdbReportCorruptRDB("Hash with dup elements");
1951 dictRelease(dupSearchDict);
1952 decrRefCount(o);
1953 sdsfree(field_dup);
1954 sdsfree(field);
1955 sdsfree(value);
1956 return NULL;
1957 }
1958 }
1959
1960 /* Convert to hash table if size threshold is exceeded */
1961 if (sdslen(field) > server.hash_max_listpack_value ||
1962 sdslen(value) > server.hash_max_listpack_value ||
1963 !lpSafeToAdd(o->ptr, sdslen(field)+sdslen(value)))
1964 {
1965 hashTypeConvert(o, OBJ_ENCODING_HT);
1966 ret = dictAdd((dict*)o->ptr, field, value);
1967 if (ret == DICT_ERR) {
1968 rdbReportCorruptRDB("Duplicate hash fields detected");
1969 if (dupSearchDict) dictRelease(dupSearchDict);
1970 sdsfree(value);
1971 sdsfree(field);
1972 decrRefCount(o);
1973 return NULL;
1974 }
1975 break;
1976 }
1977
1978 /* Add pair to listpack */
1979 o->ptr = lpAppend(o->ptr, (unsigned char*)field, sdslen(field));
1980 o->ptr = lpAppend(o->ptr, (unsigned char*)value, sdslen(value));
1981
1982 sdsfree(field);
1983 sdsfree(value);
1984 }
1985
1986 if (dupSearchDict) {
1987 /* We no longer need this, from now on the entries are added
1988 * to a dict so the check is performed implicitly. */
1989 dictRelease(dupSearchDict);
1990 dupSearchDict = NULL;
1991 }
1992
1993 if (o->encoding == OBJ_ENCODING_HT && len > DICT_HT_INITIAL_SIZE) {
1994 if (dictTryExpand(o->ptr,len) != DICT_OK) {
1995 rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len);
1996 decrRefCount(o);
1997 return NULL;
1998 }
1999 }
2000
2001 /* Load remaining fields and values into the hash table */
2002 while (o->encoding == OBJ_ENCODING_HT && len > 0) {
2003 len--;
2004 /* Load encoded strings */
2005 if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
2006 decrRefCount(o);
2007 return NULL;
2008 }
2009 if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
2010 sdsfree(field);
2011 decrRefCount(o);
2012 return NULL;
2013 }
2014
2015 /* Add pair to hash table */
2016 ret = dictAdd((dict*)o->ptr, field, value);
2017 if (ret == DICT_ERR) {
2018 rdbReportCorruptRDB("Duplicate hash fields detected");
2019 sdsfree(value);
2020 sdsfree(field);
2021 decrRefCount(o);
2022 return NULL;
2023 }
2024 }
2025
2026 /* All pairs should be read by now */
2027 serverAssert(len == 0);
2028 } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST || rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
2029 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
2030 if (len == 0) goto emptykey;
2031
2032 o = createQuicklistObject();
2033 quicklistSetOptions(o->ptr, server.list_max_listpack_size,
2034 server.list_compress_depth);
2035 uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
2036 while (len--) {
2037 unsigned char *lp;
2038 size_t encoded_len;
2039
2040 if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
2041 if ((container = rdbLoadLen(rdb,NULL)) == RDB_LENERR) {
2042 decrRefCount(o);
2043 return NULL;
2044 }
2045
2046 if (container != QUICKLIST_NODE_CONTAINER_PACKED && container != QUICKLIST_NODE_CONTAINER_PLAIN) {
2047 rdbReportCorruptRDB("Quicklist integrity check failed.");
2048 decrRefCount(o);
2049 return NULL;
2050 }
2051 }
2052
2053 unsigned char *data =
2054 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len);
2055 if (data == NULL || (encoded_len == 0)) {
2056 zfree(data);
2057 decrRefCount(o);
2058 return NULL;
2059 }
2060
2061 if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
2062 quicklistAppendPlainNode(o->ptr, data, encoded_len);
2063 continue;
2064 }
2065
2066 if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
2067 lp = data;
2068 if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;
2069 if (!lpValidateIntegrity(lp, encoded_len, deep_integrity_validation, NULL, NULL)) {
2070 rdbReportCorruptRDB("Listpack integrity check failed.");
2071 decrRefCount(o);
2072 zfree(lp);
2073 return NULL;
2074 }
2075 } else {
2076 lp = lpNew(encoded_len);
2077 if (!ziplistValidateIntegrity(data, encoded_len, 1,
2078 _ziplistEntryConvertAndValidate, &lp))
2079 {
2080 rdbReportCorruptRDB("Ziplist integrity check failed.");
2081 decrRefCount(o);
2082 zfree(data);
2083 zfree(lp);
2084 return NULL;
2085 }
2086 zfree(data);
2087 lp = lpShrinkToFit(lp);
2088 }
2089
2090 /* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */
2091 if (lpLength(lp) == 0) {
2092 zfree(lp);
2093 continue;
2094 } else {
2095 quicklistAppendListpack(o->ptr, lp);
2096 }
2097 }
2098
2099 if (quicklistCount(o->ptr) == 0) {
2100 decrRefCount(o);
2101 goto emptykey;
2102 }
2103 } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||
2104 rdbtype == RDB_TYPE_LIST_ZIPLIST ||
2105 rdbtype == RDB_TYPE_SET_INTSET ||
2106 rdbtype == RDB_TYPE_ZSET_ZIPLIST ||
2107 rdbtype == RDB_TYPE_ZSET_LISTPACK ||
2108 rdbtype == RDB_TYPE_HASH_ZIPLIST ||
2109 rdbtype == RDB_TYPE_HASH_LISTPACK)
2110 {
2111 size_t encoded_len;
2112 unsigned char *encoded =
2113 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len);
2114 if (encoded == NULL) return NULL;
2115
2116 o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */
2117
2118 /* Fix the object encoding, and make sure to convert the encoded
2119 * data type into the base type if accordingly to the current
2120 * configuration there are too many elements in the encoded data
2121 * type. Note that we only check the length and not max element
2122 * size as this is an O(N) scan. Eventually everything will get
2123 * converted. */
2124 switch(rdbtype) {
2125 case RDB_TYPE_HASH_ZIPMAP:
2126 /* Since we don't keep zipmaps anymore, the rdb loading for these
2127 * is O(n) anyway, use `deep` validation. */
2128 if (!zipmapValidateIntegrity(encoded, encoded_len, 1)) {
2129 rdbReportCorruptRDB("Zipmap integrity check failed.");
2130 zfree(encoded);
2131 o->ptr = NULL;
2132 decrRefCount(o);
2133 return NULL;
2134 }
2135 /* Convert to ziplist encoded hash. This must be deprecated
2136 * when loading dumps created by Redis 2.4 gets deprecated. */
2137 {
2138 unsigned char *lp = lpNew(0);
2139 unsigned char *zi = zipmapRewind(o->ptr);
2140 unsigned char *fstr, *vstr;
2141 unsigned int flen, vlen;
2142 unsigned int maxlen = 0;
2143 dict *dupSearchDict = dictCreate(&hashDictType);
2144
2145 while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
2146 if (flen > maxlen) maxlen = flen;
2147 if (vlen > maxlen) maxlen = vlen;
2148
2149 /* search for duplicate records */
2150 sds field = sdstrynewlen(fstr, flen);
2151 if (!field || dictAdd(dupSearchDict, field, NULL) != DICT_OK ||
2152 !lpSafeToAdd(lp, (size_t)flen + vlen)) {
2153 rdbReportCorruptRDB("Hash zipmap with dup elements, or big length (%u)", flen);
2154 dictRelease(dupSearchDict);
2155 sdsfree(field);
2156 zfree(encoded);
2157 o->ptr = NULL;
2158 decrRefCount(o);
2159 return NULL;
2160 }
2161
2162 lp = lpAppend(lp, fstr, flen);
2163 lp = lpAppend(lp, vstr, vlen);
2164 }
2165
2166 dictRelease(dupSearchDict);
2167 zfree(o->ptr);
2168 o->ptr = lp;
2169 o->type = OBJ_HASH;
2170 o->encoding = OBJ_ENCODING_LISTPACK;
2171
2172 if (hashTypeLength(o) > server.hash_max_listpack_entries ||
2173 maxlen > server.hash_max_listpack_value)
2174 {
2175 hashTypeConvert(o, OBJ_ENCODING_HT);
2176 }
2177 }
2178 break;
2179 case RDB_TYPE_LIST_ZIPLIST:
2180 {
2181 quicklist *ql = quicklistNew(server.list_max_listpack_size,
2182 server.list_compress_depth);
2183
2184 if (!ziplistValidateIntegrity(encoded, encoded_len, 1,
2185 _listZiplistEntryConvertAndValidate, ql))
2186 {
2187 rdbReportCorruptRDB("List ziplist integrity check failed.");
2188 zfree(encoded);
2189 o->ptr = NULL;
2190 decrRefCount(o);
2191 quicklistRelease(ql);
2192 return NULL;
2193 }
2194
2195 if (ql->len == 0) {
2196 zfree(encoded);
2197 o->ptr = NULL;
2198 decrRefCount(o);
2199 quicklistRelease(ql);
2200 goto emptykey;
2201 }
2202
2203 zfree(encoded);
2204 o->type = OBJ_LIST;
2205 o->ptr = ql;
2206 o->encoding = OBJ_ENCODING_QUICKLIST;
2207 break;
2208 }
2209 case RDB_TYPE_SET_INTSET:
2210 if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;
2211 if (!intsetValidateIntegrity(encoded, encoded_len, deep_integrity_validation)) {
2212 rdbReportCorruptRDB("Intset integrity check failed.");
2213 zfree(encoded);
2214 o->ptr = NULL;
2215 decrRefCount(o);
2216 return NULL;
2217 }
2218 o->type = OBJ_SET;
2219 o->encoding = OBJ_ENCODING_INTSET;
2220 if (intsetLen(o->ptr) > server.set_max_intset_entries)
2221 setTypeConvert(o,OBJ_ENCODING_HT);
2222 break;
2223 case RDB_TYPE_ZSET_ZIPLIST:
2224 {
2225 unsigned char *lp = lpNew(encoded_len);
2226 if (!ziplistPairsConvertAndValidateIntegrity(encoded, encoded_len, &lp)) {
2227 rdbReportCorruptRDB("Zset ziplist integrity check failed.");
2228 zfree(lp);
2229 zfree(encoded);
2230 o->ptr = NULL;
2231 decrRefCount(o);
2232 return NULL;
2233 }
2234
2235 zfree(o->ptr);
2236 o->type = OBJ_ZSET;
2237 o->ptr = lp;
2238 o->encoding = OBJ_ENCODING_LISTPACK;
2239 if (zsetLength(o) == 0) {
2240 decrRefCount(o);
2241 goto emptykey;
2242 }
2243
2244 if (zsetLength(o) > server.zset_max_listpack_entries)
2245 zsetConvert(o,OBJ_ENCODING_SKIPLIST);
2246 else
2247 o->ptr = lpShrinkToFit(o->ptr);
2248 break;
2249 }
2250 case RDB_TYPE_ZSET_LISTPACK:
2251 if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;
2252 if (!lpPairsValidateIntegrityAndDups(encoded, encoded_len, deep_integrity_validation)) {
2253 rdbReportCorruptRDB("Zset listpack integrity check failed.");
2254 zfree(encoded);
2255 o->ptr = NULL;
2256 decrRefCount(o);
2257 return NULL;
2258 }
2259 o->type = OBJ_ZSET;
2260 o->encoding = OBJ_ENCODING_LISTPACK;
2261 if (zsetLength(o) == 0) {
2262 decrRefCount(o);
2263 goto emptykey;
2264 }
2265
2266 if (zsetLength(o) > server.zset_max_listpack_entries)
2267 zsetConvert(o,OBJ_ENCODING_SKIPLIST);
2268 break;
2269 case RDB_TYPE_HASH_ZIPLIST:
2270 {
2271 unsigned char *lp = lpNew(encoded_len);
2272 if (!ziplistPairsConvertAndValidateIntegrity(encoded, encoded_len, &lp)) {
2273 rdbReportCorruptRDB("Hash ziplist integrity check failed.");
2274 zfree(lp);
2275 zfree(encoded);
2276 o->ptr = NULL;
2277 decrRefCount(o);
2278 return NULL;
2279 }
2280
2281 zfree(o->ptr);
2282 o->ptr = lp;
2283 o->type = OBJ_HASH;
2284 o->encoding = OBJ_ENCODING_LISTPACK;
2285 if (hashTypeLength(o) == 0) {
2286 decrRefCount(o);
2287 goto emptykey;
2288 }
2289
2290 if (hashTypeLength(o) > server.hash_max_listpack_entries)
2291 hashTypeConvert(o, OBJ_ENCODING_HT);
2292 else
2293 o->ptr = lpShrinkToFit(o->ptr);
2294 break;
2295 }
2296 case RDB_TYPE_HASH_LISTPACK:
2297 if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;
2298 if (!lpPairsValidateIntegrityAndDups(encoded, encoded_len, deep_integrity_validation)) {
2299 rdbReportCorruptRDB("Hash listpack integrity check failed.");
2300 zfree(encoded);
2301 o->ptr = NULL;
2302 decrRefCount(o);
2303 return NULL;
2304 }
2305 o->type = OBJ_HASH;
2306 o->encoding = OBJ_ENCODING_LISTPACK;
2307 if (hashTypeLength(o) == 0) {
2308 decrRefCount(o);
2309 goto emptykey;
2310 }
2311
2312 if (hashTypeLength(o) > server.hash_max_listpack_entries)
2313 hashTypeConvert(o, OBJ_ENCODING_HT);
2314 break;
2315 default:
2316 /* totally unreachable */
2317 rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
2318 break;
2319 }
2320 } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
2321 o = createStreamObject();
2322 stream *s = o->ptr;
2323 uint64_t listpacks = rdbLoadLen(rdb,NULL);
2324 if (listpacks == RDB_LENERR) {
2325 rdbReportReadError("Stream listpacks len loading failed.");
2326 decrRefCount(o);
2327 return NULL;
2328 }
2329
2330 while(listpacks--) {
2331 /* Get the master ID, the one we'll use as key of the radix tree
2332 * node: the entries inside the listpack itself are delta-encoded
2333 * relatively to this ID. */
2334 sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
2335 if (nodekey == NULL) {
2336 rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
2337 decrRefCount(o);
2338 return NULL;
2339 }
2340 if (sdslen(nodekey) != sizeof(streamID)) {
2341 rdbReportCorruptRDB("Stream node key entry is not the "
2342 "size of a stream ID");
2343 sdsfree(nodekey);
2344 decrRefCount(o);
2345 return NULL;
2346 }
2347
2348 /* Load the listpack. */
2349 size_t lp_size;
2350 unsigned char *lp =
2351 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&lp_size);
2352 if (lp == NULL) {
2353 rdbReportReadError("Stream listpacks loading failed.");
2354 sdsfree(nodekey);
2355 decrRefCount(o);
2356 return NULL;
2357 }
2358 if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;
2359 if (!streamValidateListpackIntegrity(lp, lp_size, deep_integrity_validation)) {
2360 rdbReportCorruptRDB("Stream listpack integrity check failed.");
2361 sdsfree(nodekey);
2362 decrRefCount(o);
2363 zfree(lp);
2364 return NULL;
2365 }
2366
2367 unsigned char *first = lpFirst(lp);
2368 if (first == NULL) {
2369 /* Serialized listpacks should never be empty, since on
2370 * deletion we should remove the radix tree key if the
2371 * resulting listpack is empty. */
2372 rdbReportCorruptRDB("Empty listpack inside stream");
2373 sdsfree(nodekey);
2374 decrRefCount(o);
2375 zfree(lp);
2376 return NULL;
2377 }
2378
2379 /* Insert the key in the radix tree. */
2380 int retval = raxTryInsert(s->rax,
2381 (unsigned char*)nodekey,sizeof(streamID),lp,NULL);
2382 sdsfree(nodekey);
2383 if (!retval) {
2384 rdbReportCorruptRDB("Listpack re-added with existing key");
2385 decrRefCount(o);
2386 zfree(lp);
2387 return NULL;
2388 }
2389 }
2390 /* Load total number of items inside the stream. */
2391 s->length = rdbLoadLen(rdb,NULL);
2392
2393 /* Load the last entry ID. */
2394 s->last_id.ms = rdbLoadLen(rdb,NULL);
2395 s->last_id.seq = rdbLoadLen(rdb,NULL);
2396
2397 if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
2398 /* Load the first entry ID. */
2399 s->first_id.ms = rdbLoadLen(rdb,NULL);
2400 s->first_id.seq = rdbLoadLen(rdb,NULL);
2401
2402 /* Load the maximal deleted entry ID. */
2403 s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL);
2404 s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL);
2405
2406 /* Load the offset. */
2407 s->entries_added = rdbLoadLen(rdb,NULL);
2408 } else {
2409 /* During migration the offset can be initialized to the stream's
2410 * length. At this point, we also don't care about tombstones
2411 * because CG offsets will be later initialized as well. */
2412 s->max_deleted_entry_id.ms = 0;
2413 s->max_deleted_entry_id.seq = 0;
2414 s->entries_added = s->length;
2415
2416 /* Since the rax is already loaded, we can find the first entry's
2417 * ID. */
2418 streamGetEdgeID(s,1,1,&s->first_id);
2419 }
2420
2421 if (rioGetReadError(rdb)) {
2422 rdbReportReadError("Stream object metadata loading failed.");
2423 decrRefCount(o);
2424 return NULL;
2425 }
2426
2427 if (s->length && !raxSize(s->rax)) {
2428 rdbReportCorruptRDB("Stream length inconsistent with rax entries");
2429 decrRefCount(o);
2430 return NULL;
2431 }
2432
2433 /* Consumer groups loading */
2434 uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
2435 if (cgroups_count == RDB_LENERR) {
2436 rdbReportReadError("Stream cgroup count loading failed.");
2437 decrRefCount(o);
2438 return NULL;
2439 }
2440 while(cgroups_count--) {
2441 /* Get the consumer group name and ID. We can then create the
2442 * consumer group ASAP and populate its structure as
2443 * we read more data. */
2444 streamID cg_id;
2445 sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
2446 if (cgname == NULL) {
2447 rdbReportReadError(
2448 "Error reading the consumer group name from Stream");
2449 decrRefCount(o);
2450 return NULL;
2451 }
2452
2453 cg_id.ms = rdbLoadLen(rdb,NULL);
2454 cg_id.seq = rdbLoadLen(rdb,NULL);
2455 if (rioGetReadError(rdb)) {
2456 rdbReportReadError("Stream cgroup ID loading failed.");
2457 sdsfree(cgname);
2458 decrRefCount(o);
2459 return NULL;
2460 }
2461
2462 /* Load group offset. */
2463 uint64_t cg_offset;
2464 if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
2465 cg_offset = rdbLoadLen(rdb,NULL);
2466 if (rioGetReadError(rdb)) {
2467 rdbReportReadError("Stream cgroup offset loading failed.");
2468 sdsfree(cgname);
2469 decrRefCount(o);
2470 return NULL;
2471 }
2472 } else {
2473 cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id);
2474 }
2475
2476 streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset);
2477 if (cgroup == NULL) {
2478 rdbReportCorruptRDB("Duplicated consumer group name %s",
2479 cgname);
2480 decrRefCount(o);
2481 sdsfree(cgname);
2482 return NULL;
2483 }
2484 sdsfree(cgname);
2485
2486 /* Load the global PEL for this consumer group, however we'll
2487 * not yet populate the NACK structures with the message
2488 * owner, since consumers for this group and their messages will
2489 * be read as a next step. So for now leave them not resolved
2490 * and later populate it. */
2491 uint64_t pel_size = rdbLoadLen(rdb,NULL);
2492 if (pel_size == RDB_LENERR) {
2493 rdbReportReadError("Stream PEL size loading failed.");
2494 decrRefCount(o);
2495 return NULL;
2496 }
2497 while(pel_size--) {
2498 unsigned char rawid[sizeof(streamID)];
2499 if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
2500 rdbReportReadError("Stream PEL ID loading failed.");
2501 decrRefCount(o);
2502 return NULL;
2503 }
2504 streamNACK *nack = streamCreateNACK(NULL);
2505 nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
2506 nack->delivery_count = rdbLoadLen(rdb,NULL);
2507 if (rioGetReadError(rdb)) {
2508 rdbReportReadError("Stream PEL NACK loading failed.");
2509 decrRefCount(o);
2510 streamFreeNACK(nack);
2511 return NULL;
2512 }
2513 if (!raxTryInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) {
2514 rdbReportCorruptRDB("Duplicated global PEL entry "
2515 "loading stream consumer group");
2516 decrRefCount(o);
2517 streamFreeNACK(nack);
2518 return NULL;
2519 }
2520 }
2521
2522 /* Now that we loaded our global PEL, we need to load the
2523 * consumers and their local PELs. */
2524 uint64_t consumers_num = rdbLoadLen(rdb,NULL);
2525 if (consumers_num == RDB_LENERR) {
2526 rdbReportReadError("Stream consumers num loading failed.");
2527 decrRefCount(o);
2528 return NULL;
2529 }
2530 while(consumers_num--) {
2531 sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
2532 if (cname == NULL) {
2533 rdbReportReadError(
2534 "Error reading the consumer name from Stream group.");
2535 decrRefCount(o);
2536 return NULL;
2537 }
2538 streamConsumer *consumer = streamCreateConsumer(cgroup,cname,NULL,0,
2539 SCC_NO_NOTIFY|SCC_NO_DIRTIFY);
2540 sdsfree(cname);
2541 if (!consumer) {
2542 rdbReportCorruptRDB("Duplicate stream consumer detected.");
2543 decrRefCount(o);
2544 return NULL;
2545 }
2546 consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
2547 if (rioGetReadError(rdb)) {
2548 rdbReportReadError("Stream short read reading seen time.");
2549 decrRefCount(o);
2550 return NULL;
2551 }
2552
2553 /* Load the PEL about entries owned by this specific
2554 * consumer. */
2555 pel_size = rdbLoadLen(rdb,NULL);
2556 if (pel_size == RDB_LENERR) {
2557 rdbReportReadError(
2558 "Stream consumer PEL num loading failed.");
2559 decrRefCount(o);
2560 return NULL;
2561 }
2562 while(pel_size--) {
2563 unsigned char rawid[sizeof(streamID)];
2564 if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
2565 rdbReportReadError(
2566 "Stream short read reading PEL streamID.");
2567 decrRefCount(o);
2568 return NULL;
2569 }
2570 streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
2571 if (nack == raxNotFound) {
2572 rdbReportCorruptRDB("Consumer entry not found in "
2573 "group global PEL");
2574 decrRefCount(o);
2575 return NULL;
2576 }
2577
2578 /* Set the NACK consumer, that was left to NULL when
2579 * loading the global PEL. Then set the same shared
2580 * NACK structure also in the consumer-specific PEL. */
2581 nack->consumer = consumer;
2582 if (!raxTryInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL)) {
2583 rdbReportCorruptRDB("Duplicated consumer PEL entry "
2584 " loading a stream consumer "
2585 "group");
2586 decrRefCount(o);
2587 streamFreeNACK(nack);
2588 return NULL;
2589 }
2590 }
2591 }
2592
2593 /* Verify that each PEL eventually got a consumer assigned to it. */
2594 if (deep_integrity_validation) {
2595 raxIterator ri_cg_pel;
2596 raxStart(&ri_cg_pel,cgroup->pel);
2597 raxSeek(&ri_cg_pel,"^",NULL,0);
2598 while(raxNext(&ri_cg_pel)) {
2599 streamNACK *nack = ri_cg_pel.data;
2600 if (!nack->consumer) {
2601 raxStop(&ri_cg_pel);
2602 rdbReportCorruptRDB("Stream CG PEL entry without consumer");
2603 decrRefCount(o);
2604 return NULL;
2605 }
2606 }
2607 raxStop(&ri_cg_pel);
2608 }
2609 }
2610 } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
2611 uint64_t moduleid = rdbLoadLen(rdb,NULL);
2612 if (rioGetReadError(rdb)) {
2613 rdbReportReadError("Short read module id");
2614 return NULL;
2615 }
2616 moduleType *mt = moduleTypeLookupModuleByID(moduleid);
2617
2618 if (rdbCheckMode && rdbtype == RDB_TYPE_MODULE_2) {
2619 char name[10];
2620 moduleTypeNameByID(name,moduleid);
2621 return rdbLoadCheckModuleValue(rdb,name);
2622 }
2623
2624 if (mt == NULL) {
2625 char name[10];
2626 moduleTypeNameByID(name,moduleid);
2627 rdbReportCorruptRDB("The RDB file contains module data I can't load: no matching module type '%s'", name);
2628 return NULL;
2629 }
2630 RedisModuleIO io;
2631 robj keyobj;
2632 initStaticStringObject(keyobj,key);
2633 moduleInitIOContext(io,mt,rdb,&keyobj,dbid);
2634 io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
2635 /* Call the rdb_load method of the module providing the 10 bit
2636 * encoding version in the lower 10 bits of the module ID. */
2637 void *ptr = mt->rdb_load(&io,moduleid&1023);
2638 if (io.ctx) {
2639 moduleFreeContext(io.ctx);
2640 zfree(io.ctx);
2641 }
2642
2643 /* Module v2 serialization has an EOF mark at the end. */
2644 if (io.ver == 2) {
2645 uint64_t eof = rdbLoadLen(rdb,NULL);
2646 if (eof == RDB_LENERR) {
2647 if (ptr) {
2648 o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
2649 decrRefCount(o);
2650 }
2651 return NULL;
2652 }
2653 if (eof != RDB_MODULE_OPCODE_EOF) {
2654 rdbReportCorruptRDB("The RDB file contains module data for the module '%s' that is not terminated by "
2655 "the proper module value EOF marker", moduleTypeModuleName(mt));
2656 if (ptr) {
2657 o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
2658 decrRefCount(o);
2659 }
2660 return NULL;
2661 }
2662 }
2663
2664 if (ptr == NULL) {
2665 rdbReportCorruptRDB("The RDB file contains module data for the module type '%s', that the responsible "
2666 "module is not able to load. Check for modules log above for additional clues.",
2667 moduleTypeModuleName(mt));
2668 return NULL;
2669 }
2670 o = createModuleObject(mt,ptr);
2671 } else {
2672 rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
2673 return NULL;
2674 }
2675 if (error) *error = 0;
2676 return o;
2677
2678emptykey:
2679 if (error) *error = RDB_LOAD_ERR_EMPTY_KEY;
2680 return NULL;
2681}
2682
2683/* Mark that we are loading in the global state and setup the fields
2684 * needed to provide loading stats. */
2685void startLoading(size_t size, int rdbflags, int async) {
2686 /* Load the DB */
2687 server.loading = 1;
2688 if (async == 1) server.async_loading = 1;
2689 server.loading_start_time = time(NULL);
2690 server.loading_loaded_bytes = 0;
2691 server.loading_total_bytes = size;
2692 server.loading_rdb_used_mem = 0;
2693 server.rdb_last_load_keys_expired = 0;
2694 server.rdb_last_load_keys_loaded = 0;
2695 blockingOperationStarts();
2696
2697 /* Fire the loading modules start event. */
2698 int subevent;
2699 if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
2700 subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
2701 else if(rdbflags & RDBFLAGS_REPLICATION)
2702 subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
2703 else
2704 subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
2705 moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
2706}
2707
2708/* Mark that we are loading in the global state and setup the fields
2709 * needed to provide loading stats.
2710 * 'filename' is optional and used for rdb-check on error */
2711void startLoadingFile(size_t size, char* filename, int rdbflags) {
2712 rdbFileBeingLoaded = filename;
2713 startLoading(size, rdbflags, 0);
2714}
2715
2716/* Refresh the absolute loading progress info */
2717void loadingAbsProgress(off_t pos) {
2718 server.loading_loaded_bytes = pos;
2719 if (server.stat_peak_memory < zmalloc_used_memory())
2720 server.stat_peak_memory = zmalloc_used_memory();
2721}
2722
2723/* Refresh the incremental loading progress info */
2724void loadingIncrProgress(off_t size) {
2725 server.loading_loaded_bytes += size;
2726 if (server.stat_peak_memory < zmalloc_used_memory())
2727 server.stat_peak_memory = zmalloc_used_memory();
2728}
2729
2730/* Update the file name currently being loaded */
2731void updateLoadingFileName(char* filename) {
2732 rdbFileBeingLoaded = filename;
2733}
2734
2735/* Loading finished */
2736void stopLoading(int success) {
2737 server.loading = 0;
2738 server.async_loading = 0;
2739 blockingOperationEnds();
2740 rdbFileBeingLoaded = NULL;
2741
2742 /* Fire the loading modules end event. */
2743 moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
2744 success?
2745 REDISMODULE_SUBEVENT_LOADING_ENDED:
2746 REDISMODULE_SUBEVENT_LOADING_FAILED,
2747 NULL);
2748}
2749
2750void startSaving(int rdbflags) {
2751 /* Fire the persistence modules start event. */
2752 int subevent;
2753 if (rdbflags & RDBFLAGS_AOF_PREAMBLE && getpid() != server.pid)
2754 subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
2755 else if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
2756 subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_AOF_START;
2757 else if (getpid()!=server.pid)
2758 subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
2759 else
2760 subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
2761 moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
2762}
2763
2764void stopSaving(int success) {
2765 /* Fire the persistence modules end event. */
2766 moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
2767 success?
2768 REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
2769 REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
2770 NULL);
2771}
2772
2773/* Track loading progress in order to serve client's from time to time
2774 and if needed calculate rdb checksum */
2775void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
2776 if (server.rdb_checksum)
2777 rioGenericUpdateChecksum(r, buf, len);
2778 if (server.loading_process_events_interval_bytes &&
2779 (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)
2780 {
2781 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
2782 replicationSendNewlineToMaster();
2783 loadingAbsProgress(r->processed_bytes);
2784 processEventsWhileBlocked();
2785 processModuleLoadingProgressEvent(0);
2786 }
2787 if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
2788 atomicIncr(server.stat_net_repl_input_bytes, len);
2789 }
2790}
2791
2792/* Save the given functions_ctx to the rdb.
2793 * The err output parameter is optional and will be set with relevant error
2794 * message on failure, it is the caller responsibility to free the error
2795 * message on failure.
2796 *
2797 * The lib_ctx argument is also optional. If NULL is given, only verify rdb
2798 * structure with out performing the actual functions loading. */
2799int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx* lib_ctx, int type, int rdbflags, sds *err) {
2800 UNUSED(ver);
2801 sds error = NULL;
2802 sds final_payload = NULL;
2803 int res = C_ERR;
2804 if (type == RDB_OPCODE_FUNCTION) {
2805 /* RDB that was generated on versions 7.0 rc1 and 7.0 rc2 has another
2806 * an old format that contains the library name, engine and description.
2807 * To support this format we must read those values. */
2808 sds name = NULL;
2809 sds engine_name = NULL;
2810 sds desc = NULL;
2811 sds blob = NULL;
2812 uint64_t has_desc;
2813
2814 if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
2815 error = sdsnew("Failed loading library name");
2816 goto cleanup;
2817 }
2818
2819 if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
2820 error = sdsnew("Failed loading engine name");
2821 goto cleanup;
2822 }
2823
2824 if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) {
2825 error = sdsnew("Failed loading library description indicator");
2826 goto cleanup;
2827 }
2828
2829 if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
2830 error = sdsnew("Failed loading library description");
2831 goto cleanup;
2832 }
2833
2834 if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
2835 error = sdsnew("Failed loading library blob");
2836 goto cleanup;
2837 }
2838 /* Translate old format (versions 7.0 rc1 and 7.0 rc2) to new format.
2839 * The new format has the library name and engine inside the script payload.
2840 * Add those parameters to the original script payload (ignore the description if exists). */
2841 final_payload = sdscatfmt(sdsempty(), "#!%s name=%s\n%s", engine_name, name, blob);
2842cleanup:
2843 if (name) sdsfree(name);
2844 if (engine_name) sdsfree(engine_name);
2845 if (desc) sdsfree(desc);
2846 if (blob) sdsfree(blob);
2847 if (error) goto done;
2848 } else if (type == RDB_OPCODE_FUNCTION2) {
2849 if (!(final_payload = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
2850 error = sdsnew("Failed loading library payload");
2851 goto done;
2852 }
2853 } else {
2854 serverPanic("Bad function type was given to rdbFunctionLoad");
2855 }
2856
2857 if (lib_ctx) {
2858 sds library_name = NULL;
2859 if (!(library_name = functionsCreateWithLibraryCtx(final_payload, rdbflags & RDBFLAGS_ALLOW_DUP, &error, lib_ctx))) {
2860 if (!error) {
2861 error = sdsnew("Failed creating the library");
2862 }
2863 goto done;
2864 }
2865 sdsfree(library_name);
2866 }
2867
2868 res = C_OK;
2869
2870done:
2871 if (final_payload) sdsfree(final_payload);
2872 if (error) {
2873 if (err) {
2874 *err = error;
2875 } else {
2876 serverLog(LL_WARNING, "Failed creating function, %s", error);
2877 sdsfree(error);
2878 }
2879 }
2880 return res;
2881}
2882
2883/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
2884 * otherwise C_ERR is returned and 'errno' is set accordingly. */
2885int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
2886 functionsLibCtx* functions_lib_ctx = functionsLibCtxGetCurrent();
2887 rdbLoadingCtx loading_ctx = { .dbarray = server.db, .functions_lib_ctx = functions_lib_ctx };
2888 int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx);
2889 return retval;
2890}
2891
2892
2893/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
2894 * otherwise C_ERR is returned and 'errno' is set accordingly.
2895 * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
2896 * currently it only allow to set db object and functionLibCtx to which the data
2897 * will be loaded (in the future it might contains more such objects). */
2898int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
2899 uint64_t dbid = 0;
2900 int type, rdbver;
2901 redisDb *db = rdb_loading_ctx->dbarray+0;
2902 char buf[1024];
2903 int error;
2904 long long empty_keys_skipped = 0;
2905
2906 rdb->update_cksum = rdbLoadProgressCallback;
2907 rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
2908 if (rioRead(rdb,buf,9) == 0) goto eoferr;
2909 buf[9] = '\0';
2910 if (memcmp(buf,"REDIS",5) != 0) {
2911 serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
2912 errno = EINVAL;
2913 return C_ERR;
2914 }
2915 rdbver = atoi(buf+5);
2916 if (rdbver < 1 || rdbver > RDB_VERSION) {
2917 serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
2918 errno = EINVAL;
2919 return C_ERR;
2920 }
2921
2922 /* Key-specific attributes, set by opcodes before the key type. */
2923 long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
2924 long long lru_clock = LRU_CLOCK();
2925
2926 while(1) {
2927 sds key;
2928 robj *val;
2929
2930 /* Read type. */
2931 if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
2932
2933 /* Handle special types. */
2934 if (type == RDB_OPCODE_EXPIRETIME) {
2935 /* EXPIRETIME: load an expire associated with the next key
2936 * to load. Note that after loading an expire we need to
2937 * load the actual type, and continue. */
2938 expiretime = rdbLoadTime(rdb);
2939 expiretime *= 1000;
2940 if (rioGetReadError(rdb)) goto eoferr;
2941 continue; /* Read next opcode. */
2942 } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
2943 /* EXPIRETIME_MS: milliseconds precision expire times introduced
2944 * with RDB v3. Like EXPIRETIME but no with more precision. */
2945 expiretime = rdbLoadMillisecondTime(rdb,rdbver);
2946 if (rioGetReadError(rdb)) goto eoferr;
2947 continue; /* Read next opcode. */
2948 } else if (type == RDB_OPCODE_FREQ) {
2949 /* FREQ: LFU frequency. */
2950 uint8_t byte;
2951 if (rioRead(rdb,&byte,1) == 0) goto eoferr;
2952 lfu_freq = byte;
2953 continue; /* Read next opcode. */
2954 } else if (type == RDB_OPCODE_IDLE) {
2955 /* IDLE: LRU idle time. */
2956 uint64_t qword;
2957 if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
2958 lru_idle = qword;
2959 continue; /* Read next opcode. */
2960 } else if (type == RDB_OPCODE_EOF) {
2961 /* EOF: End of file, exit the main loop. */
2962 break;
2963 } else if (type == RDB_OPCODE_SELECTDB) {
2964 /* SELECTDB: Select the specified database. */
2965 if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
2966 if (dbid >= (unsigned)server.dbnum) {
2967 serverLog(LL_WARNING,
2968 "FATAL: Data file was created with a Redis "
2969 "server configured to handle more than %d "
2970 "databases. Exiting\n", server.dbnum);
2971 exit(1);
2972 }
2973 db = rdb_loading_ctx->dbarray+dbid;
2974 continue; /* Read next opcode. */
2975 } else if (type == RDB_OPCODE_RESIZEDB) {
2976 /* RESIZEDB: Hint about the size of the keys in the currently
2977 * selected data base, in order to avoid useless rehashing. */
2978 uint64_t db_size, expires_size;
2979 if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
2980 goto eoferr;
2981 if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
2982 goto eoferr;
2983 dictExpand(db->dict,db_size);
2984 dictExpand(db->expires,expires_size);
2985 continue; /* Read next opcode. */
2986 } else if (type == RDB_OPCODE_AUX) {
2987 /* AUX: generic string-string fields. Use to add state to RDB
2988 * which is backward compatible. Implementations of RDB loading
2989 * are required to skip AUX fields they don't understand.
2990 *
2991 * An AUX field is composed of two strings: key and value. */
2992 robj *auxkey, *auxval;
2993 if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
2994 if ((auxval = rdbLoadStringObject(rdb)) == NULL) {
2995 decrRefCount(auxkey);
2996 goto eoferr;
2997 }
2998
2999 if (((char*)auxkey->ptr)[0] == '%') {
3000 /* All the fields with a name staring with '%' are considered
3001 * information fields and are logged at startup with a log
3002 * level of NOTICE. */
3003 serverLog(LL_NOTICE,"RDB '%s': %s",
3004 (char*)auxkey->ptr,
3005 (char*)auxval->ptr);
3006 } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
3007 if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
3008 } else if (!strcasecmp(auxkey->ptr,"repl-id")) {
3009 if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
3010 memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
3011 rsi->repl_id_is_set = 1;
3012 }
3013 } else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
3014 if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
3015 } else if (!strcasecmp(auxkey->ptr,"lua")) {
3016 /* Won't load the script back in memory anymore. */
3017 } else if (!strcasecmp(auxkey->ptr,"redis-ver")) {
3018 serverLog(LL_NOTICE,"Loading RDB produced by version %s",
3019 (char*)auxval->ptr);
3020 } else if (!strcasecmp(auxkey->ptr,"ctime")) {
3021 time_t age = time(NULL)-strtol(auxval->ptr,NULL,10);
3022 if (age < 0) age = 0;
3023 serverLog(LL_NOTICE,"RDB age %ld seconds",
3024 (unsigned long) age);
3025 } else if (!strcasecmp(auxkey->ptr,"used-mem")) {
3026 long long usedmem = strtoll(auxval->ptr,NULL,10);
3027 serverLog(LL_NOTICE,"RDB memory usage when created %.2f Mb",
3028 (double) usedmem / (1024*1024));
3029 server.loading_rdb_used_mem = usedmem;
3030 } else if (!strcasecmp(auxkey->ptr,"aof-preamble")) {
3031 long long haspreamble = strtoll(auxval->ptr,NULL,10);
3032 if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail");
3033 } else if (!strcasecmp(auxkey->ptr, "aof-base")) {
3034 long long isbase = strtoll(auxval->ptr, NULL, 10);
3035 if (isbase) serverLog(LL_NOTICE, "RDB is base AOF");
3036 } else if (!strcasecmp(auxkey->ptr,"redis-bits")) {
3037 /* Just ignored. */
3038 } else {
3039 /* We ignore fields we don't understand, as by AUX field
3040 * contract. */
3041 serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
3042 (char*)auxkey->ptr);
3043 }
3044
3045 decrRefCount(auxkey);
3046 decrRefCount(auxval);
3047 continue; /* Read type again. */
3048 } else if (type == RDB_OPCODE_MODULE_AUX) {
3049 /* Load module data that is not related to the Redis key space.
3050 * Such data can be potentially be stored both before and after the
3051 * RDB keys-values section. */
3052 uint64_t moduleid = rdbLoadLen(rdb,NULL);
3053 int when_opcode = rdbLoadLen(rdb,NULL);
3054 int when = rdbLoadLen(rdb,NULL);
3055 if (rioGetReadError(rdb)) goto eoferr;
3056 if (when_opcode != RDB_MODULE_OPCODE_UINT) {
3057 rdbReportReadError("bad when_opcode");
3058 goto eoferr;
3059 }
3060 moduleType *mt = moduleTypeLookupModuleByID(moduleid);
3061 char name[10];
3062 moduleTypeNameByID(name,moduleid);
3063
3064 if (!rdbCheckMode && mt == NULL) {
3065 /* Unknown module. */
3066 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
3067 exit(1);
3068 } else if (!rdbCheckMode && mt != NULL) {
3069 if (!mt->aux_load) {
3070 /* Module doesn't support AUX. */
3071 serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
3072 exit(1);
3073 }
3074
3075 RedisModuleIO io;
3076 moduleInitIOContext(io,mt,rdb,NULL,-1);
3077 io.ver = 2;
3078 /* Call the rdb_load method of the module providing the 10 bit
3079 * encoding version in the lower 10 bits of the module ID. */
3080 int rc = mt->aux_load(&io,moduleid&1023, when);
3081 if (io.ctx) {
3082 moduleFreeContext(io.ctx);
3083 zfree(io.ctx);
3084 }
3085 if (rc != REDISMODULE_OK || io.error) {
3086 moduleTypeNameByID(name,moduleid);
3087 serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
3088 goto eoferr;
3089 }
3090 uint64_t eof = rdbLoadLen(rdb,NULL);
3091 if (eof != RDB_MODULE_OPCODE_EOF) {
3092 serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
3093 goto eoferr;
3094 }
3095 continue;
3096 } else {
3097 /* RDB check mode. */
3098 robj *aux = rdbLoadCheckModuleValue(rdb,name);
3099 decrRefCount(aux);
3100 continue; /* Read next opcode. */
3101 }
3102 } else if (type == RDB_OPCODE_FUNCTION || type == RDB_OPCODE_FUNCTION2) {
3103 sds err = NULL;
3104 if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_lib_ctx, type, rdbflags, &err) != C_OK) {
3105 serverLog(LL_WARNING,"Failed loading library, %s", err);
3106 sdsfree(err);
3107 goto eoferr;
3108 }
3109 continue;
3110 }
3111
3112 /* Read key */
3113 if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
3114 goto eoferr;
3115 /* Read value */
3116 val = rdbLoadObject(type,rdb,key,db->id,&error);
3117
3118 /* Check if the key already expired. This function is used when loading
3119 * an RDB file from disk, either at startup, or when an RDB was
3120 * received from the master. In the latter case, the master is
3121 * responsible for key expiry. If we would expire keys here, the
3122 * snapshot taken by the master may not be reflected on the slave.
3123 * Similarly, if the base AOF is RDB format, we want to load all
3124 * the keys they are, since the log of operations in the incr AOF
3125 * is assumed to work in the exact keyspace state. */
3126 if (val == NULL) {
3127 /* Since we used to have bug that could lead to empty keys
3128 * (See #8453), we rather not fail when empty key is encountered
3129 * in an RDB file, instead we will silently discard it and
3130 * continue loading. */
3131 if (error == RDB_LOAD_ERR_EMPTY_KEY) {
3132 if(empty_keys_skipped++ < 10)
3133 serverLog(LL_WARNING, "rdbLoadObject skipping empty key: %s", key);
3134 sdsfree(key);
3135 } else {
3136 sdsfree(key);
3137 goto eoferr;
3138 }
3139 } else if (iAmMaster() &&
3140 !(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
3141 expiretime != -1 && expiretime < now)
3142 {
3143 if (rdbflags & RDBFLAGS_FEED_REPL) {
3144 /* Caller should have created replication backlog,
3145 * and now this path only works when rebooting,
3146 * so we don't have replicas yet. */
3147 serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0);
3148 robj keyobj;
3149 initStaticStringObject(keyobj,key);
3150 robj *argv[2];
3151 argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
3152 argv[1] = &keyobj;
3153 replicationFeedSlaves(server.slaves,dbid,argv,2);
3154 }
3155 sdsfree(key);
3156 decrRefCount(val);
3157 server.rdb_last_load_keys_expired++;
3158 } else {
3159 robj keyobj;
3160 initStaticStringObject(keyobj,key);
3161
3162 /* Add the new object in the hash table */
3163 int added = dbAddRDBLoad(db,key,val);
3164 server.rdb_last_load_keys_loaded++;
3165 if (!added) {
3166 if (rdbflags & RDBFLAGS_ALLOW_DUP) {
3167 /* This flag is useful for DEBUG RELOAD special modes.
3168 * When it's set we allow new keys to replace the current
3169 * keys with the same name. */
3170 dbSyncDelete(db,&keyobj);
3171 dbAddRDBLoad(db,key,val);
3172 } else {
3173 serverLog(LL_WARNING,
3174 "RDB has duplicated key '%s' in DB %d",key,db->id);
3175 serverPanic("Duplicated key found in RDB file");
3176 }
3177 }
3178
3179 /* Set the expire time if needed */
3180 if (expiretime != -1) {
3181 setExpire(NULL,db,&keyobj,expiretime);
3182 }
3183
3184 /* Set usage information (for eviction). */
3185 objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
3186
3187 /* call key space notification on key loaded for modules only */
3188 moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);
3189 }
3190
3191 /* Loading the database more slowly is useful in order to test
3192 * certain edge cases. */
3193 if (server.key_load_delay)
3194 debugDelay(server.key_load_delay);
3195
3196 /* Reset the state that is key-specified and is populated by
3197 * opcodes before the key, so that we start from scratch again. */
3198 expiretime = -1;
3199 lfu_freq = -1;
3200 lru_idle = -1;
3201 }
3202 /* Verify the checksum if RDB version is >= 5 */
3203 if (rdbver >= 5) {
3204 uint64_t cksum, expected = rdb->cksum;
3205
3206 if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
3207 if (server.rdb_checksum && !server.skip_checksum_validation) {
3208 memrev64ifbe(&cksum);
3209 if (cksum == 0) {
3210 serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
3211 } else if (cksum != expected) {
3212 serverLog(LL_WARNING,"Wrong RDB checksum expected: (%llx) but "
3213 "got (%llx). Aborting now.",
3214 (unsigned long long)expected,
3215 (unsigned long long)cksum);
3216 rdbReportCorruptRDB("RDB CRC error");
3217 return C_ERR;
3218 }
3219 }
3220 }
3221
3222 if (empty_keys_skipped) {
3223 serverLog(LL_WARNING,
3224 "Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.",
3225 server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped);
3226 } else {
3227 serverLog(LL_NOTICE,
3228 "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
3229 server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
3230 }
3231 return C_OK;
3232
3233 /* Unexpected end of file is handled here calling rdbReportReadError():
3234 * this will in turn either abort Redis in most cases, or if we are loading
3235 * the RDB file from a socket during initial SYNC (diskless replica mode),
3236 * we'll report the error to the caller, so that we can retry. */
3237eoferr:
3238 serverLog(LL_WARNING,
3239 "Short read or OOM loading DB. Unrecoverable error, aborting now.");
3240 rdbReportReadError("Unexpected EOF reading RDB file");
3241 return C_ERR;
3242}
3243
3244/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
3245 * filename is open for reading and a rio stream object created in order
3246 * to do the actual loading. Moreover the ETA displayed in the INFO
3247 * output is initialized and finalized.
3248 *
3249 * If you pass an 'rsi' structure initialized with RDB_SAVE_INFO_INIT, the
3250 * loading code will fill the information fields in the structure. */
3251int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
3252 FILE *fp;
3253 rio rdb;
3254 int retval;
3255 struct stat sb;
3256
3257 if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
3258
3259 if (fstat(fileno(fp), &sb) == -1)
3260 sb.st_size = 0;
3261
3262 startLoadingFile(sb.st_size, filename, rdbflags);
3263 rioInitWithFile(&rdb,fp);
3264
3265 retval = rdbLoadRio(&rdb,rdbflags,rsi);
3266
3267 fclose(fp);
3268 stopLoading(retval==C_OK);
3269 return retval;
3270}
3271
3272/* A background saving child (BGSAVE) terminated its work. Handle this.
3273 * This function covers the case of actual BGSAVEs. */
3274static void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
3275 if (!bysignal && exitcode == 0) {
3276 serverLog(LL_NOTICE,
3277 "Background saving terminated with success");
3278 server.dirty = server.dirty - server.dirty_before_bgsave;
3279 server.lastsave = time(NULL);
3280 server.lastbgsave_status = C_OK;
3281 } else if (!bysignal && exitcode != 0) {
3282 serverLog(LL_WARNING, "Background saving error");
3283 server.lastbgsave_status = C_ERR;
3284 } else {
3285 mstime_t latency;
3286
3287 serverLog(LL_WARNING,
3288 "Background saving terminated by signal %d", bysignal);
3289 latencyStartMonitor(latency);
3290 rdbRemoveTempFile(server.child_pid, 0);
3291 latencyEndMonitor(latency);
3292 latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
3293 /* SIGUSR1 is whitelisted, so we have a way to kill a child without
3294 * triggering an error condition. */
3295 if (bysignal != SIGUSR1)
3296 server.lastbgsave_status = C_ERR;
3297 }
3298}
3299
3300/* A background saving child (BGSAVE) terminated its work. Handle this.
3301 * This function covers the case of RDB -> Slaves socket transfers for
3302 * diskless replication. */
3303static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
3304 if (!bysignal && exitcode == 0) {
3305 serverLog(LL_NOTICE,
3306 "Background RDB transfer terminated with success");
3307 } else if (!bysignal && exitcode != 0) {
3308 serverLog(LL_WARNING, "Background transfer error");
3309 } else {
3310 serverLog(LL_WARNING,
3311 "Background transfer terminated by signal %d", bysignal);
3312 }
3313 if (server.rdb_child_exit_pipe!=-1)
3314 close(server.rdb_child_exit_pipe);
3315 aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
3316 close(server.rdb_pipe_read);
3317 server.rdb_child_exit_pipe = -1;
3318 server.rdb_pipe_read = -1;
3319 zfree(server.rdb_pipe_conns);
3320 server.rdb_pipe_conns = NULL;
3321 server.rdb_pipe_numconns = 0;
3322 server.rdb_pipe_numconns_writing = 0;
3323 zfree(server.rdb_pipe_buff);
3324 server.rdb_pipe_buff = NULL;
3325 server.rdb_pipe_bufflen = 0;
3326}
3327
3328/* When a background RDB saving/transfer terminates, call the right handler. */
3329void backgroundSaveDoneHandler(int exitcode, int bysignal) {
3330 int type = server.rdb_child_type;
3331 switch(server.rdb_child_type) {
3332 case RDB_CHILD_TYPE_DISK:
3333 backgroundSaveDoneHandlerDisk(exitcode,bysignal);
3334 break;
3335 case RDB_CHILD_TYPE_SOCKET:
3336 backgroundSaveDoneHandlerSocket(exitcode,bysignal);
3337 break;
3338 default:
3339 serverPanic("Unknown RDB child type.");
3340 break;
3341 }
3342
3343 server.rdb_child_type = RDB_CHILD_TYPE_NONE;
3344 server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
3345 server.rdb_save_time_start = -1;
3346 /* Possibly there are slaves waiting for a BGSAVE in order to be served
3347 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
3348 updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, type);
3349}
3350
3351/* Kill the RDB saving child using SIGUSR1 (so that the parent will know
3352 * the child did not exit for an error, but because we wanted), and performs
3353 * the cleanup needed. */
3354void killRDBChild(void) {
3355 kill(server.child_pid, SIGUSR1);
3356 /* Because we are not using here waitpid (like we have in killAppendOnlyChild
3357 * and TerminateModuleForkChild), all the cleanup operations is done by
3358 * checkChildrenDone, that later will find that the process killed.
3359 * This includes:
3360 * - resetChildState
3361 * - rdbRemoveTempFile */
3362}
3363
3364/* Spawn an RDB child that writes the RDB to the sockets of the slaves
3365 * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
3366int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
3367 listNode *ln;
3368 listIter li;
3369 pid_t childpid;
3370 int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
3371
3372 if (hasActiveChildProcess()) return C_ERR;
3373
3374 /* Even if the previous fork child exited, don't start a new one until we
3375 * drained the pipe. */
3376 if (server.rdb_pipe_conns) return C_ERR;
3377
3378 /* Before to fork, create a pipe that is used to transfer the rdb bytes to
3379 * the parent, we can't let it write directly to the sockets, since in case
3380 * of TLS we must let the parent handle a continuous TLS state when the
3381 * child terminates and parent takes over. */
3382 if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
3383 server.rdb_pipe_read = pipefds[0]; /* read end */
3384 rdb_pipe_write = pipefds[1]; /* write end */
3385
3386 /* create another pipe that is used by the parent to signal to the child
3387 * that it can exit. */
3388 if (anetPipe(pipefds, 0, 0) == -1) {
3389 close(rdb_pipe_write);
3390 close(server.rdb_pipe_read);
3391 return C_ERR;
3392 }
3393 safe_to_exit_pipe = pipefds[0]; /* read end */
3394 server.rdb_child_exit_pipe = pipefds[1]; /* write end */
3395
3396 /* Collect the connections of the replicas we want to transfer
3397 * the RDB to, which are i WAIT_BGSAVE_START state. */
3398 server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
3399 server.rdb_pipe_numconns = 0;
3400 server.rdb_pipe_numconns_writing = 0;
3401 listRewind(server.slaves,&li);
3402 while((ln = listNext(&li))) {
3403 client *slave = ln->value;
3404 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
3405 /* Check slave has the exact requirements */
3406 if (slave->slave_req != req)
3407 continue;
3408 server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
3409 replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
3410 }
3411 }
3412
3413 /* Create the child process. */
3414 if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
3415 /* Child */
3416 int retval, dummy;
3417 rio rdb;
3418
3419 rioInitWithFd(&rdb,rdb_pipe_write);
3420
3421 redisSetProcTitle("redis-rdb-to-slaves");
3422 redisSetCpuAffinity(server.bgsave_cpulist);
3423
3424 retval = rdbSaveRioWithEOFMark(req,&rdb,NULL,rsi);
3425 if (retval == C_OK && rioFlush(&rdb) == 0)
3426 retval = C_ERR;
3427
3428 if (retval == C_OK) {
3429 sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
3430 }
3431
3432 rioFreeFd(&rdb);
3433 /* wake up the reader, tell it we're done. */
3434 close(rdb_pipe_write);
3435 close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
3436 /* hold exit until the parent tells us it's safe. we're not expecting
3437 * to read anything, just get the error when the pipe is closed. */
3438 dummy = read(safe_to_exit_pipe, pipefds, 1);
3439 UNUSED(dummy);
3440 exitFromChild((retval == C_OK) ? 0 : 1);
3441 } else {
3442 /* Parent */
3443 if (childpid == -1) {
3444 serverLog(LL_WARNING,"Can't save in background: fork: %s",
3445 strerror(errno));
3446
3447 /* Undo the state change. The caller will perform cleanup on
3448 * all the slaves in BGSAVE_START state, but an early call to
3449 * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
3450 listRewind(server.slaves,&li);
3451 while((ln = listNext(&li))) {
3452 client *slave = ln->value;
3453 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
3454 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
3455 }
3456 }
3457 close(rdb_pipe_write);
3458 close(server.rdb_pipe_read);
3459 zfree(server.rdb_pipe_conns);
3460 server.rdb_pipe_conns = NULL;
3461 server.rdb_pipe_numconns = 0;
3462 server.rdb_pipe_numconns_writing = 0;
3463 } else {
3464 serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
3465 (long) childpid);
3466 server.rdb_save_time_start = time(NULL);
3467 server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
3468 close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
3469 if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
3470 serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
3471 }
3472 }
3473 close(safe_to_exit_pipe);
3474 return (childpid == -1) ? C_ERR : C_OK;
3475 }
3476 return C_OK; /* Unreached. */
3477}
3478
3479void saveCommand(client *c) {
3480 if (server.child_type == CHILD_TYPE_RDB) {
3481 addReplyError(c,"Background save already in progress");
3482 return;
3483 }
3484
3485 server.stat_rdb_saves++;
3486
3487 rdbSaveInfo rsi, *rsiptr;
3488 rsiptr = rdbPopulateSaveInfo(&rsi);
3489 if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) {
3490 addReply(c,shared.ok);
3491 } else {
3492 addReplyErrorObject(c,shared.err);
3493 }
3494}
3495
3496/* BGSAVE [SCHEDULE] */
3497void bgsaveCommand(client *c) {
3498 int schedule = 0;
3499
3500 /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
3501 * is in progress. Instead of returning an error a BGSAVE gets scheduled. */
3502 if (c->argc > 1) {
3503 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
3504 schedule = 1;
3505 } else {
3506 addReplyErrorObject(c,shared.syntaxerr);
3507 return;
3508 }
3509 }
3510
3511 rdbSaveInfo rsi, *rsiptr;
3512 rsiptr = rdbPopulateSaveInfo(&rsi);
3513
3514 if (server.child_type == CHILD_TYPE_RDB) {
3515 addReplyError(c,"Background save already in progress");
3516 } else if (hasActiveChildProcess() || server.in_exec) {
3517 if (schedule || server.in_exec) {
3518 server.rdb_bgsave_scheduled = 1;
3519 addReplyStatus(c,"Background saving scheduled");
3520 } else {
3521 addReplyError(c,
3522 "Another child process is active (AOF?): can't BGSAVE right now. "
3523 "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
3524 "possible.");
3525 }
3526 } else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) {
3527 addReplyStatus(c,"Background saving started");
3528 } else {
3529 addReplyErrorObject(c,shared.err);
3530 }
3531}
3532
3533/* Populate the rdbSaveInfo structure used to persist the replication
3534 * information inside the RDB file. Currently the structure explicitly
3535 * contains just the currently selected DB from the master stream, however
3536 * if the rdbSave*() family functions receive a NULL rsi structure also
3537 * the Replication ID/offset is not saved. The function populates 'rsi'
3538 * that is normally stack-allocated in the caller, returns the populated
3539 * pointer if the instance has a valid master client, otherwise NULL
3540 * is returned, and the RDB saving will not persist any replication related
3541 * information. */
3542rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
3543 rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
3544 *rsi = rsi_init;
3545
3546 /* If the instance is a master, we can populate the replication info
3547 * only when repl_backlog is not NULL. If the repl_backlog is NULL,
3548 * it means that the instance isn't in any replication chains. In this
3549 * scenario the replication info is useless, because when a slave
3550 * connects to us, the NULL repl_backlog will trigger a full
3551 * synchronization, at the same time we will use a new replid and clear
3552 * replid2. */
3553 if (!server.masterhost && server.repl_backlog) {
3554 /* Note that when server.slaveseldb is -1, it means that this master
3555 * didn't apply any write commands after a full synchronization.
3556 * So we can let repl_stream_db be 0, this allows a restarted slave
3557 * to reload replication ID/offset, it's safe because the next write
3558 * command must generate a SELECT statement. */
3559 rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb;
3560 return rsi;
3561 }
3562
3563 /* If the instance is a slave we need a connected master
3564 * in order to fetch the currently selected DB. */
3565 if (server.master) {
3566 rsi->repl_stream_db = server.master->db->id;
3567 return rsi;
3568 }
3569
3570 /* If we have a cached master we can use it in order to populate the
3571 * replication selected DB info inside the RDB file: the slave can
3572 * increment the master_repl_offset only from data arriving from the
3573 * master, so if we are disconnected the offset in the cached master
3574 * is valid. */
3575 if (server.cached_master) {
3576 rsi->repl_stream_db = server.cached_master->db->id;
3577 return rsi;
3578 }
3579 return NULL;
3580}
3581