1/* rio.c is a simple stream-oriented I/O abstraction that provides an interface
2 * to write code that can consume/produce data using different concrete input
3 * and output devices. For instance the same rdb.c code using the rio
4 * abstraction can be used to read and write the RDB format using in-memory
5 * buffers or files.
6 *
7 * A rio object provides the following methods:
8 * read: read from stream.
9 * write: write to stream.
10 * tell: get the current offset.
11 *
12 * It is also possible to set a 'checksum' method that is used by rio.c in order
13 * to compute a checksum of the data written or read, or to query the rio object
14 * for the current checksum.
15 *
16 * ----------------------------------------------------------------------------
17 *
18 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
19 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
20 * All rights reserved.
21 *
22 * Redistribution and use in source and binary forms, with or without
23 * modification, are permitted provided that the following conditions are met:
24 *
25 * * Redistributions of source code must retain the above copyright notice,
26 * this list of conditions and the following disclaimer.
27 * * Redistributions in binary form must reproduce the above copyright
28 * notice, this list of conditions and the following disclaimer in the
29 * documentation and/or other materials provided with the distribution.
30 * * Neither the name of Redis nor the names of its contributors may be used
31 * to endorse or promote products derived from this software without
32 * specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
35 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
38 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
39 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
40 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
41 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
42 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
43 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
44 * POSSIBILITY OF SUCH DAMAGE.
45 */
46
47
48#include "fmacros.h"
49#include <string.h>
50#include <stdio.h>
51#include <unistd.h>
52#include "rio.h"
53#include "util.h"
54#include "crc64.h"
55#include "config.h"
56#include "server.h"
57
58/* ------------------------- Buffer I/O implementation ----------------------- */
59
60/* Returns 1 or 0 for success/failure. */
61static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
62 r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
63 r->io.buffer.pos += len;
64 return 1;
65}
66
67/* Returns 1 or 0 for success/failure. */
68static size_t rioBufferRead(rio *r, void *buf, size_t len) {
69 if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
70 return 0; /* not enough buffer to return len bytes. */
71 memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
72 r->io.buffer.pos += len;
73 return 1;
74}
75
76/* Returns read/write position in buffer. */
77static off_t rioBufferTell(rio *r) {
78 return r->io.buffer.pos;
79}
80
81/* Flushes any buffer to target device if applicable. Returns 1 on success
82 * and 0 on failures. */
83static int rioBufferFlush(rio *r) {
84 UNUSED(r);
85 return 1; /* Nothing to do, our write just appends to the buffer. */
86}
87
88static const rio rioBufferIO = {
89 rioBufferRead,
90 rioBufferWrite,
91 rioBufferTell,
92 rioBufferFlush,
93 NULL, /* update_checksum */
94 0, /* current checksum */
95 0, /* flags */
96 0, /* bytes read or written */
97 0, /* read/write chunk size */
98 { { NULL, 0 } } /* union for io-specific vars */
99};
100
101void rioInitWithBuffer(rio *r, sds s) {
102 *r = rioBufferIO;
103 r->io.buffer.ptr = s;
104 r->io.buffer.pos = 0;
105}
106
107/* --------------------- Stdio file pointer implementation ------------------- */
108
109/* Returns 1 or 0 for success/failure. */
110static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
111 if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp);
112
113 size_t nwritten = 0;
114 /* Incrementally write data to the file, avoid a single write larger than
115 * the autosync threshold (so that the kernel's buffer cache never has too
116 * many dirty pages at once). */
117 while (len != nwritten) {
118 serverAssert(r->io.file.autosync > r->io.file.buffered);
119 size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered);
120 size_t towrite = nalign > len-nwritten ? len-nwritten : nalign;
121
122 if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0;
123 nwritten += towrite;
124 r->io.file.buffered += towrite;
125
126 if (r->io.file.buffered >= r->io.file.autosync) {
127 fflush(r->io.file.fp);
128
129 size_t processed = r->processed_bytes + nwritten;
130 serverAssert(processed % r->io.file.autosync == 0);
131 serverAssert(r->io.file.buffered == r->io.file.autosync);
132
133#if HAVE_SYNC_FILE_RANGE
134 /* Start writeout asynchronously. */
135 if (sync_file_range(fileno(r->io.file.fp),
136 processed - r->io.file.autosync, r->io.file.autosync,
137 SYNC_FILE_RANGE_WRITE) == -1)
138 return 0;
139
140 if (processed >= (size_t)r->io.file.autosync * 2) {
141 /* To keep the promise to 'autosync', we should make sure last
142 * asynchronous writeout persists into disk. This call may block
143 * if last writeout is not finished since disk is slow. */
144 if (sync_file_range(fileno(r->io.file.fp),
145 processed - r->io.file.autosync*2,
146 r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE|
147 SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1)
148 return 0;
149 }
150#else
151 if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0;
152#endif
153 r->io.file.buffered = 0;
154 }
155 }
156 return 1;
157}
158
159/* Returns 1 or 0 for success/failure. */
160static size_t rioFileRead(rio *r, void *buf, size_t len) {
161 return fread(buf,len,1,r->io.file.fp);
162}
163
164/* Returns read/write position in file. */
165static off_t rioFileTell(rio *r) {
166 return ftello(r->io.file.fp);
167}
168
169/* Flushes any buffer to target device if applicable. Returns 1 on success
170 * and 0 on failures. */
171static int rioFileFlush(rio *r) {
172 return (fflush(r->io.file.fp) == 0) ? 1 : 0;
173}
174
175static const rio rioFileIO = {
176 rioFileRead,
177 rioFileWrite,
178 rioFileTell,
179 rioFileFlush,
180 NULL, /* update_checksum */
181 0, /* current checksum */
182 0, /* flags */
183 0, /* bytes read or written */
184 0, /* read/write chunk size */
185 { { NULL, 0 } } /* union for io-specific vars */
186};
187
188void rioInitWithFile(rio *r, FILE *fp) {
189 *r = rioFileIO;
190 r->io.file.fp = fp;
191 r->io.file.buffered = 0;
192 r->io.file.autosync = 0;
193}
194
195/* ------------------- Connection implementation -------------------
196 * We use this RIO implementation when reading an RDB file directly from
197 * the connection to the memory via rdbLoadRio(), thus this implementation
198 * only implements reading from a connection that is, normally,
199 * just a socket. */
200
201static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
202 UNUSED(r);
203 UNUSED(buf);
204 UNUSED(len);
205 return 0; /* Error, this target does not yet support writing. */
206}
207
208/* Returns 1 or 0 for success/failure. */
209static size_t rioConnRead(rio *r, void *buf, size_t len) {
210 size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos;
211
212 /* If the buffer is too small for the entire request: realloc. */
213 if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
214 r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
215
216 /* If the remaining unused buffer is not large enough: memmove so that we
217 * can read the rest. */
218 if (len > avail && sdsavail(r->io.conn.buf) < len - avail) {
219 sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
220 r->io.conn.pos = 0;
221 }
222
223 /* Make sure the caller didn't request to read past the limit.
224 * If they didn't we'll buffer till the limit, if they did, we'll
225 * return an error. */
226 if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) {
227 errno = EOVERFLOW;
228 return 0;
229 }
230
231 /* If we don't already have all the data in the sds, read more */
232 while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
233 size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos;
234 size_t needs = len - buffered;
235 /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
236 * the two. */
237 size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs;
238 if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
239 if (r->io.conn.read_limit != 0 &&
240 r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
241 {
242 toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
243 }
244 int retval = connRead(r->io.conn.conn,
245 (char*)r->io.conn.buf + sdslen(r->io.conn.buf),
246 toread);
247 if (retval == 0) {
248 return 0;
249 } else if (retval < 0) {
250 if (connLastErrorRetryable(r->io.conn.conn)) continue;
251 if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
252 return 0;
253 }
254 sdsIncrLen(r->io.conn.buf, retval);
255 }
256
257 memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
258 r->io.conn.read_so_far += len;
259 r->io.conn.pos += len;
260 return len;
261}
262
263/* Returns read/write position in file. */
264static off_t rioConnTell(rio *r) {
265 return r->io.conn.read_so_far;
266}
267
268/* Flushes any buffer to target device if applicable. Returns 1 on success
269 * and 0 on failures. */
270static int rioConnFlush(rio *r) {
271 /* Our flush is implemented by the write method, that recognizes a
272 * buffer set to NULL with a count of zero as a flush request. */
273 return rioConnWrite(r,NULL,0);
274}
275
276static const rio rioConnIO = {
277 rioConnRead,
278 rioConnWrite,
279 rioConnTell,
280 rioConnFlush,
281 NULL, /* update_checksum */
282 0, /* current checksum */
283 0, /* flags */
284 0, /* bytes read or written */
285 0, /* read/write chunk size */
286 { { NULL, 0 } } /* union for io-specific vars */
287};
288
289/* Create an RIO that implements a buffered read from an fd
290 * read_limit argument stops buffering when the reaching the limit. */
291void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
292 *r = rioConnIO;
293 r->io.conn.conn = conn;
294 r->io.conn.pos = 0;
295 r->io.conn.read_limit = read_limit;
296 r->io.conn.read_so_far = 0;
297 r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
298 sdsclear(r->io.conn.buf);
299}
300
301/* Release the RIO stream. Optionally returns the unread buffered data
302 * when the SDS pointer 'remaining' is passed. */
303void rioFreeConn(rio *r, sds *remaining) {
304 if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
305 if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
306 *remaining = r->io.conn.buf;
307 } else {
308 sdsfree(r->io.conn.buf);
309 if (remaining) *remaining = NULL;
310 }
311 r->io.conn.buf = NULL;
312}
313
314/* ------------------- File descriptor implementation ------------------
315 * This target is used to write the RDB file to pipe, when the master just
316 * streams the data to the replicas without creating an RDB on-disk image
317 * (diskless replication option).
318 * It only implements writes. */
319
320/* Returns 1 or 0 for success/failure.
321 *
322 * When buf is NULL and len is 0, the function performs a flush operation
323 * if there is some pending buffer, so this function is also used in order
324 * to implement rioFdFlush(). */
325static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
326 ssize_t retval;
327 unsigned char *p = (unsigned char*) buf;
328 int doflush = (buf == NULL && len == 0);
329
330 /* For small writes, we rather keep the data in user-space buffer, and flush
331 * it only when it grows. however for larger writes, we prefer to flush
332 * any pre-existing buffer, and write the new one directly without reallocs
333 * and memory copying. */
334 if (len > PROTO_IOBUF_LEN) {
335 /* First, flush any pre-existing buffered data. */
336 if (sdslen(r->io.fd.buf)) {
337 if (rioFdWrite(r, NULL, 0) == 0)
338 return 0;
339 }
340 /* Write the new data, keeping 'p' and 'len' from the input. */
341 } else {
342 if (len) {
343 r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
344 if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
345 doflush = 1;
346 if (!doflush)
347 return 1;
348 }
349 /* Flushing the buffered data. set 'p' and 'len' accordingly. */
350 p = (unsigned char*) r->io.fd.buf;
351 len = sdslen(r->io.fd.buf);
352 }
353
354 size_t nwritten = 0;
355 while(nwritten != len) {
356 retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
357 if (retval <= 0) {
358 if (retval == -1 && errno == EINTR) continue;
359 /* With blocking io, which is the sole user of this
360 * rio target, EWOULDBLOCK is returned only because of
361 * the SO_SNDTIMEO socket option, so we translate the error
362 * into one more recognizable by the user. */
363 if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
364 return 0; /* error. */
365 }
366 nwritten += retval;
367 }
368
369 r->io.fd.pos += len;
370 sdsclear(r->io.fd.buf);
371 return 1;
372}
373
374/* Returns 1 or 0 for success/failure. */
375static size_t rioFdRead(rio *r, void *buf, size_t len) {
376 UNUSED(r);
377 UNUSED(buf);
378 UNUSED(len);
379 return 0; /* Error, this target does not support reading. */
380}
381
382/* Returns read/write position in file. */
383static off_t rioFdTell(rio *r) {
384 return r->io.fd.pos;
385}
386
387/* Flushes any buffer to target device if applicable. Returns 1 on success
388 * and 0 on failures. */
389static int rioFdFlush(rio *r) {
390 /* Our flush is implemented by the write method, that recognizes a
391 * buffer set to NULL with a count of zero as a flush request. */
392 return rioFdWrite(r,NULL,0);
393}
394
395static const rio rioFdIO = {
396 rioFdRead,
397 rioFdWrite,
398 rioFdTell,
399 rioFdFlush,
400 NULL, /* update_checksum */
401 0, /* current checksum */
402 0, /* flags */
403 0, /* bytes read or written */
404 0, /* read/write chunk size */
405 { { NULL, 0 } } /* union for io-specific vars */
406};
407
408void rioInitWithFd(rio *r, int fd) {
409 *r = rioFdIO;
410 r->io.fd.fd = fd;
411 r->io.fd.pos = 0;
412 r->io.fd.buf = sdsempty();
413}
414
415/* release the rio stream. */
416void rioFreeFd(rio *r) {
417 sdsfree(r->io.fd.buf);
418}
419
420/* ---------------------------- Generic functions ---------------------------- */
421
422/* This function can be installed both in memory and file streams when checksum
423 * computation is needed. */
424void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
425 r->cksum = crc64(r->cksum,buf,len);
426}
427
428/* Set the file-based rio object to auto-fsync every 'bytes' file written.
429 * By default this is set to zero that means no automatic file sync is
430 * performed.
431 *
432 * This feature is useful in a few contexts since when we rely on OS write
433 * buffers sometimes the OS buffers way too much, resulting in too many
434 * disk I/O concentrated in very little time. When we fsync in an explicit
435 * way instead the I/O pressure is more distributed across time. */
436void rioSetAutoSync(rio *r, off_t bytes) {
437 if(r->write != rioFileIO.write) return;
438 r->io.file.autosync = bytes;
439}
440
441/* Check the type of rio. */
442uint8_t rioCheckType(rio *r) {
443 if (r->read == rioFileRead) {
444 return RIO_TYPE_FILE;
445 } else if (r->read == rioBufferRead) {
446 return RIO_TYPE_BUFFER;
447 } else if (r->read == rioConnRead) {
448 return RIO_TYPE_CONN;
449 } else {
450 /* r->read == rioFdRead */
451 return RIO_TYPE_FD;
452 }
453}
454
455/* --------------------------- Higher level interface --------------------------
456 *
457 * The following higher level functions use lower level rio.c functions to help
458 * generating the Redis protocol for the Append Only File. */
459
460/* Write multi bulk count in the format: "*<count>\r\n". */
461size_t rioWriteBulkCount(rio *r, char prefix, long count) {
462 char cbuf[128];
463 int clen;
464
465 cbuf[0] = prefix;
466 clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
467 cbuf[clen++] = '\r';
468 cbuf[clen++] = '\n';
469 if (rioWrite(r,cbuf,clen) == 0) return 0;
470 return clen;
471}
472
473/* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */
474size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
475 size_t nwritten;
476
477 if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;
478 if (len > 0 && rioWrite(r,buf,len) == 0) return 0;
479 if (rioWrite(r,"\r\n",2) == 0) return 0;
480 return nwritten+len+2;
481}
482
483/* Write a long long value in format: "$<count>\r\n<payload>\r\n". */
484size_t rioWriteBulkLongLong(rio *r, long long l) {
485 char lbuf[32];
486 unsigned int llen;
487
488 llen = ll2string(lbuf,sizeof(lbuf),l);
489 return rioWriteBulkString(r,lbuf,llen);
490}
491
492/* Write a double value in the format: "$<count>\r\n<payload>\r\n" */
493size_t rioWriteBulkDouble(rio *r, double d) {
494 char dbuf[128];
495 unsigned int dlen;
496
497 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
498 return rioWriteBulkString(r,dbuf,dlen);
499}
500